# -*- coding: utf-8 -*- import base64 import datetime import logging import json import os import re import requests from itertools import groupby from collections import defaultdict, namedtuple from odoo import api, fields, models, SUPERUSER_ID, _ from odoo.exceptions import UserError, ValidationError from odoo.addons.sf_base.commons.common import Common from odoo.tools import float_compare, float_round, float_is_zero, format_datetime class MrpProduction(models.Model): _inherit = 'mrp.production' _description = "制造订单" _order = 'create_date desc' sale_order_id = fields.Many2one('sale.order', string='销售订单', compute='_compute_sale_order_id', store=True) deadline_of_delivery = fields.Date('订单交期', tracking=True, compute='_compute_deadline_of_delivery', store=True) # tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘") maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests") request_ids = fields.One2many('maintenance.request', 'production_id') model_file = fields.Binary('模型文件', related='product_id.model_file') schedule_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')], string='排程状态', default='未排') work_order_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')], string='工单状态', default='未排') detection_result_ids = fields.One2many('sf.detection.result', 'production_id', '检测报告') tool_state = fields.Selection([('0', '正常'), ('1', '缺刀'), ('2', '无效刀')], string='功能刀具状态', default='0', store=True, compute='_compute_tool_state') tool_state_remark = fields.Text(string='功能刀具状态备注(缺刀)', compute='_compute_tool_state_remark', store=True) tool_state_remark2 = fields.Text(string='功能刀具状态备注(无效刀)', readonly=True) @api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id') def _compute_sale_order_id(self): for production in self: # 初始化 sale_order_id 为 False sale_order_id = False # 使用正则表达式查找产品名称中的 'S' 开头的字母数字字符串 match = re.search(r'S\d+', production.product_id.with_context(lang='zh_CN').name) # 从字符串开始匹配 if match: result = match.group(0) try: # 查找与匹配的字符串相符的销售订单 sale_order = self.env['sale.order'].search( [('name', '=', result)], limit=1, order='id asc' ) if sale_order: production.sale_order_id = sale_order.id else: logging.warning("No sale order found for production {} with product {} (name match: {})".format( production.id, production.product_id.name, result)) except Exception as e: logging.error("Error while fetching sale order for production {}: {}".format(production.id, str(e))) @api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id') def _compute_deadline_of_delivery(self): for production in self: # 确保 procurement_group_id 和相关字段存在 if production.procurement_group_id: # 获取相关的 sale_id sale_order_id = production.procurement_group_id.mrp_production_ids.mapped( 'move_dest_ids.group_id.sale_id') # 确保 sale_order_id 是有效的 ID 列表 if sale_order_id: # 获取 sale.order 记录 sale_id = self.env['sale.order'].sudo().browse(sale_order_id.ids) # 使用 mapped 返回的 ID 列表 # 处理 sale_id if sale_id: # 假设我们只需要第一个 sale_id production.deadline_of_delivery = sale_id[0].deadline_of_delivery if sale_id else False else: production.deadline_of_delivery = False else: production.deadline_of_delivery = False def _compute_default_delivery_status(self): try: if self.state == 'cancel': return False if not self.deadline_of_delivery: return False hours = self.get_hours_diff() if hours >= 48: return '正常' elif hours > 0 and hours < 48 and self.state != 'done': return '预警' elif hours > 0 and hours < 48 and self.state == 'done': return '正常' else: return '已逾期' except Exception as e: logging.error("Error processing production ID {}: {}".format(self.id, e)) raise e @api.depends('state', 'deadline_of_delivery') def _compute_delivery_status(self): for production in self: delivery_status = production._compute_default_delivery_status() if delivery_status and production.delivery_status != delivery_status: production.delivery_status = delivery_status delivery_status = fields.Selection([('正常', '正常'), ('预警', '预警'), ('已逾期', '已逾期')], string='交期状态', store=True, compute='_compute_delivery_status', default=lambda self: self._compute_default_delivery_status()) def get_page_all_records(self, model_name, func, domain, page_size=100): # 获取模型对象 model = self.env[model_name].sudo() # 初始化分页参数 page_number = 1 while True: # 计算偏移量 offset = (page_number - 1) * page_size # 获取当前页的数据 records = model.search(domain, limit=page_size, offset=offset) # 如果没有更多记录,退出循环 if not records: break # 将当前页的数据添加到结果列表 func(records) # 增加页码 page_number += 1 def run_compute_delivery_status(self, records): records._compute_delivery_status() def _corn_update_delivery_status(self): need_list = [ 'draft', 'technology_to_confirmed', 'confirmed', 'pending_cam', 'progress', 'rework', 'scrap', 'to_close', ] # previous_workorder = self.env['mrp.production'].search([('state', 'in', need_list)]) self.get_page_all_records('mrp.production', self.run_compute_delivery_status, [('state', 'in', need_list)], 100) def get_hours_diff(self): # 获取当前日期和时间 current_datetime = fields.Datetime.now() # 将 date_field 转换为 datetime 对象 if self.deadline_of_delivery: date_obj = fields.Date.from_string(self.deadline_of_delivery) # 将 date 对象转换为 datetime 对象,设置时间为 00:00:00 date_obj = datetime.datetime.combine(date_obj, datetime.time.min) # 计算两个日期之间的差值 delta = date_obj - current_datetime # 返回差值的小时数 return int(delta.total_seconds() / 3600) else: return 0.0 @api.depends('workorder_ids.tool_state_remark') def _compute_tool_state_remark(self): for item in self: if item.workorder_ids: workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel']) if workorder_ids.filtered(lambda a: a.tool_state == '1'): work_ids = workorder_ids.filtered(lambda a: a.tool_state == '1') tool_state_remark = '' for work_id in work_ids: if tool_state_remark == '': tool_state_remark = f'{work_id.tool_state_remark}' else: tool_state_remark = f"{tool_state_remark}\n{work_id.tool_state_remark}" item.tool_state_remark = tool_state_remark else: item.tool_state_remark = False @api.depends('workorder_ids.tool_state') def _compute_tool_state(self): for item in self: if item.workorder_ids: tool_state = item.tool_state workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel']) if workorder_ids.filtered(lambda a: a.tool_state == '2'): item.tool_state = '2' elif workorder_ids.filtered(lambda a: a.tool_state == '1'): item.tool_state = '1' else: item.tool_state = '0' if tool_state == '2' and item.tool_state != '2': item.detection_result_ids.filtered( lambda a: a.detailed_reason == '无效功能刀具' and a.handle_result == '待处理').write( {'handle_result': '已处理'}) # state = fields.Selection(selection_add=[ # ('pending_scheduling', '待排程'), # ('pending_processing', '待加工'), # ('completed', '已完工') # ]) state = fields.Selection([ ('draft', '草稿'), ('technology_to_confirmed', '待工艺确认'), ('confirmed', '待排程'), ('pending_cam', '待加工'), ('progress', '加工中'), ('rework', '返工'), ('scrap', '报废'), ('to_close', 'To Close'), ('done', 'Done'), ('cancel', '已取消')], string='State', compute='_compute_state', copy=False, index=True, readonly=True, store=True, tracking=True, help=" * Draft: The MO is not confirmed yet.\n" " * Confirmed: The MO is confirmed, the stock rules and the reordering of the components are trigerred.\n" " * In Progress: The production has started (on the MO or on the WO).\n" " * To Close: The production is done, the MO has to be closed.\n" " * Done: The MO is closed, the stock moves are posted. \n" " * Cancelled: The MO has been cancelled, can't be confirmed anymore.") check_status = fields.Boolean(string='启用状态', default=False, readonly=True) active = fields.Boolean(string='已归档', default=True) programming_no = fields.Char('编程单号') work_state = fields.Char('业务状态') programming_state = fields.Selection( [('编程中', '编程中'), ('已编程', '已编程'), ('已编程未下发', '已编程未下发'), ('已下发', '已下发')], string='编程状态', tracking=True) glb_file = fields.Binary("glb模型文件") production_line_id = fields.Many2one('sf.production.line', string='生产线', tracking=True) plan_start_processing_time = fields.Datetime('计划开始加工时间') is_rework = fields.Boolean(string='是否返工', default=False) # production_line_state = fields.Selection( # [('待上产线', '待上产线'), ('已上产线', '已上产线'), ('已下产线', '已下产线')], # string='上/下产线', default='待上产线', tracking=True) # 工序状态 # Todo 研究下用法 process_state = fields.Selection([ ('待装夹', '待装夹'), ('待检测', '待检测'), ('待加工', '待加工'), ('待解除装夹', '待解除装夹'), ('已完工', '已完工'), ], string='工序状态', default='待装夹') # 零件图号 part_number = fields.Char('零件图号', related='product_id.part_number', readonly=True) # 上传零件图纸 part_drawing = fields.Binary('零件图纸', related='product_id.machining_drawings', readonly=True) quality_standard = fields.Binary('质检标准', related='product_id.quality_standard', readonly=True) part_name = fields.Char(string='零件名称', related='product_id.part_name', readonly=True) @api.depends('product_id.manual_quotation') def _compute_manual_quotation(self): for item in self: item.manual_quotation = item.product_id.manual_quotation manual_quotation = fields.Boolean('人工编程', default=False, compute=_compute_manual_quotation, store=True) is_scrap = fields.Boolean('是否报废', default=False) is_remanufacture = fields.Boolean('是否重新制造', default=False) remanufacture_count = fields.Integer("重新制造订单数量", compute='_compute_remanufacture_production_ids') remanufacture_production_id = fields.Many2one('mrp.production', string='') technology_design_ids = fields.One2many('sf.technology.design', 'production_id', string='工艺设计') is_adjust = fields.Boolean('是否退回调整', default=False) @api.depends('remanufacture_production_id') def _compute_remanufacture_production_ids(self): for production in self: if production.remanufacture_production_id: remanufacture_production = self.env['mrp.production'].search( [('id', '=', production.remanufacture_production_id.id)]) if remanufacture_production: production.remanufacture_count = len(remanufacture_production) else: production.remanufacture_count = 0 def action_view_remanufacture_productions(self): self.ensure_one() mrp_production = self.env['mrp.production'].search( [('id', '=', self.remanufacture_production_id.id)]) action = { 'res_model': 'mrp.production', 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_id': mrp_production.id, } return action @api.depends( 'move_raw_ids.state', 'move_raw_ids.quantity_done', 'move_finished_ids.state', 'tool_state', 'workorder_ids.state', 'product_qty', 'qty_producing', 'schedule_state', 'programming_state', 'is_adjust') def _compute_state(self): for production in self: if not production.state or not production.product_uom_id: production.state = 'draft' elif production.state == 'cancel' or (production.move_finished_ids and all( move.state == 'cancel' for move in production.move_finished_ids)): production.state = 'cancel' elif production.workorder_ids and all( wo_state in ('done', 'cancel') for wo_state in production.workorder_ids.mapped('state')): production.state = 'to_close' elif not production.workorder_ids and float_compare(production.qty_producing, production.product_qty, precision_rounding=production.product_uom_id.rounding) >= 0: production.state = 'to_close' elif any(wo_state in ('progress', 'done') for wo_state in production.workorder_ids.mapped('state')): production.state = 'progress' elif production.product_uom_id and not float_is_zero(production.qty_producing, precision_rounding=production.product_uom_id.rounding): production.state = 'progress' elif any(not float_is_zero(move.quantity_done, precision_rounding=move.product_uom.rounding or move.product_id.uom_id.rounding) for move in production.move_raw_ids if move.product_id): production.state = 'progress' # 新添加的状态逻辑 if production.state in ['to_close', 'progress', 'technology_to_confirmed'] and production.schedule_state == '未排': if not production.workorder_ids or production.is_adjust is True: production.state = 'technology_to_confirmed' else: if production.is_adjust is True: production.state = 'technology_to_confirmed' else: production.state = 'confirmed' elif production.state == 'pending_cam' and production.schedule_state == '未排': production.state = 'confirmed' elif production.state == 'to_close' and production.schedule_state == '已排': production.state = 'pending_cam' elif production.state == 'confirmed' and production.is_adjust is True: production.state = 'technology_to_confirmed' if production.state == 'confirmed' and production.schedule_state == '已排': production.state = 'pending_cam' if (production.state == 'rework' and production.tool_state == '0' and production.schedule_state == '已排' and production.is_rework is False): production.state = 'pending_cam' if any((wo.test_results == '返工' and wo.state == 'done' and (production.programming_state in ['已编程'] or wo.individuation_page_PTD is True)) or (wo.is_rework is True and wo.state == 'done' and production.programming_state in ['编程中', '已编程']) for wo in production.workorder_ids): production.state = 'rework' if any(wo.test_results == '报废' and wo.state == 'done' for wo in production.workorder_ids): production.state = 'scrap' if any(dr.test_results == '报废' and dr.handle_result == '已处理' for dr in production.detection_result_ids): production.state = 'cancel' if production.workorder_ids and all( wo_state in ('done', 'rework', 'cancel') for wo_state in production.workorder_ids.mapped('state')): if production.state not in ['scrap', 'rework', 'cancel']: production.state = 'done' elif production.state == 'done': production.state = 'progress' # 退回调整 def technology_back_adjust(self): process_parameters = [] domain = [('state', '=', 'confirmed'), ('origin', '=', self.origin)] if self.production_type == '自动化产线加工': cloud_programming = self._cron_get_programming_state() if cloud_programming['send_state'] == 'sending': raise UserError(_("编程文件正在下发中,请稍后重试")) domain += [('programming_no', '=', self.programming_no)] # 带排程的制造订单 production_confirmed = self.env['mrp.production'].search(domain) for special in production_confirmed.technology_design_ids: if special.process_parameters_id: product_production_process = self.env['product.template'].search( [('server_product_process_parameters_id', '=', special.process_parameters_id.id)]) if not product_production_process: if special.process_parameters_id not in process_parameters: process_parameters.append(special.process_parameters_id.display_name) if process_parameters: raise UserError(_("【工艺设计】-【参数】为%s的在【产品】中不存在,请先创建", ", ".join(process_parameters))) if production_confirmed: production_count = self.env['mrp.production'].search_count([ ('origin', '=', self.origin), ('product_id', '=', self.product_id.id), ('state', '=', 'confirmed') ]) if production_count > 1: return { 'name': _('退回调整'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_re_adjust_wizard_form_view').id, 'form')], 'res_model': 'sf.production.technology.re_adjust.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} else: return { 'name': _('退回调整'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_re_adjust_wizard_confirm_form_view').id, 'form')], 'res_model': 'sf.production.technology.re_adjust.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} # 工艺确认 def technology_confirm(self): process_parameters = [] purchase_orders = [] parameters_not = [] # 获取原有的工单对应的工序 origin_designs = self.workorder_ids.technology_design_id # 获取已删除的工序 deleted_designs = origin_designs - self.technology_design_ids if deleted_designs: for deleted_design in deleted_designs: workorder = self.env['mrp.workorder'].search([('technology_design_id', '=', deleted_design.id)]) purchase = workorder._get_surface_technics_purchase_ids() if purchase.state not in ['cancel', 'draft', False]: purchase_orders.append(purchase.name) special_design = self.technology_design_ids.filtered( lambda a: a.routing_tag == 'special' and a.is_auto is False) for special in special_design: if special.route_id.routing_type == '表面工艺' and not special.process_parameters_id: parameters_not.append(special.route_id.name) if special.process_parameters_id: product_production_process = self.env['product.template'].search( [('server_product_process_parameters_id', '=', special.process_parameters_id.id)]) if not product_production_process: if special.process_parameters_id not in process_parameters: process_parameters.append(special.process_parameters_id.display_name) if purchase_orders: raise UserError(_("请联系工厂生产经理对该(%s)采购订单进行取消", ", ".join(purchase_orders))) if parameters_not: raise UserError(_("【工艺设计】-【工序】为%s未选择参数,请选择", ", ".join(parameters_not))) if process_parameters: raise UserError(_("【工艺设计】-【参数】为%s的在【产品】中不存在,请先创建", ", ".join(process_parameters))) # 判断同一个加工面的标准工序的顺序是否依次排序 error_panel = [] technology_design = self.technology_design_ids.filtered(lambda a: a.routing_tag == 'standard').sorted( key=lambda m: m.sequence) for index, design in enumerate(technology_design): routing_type = design.route_id.routing_type if index < len(technology_design) - 1: next_index = index + 1 next_design = technology_design[next_index] next_design_routing_type = next_design.route_id.routing_type # logging.info('当前工序和加工面: %s-%s' % (design.route_id.name, design.panel)) # logging.info('下一个工序和加工面: %s-%s' % (next_design.route_id.name, next_design.panel)) if design.panel is not False: if design.panel != next_design.panel: if index == 0: raise UserError('【加工面】为%s的标准工序里含有其他加工面的工序,请调整后重试' % design.panel) if routing_type not in ['解除装夹']: raise UserError('【加工面】为%s的标准工序顺序有误,请调整后重试' % design.panel) if design.panel == next_design.panel: if (routing_type == '装夹预调' and next_design_routing_type == '解除装夹') or ( routing_type == 'CNC加工' and next_design_routing_type == '装夹预调'): if design.panel not in error_panel: error_panel.append(design.panel) else: if not error_panel and not process_parameters: production_count = self.env['mrp.production'].search_count([ ('origin', '=', self.origin), ('product_id', '=', self.product_id.id), ('state', '=', 'technology_to_confirmed') ]) if production_count > 1: return { 'name': _('工艺确认'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_wizard_form_view').id, 'form')], 'res_model': 'sf.production.technology.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} else: return { 'name': _('工艺确认'), 'type': 'ir.actions.act_window', 'views': [(self.env.ref( 'sf_manufacturing.sf_production_technology_wizard_confirm_form_view').id, 'form')], 'res_model': 'sf.production.technology.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_origin': self.origin, }} if error_panel: raise UserError(_("【加工面】为%s的标准工序顺序有误,请调整后重试", ", ".join(error_panel))) return True def action_check(self): """ 审核启用 """ self.check_status = True def action_uncheck(self): """ 审核禁用 """ self.check_status = False def archive(self): """ 归档 """ self.write({'active': False}) def unarchive(self): """ 取消归档 """ self.write({'active': True}) @api.depends('request_ids') def _compute_maintenance_count(self): for production in self: production.maintenance_count = len(production.request_ids) # 获取cloud编程单的状态 def _cron_get_programming_state(self): try: if not self: reproduction = self.env['mrp.production'].search( [('state', '=', 'rework'), ('programming_state', '=', '编程中'), ('is_rework', '=', True)]) else: reproduction = self if reproduction: programming_no_set = set([str(item.programming_no) for item in reproduction]) programming_no = list(programming_no_set) programming_no_str = ','.join(programming_no) res = {'programming_no': programming_no_str} logging.info('res=%s:' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/get_state' config_url = configsettings['sf_url'] + url ret = requests.post(config_url, json=res, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) if result['status'] == 1: for item in result['programming_list']: if not self: for rp in reproduction: if rp.programming_no == item['programming_no']: rp.write({'programming_state': '已编程未下发' if item[ 'programming_state'] == '已编程' else '编程中'}) else: return item else: raise UserError(ret['message']) except Exception as e: logging.info('cron_get_programming_state error:%s' % e) # 编程单更新 # 增加触发时间参数 def update_programming_state(self, trigger_time=None): try: manufacturing_type = 'rework' if self.is_scrap: manufacturing_type = 'scrap' elif self.tool_state == '2': manufacturing_type = 'invalid_tool_rework' res = {'programming_no': self.programming_no, 'manufacturing_type': manufacturing_type, 'trigger_time': trigger_time} logging.info('res=%s:' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/reset_state_again' config_url = configsettings['sf_url'] + url ret = requests.post(config_url, json=res, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) logging.info('update_programming_state-ret:%s' % result) if result['status'] == 1: self.write({'is_rework': True}) else: raise UserError(ret['message']) # # 增加对编程记录的更新 # cloud_programming = self._cron_get_programming_state() # if manufacturing_type == 'rework': # self.programming_record_ids.create({ # 'number': len(self.programming_record_ids) + 1, # 'production_id': self.id, # 'reason': '返工', # 'programming_method': cloud_programming['programme_way'], # 'current_programming_count': cloud_programming['reprogramming_num'], # 'target_production_id': cloud_programming['production_order_no'], # 'apply_time': trigger_time, # 'send_time': cloud_programming['send_time'], # }) # elif manufacturing_type == 'scrap': # self.programming_record_ids.create({ # 'number': len(self.programming_record_ids) + 1, # 'production_id': self.id, # 'reason': '报废', # 'programming_method': cloud_programming['programme_way'], # 'current_programming_count': cloud_programming['reprogramming_num'], # 'target_production_id': cloud_programming['production_order_no'], # 'apply_time': trigger_time, # 'send_time': cloud_programming['send_time'], # }) # elif manufacturing_type == 'invalid_tool_rework': # self.programming_record_ids.create({ # 'number': len(self.programming_record_ids) + 1, # 'production_id': self.id, # 'reason': '无效功能刀具', # 'programming_method': cloud_programming['programme_way'], # 'current_programming_count': cloud_programming['reprogramming_num'], # 'target_production_id': cloud_programming['production_order_no'], # 'apply_time': trigger_time, # 'send_time': cloud_programming['send_time'], # }) # else: # logging.info('无对应状态,不需更新编程记录') except Exception as e: logging.info('update_programming_state error:%s' % e) raise UserError("更新编程单状态失败,请联系管理员") # cnc程序获取 def fetchCNC(self, production_names): cnc = self.env['mrp.production'].search([('id', '=', self.id)]) quick_order = False if cnc.product_id.default_code: quick_order = self.env['quick.easy.order'].search( [('name', '=', cnc.product_id.default_code.rsplit('-', 1)[0])]) programme_way = False if cnc.manual_quotation is True: programme_way = 'manual operation' else: programme_way = 'auto' if quick_order: programme_way = 'manual operation' try: res = { 'production_no': production_names, 'machine_tool_code': '', 'product_name': cnc.product_id.name, 'remanufacture_type': '', 'model_code': cnc.product_id.model_code, 'material_code': self.env['sf.production.materials'].search( [('id', '=', cnc.product_id.materials_id.id)]).materials_no, 'material_type_code': self.env['sf.materials.model'].search( [('id', '=', cnc.product_id.materials_type_id.id)]).materials_no, 'machining_processing_panel': cnc.product_id.model_processing_panel, 'machining_precision': '', 'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length, 'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height, 'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width, 'order_no': cnc.origin, 'model_order_no': cnc.product_id.default_code, 'user': cnc.env.user.name, 'programme_way': programme_way, 'model_file': '' if not cnc.product_id.model_file else base64.b64encode( cnc.product_id.model_file).decode('utf-8'), 'part_name': cnc.product_id.part_name, 'part_number': cnc.product_id.part_number, 'machining_drawings': base64.b64encode(cnc.product_id.machining_drawings).decode( 'utf-8') if cnc.product_id.machining_drawings else '', 'machining_drawings_name': cnc.product_id.machining_drawings_name, 'machining_drawings_mimetype': cnc.product_id.machining_drawings_mimetype, } # 打印出除了 model_file 之外的所有键值对 for key, value in res.items(): if key != 'model_file': logging.info('%s: %s' % (key, value)) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/create' config_url = configsettings['sf_url'] + url res['token'] = configsettings['token'] # res_str = json.dumps(res) ret = requests.post(config_url, json={}, data=res, headers=config_header) ret = ret.json() logging.info('fetchCNC-ret:%s' % ret) if ret['status'] == 1: self.write( {'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'}) else: raise UserError(ret['message']) except Exception as e: logging.info('fetchCNC error:%s' % e) raise UserError("cnc程序获取编程单失败,请联系管理员") # 维修模块按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_production_id': self.id, }, 'domain': [('production_id', '=', self.id)], } # 打开维修模块请求 def open_maintenance_request_mo(self): self.ensure_one() action = { 'name': _('Maintenance Requests'), 'view_mode': 'kanban,tree,form,pivot,graph,calendar', 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_production_id': self.id, }, 'domain': [('production_id', '=', self.id)], } if self.maintenance_count == 1: production = self.env['maintenance.request'].search([('production_id', '=', self.id)]) action['view_mode'] = 'form' action['res_id'] = production.id return action def action_generate_serial(self): self.ensure_one() iot_code = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[ 'ir.sequence'].next_by_code('stock.lot.serial') iot_code_name = re.sub('[\u4e00-\u9fa5]', "", iot_code) self.lot_producing_id = self.env['stock.lot'].create({ 'product_id': self.product_id.id, 'company_id': self.company_id.id, 'name': iot_code_name, }) if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids: self.move_finished_ids.filtered( lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id # if self.product_id.tracking == 'serial': # self._set_qty_producing() # 重载根据工序生成工单的程序:如果产品BOM中没有工序时, # 根据产品对应的模板类型中工序,去生成工单; # CNC加工工序的选取规则: # 如果自动报价有带过来预分配的机床, # 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制; # 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心; def _create_workorder3(self, item): for production in self: if not production.bom_id or not production.product_id: continue workorders_values = [] product_qty = production.product_uom_id._compute_quantity(production.product_qty, production.bom_id.product_uom_id) exploded_boms, dummy = production.bom_id.explode(production.product_id, product_qty / production.bom_id.product_qty, picking_type=production.bom_id.picking_type_id) for bom, bom_data in exploded_boms: # If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders. if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[ 'parent_line'].bom_id.operation_ids != bom.operation_ids)): continue for operation in bom.operation_ids: if operation._skip_operation_line(bom_data['product']): continue workorders_values += [{ 'name': operation.name, 'production_id': production.id, 'workcenter_id': operation.workcenter_id.id, 'product_uom_id': production.product_uom_id.id, 'operation_id': operation.id, 'state': 'pending', }] if production.product_id.categ_id.type in ['成品', '坯料']: # # 根据工序设计生成工单 technology_design_ids = sorted(production.technology_design_ids, key=lambda x: x.sequence) for route in technology_design_ids: workorder_has = self.env['mrp.workorder'].search( [('technology_design_id', '=', route.id), ('production_id', '=', production.id)]) if not workorder_has: if route.route_id.routing_type not in ['表面工艺']: workorders_values.append( self.env['mrp.workorder'].json_workorder_str(production, route)) else: product_production_process = self.env['product.template'].search( [('server_product_process_parameters_id', '=', route.process_parameters_id.id)]) workorders_values.append( self.env[ 'mrp.workorder']._json_workorder_surface_process_str( production, route, product_production_process.seller_ids[0].partner_id.id)) production.workorder_ids = workorders_values for workorder in production.workorder_ids: workorder.duration_expected = workorder._get_duration_expected() # 外协出入库单处理 def get_subcontract_pick_purchase(self): production_all = self.sorted(lambda x: x.id) product_id_to_production_names = {} grouped_product_ids = {k: list(g) for k, g in groupby(production_all, key=lambda x: x.product_id.id)} for product_id, pd in grouped_product_ids.items(): product_id_to_production_names[product_id] = [p.name for p in pd] sorted_workorders = None for production in production_all: proc_workorders = [] process_parameter_workorder = self.env['mrp.workorder'].search( [('surface_technics_parameters_id', '!=', False), ('production_id', '=', production.id), ('is_subcontract', '=', True), ('state', '!=', 'cancel')], order='sequence asc') if process_parameter_workorder: # 将这些特殊表面工艺工单的采购单与调拨单置为失效 for workorder in process_parameter_workorder: # workorder._get_surface_technics_purchase_ids().write({'state': 'cancel'}) workorder.move_subcontract_workorder_ids.write({'state': 'cancel'}) workorder.move_subcontract_workorder_ids.picking_id.write({'state': 'cancel'}) sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.sequence) if not sorted_workorders: return for workorders in reversed(sorted_workorders): self.env['stock.picking'].create_outcontract_picking(workorders, production, sorted_workorders) self.env['purchase.order'].get_purchase_order(workorders, production, product_id_to_production_names) # 工单排序 def _reset_work_order_sequence1(self, k): for rec in self: cnc_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工") cnc_back_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工(返工)") for work in rec.workorder_ids: if work.name == cnc_workorder.name and work.processing_panel == k: cnc_back_workorder.write({'sequence': work.sequence + 1}) print(cnc_back_workorder.sequence) elif work.routing_type not in ['装夹预调'] and work != cnc_back_workorder: work.sequence += 1 # 在制造订单上新增工单 def _create_workorder1(self, k): for production in self: if not production.bom_id or not production.product_id: continue workorders_values = [] product_qty = production.product_uom_id._compute_quantity(production.product_qty, production.bom_id.product_uom_id) exploded_boms, dummy = production.bom_id.explode(production.product_id, product_qty / production.bom_id.product_qty, picking_type=production.bom_id.picking_type_id) for bom, bom_data in exploded_boms: # If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders. if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[ 'parent_line'].bom_id.operation_ids != bom.operation_ids)): continue for operation in bom.operation_ids: if operation._skip_operation_line(bom_data['product']): continue workorders_values += [{ 'name': operation.name, 'production_id': production.id, 'workcenter_id': operation.workcenter_id.id, 'product_uom_id': production.product_uom_id.id, 'operation_id': operation.id, 'state': 'pending', }] # 根据加工面板的面数及对应的成品工序模板生成工单 i = 0 production.product_id.model_processing_panel = k for k in (production.product_id.model_processing_panel.split(',')): routingworkcenter = self.env['sf.product.model.type.routing.sort'].search( [('product_model_type_id', '=', production.product_id.product_model_type_id.id)], order='sequence asc' ) i += 1 for route in routingworkcenter: if route.routing_type == 'CNC加工': workorders_values.append( self.env['mrp.workorder'].json_workorder_str1(k, production, route)) production.workorder_ids = workorders_values workorder = self.env['mrp.workorder'].browse(production.workorder_ids.ids) print(workorder) # for item in workorder: # workorder.duration_expected = workorder._get_duration_expected() def _create_workorder2(self, k): self._create_workorder1(k) self._reset_work_order_sequence1(k) return True # 需对不连续工单对应的采购单和外协出入库单做处理 def _reset_subcontract_pick_purchase(self): production_all = self.sorted(lambda x: x.id) product_id_to_production_names = {} grouped_product_ids = {k: list(g) for k, g in groupby(production_all, key=lambda x: x.product_id.id)} for product_id, pd in grouped_product_ids.items(): product_id_to_production_names[product_id] = [p.name for p in pd] for item in production_all: production_process = product_id_to_production_names.get(item.product_id.id) workorder_sf = item.workorder_ids.filtered(lambda sf: sf.routing_type == '表面工艺') for i, workorder in enumerate(workorder_sf): if i == 0: continue elif workorder.sequence != workorder_sf[i - 1].sequence + 1: # workorder.picking_ids.move_ids = False workorder.picking_ids = False purchase_order = self.env['purchase.order'].search( [('state', '=', 'draft'), ('origin', '=', item.name), ('purchase_type', '=', 'consignment')]) server_template = self.env['product.template'].search( [('server_product_process_parameters_id', '=', workorder.surface_technics_parameters_id.id), ('detailed_type', '=', 'service')]) for po in purchase_order: for line in po.order_line: if line.product_id == server_template.product_variant_id: continue if server_template.server_product_process_parameters_id != line.product_id.server_product_process_parameters_id: purchase_order_line = self.env['purchase.order.line'].search( [('product_id', '=', server_template.product_variant_id.id), ('id', '=', line.id), ('product_qty', '=', 1)], limit=1, order='id desc') if purchase_order_line: line.unlink() def _reset_work_order_sequence(self): """ 工单工序排序方法(新) """ for rec in self: workorder_ids = rec.workorder_ids technology_design_ids = rec.technology_design_ids if workorder_ids.filtered(lambda item: item.state in ('返工', 'rework')): # 获取返工后新生成的工单 work_ids = workorder_ids.filtered(lambda item: item.sequence == 0) # 对工单进行逐个插入 for work_id in work_ids: order_rework_ids = rec.workorder_ids.filtered( lambda item: (item.sequence > 0 and work_id.name == item.name and work_id.processing_panel == item.processing_panel)) order_rework_ids = sorted(order_rework_ids, key=lambda item: item.sequence, reverse=True) work_id.sequence = order_rework_ids[0].sequence + 1 # 对该工单之后的工单工序进行加一 work_order_ids = rec.workorder_ids.filtered( lambda item: item.sequence >= work_id.sequence and item.id != work_id.id) for work in work_order_ids: work.sequence = work.sequence + 1 else: # 将工艺设计生成的工单序号赋值给工单的序号 for work in workorder_ids: td_ids = technology_design_ids.filtered( lambda item: (item.route_id.name in work.name and item.process_parameters_id and item.process_parameters_id == work.surface_technics_parameters_id) or (item.route_id.name == work.name and item.panel and item.panel == work.processing_panel)) if work.name == '人工线下加工': td_ids = technology_design_ids.filtered(lambda item: (item.route_id.name in work.name)) if td_ids: work.sequence = td_ids[0].sequence cancel_work_ids = workorder_ids.filtered(lambda item: item.state in ('已取消', 'cancel')) if cancel_work_ids: sequence = max(workorder_ids.filtered(lambda item: item.state not in ('已取消', 'cancel')), key=lambda w: w.sequence).sequence for cw in cancel_work_ids: cw.sequence = sequence + 1 def _reset_work_order_sequence_1(self): """ 工单工序排序方法(旧) """ for rec in self: workorder_ids = rec.workorder_ids.filtered(lambda item: item.state in ('返工', 'rework')) # 产品模型类型 model_type_id = rec.product_id.product_model_type_id # 产品加工面板 model_processing_panel = rec.product_id.model_processing_panel if not workorder_ids: sequence_list = {} if model_type_id: if model_processing_panel: tmpl_num = 1 panel_list = model_processing_panel.split(',') for panel in panel_list: panel_sequence_list = {} # 成品工序 product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids if product_routing_tmpl_ids: for tmpl_id in product_routing_tmpl_ids: panel_sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num}) tmpl_num += 1 sequence_list.update({panel: panel_sequence_list}) # 表面工艺工序 # 模型类型的表面工艺工序模版 surface_tmpl_ids = model_type_id.surface_technics_routing_tmpl_ids # 产品选择的表面工艺参数 model_process_parameters_ids = rec.product_id.model_process_parameters_ids process_dict = {} if model_process_parameters_ids: for process_parameters_id in model_process_parameters_ids: process_id = process_parameters_id.process_id for surface_tmpl_id in surface_tmpl_ids: if process_id == surface_tmpl_id.route_workcenter_id.surface_technics_id: surface_tmpl_name = surface_tmpl_id.route_workcenter_id.name process_dict.update({int(process_id.sequence): '%s-%s' % ( surface_tmpl_name, process_parameters_id.name)}) process_list = sorted(process_dict.keys()) for process_num in process_list: sequence_list.update({process_dict.get(process_num): tmpl_num}) tmpl_num += 1 # 坯料工序 tmpl_num = 1 embryo_routing_tmpl_ids = model_type_id.embryo_routing_tmpl_ids if embryo_routing_tmpl_ids: for tmpl_id in embryo_routing_tmpl_ids: sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num}) tmpl_num += 1 else: raise ValidationError('该产品【加工面板】为空!') else: raise ValidationError('该产品没有选择【模版类型】!') logging.info('sequence_list: %s' % sequence_list) for work in rec.workorder_ids: work_name = work.name logging.info(work_name) if sequence_list.get(work_name): work.sequence = sequence_list[work_name] elif sequence_list.get(work.processing_panel): processing_panel = sequence_list.get(work.processing_panel) if processing_panel.get(work_name): work.sequence = processing_panel[work_name] else: raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name) else: raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name) # 当单个面触发返工时,将新生成的工单插入到返工工单下方,并且后面的所以工单工序重排 elif rec.workorder_ids.filtered(lambda item: item.sequence == 0): # 获取新增的返工工单 work_ids = rec.workorder_ids.filtered(lambda item: item.sequence == 0) # 获取当前返工面最后一个工单工序 sequence_max = sorted( rec.workorder_ids.filtered(lambda item: item.processing_panel == work_ids[0].processing_panel), key=lambda item: item.sequence, reverse=True)[0].sequence # 对当前返工工单之后的工单工序进行重排 work_order_ids = rec.workorder_ids.filtered(lambda item: item.sequence > sequence_max) for work_id in work_order_ids: work_id.sequence = work_id.sequence + 3 # 生成新增的返工工单的工序 # 成品工序 panel_sequence_list = {} product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids if product_routing_tmpl_ids: for tmpl_id in product_routing_tmpl_ids: sequence_max += 1 panel_sequence_list.update({tmpl_id.route_workcenter_id.name: sequence_max}) for work_id in work_ids: work_name = work_id.name if panel_sequence_list.get(work_name): work_id.sequence = panel_sequence_list[work_name] # 创建工单并进行排序 def _create_workorder(self, item): self._create_workorder3(item) self._reset_work_order_sequence() return True def production_process(self, pro_plan): type_map = {'装夹预调': False, 'CNC加工': False, '解除装夹': False} # 最后一次加工结束时间 last_time = pro_plan.date_planned_start # 预置时间 works = self.workorder_ids for index, work in enumerate(works): count = type_map.get(work.routing_type) date_planned_end = None date_planned_start = None if self.production_type == '自动化产线加工': date_planned_start, date_planned_end, last_time = work.auto_production_process(last_time, count, type_map) elif self.production_type == '': date_planned_start, date_planned_end, last_time = work.manual_offline_process(last_time, index) work.update_work_start_end(date_planned_start, date_planned_end) # def def process_range_time(self): for production in self: works = production.workorder_ids pro_plan = self.env['sf.production.plan'].search([('production_id', '=', production.id)], limit=1) if not pro_plan: continue if production.production_type: production.production_process(pro_plan) # 修改标记已完成方法 def button_mark_done1(self): if not self.workorder_ids.filtered(lambda w: w.routing_type not in ['表面工艺']): self._button_mark_done_sanity_checks() if not self.env.context.get('button_mark_done_production_ids'): self = self.with_context(button_mark_done_production_ids=self.ids) res = self._pre_button_mark_done() if res is not True: return res if self.env.context.get('mo_ids_to_backorder'): productions_to_backorder = self.browse(self.env.context['mo_ids_to_backorder']) productions_not_to_backorder = self - productions_to_backorder else: productions_not_to_backorder = self productions_to_backorder = self.env['mrp.production'] backorders = productions_to_backorder and productions_to_backorder._split_productions() backorders = backorders - productions_to_backorder productions_not_to_backorder._post_inventory(cancel_backorder=True) # 查出最后一张工单完成入库操作 # if self.workorder_ids.filtered(lambda w: w.routing_type in ['表面工艺']): # move_finish = self.env['stock.move'].search([('created_production_id', '=', self.id)]) # if move_finish: # move_finish._action_assign() productions_to_backorder._post_inventory(cancel_backorder=True) # if completed products make other confirmed/partially_available moves available, assign them done_move_finished_ids = ( productions_to_backorder.move_finished_ids | productions_not_to_backorder.move_finished_ids).filtered( lambda m: m.state == 'done') done_move_finished_ids._trigger_assign() # Moves without quantity done are not posted => set them as done instead of canceling. In # case the user edits the MO later on and sets some consumed quantity on those, we do not # want the move lines to be canceled. (productions_not_to_backorder.move_raw_ids | productions_not_to_backorder.move_finished_ids).filtered( lambda x: x.state not in ('done', 'cancel')).write({ 'state': 'done', 'product_uom_qty': 0.0, }) for production in self: logging.info('qty_produced:%s' % production.qty_produced) if production.qty_produced == 0.0: production.qty_produced = 1.0 production.write({ 'date_finished': fields.Datetime.now(), 'product_qty': production.qty_produced, 'priority': '0', 'is_locked': True, 'state': 'done', }) for workorder in self.workorder_ids.filtered(lambda w: w.state not in ('done', 'cancel')): workorder.duration_expected = workorder._get_duration_expected() if not backorders: if self.env.context.get('from_workorder'): return { 'type': 'ir.actions.act_window', 'res_model': 'mrp.production', 'views': [[self.env.ref('mrp.mrp_production_form_view').id, 'form']], 'res_id': self.id, 'target': 'main', } if self.user_has_groups( 'mrp.group_mrp_reception_report') and self.picking_type_id.auto_show_reception_report: lines = self.move_finished_ids.filtered(lambda m: m.product_id.type == 'product' and m.state != 'cancel' and m.quantity_done and not m.move_dest_ids) if lines: if any(mo.show_allocation for mo in self): action = self.action_view_reception_report() return action logging.info('last-product_qty:%s' % production.product_qty) return True context = self.env.context.copy() context = {k: v for k, v in context.items() if not k.startswith('default_')} for k, v in context.items(): if k.startswith('skip_'): context[k] = False action = { 'res_model': 'mrp.production', 'type': 'ir.actions.act_window', 'context': dict(context, mo_ids_to_backorder=None, button_mark_done_production_ids=None) } if len(backorders) == 1: action.update({ 'view_mode': 'form', 'res_id': backorders[0].id, }) else: action.update({ 'name': _("Backorder MO"), 'domain': [('id', 'in', backorders.ids)], 'view_mode': 'tree,form', }) return action # 报废 def button_scrap_new(self): cloud_programming = self._cron_get_programming_state() return { 'name': _('报废'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.production.wizard', 'target': 'new', 'context': { 'default_mrp_production_id': self.id, 'is_remanufacture_flag': True, 'default_reprogramming_num': cloud_programming['reprogramming_num'], 'default_programming_states': cloud_programming['programming_state'], 'default_is_reprogramming': True if cloud_programming['programming_state'] in ['已下发'] else False } } # 返工 def button_rework(self): cloud_programming = None if self.programming_state in ['已编程']: cloud_programming = self._cron_get_programming_state() elif self.programming_state is False: cloud_programming = {} result_ids = self.detection_result_ids.filtered(lambda dr: dr.handle_result == '待处理') work_id_list = [] if result_ids: work_id_list = [self.workorder_ids.filtered( lambda wk: (wk.name == result_id.routing_type and wk.processing_panel == result_id.processing_panel and wk.state == 'done')).id for result_id in result_ids] workorder_ids = self.workorder_ids.filtered( lambda wk: wk.technology_design_id.routing_tag == 'standard' and wk.state not in ['rework', 'cancel']) logging.info('标准工艺工单【%s】' % workorder_ids) return { 'name': _('返工'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.rework.wizard', 'target': 'new', 'context': { 'default_production_id': self.id, 'default_workorder_ids': workorder_ids.ids if workorder_ids.ids != [] else self.workorder_ids.ids, 'default_hidden_workorder_ids': ','.join(map(str, work_id_list)) if work_id_list != [] else '', 'default_reprogramming_num': cloud_programming.get('reprogramming_num') if cloud_programming else '', 'default_programming_state': cloud_programming.get('programming_state') if cloud_programming else '', 'default_is_reprogramming': True if cloud_programming and (cloud_programming.get('programming_state') in ['已下发']) else False } } # 更新程序 def do_update_program(self): program_production = self if len(program_production) >= 1: # same_product_id = None # is_not_same_product = 0 for item in program_production: # if same_product_id is None: # same_product_id = item.product_id # if item.product_id != same_product_id: # is_not_same_product += 1 if item.state != "rework" and item.programming_state != "已编程未下发": raise UserError("请选择状态为返工且已编程未下发的制造订单") # if is_not_same_product >= 1: # raise UserError("您选择的记录中含有其他产品的制造订单,请选择同一产品的制造订单") grouped_program_ids = {k: list(g) for k, g in groupby(program_production, key=lambda x: x.programming_no)} program_to_production_names = {} for programming_no, program_production in grouped_program_ids.items(): program_to_production_names[programming_no] = [production.name for production in program_production] for production in self: if production.programming_no in program_to_production_names: productions_not_delivered = self.env['mrp.production'].search( [('programming_no', '=', production.programming_no), ('programming_state', '=', '已编程未下发')]) productions = self.env['mrp.production'].search( [('programming_no', '=', production.programming_no), ('state', 'not in', ('cancel', 'done'))]) rework_workorder = production.workorder_ids.filtered(lambda m: m.state == 'rework') if rework_workorder: for rework_item in rework_workorder: pending_workorder = production.workorder_ids.filtered( lambda m1: m1.state in [ 'pending'] and m1.processing_panel == rework_item.processing_panel and m1.routing_type == 'CNC加工') if not pending_workorder.cnc_ids: production.get_new_program(rework_item.processing_panel) # production.write({'state': 'progress', 'programming_state': '已编程', 'is_rework': False}) productions_not_delivered.write( {'state': 'progress', 'programming_state': '已编程', 'is_rework': False}) # 对制造订单所以面的cnc工单的程序用刀进行校验 try: logging.info(f'已更新制造订单:{productions_not_delivered}') productions.production_cnc_tool_checkout() except Exception as e: logging.info(f'对cnc工单的程序用刀进行校验报错:{e}') # 从cloud获取重新编程过的最新程序 def get_new_program(self, processing_panel): try: res = {'programming_no': self.programming_no, 'processing_panel': processing_panel} configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/get_new_program' config_url = configsettings['sf_url'] + url r = requests.post(config_url, json=res, data=None, headers=config_header) r = r.json() result = json.loads(r['result']) if result['status'] == 1: program_path_tmp_panel = os.path.join('/tmp', result['folder_name'], 'return', processing_panel) if os.path.exists(program_path_tmp_panel): files_r = os.listdir(program_path_tmp_panel) if files_r: for file_name in files_r: file_path = os.path.join(program_path_tmp_panel, file_name) os.remove(file_path) download_state = self.env['sf.cnc.processing'].download_file_tmp(result['folder_name'], processing_panel) if download_state is False: raise UserError('编程单号为%s的CNC程序文件从FTP拉取失败' % (self.programming_no)) productions = self.env['mrp.production'].search( [('programming_no', '=', self.programming_no), ('state', 'not in', ('cancel', 'done'))]) if productions: for production in productions: panel_workorder = production.workorder_ids.filtered(lambda pw: pw.processing_panel == processing_panel and pw.routing_type == 'CNC加工' and pw.state not in ( 'rework', 'done')) if panel_workorder: if panel_workorder.cmm_ids: panel_workorder.cmm_ids.sudo().unlink() if panel_workorder.cnc_ids: panel_workorder.cnc_ids.sudo().unlink() # self.env['sf.cam.work.order.program.knife.plan'].sudo().unlink_cam_plan( # production) # program_path_tmp_panel = os.path.join('C://Users//43484//Desktop//fsdownload//test', # processing_panel) logging.info('program_path_tmp_panel:%s' % program_path_tmp_panel) files_panel = os.listdir(program_path_tmp_panel) if files_panel: for file in files_panel: file_extension = os.path.splitext(file)[1] if file_extension.lower() == '.pdf': panel_file_path = os.path.join(program_path_tmp_panel, file) logging.info('panel_file_path:%s' % panel_file_path) panel_workorder.write( {'cnc_ids': panel_workorder.cnc_ids.sudo()._json_cnc_processing(processing_panel, result), 'cmm_ids': panel_workorder.cmm_ids.sudo()._json_cmm_program(processing_panel, result), 'cnc_worksheet': base64.b64encode(open(panel_file_path, 'rb').read())}) logging.info('len(cnc_worksheet):%s' % len(panel_workorder.cnc_worksheet)) pre_workorder = production.workorder_ids.filtered(lambda ap: ap.routing_type == '装夹预调' and ap.processing_panel == processing_panel and ap.state not in ( 'rework', 'done')) if pre_workorder: pre_workorder.write( {'processing_drawing': base64.b64encode(open(panel_file_path, 'rb').read())}) # if production.state == 'rework' and production.programming_state == '已编程未下发': # production.write( # {'state': 'progress', 'programming_state': '已编程', 'is_rework': False}) # logging.info('返工含有已编程未下发的程序更新完成:%s' % production.name) else: raise UserError(result['message']) except Exception as e: logging.info('get_new_program error:%s' % e) raise UserError("从云平台获取最新程序失败,请联系管理员") def recreateManufacturing(self, item): """ 重新生成制造订单 """ if self.is_scrap is True: procurement_requests = [] sale_order = self.env['sale.order'].sudo().search([('name', '=', self.origin)]) # 查询出库移动记录 out_picking = self.env['stock.picking'].search( [('origin', '=', sale_order.name), ('name', 'ilike', 'WH/OUT/')]) move = out_picking.move_ids.filtered(lambda pd: pd.product_id == self.product_id) move_values = {'product_description_variants': '', 'date_planned': fields.Datetime.now(), 'date_deadline': fields.Datetime.now(), 'move_dest_ids': move, 'group_id': move.group_id, 'route_ids': [], 'warehouse_id': self.warehouse_id, 'priority': 0, 'orderpoint_id': False, 'product_packaging_id': False} procurement_requests.append(self.env['procurement.group'].Procurement( move.product_id, 1.0, move.product_uom, move.location_id, move.rule_id and move.rule_id.name or "/", sale_order.name, move.company_id, move_values)) self.env['procurement.group'].run(procurement_requests, raise_user_error=not self.env.context.get('from_orderpoint')) productions = self.env['mrp.production'].sudo().search( [('origin', '=', self.origin)], order='id desc', limit=1) productions.write({'programming_no': self.programming_no, 'is_remanufacture': True}) # mo_move = self.env['stock.move'].search( # [('origin', '=', sale_order.name), ('reference', 'ilike', 'WH/MO/')]) # if mo_move: # sfp_move = self.env['stock.move'].search( # [('origin', '=', sale_order.name), ('reference', 'ilike', 'WH/SFP/')], limit=1) # mo_move.write({'reference': sfp_move.reference, 'partner_id': sfp_move.partner_id.id, # 'picking_id': sfp_move.picking_id.id, 'picking_type_id': sfp_move.picking_type_id.id, # 'production_id': False}) # productions.procurement_group_id.mrp_production_ids.move_dest_ids.write( # {'group_id': self.env['procurement.group'].search([('name', '=', sale_order.name)])}) stock_picking_remanufacture = self.env['stock.picking'].search([('origin', '=', productions.name)]) for pick in stock_picking_remanufacture: if pick.name.startswith('WH/PC/') or pick.name.startswith('WH/INT/'): if pick.move_ids: product_type_id = pick.move_ids[0].product_id.categ_id if product_type_id.name == '坯料': location_id = self.env['stock.location'].search([('name', '=', '坯料存货区')]) if not location_id: logging.info(f'没有搜索到【坯料存货区】: {location_id}') break if pick.picking_type_id.name == '内部调拨': if pick.location_dest_id.product_type != product_type_id: pick.location_dest_id = location_id.id elif pick.picking_type_id.name == '生产发料': if pick.location_id.product_type != product_type_id: pick.location_id = location_id.id scarp_process_parameter_workorder = self.env['mrp.workorder'].search( [('surface_technics_parameters_id', '!=', False), ('production_id', '=', self.id), ('is_subcontract', '=', True)]) if scarp_process_parameter_workorder: production_programming = self.env['mrp.production'].search( [('programming_no', '=', self.programming_no), ('id', '!=', productions.id)], order='name asc') production_list = [production.name for production in production_programming] purchase_orders = self.env['purchase.order'].search([('origin', 'ilike', ','.join(production_list))]) for purchase_item in purchase_orders.order_line: for process_item in scarp_process_parameter_workorder: if purchase_item.product_id.categ_type == '表面工艺': if purchase_item.product_id.server_product_process_parameters_id == process_item.surface_technics_parameters_id: if purchase_orders.origin.find(productions.name) == -1: purchase_orders.origin += ',' + productions.name if item['is_reprogramming'] is False: productions.programming_state = '已编程' else: productions.programming_state = '编程中' return productions # 在之前的销售单上重新生成制造订单 def create_production1_values(self, production): production_values_str = {'origin': production.origin, 'product_id': production.product_id.id, 'programming_state': '已编程', 'product_description_variants': production.product_description_variants, 'product_qty': production.product_qty, 'product_uom_id': production.product_uom_id.id, 'location_src_id': production.location_src_id.id, 'location_dest_id': production.location_dest_id.id, 'bom_id': production.bom_id.id, 'date_deadline': production.date_deadline, 'date_planned_start': production.date_planned_start, 'date_planned_finished': production.date_planned_finished, # 'procurement_group_id': self.env["procurement.group"].create( # {'name': production.name}).id, 'propagate_cancel': production.propagate_cancel, 'orderpoint_id': production.orderpoint_id.id, 'picking_type_id': production.picking_type_id.id, 'company_id': production.company_id.id, 'move_dest_ids': production.move_dest_ids.ids, 'user_id': production.user_id.id} return production_values_str # 增加制造订单类型 production_type = fields.Selection( [('自动化产线加工', '自动化产线加工'), ('人工线下加工', '人工线下加工')], string='制造类型', compute='_compute_production_type', store=True ) @api.depends('product_id.is_manual_processing') def _compute_production_type(self): for production in self: production.production_type = '自动化产线加工' if not production.product_id.is_manual_processing else '人工线下加工' @api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id') def _compute_sale_order_count(self): for production in self: if production.sale_order_id: production.sale_order_count = 1 else: production.sale_order_count = 0 def action_view_sale_orders(self): if self.sale_order_id: action = { 'res_model': 'sale.order', 'type': 'ir.actions.act_window', } action.update({ 'view_mode': 'form', 'res_id': self.sale_order_id.id, }) return action @api.model_create_multi def create(self, vals_list): """ 重载创建制造订单的方法,单个制造订单,同一成品只创建一个采购组,用于后续单据的创建 """ product_group_id = {} is_custemer_group_id = {} # 客供料与非客供料 for vals in vals_list: if not vals.get('name', False) or vals['name'] == _('New'): picking_type_id = vals.get('picking_type_id') if not picking_type_id: picking_type_id = self._get_default_picking_type_id(vals.get('company_id', self.env.company.id)) vals['picking_type_id'] = picking_type_id vals['name'] = self.env['stock.picking.type'].browse(picking_type_id).sequence_id.next_by_id() product_id = self.env['product.product'].browse(vals['product_id']) is_self_process = product_id.materials_type_id and product_id.materials_type_id.gain_way and product_id.materials_type_id.gain_way != '自加工' is_customer_provided = product_id.is_customer_provided key = f"{is_self_process}_{is_customer_provided}" if not is_custemer_group_id.get(key): is_custemer_group_id[key] = self.env["procurement.group"].create({'name': vals.get('name')}).id # if not (is_first_customer or is_first_not_customer) and is_self_process: # is_first = True # group_id = self.env["procurement.group"].create({'name': vals.get('name')}).id if not vals.get('procurement_group_id'): if product_id.product_tmpl_id.single_manufacturing: if product_id.categ_id.name == '成品': vals['procurement_group_id'] = is_custemer_group_id[key] continue if product_id.id not in product_group_id.keys(): procurement_group_vals = self._prepare_procurement_group_vals(vals) procurement_group_id = self.env["procurement.group"].create(procurement_group_vals).id vals['procurement_group_id'] = procurement_group_id product_group_id[product_id.id] = procurement_group_id else: vals['procurement_group_id'] = product_group_id[product_id.id] else: vals['procurement_group_id'] = is_custemer_group_id[key] return super(MrpProduction, self).create(vals_list) @api.depends('procurement_group_id.stock_move_ids.created_purchase_line_id.order_id', 'procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id') def _compute_purchase_order_count(self): for production in self: # 找到来源的第一张制造订单的采购组 if production.product_id.product_tmpl_id.single_manufacturing == True: first_production = self.env['mrp.production'].search( [('origin', '=', production.origin), ('product_id', '=', production.product_id.id)], limit=1, order='id asc') production.purchase_order_count = len( first_production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | first_production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id) else: production.purchase_order_count = len( production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id) @api.depends('procurement_group_id', 'procurement_group_id.stock_move_ids.group_id') def _compute_picking_ids(self): for order in self: if order.product_id.product_tmpl_id.single_manufacturing == True and not order.is_remanufacture: first_order = self.env['mrp.production'].search( [('origin', '=', order.origin), ('product_id', '=', order.product_id.id)], limit=1, order='id asc') order.picking_ids = self.env['stock.picking'].search([ ('group_id', '=', first_order.procurement_group_id.id), ('group_id', '!=', False), ]) order.delivery_count = len(first_order.picking_ids) else: order.picking_ids = self.env['stock.picking'].search([ ('group_id', '=', order.procurement_group_id.id), ('group_id', '!=', False), ]) order.delivery_count = len(order.picking_ids) def action_view_purchase_orders(self): self.ensure_one() if self.is_remanufacture: production = self elif self.product_id.product_tmpl_id.single_manufacturing == True: production = self.env['mrp.production'].search( [('origin', '=', self.origin), ('product_id', '=', self.product_id.id)], limit=1, order='id asc') else: production = self purchase_order_ids = ( production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id).ids action = { 'res_model': 'purchase.order', 'type': 'ir.actions.act_window', } if len(purchase_order_ids) == 1: action.update({ 'view_mode': 'form', 'res_id': purchase_order_ids[0], }) else: action.update({ 'name': _("Purchase Order generated from %s", self.name), 'domain': [('id', 'in', purchase_order_ids)], 'view_mode': 'tree,form', }) return action def _subcontract_sanity_check(self): for production in self: if production.product_tracking != 'none' and not self.lot_producing_id: raise UserError(_('You must enter a serial number for %s') % production.product_id.name) for sml in production.move_raw_ids.move_line_ids: if sml.tracking != 'none' and not sml.lot_id: picking_ids = production.picking_ids.filtered( lambda p: p.state not in ['done', 'cancel']) picking_num = len(picking_ids) picking_info = ', '.join( ['%s:%s' % (picking.picking_type_id.name, picking.name) for picking in picking_ids] ) if picking_info: raise UserError(_('You have %s incomplete supplies: %s') % ( picking_num, picking_info)) else: raise UserError( _('You must enter a serial number for each line of %s') % sml.product_id.display_name) return True reprogramming_count = fields.Integer(string='重新编程次数', default=0) # 申请编程 def action_apply_programming(self): """ 检查前置条件:制造订单【状态】=“待排程、待加工”,制造订单的【编程状态】=“已编程”。 """ print('申请编程') if len(self) > 1: raise UserError('仅支持选择单个制造订单进行编程申请,请重新选择') for production in self: if production.state not in ['confirmed', 'pending_cam'] or production.programming_state != '已编程': raise UserError('不可操作。所选制造订单必须同时满足如下条件:\n1、制造订单状态:待排程 或 待加工;\n2、制造订单编程状态:已编程。\n请检查!') cloud_programming = production._cron_get_programming_state() if cloud_programming['programming_state'] in ['待编程', '已编程', '编程中']: raise UserError("当前编程单正在重新编程,请注意查看当前制造订单的“编程记录”确认进度!") return { 'type': 'ir.actions.act_window', 'res_model': 'sf.programming.reason', 'view_mode': 'form', 'target': 'new', 'context': { 'default_production_id': self.id, 'active_id': self.id, # 传当前时间 'default_apply_time': fields.Datetime.now(), }, 'view_id': self.env.ref('sf_manufacturing.sf_programming_reason_form_view').id, } # 编程记录 programming_record_ids = fields.One2many('sf.programming.record', 'production_id') # 编程单更新 def re_programming_update_programming_state(self): try: res = {'programming_no': self.programming_no, 'manufacturing_type': ''} logging.info('res=%s:' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/reset_state_again' config_url = configsettings['sf_url'] + url ret = requests.post(config_url, json=res, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) logging.info('update_programming_state-ret:%s' % result) if result['status'] == 1: self.write({'is_rework': True}) else: raise UserError(ret['message']) except Exception as e: logging.info('update_programming_state error:%s' % e) raise UserError("更新编程单状态失败,请联系管理员") # 编程记录 class sf_programming_record(models.Model): _name = 'sf.programming.record' _description = "编程记录" production_id = fields.Many2one('mrp.production') # 编号、编程原因、编程方式、当前编程次数、目标制造单号、申请时间、下发时间 number = fields.Char('编号') reason = fields.Text('重新编程原因') programming_method = fields.Selection([ ('auto', '自动'), ('manual operation', '人工')], string="编程方式") current_programming_count = fields.Integer('当前编程次数') target_production_id = fields.Char('目标制造单号') apply_time = fields.Datetime('申请时间') send_time = fields.Datetime('下发时间') class sf_detection_result(models.Model): _name = 'sf.detection.result' _description = "检测结果" _order = 'handle_result_date desc, id asc' production_id = fields.Many2one('mrp.production') processing_panel = fields.Char('加工面') routing_type = fields.Selection([ ('装夹预调', '装夹预调'), ('CNC加工', 'CNC加工'), ('解除装夹', '解除装夹'), ('切割', '切割'), ('表面工艺', '表面工艺'), ('线切割', '线切割'), ('人工线下加工', '人工线下加工')], string="工序类型") rework_reason = fields.Selection( [("programming", "编程"), ("cutter", "刀具"), ("clamping", "装夹"), ("operate computer", "操机"), ("technology", "工艺"), ("customer redrawing", "客户改图")], string="原因", tracking=True) detailed_reason = fields.Text('详细原因') test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], string="检测结果", tracking=True) test_report = fields.Binary('检测报告', readonly=True) handle_result = fields.Selection([("待处理", "待处理"), ("已处理", "已处理")], default='', string="处理结果", tracking=True) handle_result_date = fields.Datetime('处理时间') handle_result_user = fields.Many2one('res.users', '处理人') # 查看检测报告 def button_look_test_report(self): return { 'res_model': 'sf.detection.result', 'type': 'ir.actions.act_window', 'res_id': self.id, 'views': [(self.env.ref('sf_manufacturing.sf_test_report_form').id, 'form')], 'target': 'new' } def write(self, vals): if vals.get('handle_result') and vals.get('handle_result') == '已处理': vals['handle_result_date'] = fields.Datetime.now() vals['handle_result_user'] = self.env.user.id return super(sf_detection_result, self).write(vals) class sf_processing_panel(models.Model): _name = 'sf.processing.panel' _description = "加工面" name = fields.Char('加工面') active = fields.Boolean('有效', default=True)