# -*- coding: utf-8 -*- import logging import os import json import base64 from qrcode.main import QRCode import PyPDF2 from odoo import http, fields, models from odoo.http import request from odoo.addons.sf_base.controllers.controllers import MultiInheritController import qrcode class Sf_Mrs_Connect(http.Controller, MultiInheritController): @http.route('/api/cnc_processing/create', type='json', auth='sf_token', methods=['GET', 'POST'], csrf=False, cors="*") def get_cnc_processing_create(self, **kw): """ 获取mrs下发的编程单 :param kw: :return: """ logging.info('get_cnc_processing_create:%s' % kw) try: res = {'status': 1, 'message': '成功'} datas = request.httprequest.data model_id = None ret = json.loads(datas) ret = json.loads(ret['result']) logging.info('下发编程单:%s' % ret) domain = [('programming_no', '=', ret['programming_no'])] if ret['manufacturing_type'] in ('scrap', 'invalid_tool_rework', 'rework'): domain += [('state', 'not in', ['done', 'scrap', 'cancel'])] else: domain += [('state', 'in', ['confirmed', 'pending_cam', 'progress'])] productions = request.env['mrp.production'].with_user( request.env.ref("base.user_admin")).search(domain) productions_technology_to_confirmed = request.env['mrp.production'].with_user( request.env.ref("base.user_admin")).search( [('programming_no', '=', ret['programming_no']), ('state', 'in', ['technology_to_confirmed'])]) if productions_technology_to_confirmed: res = {'status': -2, 'message': '查询到待工艺确认的制造订单'} return json.JSONEncoder().encode(res) if productions: # 拉取所有加工面的程序文件 for r in ret['processing_panel'].split(','): program_path_tmp_r = os.path.join('/tmp', ret['folder_name'], 'return', r) if os.path.exists(program_path_tmp_r): files_r = os.listdir(program_path_tmp_r) if files_r: for file_name in files_r: file_path = os.path.join(program_path_tmp_r, file_name) os.remove(file_path) download_state = request.env['sf.cnc.processing'].with_user( request.env.ref("base.user_admin")).download_file_tmp( ret['folder_name'], r) if download_state is False: res['status'] = -2 res['message'] = '编程单号为%s的CNC程序文件从FTP拉取失败' % (ret['programming_no']) return json.JSONEncoder().encode(res) for production in productions: model_id = production.product_id.model_id # 一个编程单的制造订单对应同一个模型 production.write({'programming_state': '已编程', 'work_state': '已编程', 'is_rework': False}) for panel in ret['processing_panel'].split(','): # 查询状态为进行中且工序类型为CNC加工的工单 cnc_workorder_has = production.workorder_ids.filtered( lambda ach: ach.routing_type in ['CNC加工', '人工线下加工'] and ach.state not in ['progress', 'done', 'rework', 'cancel'] and ach.processing_panel == panel) if cnc_workorder_has: if cnc_workorder_has.cnc_ids: cnc_workorder_has.cmm_ids.sudo().unlink() cnc_workorder_has.cnc_ids.sudo().unlink() # request.env['sf.cam.work.order.program.knife.plan'].sudo().unlink_cam_plan( # production) cnc_workorder_has.write( {'cnc_ids': cnc_workorder_has.cnc_ids.sudo()._json_cnc_processing(panel, ret), 'cmm_ids': cnc_workorder_has.cmm_ids.sudo()._json_cmm_program(panel, ret)}) for panel in ret['processing_panel'].split(','): # 查询状态为进行中且工序类型为CNC加工的工单 cnc_workorder = productions.workorder_ids.filtered( lambda ac: ac.routing_type in ['CNC加工', '人工线下加工'] and ac.state not in ['progress', 'done', 'rework' 'cancel'] and ac.processing_panel == panel) if cnc_workorder: # program_path_tmp_panel = os.path.join('C://Users//43484//Desktop//fsdownload//test', # panel) program_path_tmp_panel = os.path.join('/tmp', ret['folder_name'], 'return', panel) files_panel = os.listdir(program_path_tmp_panel) panel_file_path = '' if files_panel: for file in files_panel: file_extension = os.path.splitext(file)[1] if file_extension.lower() == '.pdf': panel_file_path = os.path.join(program_path_tmp_panel, file) logging.info('panel_file_path:%s' % panel_file_path) cnc_workorder.write({'cnc_worksheet': base64.b64encode(open(panel_file_path, 'rb').read())}) pre_workorder = productions.workorder_ids.filtered( lambda ap: ap.routing_type in ['装夹预调', '人工线下加工'] and ap.state not in ['done', 'rework' 'cancel'] and ap.processing_panel == panel) if pre_workorder: self._add_qr_code_to_pdf(panel_file_path, model_id) pre_workorder.write( {'processing_drawing': base64.b64encode(open(panel_file_path, 'rb').read())}) productions.write({'programming_state': '已编程', 'work_state': '已编程'}) productions.filtered(lambda p: p.production_type == '人工线下加工').write({'manual_quotation': True}) logging.info('已更新制造订单编程状态:%s' % productions.ids) # 对制造订单所有面的cnc工单的程序用刀进行校验 try: logging.info(f'已更新制造订单:{productions}') re_tool_chekout = False productions_temp = productions.filtered(lambda p: p.production_type == '自动化产线加工') re_tool_chekout = productions_temp.production_cnc_tool_checkout() if re_tool_chekout: return json.JSONEncoder().encode({'status': -3, 'message': '对cnc工单的程序用刀进行校验失败'}) except Exception as e: res = {'status': -3, 'message': '对cnc工单的程序用刀进行校验报错'} logging.info(f'对cnc工单的程序用刀进行校验报错:{e}') return json.JSONEncoder().encode(res) productions_reprogram = '' if productions: productions_reprogram = ','.join(productions.mapped('name')) # 更新编程记录 correct_record_ids_obj = None correct_production_id = None # rework_record_ids_obj = None # rework_production_id = None # scrap_record_ids_obj = None # scrap_production_id = None for production in productions: logging.info('production====:%s' % production.name) record_ids_obj = production.programming_record_ids.filtered( lambda r: r.current_programming_count == ret['reprogramming_num']) logging.info('record_ids_obj====:%s' % record_ids_obj) if record_ids_obj: logging.info('record_ids_obj.reason====:%s' % record_ids_obj.reason) record_ids_obj.write( {'send_time': ret['send_time'], 'target_production_id': productions_reprogram}) logging.info('已更新编程记录:%s' % record_ids_obj) correct_record_ids_obj = record_ids_obj correct_production_id = production.id if ret['reprogramming_num'] == 0: logging.info('首次下发') production.programming_record_ids.create({ 'number': 1, 'production_id': production.id, 'reason': '首次下发', 'programming_method': ret['programme_way'], 'current_programming_count': ret['reprogramming_num'], 'target_production_id': productions_reprogram, 'apply_time': False, 'send_time': ret['send_time'], }) logging.info('已创建首次下发的编程记录:%s' % production.name) elif ret['reset_flag']: logging.info('重置状态') production.programming_record_ids.create({ 'number': len(production.programming_record_ids) + 1, 'production_id': production.id, 'reason': '重置状态', 'programming_method': ret['programme_way'], 'current_programming_count': ret['reprogramming_num'], 'target_production_id': productions_reprogram, 'apply_time': False, 'send_time': ret['send_time'], }) logging.info('已创建重置状态的编程记录:%s' % production.name) elif ret['manufacturing_type'] == 'rework': logging.info('返工') rework_record_ids_obj = production.programming_record_ids.create({ 'number': len(production.programming_record_ids) + 1, 'production_id': production.id, 'reason': '返工', 'programming_method': ret['programme_way'], 'current_programming_count': ret['reprogramming_num'], 'target_production_id': productions_reprogram, 'apply_time': ret['trigger_time'], 'send_time': ret['send_time'], }) logging.info('已创建返工的编程记录:%s' % production.name) logging.info('rework_record_ids_obj====:%s' % rework_record_ids_obj) # rework_production_id = production.id # logging.info('rework_production_id====:%s' % rework_production_id) elif ret['manufacturing_type'] == 'scrap': production.programming_record_ids.create({ 'number': len(production.programming_record_ids) + 1, 'production_id': production.id, 'reason': '报废', 'programming_method': ret['programme_way'], 'current_programming_count': ret['reprogramming_num'], 'target_production_id': productions_reprogram, 'apply_time': ret['trigger_time'], 'send_time': ret['send_time'], }) elif ret['manufacturing_type'] == 'invalid_tool_rework': logging.info('无效功能刀具') production.programming_record_ids.create({ 'number': len(production.programming_record_ids) + 1, 'production_id': production.id, 'reason': '无效功能刀具', 'programming_method': ret['programme_way'], 'current_programming_count': ret['reprogramming_num'], 'target_production_id': productions_reprogram, 'apply_time': ret['trigger_time'], 'send_time': ret['send_time'], }) logging.info('已创建无效功能刀具的编程记录:%s' % production.name) elif ret['reprogramming_reason']: production.programming_record_ids.create({ 'number': len(production.programming_record_ids) + 1, 'production_id': production.id, 'reason': ret['reprogramming_reason'], 'programming_method': ret['programme_way'], 'current_programming_count': ret['reprogramming_num'], 'target_production_id': productions_reprogram, 'apply_time': ret['trigger_time'], 'send_time': ret['send_time'], }) else: logging.info('无对应状态,不需更新编程记录') for production in productions: logging.info('production====:%s' % production.name) if correct_record_ids_obj: logging.info('correct_record_ids_obj====:%s' % correct_record_ids_obj) if production.id == correct_production_id: logging.info('跳过正确的制造订单') continue else: logging.info('创建正确的制造订单的编程记录') production.programming_record_ids.create({ 'number': len(production.programming_record_ids) + 1, 'production_id': production.id, 'reason': correct_record_ids_obj.reason, 'programming_method': correct_record_ids_obj.programming_method, 'current_programming_count': correct_record_ids_obj.current_programming_count, 'target_production_id': correct_record_ids_obj.target_production_id, 'apply_time': correct_record_ids_obj.apply_time, 'send_time': correct_record_ids_obj.send_time, }) logging.info('已创建正确的制造订单的编程记录:%s' % production.name) # if rework_record_ids_obj: # logging.info('rework_record_ids_obj====:%s' % rework_record_ids_obj) # if production.id == rework_production_id: # continue # else: # logging.info('创建返工的制造订单的编程记录') # production.programming_record_ids.create({ # 'number': len(production.programming_record_ids) + 1, # 'production_id': production.id, # 'reason': rework_record_ids_obj.reason, # 'programming_method': rework_record_ids_obj.programming_method, # 'current_programming_count': rework_record_ids_obj.current_programming_count, # 'target_production_id': rework_record_ids_obj.target_production_id, # 'apply_time': rework_record_ids_obj.apply_time, # 'send_time': rework_record_ids_obj.send_time, # }) # logging.info('已创建返工的制造订单的编程记录:%s' % production.name) res.update({ 'production_ids': productions.ids }) return json.JSONEncoder().encode(res) else: res = {'status': 0, 'message': '没有查询到该制造订单'} return json.JSONEncoder().encode(res) except Exception as e: res = {'status': -1, 'message': '系统解析失败'} request.cr.rollback() logging.info('get_cnc_processing_create error:%s' % e) return json.JSONEncoder().encode(res) def _add_qr_code_to_pdf(self, panel_file_path, model_id): if not os.path.exists(panel_file_path): logging.warning(f'文件不存在: {panel_file_path}') return False # 生成二维码 qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(str(model_id)) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white") # 保存二维码为临时文件 qr_temp_path = '/tmp/qr_temp.png' qr_img.save(qr_temp_path) # 创建一个临时PDF文件路径 output_temp_path = '/tmp/output_temp.pdf' # 使用reportlab创建一个新的PDF from reportlab.pdfgen import canvas from reportlab.lib.units import inch from PyPDF2 import PdfFileReader, PdfFileWriter # 读取原始PDF existing_pdf = PdfFileReader(open(panel_file_path, "rb")) output = PdfFileWriter() # 处理第一页 page = existing_pdf.getPage(0) # 获取页面尺寸 page_width = float(page.mediaBox.getWidth()) page_height = float(page.mediaBox.getHeight()) # 创建一个新的PDF页面用于放置二维码 c = canvas.Canvas(output_temp_path, pagesize=(page_width, page_height)) # 在右下角绘制二维码,预留边距 qr_size = 1 * inch # 二维码大小为1英寸 margin = 0.4 * inch # 边距为0.4英寸 c.drawImage(qr_temp_path, page_width - qr_size - margin, margin, width=qr_size, height=qr_size) c.save() # 读取带有二维码的临时PDF qr_pdf = PdfFileReader(open(output_temp_path, "rb")) qr_page = qr_pdf.getPage(0) # 合并原始页面和二维码页面 page.mergePage(qr_page) output.addPage(page) # 添加剩余的页面 for i in range(1, existing_pdf.getNumPages()): output.addPage(existing_pdf.getPage(i)) # 保存最终的PDF with open(panel_file_path, "wb") as output_file: output.write(output_file) # 清理临时文件 qr_pdf.close() os.remove(qr_temp_path) os.remove(output_temp_path) return True