Compare commits

...

2 Commits

Author SHA1 Message Date
胡尧
6c8677e9e6 修改服务地址 2025-06-05 09:40:18 +08:00
胡尧
e8a968c5a7 修改工单获取前置三元检测方法 2025-05-15 08:45:55 +08:00
4 changed files with 234 additions and 134 deletions

View File

@@ -20,6 +20,7 @@
'data/sf_work_individuation_page.xml', 'data/sf_work_individuation_page.xml',
'data/agv_scheduling_data.xml', 'data/agv_scheduling_data.xml',
'data/product_data.xml', 'data/product_data.xml',
'data/automation_folder_data.xml',
'security/group_security.xml', 'security/group_security.xml',
'security/ir.model.access.csv', 'security/ir.model.access.csv',
'wizard/workpiece_delivery_views.xml', 'wizard/workpiece_delivery_views.xml',

View File

@@ -2,16 +2,18 @@
import logging import logging
import json import json
from datetime import datetime from datetime import datetime
import base64
from odoo.addons.sf_manufacturing.models.agv_scheduling import RepeatTaskException from odoo.addons.sf_manufacturing.models.agv_scheduling import RepeatTaskException
from odoo import http from odoo import http
from odoo.http import request from odoo.http import request
from odoo.exceptions import MissingError
from odoo.addons.sf_base.decorators.api_log import api_log from odoo.addons.sf_base.decorators.api_log import api_log
class Manufacturing_Connect(http.Controller): class Manufacturing_Connect(http.Controller):
@http.route('/AutoDeviceApi/GetWoInfo', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, @http.route('/AutoDeviceApi/GetWoInfo', type='json', auth='none', methods=['GET', 'POST'], csrf=False,
cors="*") cors="*")
@api_log('获取工单', requester='中控系统') @api_log('获取工单', requester='中控系统')
def get_Work_Info(self, **kw): def get_Work_Info(self, **kw):
@@ -54,7 +56,7 @@ class Manufacturing_Connect(http.Controller):
logging.info('get_Work_Info error:%s' % e) logging.info('get_Work_Info error:%s' % e)
return json.JSONEncoder().encode(res) return json.JSONEncoder().encode(res)
@http.route('/AutoDeviceApi/GetShiftPlan', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, @http.route('/AutoDeviceApi/GetShiftPlan', type='json', auth='none', methods=['GET', 'POST'], csrf=False,
cors="*") cors="*")
@api_log('获取日计划', requester='中控系统') @api_log('获取日计划', requester='中控系统')
def get_ShiftPlan(self, **kw): def get_ShiftPlan(self, **kw):
@@ -108,7 +110,7 @@ class Manufacturing_Connect(http.Controller):
logging.info('get_ShiftPlan error:%s' % e) logging.info('get_ShiftPlan error:%s' % e)
return json.JSONEncoder().encode(res) return json.JSONEncoder().encode(res)
@http.route('/AutoDeviceApi/QcCheck', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, @http.route('/AutoDeviceApi/QcCheck', type='json', auth='none', methods=['GET', 'POST'], csrf=False,
cors="*") cors="*")
@api_log('工件预调(前置三元检测)', requester='中控系统') @api_log('工件预调(前置三元检测)', requester='中控系统')
def get_qcCheck(self, **kw): def get_qcCheck(self, **kw):
@@ -704,3 +706,59 @@ class Manufacturing_Connect(http.Controller):
res = {'Succeed': False, 'ErrorCode': 202, 'Error': str(e)} res = {'Succeed': False, 'ErrorCode': 202, 'Error': str(e)}
logging.info('AGVDownProduct error:%s' % e) logging.info('AGVDownProduct error:%s' % e)
return json.JSONEncoder().encode(res) return json.JSONEncoder().encode(res)
@http.route('/api/upload_three_check_data', type='http', auth='public', methods=['POST'], csrf=False, cros='*')
def upload_three_check_data(self):
res = {'Succeed': True, 'ErrorCode': 200, 'Messages': '上传成功'}
uploaded_files = request.httprequest.files.getlist('file')
if uploaded_files:
try:
for uploaded_file in uploaded_files:
file_content = uploaded_file.read()
file_name = uploaded_file.filename
production_name = '/'.join(file_name.split('_')[:-1])
processing_panel = file_name.split('_')[-1].split('.')[0]
# 找到对应的工单
production_id = request.env['mrp.production'].sudo().search([('name', '=', production_name)])
wo = production_id.workorder_ids.filtered(lambda wo: wo.processing_panel == processing_panel and wo.routing_type == '装夹预调')
if not wo:
raise MissingError('工单不存在')
folder_id = request.env.ref('sf_manufacturing.documents_pre_three_element_detection_folder')
document = request.env['documents.document'].sudo().search([('res_model', '=', 'mrp.workorder'), ('res_id', '=', wo.id)])
if document and document.attachment_id:
attachment = request.env['ir.attachment'].sudo().create({
'name': file_name,
'type': 'binary',
'datas': base64.b64encode(file_content),
'res_model': 'mrp.workorder',
'res_id': wo.id,
})
document.write({'attachment_id': attachment.id})
else:
# Create ir.attachment record
attachment = request.env['ir.attachment'].sudo().create({
'name': file_name,
'type': 'binary',
'datas': base64.b64encode(file_content),
'res_model': 'mrp.workorder',
'res_id': wo.id,
})
# 创建 documents.document 记录
request.env['documents.document'].sudo().create({
'name': file_name,
'attachment_id': attachment.id,
'folder_id': folder_id.id,
'res_model': 'mrp.workorder',
'res_id': wo.id,
})
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': str(e)}
return json.JSONEncoder().encode(res)

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<data noupdate="1">
<!-- 创建自动化文件夹 -->
<record id="documents_automation_folder" model="documents.folder">
<field name="name">自动化</field>
<field name="description">存放自动化生产流程相关文件</field>
<field name="sequence">20</field>
</record>
<record id="documents_pre_three_element_detection_folder" model="documents.folder">
<field name="name">前置三元检测报告</field>
<field name="parent_folder_id" ref="documents_automation_folder"/>
<field name="sequence">1</field>
</record>
</data>
</odoo>

