diff --git a/sf_manufacturing/__manifest__.py b/sf_manufacturing/__manifest__.py index 855c1990..34237994 100644 --- a/sf_manufacturing/__manifest__.py +++ b/sf_manufacturing/__manifest__.py @@ -20,6 +20,7 @@ 'data/sf_work_individuation_page.xml', 'data/agv_scheduling_data.xml', 'data/product_data.xml', + 'data/automation_folder_data.xml', 'security/group_security.xml', 'security/ir.model.access.csv', 'wizard/workpiece_delivery_views.xml', diff --git a/sf_manufacturing/controllers/controllers.py b/sf_manufacturing/controllers/controllers.py index 70b0b87b..a794d0e8 100644 --- a/sf_manufacturing/controllers/controllers.py +++ b/sf_manufacturing/controllers/controllers.py @@ -2,16 +2,18 @@ import logging import json from datetime import datetime +import base64 from odoo.addons.sf_manufacturing.models.agv_scheduling import RepeatTaskException from odoo import http from odoo.http import request +from odoo.exceptions import MissingError from odoo.addons.sf_base.decorators.api_log import api_log class Manufacturing_Connect(http.Controller): - @http.route('/AutoDeviceApi/GetWoInfo', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, + @http.route('/AutoDeviceApi/GetWoInfo', type='json', auth='none', methods=['GET', 'POST'], csrf=False, cors="*") @api_log('获取工单', requester='中控系统') def get_Work_Info(self, **kw): @@ -54,7 +56,7 @@ class Manufacturing_Connect(http.Controller): logging.info('get_Work_Info error:%s' % e) return json.JSONEncoder().encode(res) - @http.route('/AutoDeviceApi/GetShiftPlan', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, + @http.route('/AutoDeviceApi/GetShiftPlan', type='json', auth='none', methods=['GET', 'POST'], csrf=False, cors="*") @api_log('获取日计划', requester='中控系统') def get_ShiftPlan(self, **kw): @@ -108,7 +110,7 @@ class Manufacturing_Connect(http.Controller): logging.info('get_ShiftPlan error:%s' % e) return json.JSONEncoder().encode(res) - @http.route('/AutoDeviceApi/QcCheck', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, + @http.route('/AutoDeviceApi/QcCheck', type='json', auth='none', methods=['GET', 'POST'], csrf=False, cors="*") @api_log('工件预调(前置三元检测)', requester='中控系统') def get_qcCheck(self, **kw): @@ -704,3 +706,59 @@ class Manufacturing_Connect(http.Controller): res = {'Succeed': False, 'ErrorCode': 202, 'Error': str(e)} logging.info('AGVDownProduct error:%s' % e) return json.JSONEncoder().encode(res) + + @http.route('/api/upload_three_check_data', type='http', auth='public', methods=['POST'], csrf=False, cros='*') + def upload_three_check_data(self): + + res = {'Succeed': True, 'ErrorCode': 200, 'Messages': '上传成功'} + uploaded_files = request.httprequest.files.getlist('file') + if uploaded_files: + + try: + for uploaded_file in uploaded_files: + file_content = uploaded_file.read() + file_name = uploaded_file.filename + + production_name = '/'.join(file_name.split('_')[:-1]) + processing_panel = file_name.split('_')[-1].split('.')[0] + # 找到对应的工单 + production_id = request.env['mrp.production'].sudo().search([('name', '=', production_name)]) + wo = production_id.workorder_ids.filtered(lambda wo: wo.processing_panel == processing_panel and wo.routing_type == '装夹预调') + + if not wo: + raise MissingError('工单不存在') + + folder_id = request.env.ref('sf_manufacturing.documents_pre_three_element_detection_folder') + + document = request.env['documents.document'].sudo().search([('res_model', '=', 'mrp.workorder'), ('res_id', '=', wo.id)]) + if document and document.attachment_id: + attachment = request.env['ir.attachment'].sudo().create({ + 'name': file_name, + 'type': 'binary', + 'datas': base64.b64encode(file_content), + 'res_model': 'mrp.workorder', + 'res_id': wo.id, + }) + document.write({'attachment_id': attachment.id}) + else: + # Create ir.attachment record + attachment = request.env['ir.attachment'].sudo().create({ + 'name': file_name, + 'type': 'binary', + 'datas': base64.b64encode(file_content), + 'res_model': 'mrp.workorder', + 'res_id': wo.id, + }) + + # 创建 documents.document 记录 + request.env['documents.document'].sudo().create({ + 'name': file_name, + 'attachment_id': attachment.id, + 'folder_id': folder_id.id, + 'res_model': 'mrp.workorder', + 'res_id': wo.id, + }) + except Exception as e: + res = {'Succeed': False, 'ErrorCode': 202, 'Error': str(e)} + + return json.JSONEncoder().encode(res) \ No newline at end of file diff --git a/sf_manufacturing/data/automation_folder_data.xml b/sf_manufacturing/data/automation_folder_data.xml new file mode 100644 index 00000000..450632ba --- /dev/null +++ b/sf_manufacturing/data/automation_folder_data.xml @@ -0,0 +1,16 @@ + + + + + + 自动化 + 存放自动化生产流程相关文件 + 20 + + + 前置三元检测报告 + + 1 + + + \ No newline at end of file diff --git a/sf_manufacturing/models/mrp_workorder.py b/sf_manufacturing/models/mrp_workorder.py index 35cd2234..3b0d761d 100644 --- a/sf_manufacturing/models/mrp_workorder.py +++ b/sf_manufacturing/models/mrp_workorder.py @@ -6,6 +6,7 @@ import urllib.parse from datetime import date from datetime import datetime, timedelta import requests +import tempfile import os import math from lxml import etree @@ -771,146 +772,168 @@ class ResMrpWorkOrder(models.Model): # 获取三次元检测点数据 def get_three_check_datas(self): ftp_resconfig = self.env['res.config.settings'].get_values() - ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), - ftp_resconfig['ftp_user'], - ftp_resconfig['ftp_password']) + # ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), + # ftp_resconfig['ftp_user'], + # ftp_resconfig['ftp_password']) - local_dir_path = '/ftp/before' - os.makedirs(local_dir_path, exist_ok=True) + # local_dir_path = '/ftp/before' + # os.makedirs(local_dir_path, exist_ok=True) local_filename = self.save_name + '.xls' - local_file_path = os.path.join(local_dir_path, local_filename) - logging.info('local_file_path:%s' % local_file_path) - # remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename - remote_path = '/ThreeTest/XT/Before/' + local_filename - logging.info('remote_path:%s' % remote_path) - is_get_detection_file = self.env['ir.config_parameter'].sudo().get_param('is_get_detection_file') - if not is_get_detection_file: - paload_data = { - "filename": local_filename - } - if not ftp_resconfig['get_check_file_path']: - raise UserError('请先配置获取检测报告地址') - url = ftp_resconfig['get_check_file_path'] + '/get/check/report' - response = requests.post(url, json=paload_data) - logging.info('response:%s' % response.json()) - if response.json().get('detail'): - raise UserError(response.json().get('detail')) + # local_file_path = os.path.join(local_dir_path, local_filename) + # logging.info('local_file_path:%s' % local_file_path) + # # remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename + # remote_path = '/ThreeTest/XT/Before/' + local_filename + # logging.info('remote_path:%s' % remote_path) + # is_get_detection_file = self.env['ir.config_parameter'].sudo().get_param('is_get_detection_file') + # if not is_get_detection_file: + base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') + paload_data = { + "filename": local_filename, + "sf_host": base_url + } + if not ftp_resconfig['get_check_file_path']: + raise UserError('请先配置获取检测报告地址') + url = 'http://127.0.0.1:8999' + '/get/check/report' + response = requests.post(url, json=paload_data) + # logging.info('response:%s' % response.json()) + # if response.json().get('detail'): + # raise UserError(response.json().get('detail')) - if not ftp.file_exists(remote_path): - raise UserError(f"文件不存在: {remote_path}") + # if not ftp.file_exists(remote_path): + # raise UserError(f"文件不存在: {remote_path}") - with open(local_file_path, 'wb') as local_file: - ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write) - logging.info('下载文件成功') - # 解析本地文件 - # file_path = 'WH_MO_00099.xls' # 使用下载的实际文件路径 - parser = etree.XMLParser(recover=True) # Using recover to handle errors - tree = etree.parse(local_file_path, parser) - logging.info('tree:%s' % tree) - root = tree.getroot() - logging.info('root:%s' % root) + # with open(local_file_path, 'wb') as local_file: + # ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write) + # logging.info('下载文件成功') + + document = self.env['documents.document'].sudo().search([('res_model', '=', 'mrp.workorder'), ('res_id', '=', self.id)]) + binary_data = base64.b64decode(document.attachment_id.datas) - # 准备一个外部字典来存储以PT为键的坐标字典 - pt_coordinates = {} - # 遍历每个工作表和行 - for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'): - sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name') - logging.info('sheet_name:%s' % sheet_name) - if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表 - current_pt = None - for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'): - cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell')) - for i, cell in enumerate(cells): - data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data') - if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空 - # 检查是否是PT标识 - logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据 - if "PT" in data_cell.text: - current_pt = data_cell.text - pt_coordinates[current_pt] = [] - elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None: - # 确保当前单元格后面还有单元格存在,以获取理论值 - if i + 1 < len(cells): - next_cell = cells[i + 1] - theory_value = next_cell.find( - './/{urn:schemas-microsoft-com:office:spreadsheet}Data') - if theory_value is not None: - # 为当前PT键添加坐标数据 - pt_coordinates[current_pt].append({ - data_cell.text: float(theory_value.text) - }) - logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}") - logging.info('pt_coordinates=====%s' % pt_coordinates) - # pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}], - # 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]} + # 创建临时文件保存响应内容 + with tempfile.NamedTemporaryFile(delete=False, suffix='.xls') as temp_file: + temp_file.write(binary_data) + temp_file_path = temp_file.name + + try: + # 使用临时文件进行解析 + parser = etree.XMLParser(recover=True) + tree = etree.parse(temp_file_path, parser) + logging.info('tree:%s' % tree) + root = tree.getroot() + logging.info('root:%s' % root) + - # 检查是否存在PT1等键 - if 'PT1' in pt_coordinates and pt_coordinates['PT1']: - self.X1_axis = pt_coordinates['PT3'][0]['X'] - self.Y1_axis = pt_coordinates['PT3'][1]['Y'] - self.Z1_axis = pt_coordinates['PT3'][2]['Z'] - else: - raise UserError('PT1点未测或数据错误') - if 'PT2' in pt_coordinates and pt_coordinates['PT2']: - self.X2_axis = pt_coordinates['PT4'][0]['X'] - self.Y2_axis = pt_coordinates['PT4'][1]['Y'] - self.Z2_axis = pt_coordinates['PT4'][2]['Z'] - else: - raise UserError('PT2点未测或数据错误') - if 'PT3' in pt_coordinates and pt_coordinates['PT3']: - self.X3_axis = pt_coordinates['PT5'][0]['X'] - self.Y3_axis = pt_coordinates['PT5'][1]['Y'] - self.Z3_axis = pt_coordinates['PT5'][2]['Z'] - else: - raise UserError('PT3点未测或数据错误') - if 'PT4' in pt_coordinates and pt_coordinates['PT4']: - self.X4_axis = pt_coordinates['PT6'][0]['X'] - self.Y4_axis = pt_coordinates['PT6'][1]['Y'] - self.Z4_axis = pt_coordinates['PT6'][2]['Z'] - else: - raise UserError('PT4点未测或数据错误') - if 'PT5' in pt_coordinates and pt_coordinates['PT5']: - self.X5_axis = pt_coordinates['PT7'][0]['X'] - self.Y5_axis = pt_coordinates['PT7'][1]['Y'] - self.Z5_axis = pt_coordinates['PT7'][2]['Z'] - else: - raise UserError('PT5点未测或数据错误') - if 'PT6' in pt_coordinates and pt_coordinates['PT6']: - self.X6_axis = pt_coordinates['PT8'][0]['X'] - self.Y6_axis = pt_coordinates['PT8'][1]['Y'] - self.Z6_axis = pt_coordinates['PT8'][2]['Z'] - else: - raise UserError('PT6点未测或数据错误') - if 'PT7' in pt_coordinates and pt_coordinates['PT7']: - self.X7_axis = pt_coordinates['PT9'][0]['X'] - self.Y7_axis = pt_coordinates['PT9'][1]['Y'] - self.Z7_axis = pt_coordinates['PT9'][2]['Z'] - else: - raise UserError('PT7点未测或数据错误') - if 'PT8' in pt_coordinates and pt_coordinates['PT8']: - self.X8_axis = pt_coordinates['PT10'][0]['X'] - self.Y8_axis = pt_coordinates['PT10'][1]['Y'] - self.Z8_axis = pt_coordinates['PT10'][2]['Z'] - else: - raise UserError('PT8点未测或数据错误') - if 'PT9' in pt_coordinates and pt_coordinates['PT9']: - self.X9_axis = pt_coordinates['PT1'][0]['X'] - self.Y9_axis = pt_coordinates['PT1'][1]['Y'] - self.Z9_axis = pt_coordinates['PT1'][2]['Z'] - else: - raise UserError('PT9点未测或数据错误') - if 'PT10' in pt_coordinates and pt_coordinates['PT10']: - self.X10_axis = pt_coordinates['PT2'][0]['X'] - self.Y10_axis = pt_coordinates['PT2'][1]['Y'] - self.Z10_axis = pt_coordinates['PT2'][2]['Z'] - else: - raise UserError('PT10点未测或数据错误') + # 准备一个外部字典来存储以PT为键的坐标字典 + pt_coordinates = {} + # 遍历每个工作表和行 + for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'): + sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name') + logging.info('sheet_name:%s' % sheet_name) + if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表 + current_pt = None + for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'): + cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell')) + for i, cell in enumerate(cells): + data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data') + if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空 + # 检查是否是PT标识 + logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据 + if "PT" in data_cell.text: + current_pt = data_cell.text + pt_coordinates[current_pt] = [] + elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None: + # 确保当前单元格后面还有单元格存在,以获取理论值 + if i + 1 < len(cells): + next_cell = cells[i + 1] + theory_value = next_cell.find( + './/{urn:schemas-microsoft-com:office:spreadsheet}Data') + if theory_value is not None: + # 为当前PT键添加坐标数据 + pt_coordinates[current_pt].append({ + data_cell.text: float(theory_value.text) + }) + logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}") + logging.info('pt_coordinates=====%s' % pt_coordinates) + # pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}], + # 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]} - self.data_state = True - self.getcenter() + # 检查是否存在PT1等键 + if 'PT1' in pt_coordinates and pt_coordinates['PT1']: + self.X1_axis = pt_coordinates['PT3'][0]['X'] + self.Y1_axis = pt_coordinates['PT3'][1]['Y'] + self.Z1_axis = pt_coordinates['PT3'][2]['Z'] + else: + raise UserError('PT1点未测或数据错误') + if 'PT2' in pt_coordinates and pt_coordinates['PT2']: + self.X2_axis = pt_coordinates['PT4'][0]['X'] + self.Y2_axis = pt_coordinates['PT4'][1]['Y'] + self.Z2_axis = pt_coordinates['PT4'][2]['Z'] + else: + raise UserError('PT2点未测或数据错误') + if 'PT3' in pt_coordinates and pt_coordinates['PT3']: + self.X3_axis = pt_coordinates['PT5'][0]['X'] + self.Y3_axis = pt_coordinates['PT5'][1]['Y'] + self.Z3_axis = pt_coordinates['PT5'][2]['Z'] + else: + raise UserError('PT3点未测或数据错误') + if 'PT4' in pt_coordinates and pt_coordinates['PT4']: + self.X4_axis = pt_coordinates['PT6'][0]['X'] + self.Y4_axis = pt_coordinates['PT6'][1]['Y'] + self.Z4_axis = pt_coordinates['PT6'][2]['Z'] + else: + raise UserError('PT4点未测或数据错误') + if 'PT5' in pt_coordinates and pt_coordinates['PT5']: + self.X5_axis = pt_coordinates['PT7'][0]['X'] + self.Y5_axis = pt_coordinates['PT7'][1]['Y'] + self.Z5_axis = pt_coordinates['PT7'][2]['Z'] + else: + raise UserError('PT5点未测或数据错误') + if 'PT6' in pt_coordinates and pt_coordinates['PT6']: + self.X6_axis = pt_coordinates['PT8'][0]['X'] + self.Y6_axis = pt_coordinates['PT8'][1]['Y'] + self.Z6_axis = pt_coordinates['PT8'][2]['Z'] + else: + raise UserError('PT6点未测或数据错误') + if 'PT7' in pt_coordinates and pt_coordinates['PT7']: + self.X7_axis = pt_coordinates['PT9'][0]['X'] + self.Y7_axis = pt_coordinates['PT9'][1]['Y'] + self.Z7_axis = pt_coordinates['PT9'][2]['Z'] + else: + raise UserError('PT7点未测或数据错误') + if 'PT8' in pt_coordinates and pt_coordinates['PT8']: + self.X8_axis = pt_coordinates['PT10'][0]['X'] + self.Y8_axis = pt_coordinates['PT10'][1]['Y'] + self.Z8_axis = pt_coordinates['PT10'][2]['Z'] + else: + raise UserError('PT8点未测或数据错误') + if 'PT9' in pt_coordinates and pt_coordinates['PT9']: + self.X9_axis = pt_coordinates['PT1'][0]['X'] + self.Y9_axis = pt_coordinates['PT1'][1]['Y'] + self.Z9_axis = pt_coordinates['PT1'][2]['Z'] + else: + raise UserError('PT9点未测或数据错误') + if 'PT10' in pt_coordinates and pt_coordinates['PT10']: + self.X10_axis = pt_coordinates['PT2'][0]['X'] + self.Y10_axis = pt_coordinates['PT2'][1]['Y'] + self.Z10_axis = pt_coordinates['PT2'][2]['Z'] + else: + raise UserError('PT10点未测或数据错误') - return True + self.data_state = True + self.getcenter() + return True + + except Exception as e: + logging.error('解析文件失败: %s' % str(e)) + raise UserError(f'解析文件失败: {str(e)}') + + finally: + # 清理临时文件 + try: + os.unlink(temp_file_path) + except Exception as e: + logging.warning(f'清理临时文件失败: {str(e)}') # ftp.download_file('three_check_datas.xls', '/home/ftpuser/three_check_datas.xls') # ftp.close() # data = xlrd.open_workbook('/home/ftpuser/three_check_datas.xls')