# -*- coding: utf-8 -*- import asyncio import base64 import cProfile import concurrent import datetime import io import logging import json import os import pstats import re import threading import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import requests from itertools import groupby from collections import defaultdict, namedtuple from odoo import api, fields, models, SUPERUSER_ID, _, tools from odoo.exceptions import UserError, ValidationError from odoo.addons.sf_base.commons.common import Common from odoo.tools import float_compare, float_round, float_is_zero, format_datetime class MrpProduction(models.Model): _inherit = 'mrp.production' _description = "制造订单" _order = 'create_date desc' sale_order_id = fields.Many2one('sale.order', string='销售订单', compute='_compute_sale_order_id', store=True) deadline_of_delivery = fields.Date('订单交期', tracking=True, compute='_compute_deadline_of_delivery', store=True) # tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘") maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests") request_ids = fields.One2many('maintenance.request', 'production_id') model_file = fields.Binary('模型文件', related='product_id.model_file') schedule_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')], string='排程状态', default='未排') work_order_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')], string='工单状态', default='未排') detection_result_ids = fields.One2many('sf.detection.result', 'production_id', '检测报告') tool_state = fields.Selection([('0', '正常'), ('1', '缺刀'), ('2', '无效刀')], string='功能刀具状态', default='0', store=True, compute='_compute_tool_state') tool_state_remark = fields.Text(string='功能刀具状态备注(缺刀)', compute='_compute_tool_state_remark', store=True) tool_state_remark2 = fields.Text(string='功能刀具状态备注(无效刀)', readonly=True) @api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id') def _compute_sale_order_id(self): for production in self: # 初始化 sale_order_id 为 False sale_order_id = False # 使用正则表达式查找产品名称中的 'S' 开头的字母数字字符串 match = re.search(r'S\d+', production.product_id.with_context(lang='zh_CN').name) # 从字符串开始匹配 if match: result = match.group(0) try: # 查找与匹配的字符串相符的销售订单 sale_order = self.env['sale.order'].search( [('name', '=', result)], limit=1, order='id asc' ) if sale_order: production.sale_order_id = sale_order.id else: logging.warning("No sale order found for production {} with product {} (name match: {})".format( production.id, production.product_id.name, result)) except Exception as e: logging.error("Error while fetching sale order for production {}: {}".format(production.id, str(e))) @api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id') def _compute_deadline_of_delivery(self): for production in self: # 确保 procurement_group_id 和相关字段存在 if production.procurement_group_id: # 获取相关的 sale_id sale_order_id = production.procurement_group_id.mrp_production_ids.mapped( 'move_dest_ids.group_id.sale_id') # 确保 sale_order_id 是有效的 ID 列表 if sale_order_id: # 获取 sale.order 记录 sale_id = self.env['sale.order'].sudo().browse(sale_order_id.ids) # 使用 mapped 返回的 ID 列表 # 处理 sale_id if sale_id: # 假设我们只需要第一个 sale_id production.deadline_of_delivery = sale_id[0].deadline_of_delivery if sale_id else False else: production.deadline_of_delivery = False else: production.deadline_of_delivery = False def _compute_default_delivery_status(self): try: if self.state == 'cancel': return False if not self.deadline_of_delivery: return False hours = self.get_hours_diff() if hours >= 48: return '正常' elif hours > 0 and hours < 48 and self.state != 'done': return '预警' elif hours > 0 and hours < 48 and self.state == 'done': return '正常' else: return '已逾期' except Exception as e: logging.error("Error processing production ID {}: {}".format(self.id, e)) raise e @api.depends('state', 'deadline_of_delivery') def _compute_delivery_status(self): for production in self: delivery_status = production._compute_default_delivery_status() if delivery_status and production.delivery_status != delivery_status: production.delivery_status = delivery_status delivery_status = fields.Selection([('正常', '正常'), ('预警', '预警'), ('已逾期', '已逾期')], string='交期状态', store=True, compute='_compute_delivery_status', default=lambda self: self._compute_default_delivery_status()) def get_page_all_records(self, model_name, func, domain, page_size=100): # 获取模型对象 model = self.env[model_name].sudo() # 初始化分页参数 page_number = 1 while True: # 计算偏移量 offset = (page_number - 1) * page_size # 获取当前页的数据 records = model.search(domain, limit=page_size, offset=offset) # 如果没有更多记录,退出循环 if not records: break # 将当前页的数据添加到结果列表 func(records) # 增加页码 page_number += 1 def run_compute_delivery_status(self, records): records._compute_delivery_status() def _corn_update_delivery_status(self): need_list = [ 'draft', 'technology_to_confirmed', 'confirmed', 'pending_cam', 'progress', 'rework', 'scrap', 'to_close', ] # previous_workorder = self.env['mrp.production'].search([('state', 'in', need_list)]) self.get_page_all_records('mrp.production', self.run_compute_delivery_status, [('state', 'in', need_list)], 100) def get_hours_diff(self): # 获取当前日期和时间 current_datetime = fields.Datetime.now() # 将 date_field 转换为 datetime 对象 if self.deadline_of_delivery: date_obj = fields.Date.from_string(self.deadline_of_delivery) # 将 date 对象转换为 datetime 对象,设置时间为 00:00:00 date_obj = datetime.datetime.combine(date_obj, datetime.time.min) # 计算两个日期之间的差值 delta = date_obj - current_datetime # 返回差值的小时数 return int(delta.total_seconds() / 3600) else: return 0.0 @api.depends('workorder_ids.tool_state_remark') def _compute_tool_state_remark(self): for item in self: if item.workorder_ids: workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel']) if workorder_ids.filtered(lambda a: a.tool_state == '1'): work_ids = workorder_ids.filtered(lambda a: a.tool_state == '1') tool_state_remark = '' for work_id in work_ids: if tool_state_remark == '': tool_state_remark = f'{work_id.tool_state_remark}' else: tool_state_remark = f"{tool_state_remark}\n{work_id.tool_state_remark}" item.tool_state_remark = tool_state_remark else: item.tool_state_remark = False @api.depends('workorder_ids.tool_state') def _compute_tool_state(self): for item in self: if item.workorder_ids: tool_state = item.tool_state workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel']) if workorder_ids.filtered(lambda a: a.tool_state == '2'): item.tool_state = '2' elif workorder_ids.filtered(lambda a: a.tool_state == '1'): item.tool_state = '1' else: item.tool_state = '0' if tool_state == '2' and item.tool_state != '2': item.detection_result_ids.filtered( lambda a: a.detailed_reason == '无效功能刀具' and a.handle_result == '待处理').write( {'handle_result': '已处理'}) # state = fields.Selection(selection_add=[ # ('pending_scheduling', '待排程'), # ('pending_processing', '待加工'), # ('completed', '已完工') # ]) state = fields.Selection([ ('draft', '草稿'), ('technology_to_confirmed', '待工艺确认'), ('confirmed', '待排程'), ('pending_cam', '待加工'), ('progress', '加工中'), ('rework', '返工'), ('scrap', '报废'), ('to_close', 'To Close'), ('done', 'Done'), ('cancel', '已取消')], string='State', compute='_compute_state', copy=False, index=True, readonly=True, store=True, tracking=True, help=" * Draft: The MO is not confirmed yet.\n" " * Confirmed: The MO is confirmed, the stock rules and the reordering of the components are trigerred.\n" " * In Progress: The production has started (on the MO or on the WO).\n" " * To Close: The production is done, the MO has to be closed.\n" " * Done: The MO is closed, the stock moves are posted. \n" " * Cancelled: The MO has been cancelled, can't be confirmed anymore.") check_status = fields.Boolean(string='启用状态', default=False, readonly=True) active = fields.Boolean(string='已归档', default=True) programming_no = fields.Char('编程单号') work_state = fields.Char('业务状态') programming_state = fields.Selection( [('编程中', '编程中'), ('已编程', '已编程'), ('已编程未下发', '已编程未下发'), ('已下发', '已下发')], string='编程状态', tracking=True) glb_file = fields.Binary("glb模型文件") production_line_id = fields.Many2one('sf.production.line', string='生产线', tracking=True) plan_start_processing_time = fields.Datetime('计划开始加工时间') is_rework = fields.Boolean(string='是否返工', default=False) # production_line_state = fields.Selection( # [('待上产线', '待上产线'), ('已上产线', '已上产线'), ('已下产线', '已下产线')], # string='上/下产线', default='待上产线', tracking=True) # 工序状态 # Todo 研究下用法 process_state = fields.Selection([ ('待装夹', '待装夹'), ('待检测', '待检测'), ('待加工', '待加工'), ('待解除装夹', '待解除装夹'), ('已完工', '已完工'), ], string='工序状态', default='待装夹') # 零件图号 part_number = fields.Char('零件图号', related='product_id.part_number', readonly=True) # 上传零件图纸 part_drawing = fields.Binary('零件图纸', related='product_id.machining_drawings', readonly=True) quality_standard = fields.Binary('质检标准', related='product_id.quality_standard', readonly=True) part_name = fields.Char(string='零件名称', related='product_id.part_name', readonly=True) @api.depends('product_id.manual_quotation') def _compute_manual_quotation(self): for item in self: item.manual_quotation = item.product_id.manual_quotation manual_quotation = fields.Boolean('人工编程', default=False, compute=_compute_manual_quotation, store=True) is_scrap = fields.Boolean('是否报废', default=False) is_remanufacture = fields.Boolean('是否重新制造', default=False) remanufacture_count = fields.Integer("重新制造订单数量", compute='_compute_remanufacture_production_ids') remanufacture_production_id = fields.Many2one('mrp.production', string='') technology_design_ids = fields.One2many('sf.technology.design', 'production_id', string='工艺设计') is_adjust = fields.Boolean('是否退回调整', default=False) @api.depends('remanufacture_production_id') def _compute_remanufacture_production_ids(self): for production in self: if production.remanufacture_production_id: remanufacture_production = self.env['mrp.production'].search( [('id', '=', production.remanufacture_production_id.id)]) if remanufacture_production: production.remanufacture_count = len(remanufacture_production) else: production.remanufacture_count = 0 def action_view_remanufacture_productions(self): self.ensure_one() mrp_production = self.env['mrp.production'].search( [('id', '=', self.remanufacture_production_id.id)]) action = { 'res_model': 'mrp.production', 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_id': mrp_production.id, } return action @api.depends( 'move_raw_ids.state', 'move_raw_ids.quantity_done', 'move_finished_ids.state', 'tool_state', 'workorder_ids.state', 'product_qty', 'qty_producing', 'schedule_state', 'programming_state', 'is_adjust') def _compute_state(self): for production in self: if not production.state or not production.product_uom_id: production.state = 'draft' elif production.state == 'cancel' or (production.move_finished_ids and all( move.state == 'cancel' for move in production.move_finished_ids)): production.state = 'cancel' elif production.workorder_ids and all( wo_state in ('done', 'cancel') for wo_state in production.workorder_ids.mapped('state')): production.state = 'to_close' elif not production.workorder_ids and float_compare(production.qty_producing, production.product_qty, precision_rounding=production.product_uom_id.rounding) >= 0: production.state = 'to_close' elif any(wo_state in ('progress', 'done') for wo_state in production.workorder_ids.mapped('state')): production.state = 'progress' elif production.product_uom_id and not float_is_zero(production.qty_producing, precision_rounding=production.product_uom_id.rounding): production.state = 'progress' elif any(not float_is_zero(move.quantity_done, precision_rounding=move.product_uom.rounding or move.product_id.uom_id.rounding) for move in production.move_raw_ids if move.product_id): production.state = 'progress' # 新添加的状态逻辑 if production.state in ['to_close', 'progress', 'technology_to_confirmed'] and production.schedule_state == '未排': if not production.workorder_ids or production.is_adjust is True: production.state = 'technology_to_confirmed' else: if production.is_adjust is True: production.state = 'technology_to_confirmed' else: production.state = 'confirmed' elif production.state == 'pending_cam' and production.schedule_state == '未排': production.state = 'confirmed' elif production.state == 'to_close' and production.schedule_state == '已排': production.state = 'pending_cam' elif production.state == 'confirmed' and production.is_adjust is True: production.state = 'technology_to_confirmed' if production.state == 'confirmed' and production.schedule_state == '已排': production.state = 'pending_cam' if (production.state == 'rework' and production.tool_state == '0' and production.schedule_state == '已排' and production.is_rework is False): production.state = 'pending_cam' if any((wo.test_results == '返工' and wo.state == 'done' and (production.programming_state in ['已编程'] or wo.individuation_page_PTD is True)) or (wo.is_rework is True and wo.state == 'done' and production.programming_state in ['编程中', '已编程']) for wo in production.workorder_ids) or production.is_rework is True: production.state = 'rework' if any(wo.test_results == '报废' and wo.state == 'done' for wo in production.workorder_ids): production.state = 'scrap' if any(dr.test_results == '报废' and dr.handle_result == '已处理' for dr in production.detection_result_ids): production.state = 'cancel' if production.workorder_ids and all( wo_state in ('done', 'rework', 'cancel') for wo_state in production.workorder_ids.mapped('state')): if production.state not in ['scrap', 'rework', 'cancel']: production.state = 'done' elif production.state == 'done': production.state = 'progress' # 退回调整 def technology_back_adjust(self): process_parameters = [] domain = [('state', '=', 'confirmed'), ('origin', '=', self.origin)] if self.production_type == '自动化产线加工': cloud_programming = self._cron_get_programming_state() if cloud_programming['send_state'] == 'sending': raise UserError(_("编程文件正在下发中,请稍后重试")) domain += [('programming_no', '=', self.programming_no)] # 带排程的制造订单 production_confirmed = self.env['mrp.production'].search(domain) for special in production_confirmed.technology_design_ids: if special.process_parameters_id: product_production_process = self.env['product.template'].search( [('server_product_process_parameters_id', '=', special.process_parameters_id.id)]) if not product_production_process: if special.process_parameters_id not in process_parameters: process_parameters.append(special.process_parameters_id.display_name) if process_parameters: raise UserError(_("【工艺设计】-【参数】为%s的在【产品】中不存在,请先创建", ", ".join(process_parameters))) if production_confirmed: production_count = self.env['mrp.production'].search_count([ ('origin', '=', self.origin), ('product_id', '=', self.product_id.id), ('state', '=', 'confirmed') ]) if production_count > 1: return { 'name': _('退回调整'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_re_adjust_wizard_form_view').id, 'form')], 'res_model': 'sf.production.technology.re_adjust.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} else: return { 'name': _('退回调整'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_re_adjust_wizard_confirm_form_view').id, 'form')], 'res_model': 'sf.production.technology.re_adjust.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} # 工艺确认 def technology_confirm(self): process_parameters = [] purchase_orders = [] parameters_not = [] # 获取原有的工单对应的工序 origin_designs = self.workorder_ids.technology_design_id # 获取已删除的工序 deleted_designs = origin_designs - self.technology_design_ids if deleted_designs: for deleted_design in deleted_designs: workorder = self.env['mrp.workorder'].search([('technology_design_id', '=', deleted_design.id)]) purchase = workorder._get_surface_technics_purchase_ids() if purchase.state not in ['cancel', 'draft', False]: purchase_orders.append(purchase.name) special_design = self.technology_design_ids.filtered( lambda a: a.routing_tag == 'special' and a.is_auto is False) for special in special_design: if special.route_id.routing_type == '表面工艺' and not special.process_parameters_id: parameters_not.append(special.route_id.name) if special.process_parameters_id: product_production_process = self.env['product.template'].search( [('server_product_process_parameters_id', '=', special.process_parameters_id.id)]) if not product_production_process: if special.process_parameters_id not in process_parameters: process_parameters.append(special.process_parameters_id.display_name) if purchase_orders: raise UserError(_("请联系工厂生产经理对该(%s)采购订单进行取消", ", ".join(purchase_orders))) if parameters_not: raise UserError(_("【工艺设计】-【工序】为%s未选择参数,请选择", ", ".join(parameters_not))) if process_parameters: raise UserError(_("【工艺设计】-【参数】为%s的在【产品】中不存在,请先创建", ", ".join(process_parameters))) # 判断同一个加工面的标准工序的顺序是否依次排序 error_panel = [] technology_design = self.technology_design_ids.filtered(lambda a: a.routing_tag == 'standard').sorted( key=lambda m: m.sequence) for index, design in enumerate(technology_design): routing_type = design.route_id.routing_type if index < len(technology_design) - 1: next_index = index + 1 next_design = technology_design[next_index] next_design_routing_type = next_design.route_id.routing_type # logging.info('当前工序和加工面: %s-%s' % (design.route_id.name, design.panel)) # logging.info('下一个工序和加工面: %s-%s' % (next_design.route_id.name, next_design.panel)) if design.panel is not False: if design.panel != next_design.panel: if index == 0: raise UserError('【加工面】为%s的标准工序里含有其他加工面的工序,请调整后重试' % design.panel) if routing_type not in ['解除装夹']: raise UserError('【加工面】为%s的标准工序顺序有误,请调整后重试' % design.panel) if design.panel == next_design.panel: if (routing_type == '装夹预调' and next_design_routing_type == '解除装夹') or ( routing_type == 'CNC加工' and next_design_routing_type == '装夹预调'): if design.panel not in error_panel: error_panel.append(design.panel) else: if not error_panel and not process_parameters: production_count = self.env['mrp.production'].search_count([ ('origin', '=', self.origin), ('product_id', '=', self.product_id.id), ('state', '=', 'technology_to_confirmed') ]) if production_count > 1: return { 'name': _('工艺确认'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_wizard_form_view').id, 'form')], 'res_model': 'sf.production.technology.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} else: return { 'name': _('工艺确认'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_wizard_confirm_form_view').id, 'form')], 'res_model': 'sf.production.technology.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} if error_panel: raise UserError(_("【加工面】为%s的标准工序顺序有误,请调整后重试", ", ".join(error_panel))) return True def action_check(self): """ 审核启用 """ self.check_status = True def action_uncheck(self): """ 审核禁用 """ self.check_status = False def archive(self): """ 归档 """ self.write({'active': False}) def unarchive(self): """ 取消归档 """ self.write({'active': True}) @api.depends('request_ids') def _compute_maintenance_count(self): for production in self: production.maintenance_count = len(production.request_ids) # 获取cloud编程单的状态 def _cron_get_programming_state(self): try: if not self: reproduction = self.env['mrp.production'].search( [('state', '=', 'rework'), ('programming_state', '=', '编程中'), ('is_rework', '=', True)]) else: reproduction = self if reproduction: programming_no_set = set([str(item.programming_no) for item in reproduction]) programming_no = list(programming_no_set) programming_no_str = ','.join(programming_no) res = {'programming_no': programming_no_str} logging.info('res=%s:' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/get_state' config_url = configsettings['sf_url'] + url ret = requests.post(config_url, json=res, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) if result['status'] == 1: for item in result['programming_list']: if not self: for rp in reproduction: if rp.programming_no == item['programming_no']: rp.write({'programming_state': '已编程未下发' if item[ 'programming_state'] == '已编程' else '编程中'}) else: return item else: raise UserError(ret['message']) except Exception as e: logging.info('cron_get_programming_state error:%s' % e) # 编程单更新 # 增加触发时间参数 def update_programming_state(self, trigger_time=None): try: manufacturing_type = 'rework' if self.is_scrap: manufacturing_type = 'scrap' elif self.tool_state == '2': manufacturing_type = 'invalid_tool_rework' res = {'programming_no': self.programming_no, 'manufacturing_type': manufacturing_type, 'trigger_time': trigger_time} logging.info('res=%s:' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/reset_state_again' config_url = configsettings['sf_url'] + url ret = requests.post(config_url, json=res, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) logging.info('update_programming_state-ret:%s' % result) if result['status'] == 1: self.write({'is_rework': True}) else: raise UserError(ret['message']) # # 增加对编程记录的更新 # cloud_programming = self._cron_get_programming_state() # if manufacturing_type == 'rework': # self.programming_record_ids.create({ # 'number': len(self.programming_record_ids) + 1, # 'production_id': self.id, # 'reason': '返工', # 'programming_method': cloud_programming['programme_way'], # 'current_programming_count': cloud_programming['reprogramming_num'], # 'target_production_id': cloud_programming['production_order_no'], # 'apply_time': trigger_time, # 'send_time': cloud_programming['send_time'], # }) # elif manufacturing_type == 'scrap': # self.programming_record_ids.create({ # 'number': len(self.programming_record_ids) + 1, # 'production_id': self.id, # 'reason': '报废', # 'programming_method': cloud_programming['programme_way'], # 'current_programming_count': cloud_programming['reprogramming_num'], # 'target_production_id': cloud_programming['production_order_no'], # 'apply_time': trigger_time, # 'send_time': cloud_programming['send_time'], # }) # elif manufacturing_type == 'invalid_tool_rework': # self.programming_record_ids.create({ # 'number': len(self.programming_record_ids) + 1, # 'production_id': self.id, # 'reason': '无效功能刀具', # 'programming_method': cloud_programming['programme_way'], # 'current_programming_count': cloud_programming['reprogramming_num'], # 'target_production_id': cloud_programming['production_order_no'], # 'apply_time': trigger_time, # 'send_time': cloud_programming['send_time'], # }) # else: # logging.info('无对应状态,不需更新编程记录') except Exception as e: logging.info('update_programming_state error:%s' % e) raise UserError("更新编程单状态失败,请联系管理员") # cnc程序获取 def fetchCNC(self, production_names): cnc = self.env['mrp.production'].search([('id', '=', self.id)]) quick_order = False if cnc.product_id.default_code: quick_order = self.env['quick.easy.order'].search( [('name', '=', cnc.product_id.default_code.rsplit('-', 1)[0])]) programme_way = False if cnc.manual_quotation is True: programme_way = 'manual operation' else: programme_way = 'auto' if quick_order: programme_way = 'manual operation' try: res = { 'production_no': production_names, 'machine_tool_code': '', 'product_name': cnc.product_id.name, 'remanufacture_type': '', 'model_code': cnc.product_id.model_code, 'material_code': self.env['sf.production.materials'].search( [('id', '=', cnc.product_id.materials_id.id)]).materials_no, 'material_type_code': self.env['sf.materials.model'].search( [('id', '=', cnc.product_id.materials_type_id.id)]).materials_no, 'machining_processing_panel': cnc.product_id.model_processing_panel, 'machining_precision': '', 'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length, 'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height, 'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width, 'order_no': cnc.origin, 'model_order_no': cnc.product_id.default_code, 'user': cnc.env.user.name, 'programme_way': programme_way, 'model_file': '' if not cnc.product_id.model_file else base64.b64encode( cnc.product_id.model_file).decode('utf-8'), 'part_name': cnc.product_id.part_name, 'part_number': cnc.product_id.part_number, 'machining_drawings': base64.b64encode(cnc.product_id.machining_drawings).decode( 'utf-8') if cnc.product_id.machining_drawings else '', 'machining_drawings_name': cnc.product_id.machining_drawings_name, 'machining_drawings_mimetype': cnc.product_id.machining_drawings_mimetype, } # 打印出除了 model_file 之外的所有键值对 for key, value in res.items(): if key != 'model_file': logging.info('%s: %s' % (key, value)) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/create' config_url = configsettings['sf_url'] + url res['token'] = configsettings['token'] # res_str = json.dumps(res) ret = requests.post(config_url, json={}, data=res, headers=config_header) ret = ret.json() logging.info('fetchCNC-ret:%s' % ret) if ret['status'] == 1: self.write( {'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'}) else: raise UserError(ret['message']) except Exception as e: logging.info('fetchCNC error:%s' % e) raise UserError("cnc程序获取编程单失败,请联系管理员") # 维修模块按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_production_id': self.id, }, 'domain': [('production_id', '=', self.id)], } # 打开维修模块请求 def open_maintenance_request_mo(self): self.ensure_one() action = { 'name': _('Maintenance Requests'), 'view_mode': 'kanban,tree,form,pivot,graph,calendar', 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_production_id': self.id, }, 'domain': [('production_id', '=', self.id)], } if self.maintenance_count == 1: production = self.env['maintenance.request'].search([('production_id', '=', self.id)]) action['view_mode'] = 'form' action['res_id'] = production.id return action def action_generate_serial(self): self.ensure_one() iot_code = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[ 'ir.sequence'].next_by_code('stock.lot.serial') iot_code_name = re.sub('[\u4e00-\u9fa5]', "", iot_code) self.lot_producing_id = self.env['stock.lot'].create({ 'product_id': self.product_id.id, 'company_id': self.company_id.id, 'name': iot_code_name, }) if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids: self.move_finished_ids.filtered( lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id # if self.product_id.tracking == 'serial': # self._set_qty_producing() # 重载根据工序生成工单的程序:如果产品BOM中没有工序时, # 根据产品对应的模板类型中工序,去生成工单; # CNC加工工序的选取规则: # 如果自动报价有带过来预分配的机床, # 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制; # 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心; def process_production(self): workorders_values = [] # production = self.env['mrp.production'].browse(production_id) product_qty = self.product_uom_id._compute_quantity(self.product_qty, self.bom_id.product_uom_id) exploded_boms, dummy = self.bom_id.explode(self.product_id, product_qty / self.bom_id.product_qty, picking_type=self.bom_id.picking_type_id) for bom, bom_data in exploded_boms: # If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders. if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[ 'parent_line'].bom_id.operation_ids != bom.operation_ids)): continue for operation in bom.operation_ids: if operation._skip_operation_line(bom_data['product']): continue workorders_values += [{ 'name': operation.name, 'production_id': self.id, 'workcenter_id': operation.workcenter_id.id, 'product_uom_id': self.product_uom_id.id, 'operation_id': operation.id, 'state': 'pending', }] if self.product_id.categ_id.type in ['成品', '坯料']: # # 根据工序设计生成工单 technology_design_ids = sorted(self.technology_design_ids, key=lambda x: x.sequence) for route in technology_design_ids: workorder_has = self.env['mrp.workorder'].search( [('technology_design_id', '=', route.id), ('production_id', '=', self.id)]) if not workorder_has: if route.route_id.routing_type not in ['表面工艺']: workorders_values.append( self.env['mrp.workorder'].json_workorder_str(self, route)) else: product_production_process = self.env['product.template'].search( [('server_product_process_parameters_id', '=', route.process_parameters_id.id)]) workorders_values.append( self.env[ 'mrp.workorder']._json_workorder_surface_process_str( self, route, product_production_process.seller_ids[0].partner_id.id)) return workorders_values def _set_workorder_duration_expected(self): try: # 在每个线程中创建独立的 ORM 环境 with api.Environment.manage(): new_cr = self.pool.cursor() self = self.with_env(self.env(cr=new_cr)) # program_ids = self.sudo().env['loyalty.program'].browse(program_ids.ids) # 使用独立的环境来处理数据库事务 # 在独立的环境中对 workorder 进行操作 production = self.sudo().env['mrp.production'].browse(self.id) workorders_values = production.process_production() production.write({'workorder_ids': workorders_values}) for workorder in production.workorder_ids: workorder.duration_expected = workorder._get_duration_expected() # 可以进行其他与工作单相关的操作 # workorder_env.write({'...'}) return production except Exception as e: logging.error(f"Error processing workorder {workorder.id}: {e}") # workorders_values = self.process_production(self) # print('_set_workorder_duration_expected wqio ', self) # self.write({'workorder_ids': workorders_values}) # for workorder in self.workorder_ids: # workorder.duration_expected = workorder._get_duration_expected() def _create_workorder3(self, item): with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: futures = [] for production in self: if not production.bom_id or not production.product_id: continue # 提交每个生产任务到线程池 futures.append(executor.submit(production._set_workorder_duration_expected)) # 等待所有线程完成任务 results = [] for future in futures: try: result = future.result() if result: results.append(result) except Exception as e: logging.error(f"Error processing production: {e}") return results # 外协出入库单处理 def get_subcontract_pick_purchase(self,productions): production_all = productions.sorted(lambda x: x.id) product_id_to_production_names = {} grouped_product_ids = {k: list(g) for k, g in groupby(production_all, key=lambda x: x.product_id.id)} for product_id, pd in grouped_product_ids.items(): product_id_to_production_names[product_id] = [p.name for p in pd] sorted_workorders = None for production in production_all: proc_workorders = [] process_parameter_workorder=production.workorder_ids.filtered(lambda w: w.surface_technics_parameters_id and w.is_subcontract and w.state!='cancel') # process_parameter_workorder = self.env['mrp.workorder'].search( # [('surface_technics_parameters_id', '!=', False), ('production_id', '=', production.id), # ('is_subcontract', '=', True), ('state', '!=', 'cancel')], order='sequence asc') if process_parameter_workorder: # 将这些特殊表面工艺工单的采购单与调拨单置为失效 for workorder in process_parameter_workorder: # workorder._get_surface_technics_purchase_ids().write({'state': 'cancel'}) workorder.move_subcontract_workorder_ids.write({'state': 'cancel'}) workorder.move_subcontract_workorder_ids.picking_id.write({'state': 'cancel'}) sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.sequence) if not sorted_workorders: return for workorders in reversed(sorted_workorders): self.env['stock.picking'].create_outcontract_picking(workorders, production, sorted_workorders) self.env['purchase.order'].get_purchase_order(workorders, production, product_id_to_production_names) # 工单排序 def _reset_work_order_sequence1(self, k): for rec in self: cnc_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工") cnc_back_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工(返工)") for work in rec.workorder_ids: if work.name == cnc_workorder.name and work.processing_panel == k: cnc_back_workorder.write({'sequence': work.sequence + 1}) print(cnc_back_workorder.sequence) elif work.routing_type not in ['装夹预调'] and work != cnc_back_workorder: work.sequence += 1 # 在制造订单上新增工单 def _create_workorder1(self, k): for production in self: if not production.bom_id or not production.product_id: continue workorders_values = [] product_qty = production.product_uom_id._compute_quantity(production.product_qty, production.bom_id.product_uom_id) exploded_boms, dummy = production.bom_id.explode(production.product_id, product_qty / production.bom_id.product_qty, picking_type=production.bom_id.picking_type_id) for bom, bom_data in exploded_boms: # If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders. if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[ 'parent_line'].bom_id.operation_ids != bom.operation_ids)): continue for operation in bom.operation_ids: if operation._skip_operation_line(bom_data['product']): continue workorders_values += [{ 'name': operation.name, 'production_id': production.id, 'workcenter_id': operation.workcenter_id.id, 'product_uom_id': production.product_uom_id.id, 'operation_id': operation.id, 'state': 'pending', }] # 根据加工面板的面数及对应的成品工序模板生成工单 i = 0 production.product_id.model_processing_panel = k for k in (production.product_id.model_processing_panel.split(',')): routingworkcenter = self.env['sf.product.model.type.routing.sort'].search( [('product_model_type_id', '=', production.product_id.product_model_type_id.id)], order='sequence asc' ) i += 1 for route in routingworkcenter: if route.routing_type == 'CNC加工': workorders_values.append( self.env['mrp.workorder'].json_workorder_str1(k, production, route)) production.workorder_ids = workorders_values workorder = self.env['mrp.workorder'].browse(production.workorder_ids.ids) print(workorder) # for item in workorder: # workorder.duration_expected = workorder._get_duration_expected() def _create_workorder2(self, k): self._create_workorder1(k) self._reset_work_order_sequence1(k) return True # 需对不连续工单对应的采购单和外协出入库单做处理 def _reset_subcontract_pick_purchase(self): production_all = self.sorted(lambda x: x.id) product_id_to_production_names = {} grouped_product_ids = {k: list(g) for k, g in groupby(production_all, key=lambda x: x.product_id.id)} for product_id, pd in grouped_product_ids.items(): product_id_to_production_names[product_id] = [p.name for p in pd] for item in production_all: production_process = product_id_to_production_names.get(item.product_id.id) workorder_sf = item.workorder_ids.filtered(lambda sf: sf.routing_type == '表面工艺') for i, workorder in enumerate(workorder_sf): if i == 0: continue elif workorder.sequence != workorder_sf[i - 1].sequence + 1: # workorder.picking_ids.move_ids = False workorder.picking_ids = False purchase_order = self.env['purchase.order'].search( [('state', '=', 'draft'), ('origin', '=', item.name), ('purchase_type', '=', 'consignment')]) server_template = self.env['product.template'].search( [('server_product_process_parameters_id', '=', workorder.surface_technics_parameters_id.id), ('detailed_type', '=', 'service')]) for po in purchase_order: for line in po.order_line: if line.product_id == server_template.product_variant_id: continue if server_template.server_product_process_parameters_id != line.product_id.server_product_process_parameters_id: purchase_order_line = self.env['purchase.order.line'].search( [('product_id', '=', server_template.product_variant_id.id), ('id', '=', line.id), ('product_qty', '=', 1)], limit=1, order='id desc') if purchase_order_line: line.unlink() def _process_reset_work_order_sequence(self,rec): workorder_ids = rec.workorder_ids technology_design_ids = rec.technology_design_ids if workorder_ids.filtered(lambda item: item.state in ('返工', 'rework')): # 获取返工后新生成的工单 work_ids = workorder_ids.filtered(lambda item: item.sequence == 0) # 对工单进行逐个插入 for work_id in work_ids: order_rework_ids = rec.workorder_ids.filtered( lambda item: (item.sequence > 0 and work_id.name == item.name and work_id.processing_panel == item.processing_panel)) order_rework_ids = sorted(order_rework_ids, key=lambda item: item.sequence, reverse=True) work_id.sequence = order_rework_ids[0].sequence + 1 # 对该工单之后的工单工序进行加一 work_order_ids = rec.workorder_ids.filtered( lambda item: item.sequence >= work_id.sequence and item.id != work_id.id) for work in work_order_ids: work.sequence = work.sequence + 1 else: # 将工艺设计生成的工单序号赋值给工单的序号 for work in workorder_ids: td_ids = technology_design_ids.filtered( lambda item: (item.route_id.name in work.name and item.process_parameters_id and item.process_parameters_id == work.surface_technics_parameters_id) or (item.route_id.name == work.name and item.panel and item.panel == work.processing_panel)) if work.name == '人工线下加工': td_ids = technology_design_ids.filtered(lambda item: (item.route_id.name in work.name)) if td_ids: work.sequence = td_ids[0].sequence cancel_work_ids = workorder_ids.filtered(lambda item: item.state in ('已取消', 'cancel')) if cancel_work_ids: sequence = max(workorder_ids.filtered(lambda item: item.state not in ('已取消', 'cancel')), key=lambda w: w.sequence).sequence for cw in cancel_work_ids: cw.sequence = sequence + 1 def _reset_work_order_sequence(self,productions): """ 工单工序排序方法(新) """ for rec in productions: self._process_reset_work_order_sequence(rec) def _reset_work_order_sequence_1(self): """ 工单工序排序方法(旧) """ for rec in self: workorder_ids = rec.workorder_ids.filtered(lambda item: item.state in ('返工', 'rework')) # 产品模型类型 model_type_id = rec.product_id.product_model_type_id # 产品加工面板 model_processing_panel = rec.product_id.model_processing_panel if not workorder_ids: sequence_list = {} if model_type_id: if model_processing_panel: tmpl_num = 1 panel_list = model_processing_panel.split(',') for panel in panel_list: panel_sequence_list = {} # 成品工序 product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids if product_routing_tmpl_ids: for tmpl_id in product_routing_tmpl_ids: panel_sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num}) tmpl_num += 1 sequence_list.update({panel: panel_sequence_list}) # 表面工艺工序 # 模型类型的表面工艺工序模版 surface_tmpl_ids = model_type_id.surface_technics_routing_tmpl_ids # 产品选择的表面工艺参数 model_process_parameters_ids = rec.product_id.model_process_parameters_ids process_dict = {} if model_process_parameters_ids: for process_parameters_id in model_process_parameters_ids: process_id = process_parameters_id.process_id for surface_tmpl_id in surface_tmpl_ids: if process_id == surface_tmpl_id.route_workcenter_id.surface_technics_id: surface_tmpl_name = surface_tmpl_id.route_workcenter_id.name process_dict.update({int(process_id.sequence): '%s-%s' % ( surface_tmpl_name, process_parameters_id.name)}) process_list = sorted(process_dict.keys()) for process_num in process_list: sequence_list.update({process_dict.get(process_num): tmpl_num}) tmpl_num += 1 # 坯料工序 tmpl_num = 1 embryo_routing_tmpl_ids = model_type_id.embryo_routing_tmpl_ids if embryo_routing_tmpl_ids: for tmpl_id in embryo_routing_tmpl_ids: sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num}) tmpl_num += 1 else: raise ValidationError('该产品【加工面板】为空!') else: raise ValidationError('该产品没有选择【模版类型】!') logging.info('sequence_list: %s' % sequence_list) for work in rec.workorder_ids: work_name = work.name logging.info(work_name) if sequence_list.get(work_name): work.sequence = sequence_list[work_name] elif sequence_list.get(work.processing_panel): processing_panel = sequence_list.get(work.processing_panel) if processing_panel.get(work_name): work.sequence = processing_panel[work_name] else: raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name) else: raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name) # 当单个面触发返工时,将新生成的工单插入到返工工单下方,并且后面的所以工单工序重排 elif rec.workorder_ids.filtered(lambda item: item.sequence == 0): # 获取新增的返工工单 work_ids = rec.workorder_ids.filtered(lambda item: item.sequence == 0) # 获取当前返工面最后一个工单工序 sequence_max = sorted( rec.workorder_ids.filtered(lambda item: item.processing_panel == work_ids[0].processing_panel), key=lambda item: item.sequence, reverse=True)[0].sequence # 对当前返工工单之后的工单工序进行重排 work_order_ids = rec.workorder_ids.filtered(lambda item: item.sequence > sequence_max) for work_id in work_order_ids: work_id.sequence = work_id.sequence + 3 # 生成新增的返工工单的工序 # 成品工序 panel_sequence_list = {} product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids if product_routing_tmpl_ids: for tmpl_id in product_routing_tmpl_ids: sequence_max += 1 panel_sequence_list.update({tmpl_id.route_workcenter_id.name: sequence_max}) for work_id in work_ids: work_name = work_id.name if panel_sequence_list.get(work_name): work_id.sequence = panel_sequence_list[work_name] # 创建工单并进行排序 def _create_workorder(self, item): productions = self._create_workorder3(item) self._reset_work_order_sequence(productions) return productions def production_process(self, pro_plan): type_map = {'装夹预调': False, 'CNC加工': False, '解除装夹': False} # 最后一次加工结束时间 last_time = pro_plan.date_planned_start # 预置时间 works = self.workorder_ids for index, work in enumerate(works): count = type_map.get(work.routing_type) date_planned_end = None date_planned_start = None if self.production_type == '自动化产线加工': date_planned_start, date_planned_end, last_time = work.auto_production_process(last_time, count, type_map) elif self.production_type == '': date_planned_start, date_planned_end, last_time = work.manual_offline_process(last_time, index) work.update_work_start_end(date_planned_start, date_planned_end) # def def process_range_time(self): for production in self: works = production.workorder_ids pro_plan = self.env['sf.production.plan'].search([('production_id', '=', production.id)], limit=1) if not pro_plan: continue if production.production_type: production.production_process(pro_plan) # 修改标记已完成方法 def button_mark_done1(self): if not self.workorder_ids.filtered(lambda w: w.routing_type not in ['表面工艺']): self._button_mark_done_sanity_checks() if not self.env.context.get('button_mark_done_production_ids'): self = self.with_context(button_mark_done_production_ids=self.ids) res = self._pre_button_mark_done() if res is not True: return res if self.env.context.get('mo_ids_to_backorder'): productions_to_backorder = self.browse(self.env.context['mo_ids_to_backorder']) productions_not_to_backorder = self - productions_to_backorder else: productions_not_to_backorder = self productions_to_backorder = self.env['mrp.production'] backorders = productions_to_backorder and productions_to_backorder._split_productions() backorders = backorders - productions_to_backorder productions_not_to_backorder._post_inventory(cancel_backorder=True) # 查出最后一张工单完成入库操作 # if self.workorder_ids.filtered(lambda w: w.routing_type in ['表面工艺']): # move_finish = self.env['stock.move'].search([('created_production_id', '=', self.id)]) # if move_finish: # move_finish._action_assign() productions_to_backorder._post_inventory(cancel_backorder=True) # if completed products make other confirmed/partially_available moves available, assign them done_move_finished_ids = ( productions_to_backorder.move_finished_ids | productions_not_to_backorder.move_finished_ids).filtered( lambda m: m.state == 'done') done_move_finished_ids._trigger_assign() # Moves without quantity done are not posted => set them as done instead of canceling. In # case the user edits the MO later on and sets some consumed quantity on those, we do not # want the move lines to be canceled. (productions_not_to_backorder.move_raw_ids | productions_not_to_backorder.move_finished_ids).filtered( lambda x: x.state not in ('done', 'cancel')).write({ 'state': 'done', 'product_uom_qty': 0.0, }) for production in self: logging.info('qty_produced:%s' % production.qty_produced) if production.qty_produced == 0.0: production.qty_produced = 1.0 production.write({ 'date_finished': fields.Datetime.now(), 'product_qty': production.qty_produced, 'priority': '0', 'is_locked': True, 'state': 'done', }) for workorder in self.workorder_ids.filtered(lambda w: w.state not in ('done', 'cancel')): workorder.duration_expected = workorder._get_duration_expected() if not backorders: if self.env.context.get('from_workorder'): return { 'type': 'ir.actions.act_window', 'res_model': 'mrp.production', 'views': [[self.env.ref('mrp.mrp_production_form_view').id, 'form']], 'res_id': self.id, 'target': 'main', } if self.user_has_groups( 'mrp.group_mrp_reception_report') and self.picking_type_id.auto_show_reception_report: lines = self.move_finished_ids.filtered(lambda m: m.product_id.type == 'product' and m.state != 'cancel' and m.quantity_done and not m.move_dest_ids) if lines: if any(mo.show_allocation for mo in self): action = self.action_view_reception_report() return action logging.info('last-product_qty:%s' % production.product_qty) return True context = self.env.context.copy() context = {k: v for k, v in context.items() if not k.startswith('default_')} for k, v in context.items(): if k.startswith('skip_'): context[k] = False action = { 'res_model': 'mrp.production', 'type': 'ir.actions.act_window', 'context': dict(context, mo_ids_to_backorder=None, button_mark_done_production_ids=None) } if len(backorders) == 1: action.update({ 'view_mode': 'form', 'res_id': backorders[0].id, }) else: action.update({ 'name': _("Backorder MO"), 'domain': [('id', 'in', backorders.ids)], 'view_mode': 'tree,form', }) return action # 报废 def button_scrap_new(self): cloud_programming = self._cron_get_programming_state() return { 'name': _('报废'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.production.wizard', 'target': 'new', 'context': { 'default_mrp_production_id': self.id, 'is_remanufacture_flag': True, 'default_reprogramming_num': cloud_programming['reprogramming_num'], 'default_programming_states': cloud_programming['programming_state'], 'default_is_reprogramming': True if cloud_programming['programming_state'] in ['已下发'] else False } } # 返工 def button_rework(self): cloud_programming = None if self.programming_state in ['已编程']: cloud_programming = self._cron_get_programming_state() elif self.programming_state is False: cloud_programming = {} result_ids = self.detection_result_ids.filtered(lambda dr: dr.handle_result == '待处理') work_id_list = [] if result_ids: work_id_list = [self.workorder_ids.filtered( lambda wk: (wk.name == result_id.routing_type and wk.processing_panel == result_id.processing_panel and wk.state == 'done')).id for result_id in result_ids] workorder_ids = self.workorder_ids.filtered( lambda wk: wk.technology_design_id.routing_tag == 'standard' and wk.state not in ['rework', 'cancel']) logging.info('标准工艺工单【%s】' % workorder_ids) return { 'name': _('返工'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.rework.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_workorder_ids': workorder_ids.ids if workorder_ids.ids != [] else self.workorder_ids.ids, 'default_hidden_workorder_ids': ','.join(map(str, work_id_list)) if work_id_list != [] else '', 'default_reprogramming_num': cloud_programming.get('reprogramming_num') if cloud_programming else '', 'default_programming_state': cloud_programming.get('programming_state') if cloud_programming else '', 'default_is_reprogramming': True if cloud_programming and (cloud_programming.get('programming_state') in ['已下发']) else False } } # 更新程序 def do_update_program(self): program_production = self if len(program_production) >= 1: # same_product_id = None # is_not_same_product = 0 for item in program_production: # if same_product_id is None: # same_product_id = item.product_id # if item.product_id != same_product_id: # is_not_same_product += 1 if item.state != "rework" and item.programming_state != "已编程未下发": raise UserError("请选择状态为返工且已编程未下发的制造订单") # if is_not_same_product >= 1: # raise UserError("您选择的记录中含有其他产品的制造订单,请选择同一产品的制造订单") grouped_program_ids = {k: list(g) for k, g in groupby(program_production, key=lambda x: x.programming_no)} program_to_production_names = {} for programming_no, program_production in grouped_program_ids.items(): program_to_production_names[programming_no] = [production.name for production in program_production] for production in self: if production.programming_no in program_to_production_names: productions_not_delivered = self.env['mrp.production'].search( [('programming_no', '=', production.programming_no), ('programming_state', '=', '已编程未下发')]) productions = self.env['mrp.production'].search( [('programming_no', '=', production.programming_no), ('state', 'not in', ('cancel', 'done'))]) rework_workorder = production.workorder_ids.filtered(lambda m: m.state == 'rework') if rework_workorder: for rework_item in rework_workorder: pending_workorder = production.workorder_ids.filtered( lambda m1: m1.state in [ 'pending'] and m1.processing_panel == rework_item.processing_panel and m1.routing_type == 'CNC加工') if not pending_workorder.cnc_ids: production.get_new_program(rework_item.processing_panel) # production.write({'state': 'progress', 'programming_state': '已编程', 'is_rework': False}) productions_not_delivered.write( {'state': 'progress', 'programming_state': '已编程', 'is_rework': False}) # 对制造订单所以面的cnc工单的程序用刀进行校验 try: logging.info(f'已更新制造订单:{productions_not_delivered}') productions.production_cnc_tool_checkout() except Exception as e: logging.info(f'对cnc工单的程序用刀进行校验报错:{e}') # 从cloud获取重新编程过的最新程序 def get_new_program(self, processing_panel): try: res = {'programming_no': self.programming_no, 'processing_panel': processing_panel} configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/get_new_program' config_url = configsettings['sf_url'] + url r = requests.post(config_url, json=res, data=None, headers=config_header) r = r.json() result = json.loads(r['result']) if result['status'] == 1: program_path_tmp_panel = os.path.join('/tmp', result['folder_name'], 'return', processing_panel) if os.path.exists(program_path_tmp_panel): files_r = os.listdir(program_path_tmp_panel) if files_r: for file_name in files_r: file_path = os.path.join(program_path_tmp_panel, file_name) os.remove(file_path) download_state = self.env['sf.cnc.processing'].download_file_tmp(result['folder_name'], processing_panel) if download_state is False: raise UserError('编程单号为%s的CNC程序文件从FTP拉取失败' % (self.programming_no)) productions = self.env['mrp.production'].search( [('programming_no', '=', self.programming_no), ('state', 'not in', ('cancel', 'done'))]) if productions: for production in productions: panel_workorder = production.workorder_ids.filtered(lambda pw: pw.processing_panel == processing_panel and pw.routing_type == 'CNC加工' and pw.state not in ( 'rework', 'done')) if panel_workorder: if panel_workorder.cmm_ids: panel_workorder.cmm_ids.sudo().unlink() if panel_workorder.cnc_ids: panel_workorder.cnc_ids.sudo().unlink() # self.env['sf.cam.work.order.program.knife.plan'].sudo().unlink_cam_plan( # production) # program_path_tmp_panel = os.path.join('C://Users//43484//Desktop//fsdownload//test', # processing_panel) logging.info('program_path_tmp_panel:%s' % program_path_tmp_panel) files_panel = os.listdir(program_path_tmp_panel) if files_panel: for file in files_panel: file_extension = os.path.splitext(file)[1] if file_extension.lower() == '.pdf': panel_file_path = os.path.join(program_path_tmp_panel, file) logging.info('panel_file_path:%s' % panel_file_path) panel_workorder.write( {'cnc_ids': panel_workorder.cnc_ids.sudo()._json_cnc_processing(processing_panel, result), 'cmm_ids': panel_workorder.cmm_ids.sudo()._json_cmm_program(processing_panel, result), 'cnc_worksheet': base64.b64encode(open(panel_file_path, 'rb').read())}) logging.info('len(cnc_worksheet):%s' % len(panel_workorder.cnc_worksheet)) pre_workorder = production.workorder_ids.filtered(lambda ap: ap.routing_type == '装夹预调' and ap.processing_panel == processing_panel and ap.state not in ( 'rework', 'done')) if pre_workorder: pre_workorder.write( {'processing_drawing': base64.b64encode(open(panel_file_path, 'rb').read())}) # if production.state == 'rework' and production.programming_state == '已编程未下发': # production.write( # {'state': 'progress', 'programming_state': '已编程', 'is_rework': False}) # logging.info('返工含有已编程未下发的程序更新完成:%s' % production.name) else: raise UserError(result['message']) except Exception as e: logging.info('get_new_program error:%s' % e) raise UserError("从云平台获取最新程序失败,请联系管理员") def recreateManufacturing(self, item): """ 重新生成制造订单 """ if self.is_scrap is True: procurement_requests = [] sale_order = self.env['sale.order'].sudo().search([('name', '=', self.origin)]) # 查询出库移动记录 out_picking = self.env['stock.picking'].search( [('origin', '=', sale_order.name), ('name', 'ilike', 'WH/OUT/')]) move = out_picking.move_ids.filtered(lambda pd: pd.product_id == self.product_id) move_values = {'product_description_variants': '', 'date_planned': fields.Datetime.now(), 'date_deadline': fields.Datetime.now(), 'move_dest_ids': move, 'group_id': move.group_id, 'route_ids': [], 'warehouse_id': self.warehouse_id, 'priority': 0, 'orderpoint_id': False, 'product_packaging_id': False} procurement_requests.append(self.env['procurement.group'].Procurement( move.product_id, 1.0, move.product_uom, move.location_id, move.rule_id and move.rule_id.name or "/", sale_order.name, move.company_id, move_values)) self.env['procurement.group'].run(procurement_requests, raise_user_error=not self.env.context.get('from_orderpoint')) productions = self.env['mrp.production'].sudo().search( [('origin', '=', self.origin)], order='id desc', limit=1) productions.write({'programming_no': self.programming_no, 'is_remanufacture': True}) # mo_move = self.env['stock.move'].search( # [('origin', '=', sale_order.name), ('reference', 'ilike', 'WH/MO/')]) # if mo_move: # sfp_move = self.env['stock.move'].search( # [('origin', '=', sale_order.name), ('reference', 'ilike', 'WH/SFP/')], limit=1) # mo_move.write({'reference': sfp_move.reference, 'partner_id': sfp_move.partner_id.id, # 'picking_id': sfp_move.picking_id.id, 'picking_type_id': sfp_move.picking_type_id.id, # 'production_id': False}) # productions.procurement_group_id.mrp_production_ids.move_dest_ids.write( # {'group_id': self.env['procurement.group'].search([('name', '=', sale_order.name)])}) stock_picking_remanufacture = self.env['stock.picking'].search([('origin', '=', productions.name)]) for pick in stock_picking_remanufacture: if pick.name.startswith('WH/PC/') or pick.name.startswith('WH/INT/'): if pick.move_ids: product_type_id = pick.move_ids[0].product_id.categ_id if product_type_id.name == '坯料': location_id = self.env['stock.location'].search([('name', '=', '坯料存货区')]) if not location_id: logging.info(f'没有搜索到【坯料存货区】: {location_id}') break if pick.picking_type_id.name == '内部调拨': if pick.location_dest_id.product_type != product_type_id: pick.location_dest_id = location_id.id elif pick.picking_type_id.name == '生产发料': if pick.location_id.product_type != product_type_id: pick.location_id = location_id.id scarp_process_parameter_workorder = self.env['mrp.workorder'].search( [('surface_technics_parameters_id', '!=', False), ('production_id', '=', self.id), ('is_subcontract', '=', True)]) if scarp_process_parameter_workorder: production_programming = self.env['mrp.production'].search( [('programming_no', '=', self.programming_no), ('id', '!=', productions.id)], order='name asc') production_list = [production.name for production in production_programming] purchase_orders = self.env['purchase.order'].search([('origin', 'ilike', ','.join(production_list))]) for purchase_item in purchase_orders.order_line: for process_item in scarp_process_parameter_workorder: if purchase_item.product_id.categ_type == '表面工艺': if purchase_item.product_id.server_product_process_parameters_id == process_item.surface_technics_parameters_id: if purchase_orders.origin.find(productions.name) == -1: purchase_orders.origin += ',' + productions.name if item['is_reprogramming'] is False: productions.programming_state = '已编程' else: productions.programming_state = '编程中' return productions # 在之前的销售单上重新生成制造订单 def create_production1_values(self, production): production_values_str = {'origin': production.origin, 'product_id': production.product_id.id, 'programming_state': '已编程', 'product_description_variants': production.product_description_variants, 'product_qty': production.product_qty, 'product_uom_id': production.product_uom_id.id, 'location_src_id': production.location_src_id.id, 'location_dest_id': production.location_dest_id.id, 'bom_id': production.bom_id.id, 'date_deadline': production.date_deadline, 'date_planned_start': production.date_planned_start, 'date_planned_finished': production.date_planned_finished, # 'procurement_group_id': self.env["procurement.group"].create( # {'name': production.name}).id, 'propagate_cancel': production.propagate_cancel, 'orderpoint_id': production.orderpoint_id.id, 'picking_type_id': production.picking_type_id.id, 'company_id': production.company_id.id, 'move_dest_ids': production.move_dest_ids.ids, 'user_id': production.user_id.id} return production_values_str # 增加制造订单类型 production_type = fields.Selection( [('自动化产线加工', '自动化产线加工'), ('人工线下加工', '人工线下加工')], string='制造类型', compute='_compute_production_type', store=True ) @api.depends('product_id.is_manual_processing') def _compute_production_type(self): for production in self: production.production_type = '自动化产线加工' if not production.product_id.is_manual_processing else '人工线下加工' @api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id') def _compute_sale_order_count(self): for production in self: if production.sale_order_id: production.sale_order_count = 1 else: production.sale_order_count = 0 def action_view_sale_orders(self): if self.sale_order_id: action = { 'res_model': 'sale.order', 'type': 'ir.actions.act_window', } action.update({ 'view_mode': 'form', 'res_id': self.sale_order_id.id, }) return action @api.model_create_multi def create(self, vals_list): """ 重载创建制造订单的方法,单个制造订单,同一成品只创建一个采购组,用于后续单据的创建 """ product_group_id = {} is_custemer_group_id = {} # 客供料与非客供料 for vals in vals_list: if not vals.get('name', False) or vals['name'] == _('New'): picking_type_id = vals.get('picking_type_id') if not picking_type_id: picking_type_id = self._get_default_picking_type_id(vals.get('company_id', self.env.company.id)) vals['picking_type_id'] = picking_type_id vals['name'] = self.env['stock.picking.type'].browse(picking_type_id).sequence_id.next_by_id() product_id = self.env['product.product'].browse(vals['product_id']) is_self_process = product_id.materials_type_id and product_id.materials_type_id.gain_way and product_id.materials_type_id.gain_way != '自加工' is_customer_provided = product_id.is_customer_provided key = f"{is_self_process}_{is_customer_provided}" if not is_custemer_group_id.get(key): is_custemer_group_id[key] = self.env["procurement.group"].create({'name': vals.get('name')}).id # if not (is_first_customer or is_first_not_customer) and is_self_process: # is_first = True # group_id = self.env["procurement.group"].create({'name': vals.get('name')}).id if not vals.get('procurement_group_id'): if product_id.product_tmpl_id.single_manufacturing: if product_id.categ_id.name == '成品': vals['procurement_group_id'] = is_custemer_group_id[key] continue if product_id.id not in product_group_id.keys(): procurement_group_vals = self._prepare_procurement_group_vals(vals) procurement_group_id = self.env["procurement.group"].create(procurement_group_vals).id vals['procurement_group_id'] = procurement_group_id product_group_id[product_id.id] = procurement_group_id else: vals['procurement_group_id'] = product_group_id[product_id.id] else: vals['procurement_group_id'] = is_custemer_group_id[key] return super(MrpProduction, self).create(vals_list) @api.depends('procurement_group_id.stock_move_ids.created_purchase_line_id.order_id', 'procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id') def _compute_purchase_order_count(self): for production in self: # 找到来源的第一张制造订单的采购组 if production.product_id.product_tmpl_id.single_manufacturing == True: first_production = self.env['mrp.production'].search( [('origin', '=', production.origin), ('product_id', '=', production.product_id.id)], limit=1, order='id asc') production.purchase_order_count = len( first_production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | first_production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id) else: production.purchase_order_count = len( production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id) @api.depends('procurement_group_id', 'procurement_group_id.stock_move_ids.group_id') def _compute_picking_ids(self): for order in self: if order.product_id.product_tmpl_id.single_manufacturing == True and not order.is_remanufacture: first_order = self.env['mrp.production'].search( [('origin', '=', order.origin), ('product_id', '=', order.product_id.id)], limit=1, order='id asc') order.picking_ids = self.env['stock.picking'].search([ ('group_id', '=', first_order.procurement_group_id.id), ('group_id', '!=', False), ]) order.delivery_count = len(first_order.picking_ids) else: order.picking_ids = self.env['stock.picking'].search([ ('group_id', '=', order.procurement_group_id.id), ('group_id', '!=', False), ]) order.delivery_count = len(order.picking_ids) def action_view_purchase_orders(self): self.ensure_one() if self.is_remanufacture: production = self elif self.product_id.product_tmpl_id.single_manufacturing == True: production = self.env['mrp.production'].search( [('origin', '=', self.origin), ('product_id', '=', self.product_id.id)], limit=1, order='id asc') else: production = self purchase_order_ids = ( production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id).ids action = { 'res_model': 'purchase.order', 'type': 'ir.actions.act_window', } if len(purchase_order_ids) == 1: action.update({ 'view_mode': 'form', 'res_id': purchase_order_ids[0], }) else: action.update({ 'name': _("Purchase Order generated from %s", self.name), 'domain': [('id', 'in', purchase_order_ids)], 'view_mode': 'tree,form', }) return action def _subcontract_sanity_check(self): for production in self: if production.product_tracking != 'none' and not self.lot_producing_id: raise UserError(_('You must enter a serial number for %s') % production.product_id.name) for sml in production.move_raw_ids.move_line_ids: if sml.tracking != 'none' and not sml.lot_id: picking_ids = production.picking_ids.filtered( lambda p: p.state not in ['done', 'cancel']) picking_num = len(picking_ids) picking_info = ', '.join( ['%s:%s' % (picking.picking_type_id.name, picking.name) for picking in picking_ids] ) if picking_info: raise UserError(_('You have %s incomplete supplies: %s') % ( picking_num, picking_info)) else: raise UserError( _('You must enter a serial number for each line of %s') % sml.product_id.display_name) return True reprogramming_count = fields.Integer(string='重新编程次数', default=0) # 申请编程 def action_apply_programming(self): """ 检查前置条件:制造订单【状态】=“待排程、待加工”,制造订单的【编程状态】=“已编程”。 """ print('申请编程') if len(self) > 1: raise UserError('仅支持选择单个制造订单进行编程申请,请重新选择') for production in self: if production.state not in ['confirmed', 'pending_cam'] or production.programming_state != '已编程': raise UserError('不可操作。所选制造订单必须同时满足如下条件:\n1、制造订单状态:待排程 或 待加工;\n2、制造订单编程状态:已编程。\n请检查!') cloud_programming = production._cron_get_programming_state() if cloud_programming['programming_state'] in ['待编程', '已编程', '编程中']: raise UserError("当前编程单正在重新编程,请注意查看当前制造订单的“编程记录”确认进度!") return { 'type': 'ir.actions.act_window', 'res_model': 'sf.programming.reason', 'view_mode': 'form', 'target': 'new', 'context': { 'default_production_id': self.id, 'active_id': self.id, # 传当前时间 'default_apply_time': fields.Datetime.now(), }, 'view_id': self.env.ref('sf_manufacturing.sf_programming_reason_form_view').id, } # 编程记录 programming_record_ids = fields.One2many('sf.programming.record', 'production_id') # 编程单更新 def re_programming_update_programming_state(self): try: res = {'programming_no': self.programming_no, 'manufacturing_type': ''} logging.info('res=%s:' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/reset_state_again' config_url = configsettings['sf_url'] + url ret = requests.post(config_url, json=res, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) logging.info('update_programming_state-ret:%s' % result) if result['status'] == 1: self.write({'is_rework': True}) else: raise UserError(ret['message']) except Exception as e: logging.info('update_programming_state error:%s' % e) raise UserError("更新编程单状态失败,请联系管理员") # 编程记录 class sf_programming_record(models.Model): _name = 'sf.programming.record' _description = "编程记录" production_id = fields.Many2one('mrp.production') # 编号、编程原因、编程方式、当前编程次数、目标制造单号、申请时间、下发时间 number = fields.Char('编号') reason = fields.Text('重新编程原因') programming_method = fields.Selection([ ('auto', '自动'), ('manual operation', '人工')], string="编程方式") current_programming_count = fields.Integer('当前编程次数') target_production_id = fields.Char('目标制造单号') apply_time = fields.Datetime('申请时间') send_time = fields.Datetime('下发时间') class sf_detection_result(models.Model): _name = 'sf.detection.result' _description = "检测结果" _order = 'handle_result_date desc, id asc' production_id = fields.Many2one('mrp.production') processing_panel = fields.Char('加工面') routing_type = fields.Selection([ ('装夹预调', '装夹预调'), ('CNC加工', 'CNC加工'), ('解除装夹', '解除装夹'), ('切割', '切割'), ('表面工艺', '表面工艺'), ('线切割', '线切割'), ('人工线下加工', '人工线下加工')], string="工序类型") rework_reason = fields.Selection( [("programming", "编程"), ("cutter", "刀具"), ("clamping", "装夹"), ("operate computer", "操机"), ("technology", "工艺"), ("customer redrawing", "客户改图")], string="原因", tracking=True) detailed_reason = fields.Text('详细原因') test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], string="检测结果", tracking=True) test_report = fields.Binary('检测报告', readonly=True) handle_result = fields.Selection([("待处理", "待处理"), ("已处理", "已处理")], default='', string="处理结果", tracking=True) handle_result_date = fields.Datetime('处理时间') handle_result_user = fields.Many2one('res.users', '处理人') # 查看检测报告 def button_look_test_report(self): return { 'res_model': 'sf.detection.result', 'type': 'ir.actions.act_window', 'res_id': self.id, 'views': [(self.env.ref('sf_manufacturing.sf_test_report_form').id, 'form')], 'target': 'new' } def write(self, vals): if vals.get('handle_result') and vals.get('handle_result') == '已处理': vals['handle_result_date'] = fields.Datetime.now() vals['handle_result_user'] = self.env.user.id return super(sf_detection_result, self).write(vals) class sf_processing_panel(models.Model): _name = 'sf.processing.panel' _description = "加工面" name = fields.Char('加工面') active = fields.Boolean('有效', default=True)