View File

@@ -6,6 +6,7 @@ import urllib.parse
from datetime import date from datetime import date
from datetime import datetime, timedelta from datetime import datetime, timedelta
import requests import requests
import tempfile
import os import os
import math import math
from lxml import etree from lxml import etree
@@ -771,146 +772,170 @@ class ResMrpWorkOrder(models.Model):
# 获取三次元检测点数据 # 获取三次元检测点数据
def get_three_check_datas(self): def get_three_check_datas(self):
ftp_resconfig = self.env['res.config.settings'].get_values() ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), # ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']),
ftp_resconfig['ftp_user'], # ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password']) # ftp_resconfig['ftp_password'])
local_dir_path = '/ftp/before' # local_dir_path = '/ftp/before'
os.makedirs(local_dir_path, exist_ok=True) # os.makedirs(local_dir_path, exist_ok=True)
local_filename = self.save_name + '.xls' local_filename = self.save_name + '.xls'
local_file_path = os.path.join(local_dir_path, local_filename) # local_file_path = os.path.join(local_dir_path, local_filename)
logging.info('local_file_path:%s' % local_file_path) # logging.info('local_file_path:%s' % local_file_path)
# remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename # # remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename
remote_path = '/ThreeTest/XT/Before/' + local_filename # remote_path = '/ThreeTest/XT/Before/' + local_filename
logging.info('remote_path:%s' % remote_path) # logging.info('remote_path:%s' % remote_path)
is_get_detection_file = self.env['ir.config_parameter'].sudo().get_param('is_get_detection_file') # is_get_detection_file = self.env['ir.config_parameter'].sudo().get_param('is_get_detection_file')
if not is_get_detection_file: # if not is_get_detection_file:
paload_data = { base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
"filename": local_filename paload_data = {
} "filename": local_filename,
if not ftp_resconfig['get_check_file_path']: "sf_host": base_url
raise UserError('请先配置获取检测报告地址') }
url = ftp_resconfig['get_check_file_path'] + '/get/check/report' if not ftp_resconfig['get_check_file_path']:
response = requests.post(url, json=paload_data) raise UserError('请先配置获取检测报告地址')
logging.info('response:%s' % response.json()) url = ftp_resconfig['get_check_file_path'] + '/get/check/report'
if response.json().get('detail'): response = requests.post(url, json=paload_data)
raise UserError(response.json().get('detail')) # logging.info('response:%s' % response.json())
# if response.json().get('detail'):
# raise UserError(response.json().get('detail'))
if not ftp.file_exists(remote_path): # if not ftp.file_exists(remote_path):
raise UserError(f"文件不存在: {remote_path}") # raise UserError(f"文件不存在: {remote_path}")
with open(local_file_path, 'wb') as local_file: # with open(local_file_path, 'wb') as local_file:
ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write) # ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write)
logging.info('下载文件成功') # logging.info('下载文件成功')
# 解析本地文件
# file_path = 'WH_MO_00099.xls' # 使用下载的实际文件路径
parser = etree.XMLParser(recover=True) # Using recover to handle errors
tree = etree.parse(local_file_path, parser)
logging.info('tree:%s' % tree)
root = tree.getroot()
logging.info('root:%s' % root)
# 准备一个外部字典来存储以PT为键的坐标字典 document = self.env['documents.document'].sudo().search([('res_model', '=', 'mrp.workorder'), ('res_id', '=', self.id)])
pt_coordinates = {} if not document:
# 遍历每个工作表和行 raise UserError(f"未获取到检测数据")
for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'): binary_data = base64.b64decode(document.attachment_id.datas)
sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name')
logging.info('sheet_name:%s' % sheet_name)
if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表
current_pt = None
for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'):
cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell'))
for i, cell in enumerate(cells):
data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data')
if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空
# 检查是否是PT标识
logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据
if "PT" in data_cell.text:
current_pt = data_cell.text
pt_coordinates[current_pt] = []
elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None:
# 确保当前单元格后面还有单元格存在,以获取理论值
if i + 1 < len(cells):
next_cell = cells[i + 1]
theory_value = next_cell.find(
'.//{urn:schemas-microsoft-com:office:spreadsheet}Data')
if theory_value is not None:
# 为当前PT键添加坐标数据
pt_coordinates[current_pt].append({
data_cell.text: float(theory_value.text)
})
logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}")
logging.info('pt_coordinates=====%s' % pt_coordinates)
# pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}],
# 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]}
# 检查是否存在PT1等键 # 创建临时文件保存响应内容
if 'PT1' in pt_coordinates and pt_coordinates['PT1']: with tempfile.NamedTemporaryFile(delete=False, suffix='.xls') as temp_file:
self.X1_axis = pt_coordinates['PT3'][0]['X'] temp_file.write(binary_data)
self.Y1_axis = pt_coordinates['PT3'][1]['Y'] temp_file_path = temp_file.name
self.Z1_axis = pt_coordinates['PT3'][2]['Z']
else:
raise UserError('PT1点未测或数据错误')
if 'PT2' in pt_coordinates and pt_coordinates['PT2']:
self.X2_axis = pt_coordinates['PT4'][0]['X']
self.Y2_axis = pt_coordinates['PT4'][1]['Y']
self.Z2_axis = pt_coordinates['PT4'][2]['Z']
else:
raise UserError('PT2点未测或数据错误')
if 'PT3' in pt_coordinates and pt_coordinates['PT3']:
self.X3_axis = pt_coordinates['PT5'][0]['X']
self.Y3_axis = pt_coordinates['PT5'][1]['Y']
self.Z3_axis = pt_coordinates['PT5'][2]['Z']
else:
raise UserError('PT3点未测或数据错误')
if 'PT4' in pt_coordinates and pt_coordinates['PT4']:
self.X4_axis = pt_coordinates['PT6'][0]['X']
self.Y4_axis = pt_coordinates['PT6'][1]['Y']
self.Z4_axis = pt_coordinates['PT6'][2]['Z']
else:
raise UserError('PT4点未测或数据错误')
if 'PT5' in pt_coordinates and pt_coordinates['PT5']:
self.X5_axis = pt_coordinates['PT7'][0]['X']
self.Y5_axis = pt_coordinates['PT7'][1]['Y']
self.Z5_axis = pt_coordinates['PT7'][2]['Z']
else:
raise UserError('PT5点未测或数据错误')
if 'PT6' in pt_coordinates and pt_coordinates['PT6']:
self.X6_axis = pt_coordinates['PT8'][0]['X']
self.Y6_axis = pt_coordinates['PT8'][1]['Y']
self.Z6_axis = pt_coordinates['PT8'][2]['Z']
else:
raise UserError('PT6点未测或数据错误')
if 'PT7' in pt_coordinates and pt_coordinates['PT7']:
self.X7_axis = pt_coordinates['PT9'][0]['X']
self.Y7_axis = pt_coordinates['PT9'][1]['Y']
self.Z7_axis = pt_coordinates['PT9'][2]['Z']
else:
raise UserError('PT7点未测或数据错误')
if 'PT8' in pt_coordinates and pt_coordinates['PT8']:
self.X8_axis = pt_coordinates['PT10'][0]['X']
self.Y8_axis = pt_coordinates['PT10'][1]['Y']
self.Z8_axis = pt_coordinates['PT10'][2]['Z']
else:
raise UserError('PT8点未测或数据错误')
if 'PT9' in pt_coordinates and pt_coordinates['PT9']:
self.X9_axis = pt_coordinates['PT1'][0]['X']
self.Y9_axis = pt_coordinates['PT1'][1]['Y']
self.Z9_axis = pt_coordinates['PT1'][2]['Z']
else:
raise UserError('PT9点未测或数据错误')
if 'PT10' in pt_coordinates and pt_coordinates['PT10']:
self.X10_axis = pt_coordinates['PT2'][0]['X']
self.Y10_axis = pt_coordinates['PT2'][1]['Y']
self.Z10_axis = pt_coordinates['PT2'][2]['Z']
else:
raise UserError('PT10点未测或数据错误')
self.data_state = True try:
self.getcenter() # 使用临时文件进行解析
parser = etree.XMLParser(recover=True)
tree = etree.parse(temp_file_path, parser)
logging.info('tree:%s' % tree)
root = tree.getroot()
logging.info('root:%s' % root)
return True
# 准备一个外部字典来存储以PT为键的坐标字典
pt_coordinates = {}
# 遍历每个工作表和行
for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'):
sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name')
logging.info('sheet_name:%s' % sheet_name)
if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表
current_pt = None
for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'):
cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell'))
for i, cell in enumerate(cells):
data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data')
if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空
# 检查是否是PT标识
logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据
if "PT" in data_cell.text:
current_pt = data_cell.text
pt_coordinates[current_pt] = []
elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None:
# 确保当前单元格后面还有单元格存在,以获取理论值
if i + 1 < len(cells):
next_cell = cells[i + 1]
theory_value = next_cell.find(
'.//{urn:schemas-microsoft-com:office:spreadsheet}Data')
if theory_value is not None:
# 为当前PT键添加坐标数据
pt_coordinates[current_pt].append({
data_cell.text: float(theory_value.text)
})
logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}")
logging.info('pt_coordinates=====%s' % pt_coordinates)
# pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}],
# 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]}
# 检查是否存在PT1等键
if 'PT1' in pt_coordinates and pt_coordinates['PT1']:
self.X1_axis = pt_coordinates['PT3'][0]['X']
self.Y1_axis = pt_coordinates['PT3'][1]['Y']
self.Z1_axis = pt_coordinates['PT3'][2]['Z']
else:
raise UserError('PT1点未测或数据错误')
if 'PT2' in pt_coordinates and pt_coordinates['PT2']:
self.X2_axis = pt_coordinates['PT4'][0]['X']
self.Y2_axis = pt_coordinates['PT4'][1]['Y']
self.Z2_axis = pt_coordinates['PT4'][2]['Z']
else:
raise UserError('PT2点未测或数据错误')
if 'PT3' in pt_coordinates and pt_coordinates['PT3']:
self.X3_axis = pt_coordinates['PT5'][0]['X']
self.Y3_axis = pt_coordinates['PT5'][1]['Y']
self.Z3_axis = pt_coordinates['PT5'][2]['Z']
else:
raise UserError('PT3点未测或数据错误')
if 'PT4' in pt_coordinates and pt_coordinates['PT4']:
self.X4_axis = pt_coordinates['PT6'][0]['X']
self.Y4_axis = pt_coordinates['PT6'][1]['Y']
self.Z4_axis = pt_coordinates['PT6'][2]['Z']
else:
raise UserError('PT4点未测或数据错误')
if 'PT5' in pt_coordinates and pt_coordinates['PT5']:
self.X5_axis = pt_coordinates['PT7'][0]['X']
self.Y5_axis = pt_coordinates['PT7'][1]['Y']
self.Z5_axis = pt_coordinates['PT7'][2]['Z']
else:
raise UserError('PT5点未测或数据错误')
if 'PT6' in pt_coordinates and pt_coordinates['PT6']:
self.X6_axis = pt_coordinates['PT8'][0]['X']
self.Y6_axis = pt_coordinates['PT8'][1]['Y']
self.Z6_axis = pt_coordinates['PT8'][2]['Z']
else:
raise UserError('PT6点未测或数据错误')
if 'PT7' in pt_coordinates and pt_coordinates['PT7']:
self.X7_axis = pt_coordinates['PT9'][0]['X']
self.Y7_axis = pt_coordinates['PT9'][1]['Y']
self.Z7_axis = pt_coordinates['PT9'][2]['Z']
else:
raise UserError('PT7点未测或数据错误')
if 'PT8' in pt_coordinates and pt_coordinates['PT8']:
self.X8_axis = pt_coordinates['PT10'][0]['X']
self.Y8_axis = pt_coordinates['PT10'][1]['Y']
self.Z8_axis = pt_coordinates['PT10'][2]['Z']
else:
raise UserError('PT8点未测或数据错误')
if 'PT9' in pt_coordinates and pt_coordinates['PT9']:
self.X9_axis = pt_coordinates['PT1'][0]['X']
self.Y9_axis = pt_coordinates['PT1'][1]['Y']
self.Z9_axis = pt_coordinates['PT1'][2]['Z']
else:
raise UserError('PT9点未测或数据错误')
if 'PT10' in pt_coordinates and pt_coordinates['PT10']:
self.X10_axis = pt_coordinates['PT2'][0]['X']
self.Y10_axis = pt_coordinates['PT2'][1]['Y']
self.Z10_axis = pt_coordinates['PT2'][2]['Z']
else:
raise UserError('PT10点未测或数据错误')
self.data_state = True
self.getcenter()
return True
except Exception as e:
logging.error('解析文件失败: %s' % str(e))
raise UserError(f'解析文件失败: {str(e)}')
finally:
# 清理临时文件
try:
os.unlink(temp_file_path)
except Exception as e:
logging.warning(f'清理临时文件失败: {str(e)}')
# ftp.download_file('three_check_datas.xls', '/home/ftpuser/three_check_datas.xls') # ftp.download_file('three_check_datas.xls', '/home/ftpuser/three_check_datas.xls')
# ftp.close() # ftp.close()
# data = xlrd.open_workbook('/home/ftpuser/three_check_datas.xls') # data = xlrd.open_workbook('/home/ftpuser/three_check_datas.xls')