import re import json import logging import base64 import urllib.parse from datetime import date from datetime import datetime, timedelta import requests import os import math from lxml import etree from dateutil.relativedelta import relativedelta # import subprocess from odoo import api, fields, models, SUPERUSER_ID, _ from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError, ValidationError from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' _order = 'sequence asc' product_tmpl_name = fields.Char('坯料产品名称', related='production_bom_id.bom_line_ids.product_id.name') product_tmpl_id_length = fields.Float(string='坯料长度(mm)', related='material_length', readonly=True, store=False) product_tmpl_id_width = fields.Float(string='坯料宽度(mm)', related='material_width', readonly=True, store=False) product_tmpl_id_height = fields.Float(string='坯料高度(mm)', related='material_height', readonly=True, store=False) # product_tmpl_id_length = fields.Float(related='production_id.product_tmpl_id.length', readonly=True, store=True, # string="坯料长度(mm)") # product_tmpl_id_width = fields.Float(related='production_id.product_tmpl_id.width', readonly=True, store=True, # string="坯料宽度(mm)") # product_tmpl_id_height = fields.Float(related='production_id.product_tmpl_id.height', readonly=True, store=True, # string="坯料高度(mm)") product_tmpl_id_materials_id = fields.Many2one(related='production_id.product_tmpl_id.materials_id', readonly=True, store=True, check_company=True, string="材料") product_tmpl_id_materials_type_id = fields.Many2one(related='production_id.product_tmpl_id.materials_type_id', readonly=True, store=True, check_company=True, string="型号") # workcenter_id = fields.Many2one('mrp.workcenter', string='工作中心', required=False) users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids") processing_panel = fields.Char('加工面') processing_panel_selection = fields.Selection([ ('ZM', 'ZM'), ('FM', 'FM'), ('YC', 'YC'), ('QC', 'QC'), ('HC', 'HC'), ('ZC', 'ZC')], string="加工面", compute='_compute_processing_panel_selection', store=True) sequence = fields.Integer(string='工序') routing_type = fields.Selection([ ('装夹预调', '装夹预调'), ('CNC加工', 'CNC加工'), ('解除装夹', '解除装夹'), ('切割', '切割'), ('表面工艺', '表面工艺'), ('线切割', '线切割'), ('人工线下加工', '人工线下加工') ], string="工序类型") results = fields.Char('结果') state = fields.Selection([ ('pending', '等待其他工单'), ('waiting', '等待组件'), ('ready', '就绪'), ('progress', '进行中'), ('to be detected', "待检测"), ('done', '已完工'), ('rework', '返工'), ('cancel', '取消')], string='Status', compute='_compute_state', store=True, default='pending', copy=False, readonly=True, recursive=True, index=True, tracking=True) delivery_warning = fields.Selection([('normal', '正常'), ('warning', '告警'), ('overdue', '逾期')], string='时效', tracking=True) back_button_display = fields.Boolean(default=False, compute='_compute_back_button_display', store=True) @api.depends('state') def _compute_back_button_display(self): for record in self: sorted_workorders = record.production_id.workorder_ids.filtered(lambda w: w.state != 'cancel').sorted( key=lambda w: w.sequence) if not sorted_workorders: continue position = next((idx for idx, workorder in enumerate(sorted_workorders) if workorder.id == record.id), -1) cur_workorder = sorted_workorders[position] if position == len(sorted_workorders) - 1: picking_ids = cur_workorder.production_id.sale_order_id.picking_ids finished_product_area = picking_ids.filtered( lambda picking: picking.location_dest_id.name == '成品存货区' and picking.state == 'done' ) if finished_product_area: moves = self.env['stock.move'].search([ ('name', '=', cur_workorder.production_id.name), ('state', '!=', 'cancel') ]) finish_move = next((move for move in moves if move.location_dest_id.name == '制造后'), None) if not finish_move and not cur_workorder.is_subcontract and not cur_workorder.name == '解除装夹': record.back_button_display = True else: record.back_button_display = any( finish_move.move_dest_ids.ids not in move.ids and record.state == 'done' for picking in finished_product_area for move in picking.move_ids ) else: if record.state == 'done': record.back_button_display = True else: record.back_button_display = False # tag_type if cur_workorder.is_subcontract or cur_workorder.name == '解除装夹' or any( detection_result.processing_panel == cur_workorder.processing_panel and detection_result.routing_type == cur_workorder.routing_type and cur_workorder.tag_type !='重新加工' for detection_result in cur_workorder.production_id.detection_result_ids ): record.back_button_display = False else: next_workorder = sorted_workorders[position + 1] next_state = next_workorder.state if (next_state == 'ready' or ( next_workorder.state == 'waiting' and next_workorder.is_subcontract)) and cur_workorder.state == 'done': record.back_button_display = True else: record.back_button_display = False if cur_workorder.is_subcontract or cur_workorder.name == '解除装夹' or any( detection_result.processing_panel == cur_workorder.processing_panel and detection_result.routing_type == cur_workorder.routing_type and cur_workorder.tag_type !='重新加工' for detection_result in cur_workorder.production_id.detection_result_ids ): record.back_button_display = False date_planned_start = fields.Datetime(tracking=True) @api.depends('processing_panel') def _compute_processing_panel_selection(self): for record in self: if record.processing_panel in ['ZM', 'FM', 'YC', 'QC', 'HC', 'ZC']: record.processing_panel_selection = record.processing_panel else: record.processing_panel_selection = False @api.depends('production_id.manual_quotation') def _compute_manual_quotation(self): for item in self: item.manual_quotation = item.production_id.manual_quotation manual_quotation = fields.Boolean('人工编程', default=False, compute=_compute_manual_quotation, store=True) def button_back(self): if self.production_id.state == 'rework': raise UserError('制造订单为返工时不能进行工单回退') sorted_workorders = self.production_id.workorder_ids.filtered(lambda w: w.state != 'cancel').sorted( key=lambda w: w.sequence) position = next((idx for idx, workorder in enumerate(sorted_workorders) if workorder.id == self.id), -1) cur_workorder = sorted_workorders[position] if position == len(sorted_workorders) - 1: # 末工序 picking_ids = cur_workorder.production_id.sale_order_id.picking_ids finished_product_area = picking_ids.filtered( lambda picking: picking.location_dest_id.name == '成品存货区' and picking.state == 'done' ) moves = self.env['stock.move'].search([ ('name', '=', cur_workorder.production_id.name), ('state', '!=', 'cancel') ]) finish_move = next((move for move in moves if move.location_dest_id.name == '制造后'), None) or [] if any( finish_move.move_dest_ids.ids in move.ids for picking in finished_product_area for move in picking.move_ids ): raise UserError('已入库,无法回退') else: moves = self.env['stock.move'].search([ ('name', '=', cur_workorder.production_id.name), ('state', '!=', 'cancel') ]) move_lines = self.env['stock.move.line'].search([ ('reference', '=', cur_workorder.production_id.name), ('state', '!=', 'cancel') ]) moves.state = 'assigned' external_assistance = move_lines.filtered( lambda picking: picking.location_id.name != '外协线边仓' ) external_assistance.state = 'assigned' # move_lines.state = 'assigned' self.time_ids.date_end = None cur_workorder.state = 'progress' cur_workorder.production_id.state = 'progress' quality_check = self.env['quality.check'].search( [('workorder_id', '=', self.id)]) for check_order in quality_check: if check_order.point_id.is_inspect: check_order.quality_state = 'waiting' else: check_order.quality_state = 'none' # move_dest_ids finished_quants = moves.mapped('move_line_ids.lot_id.quant_ids') finished_quants.quantity = 0 finish_move = next((move for move in moves if move.location_dest_id.name == '制造后'), None) finish_move.move_dest_ids.reserved_availability = 0 finish_move.move_dest_ids.move_line_ids.state = 'draft' finish_move.move_dest_ids.move_line_ids.unlink() # finish_move.move_dest_ids.move_line_ids.reserved_uom_qty = 0 else: next_workorder = sorted_workorders[position + 1] next_state = next_workorder.state if next_state not in ['pending', 'waiting', 'ready']: raise UserError('下工序已经开始,无法回退') if next_workorder.is_subcontract: next_workorder.picking_ids.write({'state': 'waiting'}) next_workorder.state = 'pending' self.time_ids.date_end = None cur_workorder.state = 'progress' cur_workorder.production_id.state = 'progress' quality_check = self.env['quality.check'].search( [('workorder_id', '=', self.id)]) for check_order in quality_check: if check_order.point_id.is_inspect: check_order.quality_state = 'waiting' else: check_order.quality_state = 'none' def _compute_working_users(self): super()._compute_working_users() for item in self: if item.state == 'to be detected': if self.env.user.has_group('sf_base.group_sf_equipment_user'): item.is_user_working = True @api.onchange('users_ids') def get_user_permissions(self): uid = self.env.uid for workorder in self: if workorder.users_ids: list_user_id = [] for item in workorder.users_ids: list_user_id.append(item.id) if uid in list_user_id: workorder.user_permissions = True else: workorder.user_permissions = False else: workorder.user_permissions = False user_permissions = fields.Boolean('用户权限', compute='get_user_permissions') programming_no = fields.Char('编程单号') work_state = fields.Char('业务状态') programming_state = fields.Char('编程状态') cnc_worksheet = fields.Binary( '工作指令', readonly=True) material_center_point = fields.Char(string='坯料中心点') X1_axis = fields.Float(default=0) Y1_axis = fields.Float(default=0) Z1_axis = fields.Float(default=0) X2_axis = fields.Float(default=0) Y2_axis = fields.Float(default=0) Z2_axis = fields.Float(default=0) X3_axis = fields.Float(default=0) Y3_axis = fields.Float(default=0) Z3_axis = fields.Float(default=0) X4_axis = fields.Float(default=0) Y4_axis = fields.Float(default=0) Z4_axis = fields.Float(default=0) X5_axis = fields.Float(default=0) Y5_axis = fields.Float(default=0) Z5_axis = fields.Float(default=0) X6_axis = fields.Float(default=0) Y6_axis = fields.Float(default=0) Z6_axis = fields.Float(default=0) X7_axis = fields.Float(default=0) Y7_axis = fields.Float(default=0) Z7_axis = fields.Float(default=0) X8_axis = fields.Float(default=0) Y8_axis = fields.Float(default=0) Z8_axis = fields.Float(default=0) X9_axis = fields.Float(default=0) Y9_axis = fields.Float(default=0) Z9_axis = fields.Float(default=0) X10_axis = fields.Float(default=0) Y10_axis = fields.Float(default=0) Z10_axis = fields.Float(default=0) X_deviation_angle = fields.Integer(string="X轴偏差度", default=0) test_results = fields.Selection([("合格", "合格"), ("返工", "返工")], default='合格', string="检测结果", tracking=True) cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工程序") cmm_ids = fields.One2many("sf.cmm.program", 'workorder_id', string="CMM程序") tray_code = fields.Char(string="托盘编码") glb_file = fields.Binary("glb模型文件", related='production_id.model_file') is_subcontract = fields.Boolean(string='是否外协') surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数") picking_ids = fields.Many2many('stock.picking', string='外协出入库单', compute='_compute_surface_technics_picking_ids') purchase_id = fields.Many2many('purchase.order', string='外协采购单') surface_technics_picking_count = fields.Integer("外协出入库", compute='_compute_surface_technics_picking_ids') surface_technics_purchase_count = fields.Integer("外协采购", compute='_compute_surface_technics_purchase_ids') # 是否绑定托盘 is_trayed = fields.Boolean(string='是否绑定托盘', default=False) tag_type = fields.Selection([("重新加工", "重新加工")], string="标签", tracking=True) technology_design_id = fields.Many2one('sf.technology.design') def _compute_default_construction_period_status(self): need_list = ['pending', 'waiting', 'ready', 'progress', 'to be detected', 'done'] try: if self.state not in need_list: return False if not self.date_planned_finished: return False hours = self.get_hours_diff() if hours >= 12: return '正常' elif hours > 0 and hours < 12 and self.state != 'done': return '预警' elif hours > 0 and hours < 12 and self.state == 'done': return '正常' else: return '已逾期' except Exception as e: logging.error("Error processing production ID {}: {}".format(self.id, e)) raise e @api.depends('state', 'date_planned_finished') def _compute_construction_period_status(self): for worker in self: construction_period_status = worker._compute_default_construction_period_status() if construction_period_status and worker.construction_period_status != construction_period_status: worker.construction_period_status = construction_period_status construction_period_status = fields.Selection([('正常', '正常'), ('预警', '预警'), ('已逾期', '已逾期')], string='工期状态', store=True, compute='_compute_construction_period_status', default=lambda self: self._compute_default_construction_period_status()) def get_page_all_records(self, model_name, func, domain, page_size=100): # 获取模型对象 model = self.env[model_name].sudo() # 初始化分页参数 page_number = 1 while True: # 计算偏移量 offset = (page_number - 1) * page_size # 获取当前页的数据 records = model.search(domain, limit=page_size, offset=offset) # 如果没有更多记录,退出循环 if not records: break # 将当前页的数据添加到结果列表 func(records) # 增加页码 page_number += 1 def run_compute_construction_period_status(self, records): records._compute_construction_period_status() def _corn_update_construction_period_status(self): need_list = ['pending', 'waiting', 'ready', 'progress', 'to be detected'] # need_list = [ # 'progress', # 'to be detected'] self.get_page_all_records('mrp.workorder', self.run_compute_construction_period_status, [('state', 'in', need_list)], 100) def get_hours_diff(self): # 获取当前日期和时间 current_datetime = fields.Datetime.now() # 将 date_field 转换为 datetime 对象 if self.date_planned_finished: date_obj = fields.Datetime.from_string(self.date_planned_finished) # 将 date 对象转换为 datetime 对象,设置时间为 00:00:00 # date_obj = datetime.datetime.combine(date_obj, datetime.time.min) # 计算两个日期之间的差值 delta = date_obj - current_datetime # 返回差值的小时数 return int(delta.total_seconds() / 3600) else: return 0.0 @api.depends('name', 'production_id.name') def _compute_surface_technics_picking_ids(self): for workorder in self: if workorder.routing_type == '表面工艺': domain = [('origin', '=', workorder.production_id.name), ('state', 'not in', ['cancel']), ('partner_id', '=', workorder.supplier_id.id)] previous_workorder = self.env['mrp.workorder'].search( [('sequence', '=', workorder.sequence - 1), ('routing_type', '=', '表面工艺'), ('production_id', '=', workorder.production_id.id)]) # if previous_workorder: # if previous_workorder.supplier_id != workorder.supplier_id: # domain += [('surface_technics_parameters_id', '=', workorder.surface_technics_parameters_id.id)] # else: domain += [('surface_technics_parameters_id', '=', workorder.surface_technics_parameters_id.id)] picking_ids = self.env['stock.picking'].search(domain, order='id asc') workorder.surface_technics_picking_count = len(picking_ids) workorder.picking_ids = picking_ids.ids else: workorder.surface_technics_picking_count = 0 def action_view_surface_technics_picking(self): self.ensure_one() action = self.env["ir.actions.actions"]._for_xml_id("stock.action_picking_tree_all") if len(self.picking_ids) > 1: action['domain'] = [('id', 'in', self.picking_ids.ids)] elif self.picking_ids: # action['name'] = '工艺外协' action['res_id'] = self.picking_ids.id action['views'] = [(self.env.ref('stock.view_picking_form').id, 'form')] if 'views' in action: action['views'] += [(state, view) for state, view in action['views'] if view != 'form'] action['context'] = dict(self._context, default_origin=self.name) return action @api.depends('state', 'production_id.name') def _compute_surface_technics_purchase_ids(self): for order in self: if order.routing_type == '表面工艺' and order.state not in ['cancel']: domain = [('purchase_type', '=', 'consignment'), ('origin', 'like', '%' + self.production_id.name + '%'), ('state', '!=', 'cancel')] purchase = self.env['purchase.order'].search(domain) order.surface_technics_purchase_count = 0 if not purchase: order.surface_technics_purchase_count = 0 for po in purchase: if any( line.product_id and line.product_id.server_product_process_parameters_id == order.surface_technics_parameters_id for line in po.order_line): order.surface_technics_purchase_count = 1 else: order.surface_technics_purchase_count = 0 def action_view_surface_technics_purchase(self): self.ensure_one() # if self.routing_type == '表面工艺': # if self.production_id.production_type == '自动化产线加工': # domain = [('programming_no', '=', self.production_id.programming_no)] # else: # domain = [('origin', '=', self.production_id.origin)] # production_programming = self.env['mrp.production'].search(domain, order='name asc') # production_list = [production.name for production in production_programming] # production_no_remanufacture = production_programming.filtered(lambda a: a.is_remanufacture is False) # technology_design = self.env['sf.technology.design'].search( # [('process_parameters_id', '=', self.surface_technics_parameters_id.id), # ('production_id', '=', self.production_id.id)]) # if technology_design.is_auto is False: # domain = [('origin', '=', self.production_id.name)] # else: purchase_orders_id = self._get_surface_technics_purchase_ids() result = { "type": "ir.actions.act_window", "res_model": "purchase.order", "res_id": purchase_orders_id.id, # "domain": [['id', 'in', self.purchase_id]], "name": _("Purchase Orders"), 'view_mode': 'form', } return result def _get_surface_technics_purchase_ids(self): domain = [('origin', 'like', '%' + self.production_id.name + '%'), ('purchase_type', '=', 'consignment')] purchase_orders = self.env['purchase.order'].search(domain) purchase_orders_id = self.env['purchase.order'] for po in purchase_orders: for line in po.order_line: if line.product_id.server_product_process_parameters_id == self.surface_technics_parameters_id: if line.product_qty == 1: purchase_orders_id = line.order_id return purchase_orders_id supplier_id = fields.Many2one('res.partner', string='外协供应商') equipment_id = fields.Many2one('maintenance.equipment', string='加工设备', tracking=True) # 保存名称 save_name = fields.Char(string='检测文件保存名称', compute='_compute_save_name') # 获取数据状态 data_state = fields.Boolean(string='获取数据状态', default=False) # 坯料长宽高 material_length = fields.Float(string='长') material_width = fields.Float(string='宽') material_height = fields.Float(string='高') # 零件图号 part_number = fields.Char(related='production_id.part_number', string='零件图号') machining_drawings = fields.Binary('2D加工图纸', related='production_id.part_drawing', readonly=True) quality_standard = fields.Binary('质检标准', related='production_id.quality_standard', readonly=True) part_name = fields.Char(related='production_id.part_name', string='零件名称') # 工序状态 process_state = fields.Selection([ ('待装夹', '待装夹'), ('待检测', '待检测'), ('待加工', '装夹预调完工'), ('待解除装夹', '待解除装夹'), ('已完工', '已完工'), ], string='工序状态', default='待装夹', readonly='True') # 加工图纸 processing_drawing = fields.Binary(string='加工图纸') # 功能刀具状态 tool_state = fields.Selection([('0', '正常'), ('1', '缺刀'), ('2', '无效刀')], string='功能刀具状态', default='0', store=True, compute='_compute_tool_state') tool_state_remark = fields.Text(string='功能刀具状态备注(缺刀)', compute='_compute_tool_state_remark', store=True) reserved_duration = fields.Float('预留时长', default=30, tracking=True) @api.depends('cnc_ids.tool_state') def _compute_tool_state_remark(self): for item in self: if item.cnc_ids: if item.cnc_ids.filtered(lambda a: a.tool_state == '2'): item.tool_state_remark = None elif item.cnc_ids.filtered(lambda a: a.tool_state == '1'): tool_state_remark = [] cnc_ids = item.cnc_ids.filtered(lambda a: a.tool_state == '1') for cnc_id in cnc_ids: if cnc_id.cutting_tool_name not in tool_state_remark: tool_state_remark.append(cnc_id.cutting_tool_name) item.tool_state_remark = f"{item.processing_panel}缺刀:{tool_state_remark}" else: item.tool_state_remark = None @api.depends('cnc_ids.tool_state') def _compute_tool_state(self): for item in self: if item.cnc_ids: if item.cnc_ids.filtered(lambda a: a.tool_state == '2'): item.tool_state = '2' elif item.cnc_ids.filtered(lambda a: a.tool_state == '1'): item.tool_state = '1' else: item.tool_state = '0' @api.depends('production_id') def _compute_save_name(self): """ 保存名称 """ for record in self: if record.processing_panel: tem_name = record.production_id.name.replace('/', '_') record.save_name = tem_name + '_' + record.processing_panel else: record.save_name = '' schedule_state = fields.Selection(related='production_id.schedule_state', store=True) # 工件装夹信息 functional_fixture_code = fields.Char(string="功能夹具编码", readonly=True) functional_fixture_serial_number = fields.Char(string="功能夹具序列号", readonly=True) functional_fixture_id = fields.Many2one('sf.functional.fixture', string="功能夹具") functional_fixture_type_id = fields.Many2one('sf.functional.fixture.type', string="功能夹具类型", readonly=True) chuck_serial_number = fields.Char(string="卡盘序列号") chuck_name = fields.Char(string="卡盘名称") chuck_brand_id = fields.Many2one('sf.machine.brand', string="卡盘品牌") chuck_type_id = fields.Char(string="卡盘类型") chuck_model_id = fields.Char(string="卡盘型号") tray_serial_number = fields.Char(string="托盘序列号") tray_product_id = fields.Many2one('product.product', string="托盘名称") tray_brand_id = fields.Many2one('sf.machine.brand', string="托盘品牌") tray_type_id = fields.Many2one('sf.fixture.material', string="托盘类型") tray_model_id = fields.Many2one('sf.fixture.model', string="托盘型号") total_wight = fields.Float(string="总重量") maximum_carrying_weight = fields.Char(string="最大承载重量[kg]") maximum_clamping_force = fields.Char(string="最大夹持力[n]") preset_program_information = fields.Char(string="预调程序信息") workpiece_delivery_ids = fields.One2many('sf.workpiece.delivery', 'workorder_id', '工件配送') is_delivery = fields.Boolean('是否配送完成', default=False) rfid_code = fields.Char('RFID码') rfid_code_old = fields.Char('RFID码(已解除)') is_test_env = fields.Boolean('测试环境', default=False) production_line_id = fields.Many2one('sf.production.line', related='production_id.production_line_id', string='生产线', store=True, tracking=True) production_line_state = fields.Selection( [('待上产线', '待上产线'), ('已上产线', '已上产线'), ('已下产线', '已下产线')], string='上/下产线', default='待上产线', tracking=True) detection_report = fields.Binary('检测报告', readonly=False) is_remanufacture = fields.Boolean(string='重新生成制造订单', default=False) is_fetchcnc = fields.Boolean(string='重新获取NC程序', default=False) reason = fields.Selection( [("programming", "编程"), ("cutter", "刀具"), ("clamping", "装夹"), ("operate computer", "操机"), ("technology", "工艺"), ("customer redrawing", "客户改图")], string="原因", tracking=True) detailed_reason = fields.Text('详细原因') is_rework = fields.Boolean(string='是否返工', default=False) # rework_flag = fields.Boolean(string='返工标志', compute='_compute_rework_flag') # # @api.depends('state', 'production_line_state') # def _compute_rework_flag(self): # for record in self: # if record.state == 'done' and record.routing_type == '装夹预调': # next_workorder = record.production_id.workorder_ids.filtered( # lambda w: w.sequence == record.sequence + 1) # if next_workorder and next_workorder.routing_type == 'CNC加工' and next_workorder.state in ['ready', # 'waiting', # 'pending'] and next_workorder.production_line_state == '待上产线': # record.rework_flag = False # elif next_workorder and next_workorder.routing_type == '表面工艺' and next_workorder.state in ['ready', # 'waiting', # 'pending']: # record.rework_flag = False # else: # record.rework_flag = True # else: # record.rework_flag = True # # def button_rework(self): # for item in self: # item.state = 'progress' # for time_id in item.time_ids: # time_id.write({'date_end': None}) def button_change_env(self): self.is_test_env = not self.is_test_env @api.constrains('blocked_by_workorder_ids') def _check_no_cyclic_dependencies(self): if self.production_id.state not in ['rework', 'technology_to_confirmed', 'confirmed'] and self.state not in [ 'rework']: if not self._check_m2m_recursion('blocked_by_workorder_ids'): raise ValidationError(_("您不能创建周期性的依赖关系.")) def _plan_workorder(self, replan=False): self.ensure_one() # Plan workorder after its predecessors start_date = max(self.production_id.date_planned_start, datetime.now()) for workorder in self.blocked_by_workorder_ids: if workorder.state in ['done', 'cancel', 'rework']: continue if workorder.production_id.state not in ['technology_to_confirmed', 'confirmed']: workorder._plan_workorder(replan) start_date = max(start_date, workorder.date_planned_finished) # Plan only suitable workorders if self.state not in ['pending', 'waiting', 'ready']: return if self.leave_id: if replan: self.leave_id.unlink() else: return # Consider workcenter and alternatives workcenters = self.workcenter_id | self.workcenter_id.alternative_workcenter_ids best_finished_date = datetime.max vals = {} for workcenter in workcenters: # Compute theoretical duration if self.workcenter_id == workcenter: duration_expected = self.duration_expected else: duration_expected = self._get_duration_expected(alternative_workcenter=workcenter) from_date, to_date = workcenter._get_first_available_slot(start_date, duration_expected) # If the workcenter is unavailable, try planning on the next one if not from_date: continue # Check if this workcenter is better than the previous ones if to_date and to_date < best_finished_date: best_start_date = from_date best_finished_date = to_date best_workcenter = workcenter vals = { 'workcenter_id': workcenter.id, 'duration_expected': duration_expected, } # If none of the workcenter are available, raise if best_finished_date == datetime.max: raise UserError(_('Impossible to plan the workorder. Please check the workcenter availabilities.')) # Create leave on chosen workcenter calendar leave = self.env['resource.calendar.leaves'].create({ 'name': self.display_name, 'calendar_id': best_workcenter.resource_calendar_id.id, 'date_from': best_start_date, 'date_to': best_finished_date, 'resource_id': best_workcenter.resource_id.id, 'time_type': 'other' }) vals['leave_id'] = leave.id self.write(vals) # @api.onchange('rfid_code') # def _onchange(self): # if self.rfid_code and self.state == 'progress': # self.workpiece_delivery_ids[0].write({'rfid_code': self.rfid_code}) def get_plan_workorder(self, production_line): tomorrow = (date.today() + timedelta(days=+1)).strftime("%Y-%m-%d") tomorrow_start = tomorrow + ' 00:00:00' tomorrow_end = tomorrow + ' 23:59:59' logging.info('tomorrow:%s' % tomorrow) sql = """ SELECT * FROM mrp_workorder WHERE state!='rework' to_char(date_planned_start::timestamp + '8 hour','YYYY-MM-DD HH:mm:SS')>= %s AND to_char(date_planned_finished::timestamp + '8 hour','YYYY-MM-DD HH:mm:SS')<= %s """ params = [tomorrow_start, tomorrow_end] if production_line: sql += "AND production_line_id = %s" params.append(production_line) self.env.cr.execute(sql, params) ids = [t[0] for t in self.env.cr.fetchall()] return [('id', 'in', ids)] @api.onchange('functional_fixture_id') def _onchange_functional_fixture_id(self): if self.functional_fixture_id: self.functional_fixture_code = self.functional_fixture_id.code self.functional_fixture_type_id = self.functional_fixture_id.type_id.id def get_no_data(self, production_id): process_parameter_workorder = self.search( [('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_id)]) return process_parameter_workorder # 获取三次元检测点数据 def get_three_check_datas(self): ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) local_dir_path = '/ftp/before' os.makedirs(local_dir_path, exist_ok=True) local_filename = self.save_name + '.xls' local_file_path = os.path.join(local_dir_path, local_filename) logging.info('local_file_path:%s' % local_file_path) remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename logging.info('remote_path:%s' % remote_path) is_get_detection_file = self.env['ir.config_parameter'].sudo().get_param('is_get_detection_file') if not is_get_detection_file: paload_data = { "filename": local_filename } if not ftp_resconfig['get_check_file_path']: raise UserError('请先配置获取检测报告地址') url = ftp_resconfig['get_check_file_path'] + '/get/check/report' response = requests.post(url, json=paload_data) logging.info('response:%s' % response.json()) if response.json().get('detail'): raise UserError(response.json().get('detail')) if not ftp.file_exists(remote_path): raise UserError(f"文件不存在: {remote_path}") with open(local_file_path, 'wb') as local_file: ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write) logging.info('下载文件成功') # 解析本地文件 # file_path = 'WH_MO_00099.xls' # 使用下载的实际文件路径 parser = etree.XMLParser(recover=True) # Using recover to handle errors tree = etree.parse(local_file_path, parser) logging.info('tree:%s' % tree) root = tree.getroot() logging.info('root:%s' % root) # 准备一个外部字典来存储以PT为键的坐标字典 pt_coordinates = {} # 遍历每个工作表和行 for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'): sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name') logging.info('sheet_name:%s' % sheet_name) if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表 current_pt = None for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'): cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell')) for i, cell in enumerate(cells): data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data') if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空 # 检查是否是PT标识 logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据 if "PT" in data_cell.text: current_pt = data_cell.text pt_coordinates[current_pt] = [] elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None: # 确保当前单元格后面还有单元格存在,以获取理论值 if i + 1 < len(cells): next_cell = cells[i + 1] theory_value = next_cell.find( './/{urn:schemas-microsoft-com:office:spreadsheet}Data') if theory_value is not None: # 为当前PT键添加坐标数据 pt_coordinates[current_pt].append({ data_cell.text: float(theory_value.text) }) logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}") logging.info('pt_coordinates=====%s' % pt_coordinates) # pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}], # 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]} # 检查是否存在PT1等键 if 'PT1' in pt_coordinates and pt_coordinates['PT1']: self.X1_axis = pt_coordinates['PT3'][0]['X'] self.Y1_axis = pt_coordinates['PT3'][1]['Y'] self.Z1_axis = pt_coordinates['PT3'][2]['Z'] else: raise UserError('PT1点未测或数据错误') if 'PT2' in pt_coordinates and pt_coordinates['PT2']: self.X2_axis = pt_coordinates['PT4'][0]['X'] self.Y2_axis = pt_coordinates['PT4'][1]['Y'] self.Z2_axis = pt_coordinates['PT4'][2]['Z'] else: raise UserError('PT2点未测或数据错误') if 'PT3' in pt_coordinates and pt_coordinates['PT3']: self.X3_axis = pt_coordinates['PT5'][0]['X'] self.Y3_axis = pt_coordinates['PT5'][1]['Y'] self.Z3_axis = pt_coordinates['PT5'][2]['Z'] else: raise UserError('PT3点未测或数据错误') if 'PT4' in pt_coordinates and pt_coordinates['PT4']: self.X4_axis = pt_coordinates['PT6'][0]['X'] self.Y4_axis = pt_coordinates['PT6'][1]['Y'] self.Z4_axis = pt_coordinates['PT6'][2]['Z'] else: raise UserError('PT4点未测或数据错误') if 'PT5' in pt_coordinates and pt_coordinates['PT5']: self.X5_axis = pt_coordinates['PT7'][0]['X'] self.Y5_axis = pt_coordinates['PT7'][1]['Y'] self.Z5_axis = pt_coordinates['PT7'][2]['Z'] else: raise UserError('PT5点未测或数据错误') if 'PT6' in pt_coordinates and pt_coordinates['PT6']: self.X6_axis = pt_coordinates['PT8'][0]['X'] self.Y6_axis = pt_coordinates['PT8'][1]['Y'] self.Z6_axis = pt_coordinates['PT8'][2]['Z'] else: raise UserError('PT6点未测或数据错误') if 'PT7' in pt_coordinates and pt_coordinates['PT7']: self.X7_axis = pt_coordinates['PT9'][0]['X'] self.Y7_axis = pt_coordinates['PT9'][1]['Y'] self.Z7_axis = pt_coordinates['PT9'][2]['Z'] else: raise UserError('PT7点未测或数据错误') if 'PT8' in pt_coordinates and pt_coordinates['PT8']: self.X8_axis = pt_coordinates['PT10'][0]['X'] self.Y8_axis = pt_coordinates['PT10'][1]['Y'] self.Z8_axis = pt_coordinates['PT10'][2]['Z'] else: raise UserError('PT8点未测或数据错误') if 'PT9' in pt_coordinates and pt_coordinates['PT9']: self.X9_axis = pt_coordinates['PT1'][0]['X'] self.Y9_axis = pt_coordinates['PT1'][1]['Y'] self.Z9_axis = pt_coordinates['PT1'][2]['Z'] else: raise UserError('PT9点未测或数据错误') if 'PT10' in pt_coordinates and pt_coordinates['PT10']: self.X10_axis = pt_coordinates['PT2'][0]['X'] self.Y10_axis = pt_coordinates['PT2'][1]['Y'] self.Z10_axis = pt_coordinates['PT2'][2]['Z'] else: raise UserError('PT10点未测或数据错误') self.data_state = True self.getcenter() return True # ftp.download_file('three_check_datas.xls', '/home/ftpuser/three_check_datas.xls') # ftp.close() # data = xlrd.open_workbook('/home/ftpuser/three_check_datas.xls') # table = data.sheets()[0] # nrows = table.nrows # # 点坐标列表 # point_list = [] # datas = [] # for i in range(1, nrows): # datas.append(table.row_values(i)) # return datas # 计算配料中心点和与x轴倾斜度方法 def getcenter(self): try: x1 = self.X1_axis x2 = self.X2_axis x3 = self.X3_axis x4 = self.X4_axis x5 = self.X5_axis x6 = self.X6_axis x7 = self.X7_axis x8 = self.X8_axis y1 = self.Y1_axis y2 = self.Y2_axis y3 = self.Y3_axis y4 = self.Y4_axis y5 = self.Y5_axis y6 = self.Y6_axis y7 = self.Y7_axis y8 = self.Y8_axis z1 = self.Z9_axis z2 = self.Z10_axis x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / ( (x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4)) y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / ( (y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4)) x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / ( (x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8)) y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / ( (y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8)) x = (x0 + x1) / 2 y = (y0 + y1) / 2 z = (z1 + z2) / 2 jd = math.atan2((x5 - x6), (y5 - y6)) jdz = jd * 180 / math.pi print("(%.2f,%.2f)" % (x, y)) self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z)) self.X_deviation_angle = jdz logging.info("坯料中心点坐标:(%.2f,%.2f)" % (x, y)) logging.info("X轴偏差度数:%.2f" % jdz) # 将补偿值写入CNC加工工单 workorder = self.env['mrp.workorder'].browse(self.ids) work = workorder.production_id.workorder_ids work.compensation_value_x = eval(self.material_center_point)[0] work.compensation_value_y = eval(self.material_center_point)[1] # work.process_state = '待加工' # self.sudo().production_id.process_state = '待加工' # self.sudo().production_id.process_state = '待加工' self.date_finished = datetime.now() workorder.button_finish() except Exception as e: # 重新抛出捕获到的异常信息 raise UserError(str(e)) def button_workpiece_delivery(self): if self.routing_type == '装夹预调': for item in self.workpiece_delivery_ids: if not item.route_id: raise UserError('【工件配送】明细中请选择【任务路线】') else: if self.state == 'done': if item.is_cnc_program_down is True: if item.status == '待下发': return { 'name': _('确认'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.workpiece.delivery.wizard', 'target': 'new', 'context': { 'default_workorder_id': self.id, }} else: raise UserError(_("该制造订单还未下发CNC程序单,无法进行工件配送")) else: raise UserError(_("该工单暂未完成,无法进行工件配送")) def button_rework_pre(self): return { 'name': _('返工'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.rework.wizard', 'target': 'new', 'context': { 'default_workorder_id': self.id, 'default_production_id': self.production_id.id, # 'default_programming_state': self.production_id.programming_state, 'default_routing_type': self.routing_type }} # 拼接工单对象属性值 def json_workorder_str(self, production, route): # 计算预计时长duration_expected routing_types = ['切割', '装夹预调', 'CNC加工', '解除装夹'] if route.route_id.routing_type in routing_types: routing_workcenter = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', route.routing_type if hasattr(route, 'routing_type') else route.route_id.routing_type)]) duration_expected = routing_workcenter.time_cycle reserved_duration = routing_workcenter.reserved_duration else: duration_expected = 60 reserved_duration = 30 workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.name if hasattr(route, 'routing_type') else route.route_id.name, 'processing_panel': False if hasattr(route, 'routing_type') else route.panel, 'sequence': route.sequence, 'quality_point_ids': False if hasattr(route, 'routing_type') else route.route_id.quality_point_ids, 'routing_type': route.routing_type if hasattr(route, 'routing_type') else route.route_id.routing_type, 'workcenter_id': False if hasattr(route, 'routing_type') else self.env[ 'mrp.routing.workcenter'].get_workcenter(route.route_id.workcenter_ids.ids, route.route_id.routing_type, production.product_id), # 设定初始化值,避免出现变成bool问题 'date_planned_start': datetime.now(), 'date_planned_finished': datetime.now() + timedelta(days=1), 'duration_expected': duration_expected, 'duration': 0, 'technology_design_id': route.id, # 'tag_type': '重新加工' if item is False else False, 'reserved_duration': reserved_duration, }] return workorders_values_str def _json_workpiece_delivery_list(self): # 修改在装夹工单完成后,生成上产线的工件配送单 # up_route = self.env['sf.agv.task.route'].search([('route_type', '=', '上产线')], limit=1, order='id asc') # down_route = self.env['sf.agv.task.route'].search([('route_type', '=', '下产线')], limit=1, order='id asc') return [ [0, '', { 'production_id': self.production_id.id, 'production_line_id': self.production_id.production_line_id.id, 'type': '上产线', 'is_cnc_program_down': True, 'rfid_code': self.rfid_code # 'route_id': up_route.id, # 'feeder_station_start_id': agv_start_site_id, # 'feeder_station_destination_id': up_route.end_site_id.id } ], # [0, '', # {'production_id': production.id, 'production_line_id': production.production_line_id.id, 'type': '下产线', # 'route_id': down_route.id, # 'feeder_station_start_id': down_route.start_site_id.id, # 'feeder_station_destination_id': down_route.end_site_id.id}] ] # 拼接工单对象属性值(表面工艺) def _json_workorder_surface_process_str(self, production, route, supplier_id): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.process_parameters_id.display_name, 'processing_panel': '', 'sequence': route.sequence, 'technology_design_id': route.id, 'routing_type': '表面工艺', 'surface_technics_parameters_id': route.process_parameters_id.id, 'work_state': '', 'supplier_id': supplier_id, 'is_subcontract': True if route.process_parameters_id.gain_way == '外协' else False, 'workcenter_id': self.env[ 'mrp.workcenter'].get_process_outsourcing_workcenter() if route.process_parameters_id.gain_way == '外协' else self.env['mrp.routing.workcenter'].get_workcenter(route.route_id.workcenter_ids.ids, route.route_id.routing_type, production.product_id), 'date_planned_start': datetime.now(), 'date_planned_finished': datetime.now() + timedelta(days=1), 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 维修模块按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')], 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_workorder_id': self.id, 'default_production_id': self.production_id.id, 'discard_on_footer_button': True, }, 'target': 'new', 'domain': [('workorder_id', '=', self.id)] } # tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True) # 扫码绑定托盘方法 # def gettray(self): # if self.tray_code != False: # values = self.env['sf.tray'].search([("code", "=", self.tray_code)]) # if values: # if values.state == "占用": # raise UserError('该托盘已占用') # if values.state == "报损": # raise UserError('该托盘已损坏') # else: # values.update({ # 'workorder_id': self, # 'production_id': self.production_id, # 'state': '占用', # }) # self.work_state = "已绑定" # orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) # for a in orders: # a.tray_id = values # else: # raise UserError('该托盘编码已失效') # else: # raise UserError('托盘码不能为空') # 验证坯料序列号是否正确 def pro_code_is_ok(self, barcode): if barcode is not False: if barcode != self.pro_code: raise UserError('坯料序列号错误') return False else: return True pro_code = fields.Char(string='坯料序列号') pro_code_ok = fields.Boolean(default=False) # 托盘扫码绑定 # def gettray_auto(self, barcode): # if barcode != False: # values = self.env['sf.tray'].search([("code", "=", barcode)]) # # if values: # if values.state == "占用": # raise UserError('该托盘已占用') # if values.state == "报损": # raise UserError('该托盘已损坏') # else: # values.update({ # 'workorder_id': self, # 'production_id': self.production_id, # 'state': '占用', # }) # self.work_state = "已绑定" # orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) # for a in orders: # a.tray_id = values # # return values # # # return { # # 'name': _('New Maintenance Request'), # # 'view_mode': 'form', # # 'res_model': 'maintenance.request', # # 'type': 'ir.actions.act_window', # # 'context': { # # 'default_company_id': self.company_id.id, # # 'default_production_id': self.id, # # }, # # 'domain': [('production_id', '=', self.id)], # # } # else: # raise UserError('该托盘编码已失效') # else: # raise UserError('托盘码不能为空') # 解除托盘绑定 # def unbindtray(self): # tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)]) # if tray: # tray.unclamp() # self.tray_id = False # return { # 'name': _('New Maintenance Request'), # 'view_mode': 'form', # 'res_model': 'maintenance.request', # 'res_id':self.id, # 'type': 'ir.actions.act_window', # 'context': { # 'default_company_id': self.company_id.id, # 'default_production_id': self.id, # }, # 'domain': [('production_id', '=', self.id)], # 'target':'new' # } def json_workorder_str1(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s(返工)' % route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': datetime.now(), 'date_planned_finished': datetime.now() + timedelta(days=1), 'duration_expected': 60, 'duration': 0, 'manual_quotation': production.workorder_ids.filtered( lambda t: t.routing_type == 'CNC加工').manual_quotation, 'rfid_code': production.workorder_ids.filtered(lambda t: t.routing_type == 'CNC加工').rfid_code, 'cnc_ids': production.workorder_ids.filtered(lambda t: t.routing_type == 'CNC加工').cnc_ids, 'cmm_ids': production.workorder_ids.filtered(lambda t: t.routing_type == 'CNC加工').cmm_ids, }] return workorders_values_str @api.depends('production_availability', 'blocked_by_workorder_ids', 'blocked_by_workorder_ids.state', 'production_id.tool_state', 'production_id.schedule_state', 'sequence', 'production_id.programming_state') def _compute_state(self): for workorder in self: # 如果工单的工序没有进行排序则跳出循环 if workorder.production_id.workorder_ids.filtered(lambda wk: wk.sequence == 0): continue # ===== 对所有按序号排序的非[进行中、完成、返工、取消]状态的工单,除了第一条之外的工单状态都设置为[等待其他工单] ===== # logging.info(workorder.state) work_ids = workorder.production_id.workorder_ids.filtered( lambda wk: wk.state not in ['done', 'rework', 'cancel']) if not work_ids: continue min_sequence_wk = min(work_ids, key=lambda wk: wk.sequence) if workorder.state in ['done', 'rework', 'cancel', 'progress', 'to be detected']: continue else: if workorder != min_sequence_wk: if workorder.state != 'pending': workorder.state = 'pending' continue # ================= 如果制造订单制造类型为【人工线下加工】========================== if (workorder.production_id.production_type == '人工线下加工' and workorder.production_id.schedule_state == '已排' and len(workorder.production_id.picking_ids.filtered( lambda w: w.state not in ['done', 'cancel'])) == 0): # and workorder.production_id.programming_state == '已编程' if workorder.is_subcontract is True: purchase_orders_id = self._get_surface_technics_purchase_ids() if purchase_orders_id.state == 'purchase': workorder.state = 'ready' move_out = workorder.move_subcontract_workorder_ids[1] for mo in move_out: if mo.state != 'done': mo.write({'state': 'assigned', 'production_id': False}) if not mo.move_line_ids: self.env['stock.move.line'].create( mo.get_move_line(workorder.production_id, workorder)) continue else: workorder.state = 'waiting' continue elif workorder.routing_type == '人工线下加工': if workorder.production_id.programming_state == '已编程': workorder.state = 'ready' else: workorder.state = 'ready' continue # ================= 如果制造订单刀具状态为[无效刀、缺刀] 或者 制造订单状态为[返工]========================== if (workorder.production_id.tool_state in ['1', '2'] or workorder.production_id.state == 'rework' or workorder.production_id.schedule_state != '已排' or workorder.production_id.reservation_state not in ['assigned']): if workorder.state != 'waiting': workorder.state = 'waiting' continue if workorder.production_id.programming_state == '已编程': workorder.state = 'ready' elif workorder.state != 'waiting': workorder.state = 'waiting' # =========== 特殊工艺工单处理 =================== # if workorder.routing_type == '表面工艺' and workorder.is_subcontrac: # purchase_order = self.env['purchase.order'].search( # [('origin', 'ilike', workorder.production_id.name)]) # if purchase_order.picking_ids.filtered(lambda p: p.state in ['waiting', 'confirmed', 'assigned']): # workorder.state = 'waiting' # continue if workorder.technology_design_id.routing_tag == 'special': if workorder.is_subcontract is False: workorder.state = 'ready' else: if len(workorder.production_id.picking_ids.filtered( lambda w: w.state not in ['done', 'cancel'])) == 0 and workorder.production_id.programming_state == '已编程': purchase_orders_id = self._get_surface_technics_purchase_ids() if purchase_orders_id: if purchase_orders_id.state == 'purchase': workorder.state = 'ready' move_out = workorder.move_subcontract_workorder_ids[1] for mo in move_out: if mo.state != 'done': mo.write({'state': 'assigned', 'production_id': False}) if not mo.move_line_ids: self.env['stock.move.line'].create( mo.get_move_line(workorder.production_id, workorder)) else: workorder.state = 'waiting' # 重写工单开始按钮方法 def button_start(self): # 判断工单状态是否为等待组件 if self.state in ['waiting', 'pending']: raise UserError('制造订单【%s】缺少组件信息!' % self.production_id.name) if self.routing_type == 'CNC加工': self.env['sf.production.plan'].sudo().search([('name', '=', self.production_id.name)]).write({ 'state': 'processing', 'actual_start_time': datetime.now() }) if self.sequence == 1: # 判断是否有坯料的序列号信息 boolean = False if self.production_id.move_raw_ids: if self.production_id.move_raw_ids[0].product_id.categ_type == '坯料': if self.production_id.move_raw_ids[0].move_line_ids: if self.production_id.move_raw_ids[0].move_line_ids: if self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name: boolean = True else: boolean = True if not boolean: raise UserError('制造订单【%s】缺少组件的序列号信息!' % self.production_id.name) self.pro_code = self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name # cnc校验 if self.production_id.production_type == '自动化产线加工': cnc_workorder = self.search( [('production_id', '=', self.production_id.id), ('routing_type', '=', 'CNC加工')], limit=1, order='id asc') # if not cnc_workorder.cnc_ids: # raise UserError(_('该制造订单还未下发CNC程序,请稍后再试')) else: if self.production_id.tool_state in ['1', '2']: if self.production_id.tool_state == '1': state = '缺刀' else: state = '无效刀' raise UserError( f'制造订单【{self.production_id.name}】功能刀具状态为【{state}】!') if self.routing_type == '解除装夹': ''' 记录开始时间 ''' self.date_start = datetime.now() # 表面工艺外协出库单 if self.routing_type == '表面工艺': if self.is_subcontract is True: move_out = self.move_subcontract_workorder_ids[1] # move_out = self.env['stock.move'].search( # [('location_id', '=', self.env['stock.location'].search( # [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id), # ('location_dest_id', '=', self.env['stock.location'].search( # [('barcode', 'ilike', 'VL-SPOC')]).id), # ('origin', '=', self.production_id.name), ('state', 'not in', ['cancel', 'done'])]) for mo in move_out: if mo.state != 'done': mo.write({'state': 'assigned', 'production_id': False}) if not mo.move_line_ids: self.env['stock.move.line'].create(mo.get_move_line(self.production_id, self)) # product_qty = mo.product_uom._compute_quantity( # mo.product_uom_qty, mo.product_id.uom_id, rounding_method='HALF-UP') # available_quantity = self.env['stock.quant']._get_available_quantity( # mo.product_id, # mo.location_id, # lot_id=mo.move_line_ids.lot_id, # strict=False, # ) # mo._update_reserved_quantity( # product_qty, # available_quantity, # mo.location_id, # lot_id=mo.move_line_ids.lot_id, # strict=False, # ) # move_out._action_assign() if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress': self.move_raw_ids = self.production_id.move_raw_ids self.move_raw_ids[0].write({ 'materiel_length': self.move_raw_ids[0].product_id.length, 'materiel_width': self.move_raw_ids[0].product_id.width, 'materiel_height': self.move_raw_ids[0].product_id.height }) self.write({ 'material_length': self.move_raw_ids[0].product_id.length, 'material_width': self.move_raw_ids[0].product_id.width, 'material_height': self.move_raw_ids[0].product_id.height }) self.ensure_one() if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)): return True # As button_start is automatically called in the new view if self.state in ('done', 'cancel'): return True if self.product_tracking == 'serial': self.qty_producing = 1.0 else: self.qty_producing = self.qty_remaining self.env['mrp.workcenter.productivity'].create( self._prepare_timeline_vals(self.duration, datetime.now()) ) if self.production_id.state != 'progress': self.production_id.write({ 'date_start': datetime.now(), }) if self.state == 'progress': return True start_date = datetime.now() vals = { 'state': 'progress', 'date_start': start_date, } if not self.leave_id: leave = self.env['resource.calendar.leaves'].create({ 'name': self.display_name, 'calendar_id': self.workcenter_id.resource_calendar_id.id, 'date_from': start_date, 'date_to': start_date + relativedelta(minutes=self.duration_expected), 'resource_id': self.workcenter_id.resource_id.id, 'time_type': 'other' }) vals['leave_id'] = leave.id return self.write(vals) else: if self.date_planned_start > start_date: vals['date_planned_start'] = start_date # if self.date_planned_finished and self.date_planned_finished < start_date: # vals['date_planned_finished'] = start_date return self.write(vals) else: raise UserError(_('请先完成上一步工单')) def button_finish(self): for record in self: if record.routing_type == '装夹预调': if not record.rfid_code and record.is_rework is False: raise UserError("请扫RFID码进行绑定") if record.is_rework is False: if not record.material_center_point: raise UserError("坯料中心点为空,请检查") # if record.X_deviation_angle <= 0: # raise UserError("X偏差角度小于等于0,请检查!本次计算的X偏差角度为:%s" % record.X_deviation_angle) record.process_state = '待加工' # record.write({'process_state': '待加工'}) record.production_id.process_state = '待加工' # 生成工件配送单 record.workpiece_delivery_ids = record._json_workpiece_delivery_list() if record.routing_type == 'CNC加工' or record.individuation_page_PTD is True: if record.routing_type == 'CNC加工': record.process_state = '待解除装夹' # record.write({'process_state': '待加工'}) record.production_id.process_state = '待解除装夹' self.env['sf.production.plan'].sudo().search([('name', '=', record.production_id.name)]).write({ 'state': 'finished', 'actual_end_time': datetime.now() }) record.production_id.write({'detection_result_ids': [(0, 0, { 'rework_reason': record.reason, 'detailed_reason': record.detailed_reason, 'processing_panel': record.processing_panel, 'routing_type': record.routing_type, 'handle_result': '待处理' if record.test_results in ['返工', '报废'] or record.is_rework is True else '', 'test_results': record.test_results, 'test_report': record.detection_report})], 'is_scrap': True if record.test_results == '报废' else False }) if record.routing_type == '解除装夹': ''' 记录结束时间 ''' record.date_finished = datetime.now() if record.routing_type == '表面工艺': if record.picking_ids: picks = record.picking_ids.filtered(lambda p: p.state not in ('done')) if picks: raise UserError('请先完成该工单的工艺外协再进行操作') # 表面工艺外协,最后一张工单 workorders = self.production_id.workorder_ids subcontract_workorders = workorders.filtered( lambda wo: wo.is_subcontract == True and wo.state != 'cancel').sorted('sequence') if self == subcontract_workorders[-1]: # 给下一个库存移动就绪 self.move_subcontract_workorder_ids[0].move_dest_ids._action_done() # self.production_id.button_mark_done() tem_date_planned_finished = record.date_planned_finished tem_date_finished = record.date_finished logging.info('routing_type:%s' % record.routing_type) super().button_finish() logging.info('date_planned_finished:%s' % record.date_planned_finished) # 表面工艺工单完成不走该修改 if record.routing_type != '表面工艺': record.write({ 'date_planned_finished': tem_date_planned_finished # 保持原值 }) # if record.routing_type == 'CNC加工' and record.test_results in ['返工', '报废']: # record.production_id.action_cancel() # record.production_id.workorder_ids.write({'rfid_code': False, 'rfid_code_old': record.rfid_code}) # if record.is_remanufacture is True: # record.recreateManufacturingOrWorkerOrder() is_production_id = False rework_workorder = record.production_id.workorder_ids.filtered(lambda p: p.state == 'rework') done_workorder = record.production_id.workorder_ids.filtered(lambda p1: p1.state in ['done']) if (len(rework_workorder) + len(done_workorder) == len( record.production_id.workorder_ids.filtered(lambda wo: wo.state != 'cancel'))) or ( len(done_workorder) == len( record.production_id.workorder_ids.filtered(lambda wo: wo.state != 'cancel'))): is_production_id = True if record.routing_type in ['解除装夹'] or ( record.is_rework is True and record.routing_type in ['装夹预调']): for workorder in record.production_id.workorder_ids: if workorder.processing_panel == record.processing_panel: rfid_code = workorder.rfid_code workorder.write({'rfid_code_old': rfid_code, 'rfid_code': False}) self.env['stock.lot'].sudo().search([('rfid', '=', rfid_code)]).write( {'tool_material_status': '可用'}) if workorder.rfid_code: raise ValidationError(f'【{workorder.name}】工单解绑失败,请重新点击完成按钮!!!') # workorder.rfid_code_old = rfid_code # workorder.rfid_code = False logging.info('workorder.rfid_code:%s' % workorder.rfid_code) # if is_production_id is True and record.routing_type in ['解除装夹', '表面工艺', '切割']: if is_production_id is True: logging.info('product_qty:%s' % record.production_id.product_qty) for move_raw_id in record.production_id.move_raw_ids: move_raw_id.quantity_done = move_raw_id.product_uom_qty record.process_state = '已完工' record.production_id.process_state = '已完工' # if record.routing_type in ['表面工艺']: # raw_move = self.env['stock.move'].sudo().search( # [('origin', '=', record.production_id.name), # ('procure_method', 'in', ['make_to_order', 'make_to_stock']), # ('state', '!=', 'done')]) # if raw_move: # raw_move.write({'state': 'done'}) if record.production_id.state != 'rework': record.production_id.button_mark_done1() # record.production_id.state = 'done' # ============工单完成,修改对应[质检单]的值===================== if record.check_ids.filtered(lambda qc: qc.quality_state in ('waiting', 'none')): check_ids = record.check_ids.filtered(lambda qc: qc.quality_state in ('waiting', 'none')) if record.test_results == '合格': check_ids.write({'test_results': record.test_results}) for check_id in check_ids: check_id.do_pass() elif record.test_results in ('返工', '报废'): check_ids.write({ 'test_results': record.test_results, 'reason': record.reason, 'detailed_reason': record.detailed_reason}) for check_id in check_ids: check_id.do_fail() # ====================================================== # 解绑托盘 def unbind_tray(self): for item in self: self.env['stock.lot'].sudo().search([('rfid', '=', item.rfid_code)]).write( {'tool_material_status': '可用'}) self.production_id.workorder_ids.write({ 'rfid_code': False, 'tray_serial_number': False, 'tray_product_id': False, 'tray_brand_id': False, 'tray_type_id': False, 'tray_model_id': False, 'is_trayed': False}) # 将FTP的检测报告文件下载到临时目录 def download_reportfile_tmp(self, workorder, reportpath): logging.info('reportpath/ftp地址:%s' % reportpath) logging.info('processing_panel:%s' % workorder.processing_panel) serverdir = os.path.join('/tmp', workorder.production_id.name.replace('/', '_'), 'detection', workorder.processing_panel) ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) if not ftp.file_exists_1(reportpath): logging.info('文件不存在:%s' % reportpath) download_state = ftp.download_program_file(reportpath, serverdir) logging.info('download_state:%s' % download_state) return download_state # 根据中控系统提供的检测文件地址去ftp里对应的制造订单里获取 def get_detection_file(self, workorder, reportPath): serverdir = os.path.join('/tmp', workorder.production_id.name.replace('/', '_'), 'detection', workorder.processing_panel) logging.info('get_detection_file-serverdir:%s' % serverdir) for root, dirs, files in os.walk(serverdir): for filename in files: if filename == os.path.basename(reportPath): report_file_path = os.path.join(root, filename) logging.info('get_detection_file-report_file_path:%s' % report_file_path) workorder.detection_report = base64.b64encode(open(report_file_path, 'rb').read()) return True def print_method(self): """ 解除装夹处调用关联制造订单的关联序列号的打印方法 """ if self.production_id: if self.production_id.lot_producing_id: self.production_id.lot_producing_id.print_single_method() else: raise UserError("无关联制造订单或关联序列号,无法打印。请检查!") @api.model def get_views(self, views, options=None): res = super().get_views(views, options) if res['views'].get('list', {}) and self.env.context.get('search_default_workcenter_id'): workcenter = self.env['mrp.workcenter'].browse(self.env.context.get('search_default_workcenter_id')) tree_view = res['views']['list'] if workcenter.name == '工件拆卸中心': arch = etree.fromstring(tree_view['arch']) # 查找 tree 标签 tree_element = arch.xpath("//tree")[0] tree_element.set('js_class', 'remove_focus_list_view') # 查找或创建 header 标签 header_element = tree_element.find('header') if header_element is None: header_element = etree.Element('header') tree_element.insert(0, header_element) # 创建并添加按钮元素 button_element = etree.Element('button', { 'name': 'button_delivery', 'type': 'object', 'string': '解除装夹', 'class': 'btn-primary jikimo_button_confirm', # 'className': 'btn-primary', 'modifiers': '{"force_show": 1}' }) header_element.append(button_element) # 更新 tree_view 的 arch tree_view['arch'] = etree.tostring(arch, encoding='unicode') return res def button_delivery(self): # production_ids = [] # workorder_ids = [] delivery_type = '运送空料架' # max_num = 4 # 最大配送数量 # feeder_station_start_id = False # if len(self) > max_num: # raise UserError('仅限于拆卸1-4个制造订单,请重新选择') # for item in self: # if item.state != 'ready': # raise UserError('请选择状态为【就绪】的工单进行解除装夹') # # production_ids.append(item.production_id.id) # workorder_ids.append(item.id) # if not feeder_station_start_id: # down_product_agv_scheduling = item.get_down_product_agv_scheduling() # if down_product_agv_scheduling: # feeder_station_start_id = down_product_agv_scheduling.end_site_id.id return { 'name': _('确认'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.workpiece.delivery.wizard', 'target': 'new', 'context': { # 'default_delivery_ids': [(6, 0, delivery_ids)], # 'default_production_ids': [(6, 0, production_ids)], 'default_delivery_type': delivery_type, # 'default_workorder_ids': [(6, 0, workorder_ids)], 'default_workcenter_id': self.env.context.get('default_workcenter_id'), 'default_confirm_button': '确认解除', # 'default_feeder_station_start_id': feeder_station_start_id, }} move_subcontract_workorder_ids = fields.One2many('stock.move', 'subcontract_workorder_id', string='组件') # ==============================配置化页签--个性化记录=================================== routing_workcenter_id = fields.Many2one('mrp.routing.workcenter', compute='_compute_routing_workcenter_id', store=True) individuation_page_ids = fields.Many2many('sf.work.individuation.page', string='个性化记录', store=True, compute='_compute_individuation_page_ids') individuation_page_PTD = fields.Boolean('个性化记录(是否显示后置三元检测[PTD]页签)', default=False) @api.depends('name') def _compute_routing_workcenter_id(self): for mw in self: routing_workcenter_id = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', mw.name), ('routing_type', '=', mw.routing_type)]) if routing_workcenter_id: mw.routing_workcenter_id = routing_workcenter_id.id @api.depends('routing_workcenter_id.individuation_page_ids') def _compute_individuation_page_ids(self): for mw in self: if mw.routing_workcenter_id: mw.individuation_page_ids = mw.routing_workcenter_id.individuation_page_ids.ids # 初始化页签配置 mw.individuation_page_PTD = False # 根据工单对应的【作业_个性化记录】配置页签 if any(item.code == 'PTD' for item in mw.routing_workcenter_id.individuation_page_ids): mw.individuation_page_PTD = True # ============================================================================================= is_inspect = fields.Boolean('需送检', compute='_compute_is_inspect', store=True, default=False) @api.depends('check_ids.is_inspect') def _compute_is_inspect(self): for item in self: if item.check_ids: is_inspect = False for check_id in item.check_ids: if check_id.is_inspect: is_inspect = True break item.is_inspect = is_inspect def do_inspect(self): """送检""" # 修改工单状态 self.write({'state': 'to be detected'}) # 若关联的【质量检查_需送检】=true,则质量检查单的状态从“等待”更新为“待处理” self.check_ids.filtered(lambda ch: ch.is_inspect is True and ch.quality_state == 'waiting').write( {'quality_state': 'none'}) class CNCprocessing(models.Model): _name = 'sf.cnc.processing' _description = "CNC加工" _rec_name = 'program_name' _order = 'sequence_number,id' cnc_id = fields.Many2one('ir.attachment') sequence_number = fields.Integer('序号') program_name = fields.Char('程序名') functional_tool_type_id = fields.Many2one('sf.functional.cutting.tool.model', string='功能刀具类型') cutting_tool_name = fields.Char('刀具名称') cutting_tool_no = fields.Char('刀号') processing_type = fields.Char('加工类型') margin_x_y = fields.Char('余量_X/Y') margin_z = fields.Char('余量_Z') depth_of_processing_z = fields.Char('加工深度(Z)') cutting_tool_extension_length = fields.Char('刀具伸出长度') cutting_tool_handle_type = fields.Char('刀柄型号') estimated_processing_time = fields.Char('预计加工时间') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") production_id = fields.Many2one('mrp.production', string="制造订单") button_state = fields.Boolean(string='是否已经下发') program_path = fields.Char('程序文件路径') program_create_date = fields.Datetime('程序创建日期') tool_state = fields.Selection([('0', '正常'), ('1', '缺刀'), ('2', '无效刀')], string='刀具状态', default='0') # mrs下发编程单创建CNC加工 def cnc_processing_create(self, cnc_workorder, ret, program_path, program_path_tmp): cnc_processing = None for obj in ret['programming_list']: workorder = self.env['mrp.workorder'].search( [('production_id.name', '=', cnc_workorder.name), ('processing_panel', '=', obj['processing_panel']), ('routing_type', '=', 'CNC加工')]) logging.info('workorder:%s' % workorder.id) if obj['program_name'] in program_path: logging.info('obj:%s' % obj['program_name']) cnc_processing = self.env['sf.cnc.processing'].create({ 'workorder_id': workorder.id, 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'remark': obj['remark'], 'program_path': program_path.replace('/tmp', '/home/ftp/ftp_root/NC') }) cnc_processing.get_cnc_processing_file(program_path_tmp, cnc_processing, program_path) cnc_workorder.write({'programming_state': '已编程', 'work_state': '已编程'}) return cnc_processing def _json_cnc_processing(self, panel, ret): cnc_processing = [] if ret is not False: for item in ret['programming_list']: if item['processing_panel'] == panel and item['ftp_path'].find('.dmi') == -1: cnc_processing.append((0, 0, { 'sequence_number': item['sequence_number'], 'program_name': item['program_name'], 'cutting_tool_name': item['cutting_tool_name'], 'cutting_tool_no': item['cutting_tool_no'], 'processing_type': item['processing_type'], 'margin_x_y': item['margin_x_y'], 'margin_z': item['margin_z'], 'depth_of_processing_z': item['depth_of_processing_z'], 'cutting_tool_extension_length': item['cutting_tool_extension_length'], 'cutting_tool_handle_type': item['cutting_tool_handle_type'], 'estimated_processing_time': item['estimated_processing_time'], 'program_path': item['ftp_path'], 'program_create_date': datetime.strptime(item['program_create_date'], '%Y-%m-%d %H:%M:%S'), 'remark': item['remark'] })) return cnc_processing # 根据程序名和加工面匹配到ftp里对应的Nc程序名,可优化为根据cnc_processing.program_path进行匹配 def get_cnc_processing_file(self, serverdir, cnc_processing, program_path): logging.info('serverdir:%s' % serverdir) logging.info('cnc_processing:%s' % cnc_processing) for root, dirs, files in os.walk(serverdir): for f in files: logging.info('splitext(f):%s' % os.path.splitext(f)[1]) if os.path.splitext(f)[1] == ".pdf": full_path = os.path.join(serverdir, root, f) cnc_processing.workorder_id.cnc_worksheet = base64.b64encode( open(full_path, 'rb').read()) else: if f in program_path: # if cnc_processing.program_name == f.split('.')[0]: cnc_file_path = os.path.join(serverdir, root, f) self.write_file(cnc_file_path, cnc_processing) # 创建附件(nc文件) def attachment_create(self, name, data): attachment = self.env['ir.attachment'].create({ 'datas': base64.b64encode(data), 'type': 'binary', 'public': True, 'description': '程序文件', 'name': name }) return attachment # 将FTP的多面的程序单文件下载到临时目录 def download_file_tmp(self, production_no, processing_panel): remotepath = os.path.join('/home/ftp/ftp_root/NC', production_no, 'return', processing_panel) serverdir = os.path.join('/tmp', production_no, 'return', processing_panel) ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) if not ftp.file_exists_1(remotepath): logging.info('目录不存在:%s' % remotepath) download_state = ftp.download_program_file(remotepath, serverdir) logging.info('download_state:%s' % download_state) return download_state # 将nc文件存到attach的datas里 def write_file(self, nc_file_path, cnc): nc_file_name = nc_file_path.split('/') logging.info('file_name:%s' % nc_file_name[-1]) if os.path.exists(nc_file_path): with open(nc_file_path, 'rb') as file: data_bytes = file.read() attachment = self.attachment_create(cnc.program_name + nc_file_name[-1], data_bytes) cnc.write({'cnc_id': attachment.id}) file.close() else: return False # end_date = datetime.now() # for workorder in self: # if workorder.state in ('done', 'cancel'): # continue # workorder.end_all() # vals = { # 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production, # 'state': 'done', # 'date_finished': end_date, # 'date_planned_finished': end_date, # 'costs_hour': workorder.workcenter_id.costs_hour # } # if not workorder.date_start: # vals['date_start'] = end_date # if not workorder.date_planned_start or end_date < workorder.date_planned_start: # vals['date_planned_start'] = end_date # workorder.with_context(bypass_duration_calculation=True).write(vals) return True class SfWorkOrderBarcodes(models.Model): """ 智能工厂工单处扫码绑定托盘 """ _name = "mrp.workorder" _inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"] def on_barcode_scanned(self, barcode): logging.info('Rfid:%s' % barcode) if 'O-CMD' in barcode: return None workorder = self.env['mrp.workorder'].browse(self.ids) # workorder_preset = self.env['mrp.workorder'].search( # [('routing_type', '=', '装夹预调'), ('rfid_code', '=', barcode)]) workorder_olds = self.env['mrp.workorder'].search( [('routing_type', '=', '装夹预调'), ('rfid_code', '=', barcode)]) if workorder_olds: name = '' tem_list = [] for workorder in workorder_olds: tem_list.append(workorder.production_id.name) for i in list(set(tem_list)): name = '%s %s' % (name, i) raise UserError('该托盘已绑定【%s】制造订单,请先解除绑定!!!' % name) if workorder: if workorder.routing_type == '装夹预调': if workorder.state in ['done']: work_state = {'done': '已完工'} raise UserError('装夹%s,请勿重复扫码' % work_state.get(workorder.state)) lots = self.env['stock.lot'].sudo().search([('rfid', '=', barcode)]) logging.info("托盘信息:%s" % lots) if lots: for lot in lots: if lot.product_id.categ_type == '夹具': val = { 'tray_serial_number': lot.name, 'tray_product_id': lot.product_id.id, 'tray_brand_id': lot.product_id.brand_id.id, 'tray_type_id': lot.product_id.fixture_material_id.id, 'tray_model_id': lot.product_id.fixture_model_id.id, 'rfid_code': barcode } workorder.write(val) self.write(val) workorder_rfid = self.env['mrp.workorder'].search( [('production_id', '=', workorder.production_id.id), ('processing_panel', '=', workorder.processing_panel)]) if workorder_rfid: for item in workorder_rfid: item.write({'rfid_code': barcode}) lot.sudo().write({'tool_material_status': '在用'}) logging.info("Rfid[%s]绑定成功!!!" % barcode) else: raise UserError('该Rfid【%s】绑定的是【%s】, 不是托盘!!!' % (barcode, lot.product_id.name)) self.process_state = '待检测' self.date_start = datetime.now() self.is_trayed = True else: raise UserError('没有找到Rfid为【%s】的托盘信息!!!' % barcode) # stock_move_line = self.env['stock.move.line'].search([('lot_name', '=', barcode)]) # if stock_move_line.product_id.categ_type == '夹具': # workorder.write({ # 'tray_serial_number': stock_move_line.lot_name, # 'tray_product_id': stock_move_line.product_id.id, # 'tray_brand_id': stock_move_line.product_id.brand_id.id, # 'tray_type_id': stock_move_line.product_id.fixture_material_id.id, # 'tray_model_id': stock_move_line.product_id.fixture_model_id.id # }) # workorder.button_start() # # return { # # 'type': 'ir.actions.act_window', # # 'res_model': 'mrp.workorder', # # 'view_mode': 'form', # # 'domain': [('id', 'in', workorder.id)], # # 'target': 'current' # # } # else: # embryo_stock_lot = self.env['stock.lot'].search([('name', '=', barcode)]) # if embryo_stock_lot: # embryo_stock_move_line = self.env['stock.move.line'].search( # [('product_id', '=', embryo_stock_lot.product_id.id), # ('reference', '=', workorder.production_id.name), # ('lot_id', '=', embryo_stock_lot.id), # ('product_category_name', '=', '坯料')]) # if embryo_stock_move_line: # bom_production = self.env['mrp.production'].search( # [('product_id', '=', embryo_stock_lot.product_id.id), # ('origin', '=', workorder.production_id.name)], limit=1, order='id asc') # workpiece_delivery = self.env['sf.workpiece.delivery'].search( # [('workorder_id', '=', workorder.id)], limit=1, order='id asc') # if workpiece_delivery: # embryo_workpiece_code = workpiece_delivery.workpiece_code # if bom_production: # if workpiece_delivery.workpiece_code and bom_production.name not in \ # workpiece_delivery.workpiece_code: # embryo_workpiece_code = workpiece_delivery.workpiece_code + ',' + \ # bom_production.name # if not workpiece_delivery.workpiece_code: # embryo_workpiece_code = bom_production.name # workpiece_delivery.write({'workpiece_code': embryo_workpiece_code}) # else: # raise UserError('工件生产线不一致,请重新确认') # else: # workorder_rfid = self.env['mrp.workorder'].search( # [('production_id', '=', workorder.production_id.id)]) # if workorder_rfid: # for item in workorder_rfid: # item.write({'rfid_code': barcode}) class WorkPieceDelivery(models.Model): _name = "sf.workpiece.delivery" _inherit = ['mail.thread', 'mail.activity.mixin'] _description = '工件配送' name = fields.Char('单据编码') workorder_id = fields.Many2one('mrp.workorder', string='工单', readonly=True) workorder_state = fields.Selection(related='workorder_id.state', string='工单状态') rfid_code = fields.Char(related='workorder_id.rfid_code', string='rfid码', store=True) production_id = fields.Many2one('mrp.production', string='制造订单号', readonly=True) production_line_id = fields.Many2one('sf.production.line', string='目的生产线', tracking=True) plan_start_processing_time = fields.Datetime('计划开始加工时间', readonly=True) route_id = fields.Many2one('sf.agv.task.route', '任务路线', tracking=True) feeder_station_start_id = fields.Many2one('sf.agv.site', '起点接驳站') feeder_station_destination_id = fields.Many2one('sf.agv.site', '目的接驳站') task_delivery_time = fields.Datetime('任务下发时间') task_completion_time = fields.Datetime('任务完成时间') def _get_agv_route_type_selection(self): return self.env['sf.agv.task.route'].fields_get(['route_type'])['route_type']['selection'] type = fields.Selection(selection=_get_agv_route_type_selection, string='类型') delivery_duration = fields.Float('配送时长', compute='_compute_delivery_duration') status = fields.Selection( [('待下发', '待下发'), ('已下发', '待配送'), ('已配送', '已配送'), ('已取消', '已取消')], string='状态', default='待下发', tracking=True) is_cnc_program_down = fields.Boolean('程序是否下发', default=False, tracking=True) is_manual_work = fields.Boolean('人工操作', default=False) active = fields.Boolean(string="有效", default=True) agv_scheduling_id = fields.Many2one('sf.agv.scheduling', 'AGV任务调度') @api.model def create(self, vals): if vals.get('route_id') and vals.get('type') is None: vals['type'] = '运送空料架' else: if vals.get('name', '/') == '/' or vals.get('name', '/') is False: vals['name'] = self.env['ir.sequence'].next_by_code('sf.workpiece.delivery') or '/' obj = super(WorkPieceDelivery, self).create(vals) if obj.type == '运送空料架': if obj.name is False: obj.name = "运送空料架路线:%s-%s" % ( obj.feeder_station_start_id.name, obj.feeder_station_destination_id.name) return obj # @api.constrains('route_id') # def _check_route_id(self): # if self.type == '运送空料架': # if self.route_id and self.name is False: # route = self.sudo().search( # [('route_id', '=', self.route_id.id), ('id', '!=', self.id), ('name', 'ilike', '运送空料架路线')]) # if route: # raise UserError("该任务路线已存在,请重新选择") # @api.constrains('name') # def _check_name(self): # if self.type == '运送空料架': # wd = self.sudo().search([('name', '=', self.name), ('id', '!=', self.id)]) # if wd: # raise UserError("该名称已存在") def action_delivery_history(self): return { 'name': _('配送历史'), 'type': 'ir.actions.act_window', 'view_mode': 'tree', 'res_model': 'sf.workpiece.delivery', 'view_id': self.env.ref('sf_manufacturing.sf_workpiece_delivery_empty_racks_tree').id, 'domain': [('type', '=', '运送空料架'), ('route_id', '=', self.route_id.id), ('name', 'ilike', 'WDO')] } @api.onchange('route_id') def onchange_route(self): if self.route_id: self.feeder_station_start_id = self.route_id.start_site_id.id self.feeder_station_destination_id = self.route_id.end_site_id.id # 工件配送 def button_delivery(self): delivery_ids = [] production_ids = [] workorder_ids = [] is_cnc_down = 0 is_not_production_line = 0 same_production_line_id = None delivery_type = '上产线' max_num = 4 # 最大配送数量 if len(self) > max_num: raise UserError('仅限于配送1-4个制造订单,请重新选择') for item in self: if item.status != '待下发': raise UserError('请选择状态为【待下发】的制造订单进行配送') if same_production_line_id is None: same_production_line_id = item.production_line_id.id if item.production_line_id.id != same_production_line_id: is_not_production_line += 1 if item.is_cnc_program_down is False: is_cnc_down += 1 if is_cnc_down == 0 and is_not_production_line == 0: delivery_ids.append(item.id) production_ids.append(item.production_id.id) workorder_ids.append(item.workorder_id.id) if is_cnc_down >= 1: raise UserError('您所选择制造订单的【CNC程序】暂未下发,请在程序下发后再进行配送') if is_not_production_line >= 1: raise UserError('您所选择制造订单的【目的生产线】不一致,请重新确认') return { 'name': _('确认'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.workpiece.delivery.wizard', 'target': 'new', 'context': { 'default_delivery_ids': [(6, 0, delivery_ids)], 'default_production_ids': [(6, 0, production_ids)], 'default_delivery_type': delivery_type, 'default_workorder_ids': [(6, 0, workorder_ids)], 'default_confirm_button': '确认配送' }} # 验证agv站点是否可用 def _check_avgsite_state(self): is_free = False agv_site = self.env['sf.agv.site'].search([]) if agv_site: has_site = agv_site.update_site_state() if has_site is True: for item in self: if item.type in ['上产线', '下产线']: logging.info('工件配送-起点状态:%s-%s' % ( item.feeder_station_start_id.name, item.feeder_station_start_id.state)) logging.info('工件配送-终点状态:%s-%s' % ( item.feeder_station_destination_id.name, item.feeder_station_destination_id.state)) if ( item.feeder_station_start_id.state == '占用' and item.feeder_station_destination_id.state == '空闲') or ( item.feeder_station_start_id.state == '空闲' and item.feeder_station_destination_id.state == '空闲'): is_free = True else: if item.feeder_station_destination_id.state == '空闲': is_free = True logging.info('is_free:%s' % is_free) return is_free else: raise UserError("接驳站暂未反馈站点实时状态,请稍后再试") def delivery_avg(self): is_agv_task_dispatch = self.env['ir.config_parameter'].sudo().get_param('is_agv_task_dispatch') if is_agv_task_dispatch: self._delivery_avg() # 配送至avg小车 def _delivery_avg(self): config = self.env['res.config.settings'].get_values() positionCode_Arr = [] delivery_Arr = [] feeder_station_start = None feeder_station_destination = None route_id = None for item in self: if route_id is None: route_id = item.route_id.id if feeder_station_start is None: feeder_station_start = item.feeder_station_start_id if feeder_station_destination is None: feeder_station_destination = item.feeder_station_destination_id if item.type in ['上产线', '下产线']: item.route_id = route_id item.feeder_station_start_id = feeder_station_start.id item.feeder_station_destination_id = feeder_station_destination.id delivery_Arr.append(item.name) else: self = self.create( {'name': self.env['ir.sequence'].next_by_code('sf.workpiece.delivery'), 'route_id': self.route_id.id, 'feeder_station_start_id': self.feeder_station_start_id.id, 'feeder_station_destination_id': self.feeder_station_destination_id.id}) delivery_Arr.append(self.name) delivery_str = ','.join(map(str, delivery_Arr)) if feeder_station_start is not None: positionCode_Arr.append({ 'positionCode': feeder_station_start.name, 'code': '00' }) if feeder_station_destination is not None: positionCode_Arr.append({ 'positionCode': feeder_station_destination.name, 'code': '00' }) res = {'reqCode': delivery_str, 'reqTime': '', 'clientCode': '', 'tokenCode': '', 'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'], 'positionCodePath': positionCode_Arr, 'podCode': '', 'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '', 'data': ''} try: logging.info('AGV请求路径:%s' % config['agv_rcs_url']) logging.info('AGV-json:%s' % res) headers = {'Content-Type': 'application/json'} ret = requests.post((config['agv_rcs_url']), json=res, headers=headers) ret = ret.json() logging.info('config-ret:%s' % ret) if ret['code'] == 0: req_codes = ret['reqCode'].split(',') for delivery_item in self: for req_code in req_codes: if delivery_item.name == req_code.strip(): logging.info('delivery_item-name:%s' % delivery_item.name) delivery_item.write({ 'task_delivery_time': fields.Datetime.now(), 'status': '已下发' }) if delivery_item.type == "上产线": delivery_item.workorder_id.write({'is_delivery': True}) else: raise UserError(ret['message']) except Exception as e: logging.info('config-e:%s' % e) raise UserError("工件配送请求agv失败:%s" % e) @api.depends('task_delivery_time', 'task_completion_time') def _compute_delivery_duration(self): for obj in self: if obj.task_delivery_time and obj.task_completion_time: obj.delivery_duration = round( (obj.task_completion_time - obj.task_delivery_time).total_seconds() / 60.0, 2) else: obj.delivery_duration = 0.0 class CMMprogram(models.Model): _name = 'sf.cmm.program' _description = "CMM程序" sequence_number = fields.Integer('序号') program_name = fields.Char('程序名') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") production_id = fields.Many2one('mrp.production', string="制造订单") program_path = fields.Char('程序文件路径') program_create_date = fields.Datetime('程序创建日期') def _json_cmm_program(self, panel, ret): cmm_program = [] if ret is not False: for item in ret['programming_list']: if item['processing_panel'] == panel and item['ftp_path'].find('.dmi') != -1: cmm_program.append((0, 0, { 'sequence_number': 1, 'program_name': item['program_name'], 'program_path': item['ftp_path'], 'program_create_date': datetime.strptime(item['program_create_date'], '%Y-%m-%d %H:%M:%S'), })) return cmm_program def update_work_start_end(self, date_planned_start, date_planned_end): self.leave_id.write({ 'date_from': date_planned_start, 'date_to': date_planned_end, }) self.date_planned_finished = datetime.datetime.today() + datetime.timedelta(days=100) self.date_planned_start = date_planned_start self.date_planned_finished = date_planned_end routing_workcenter = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', self.routing_type)]) self.write({'date_planned_start': date_planned_start, 'date_planned_finished': date_planned_end, 'duration_expected': routing_workcenter.time_cycle}) def auto_production_process(self, last_time, is_first, type_map): date_planned_end = None date_planned_start = None duration_expected = datetime.timedelta(minutes=self.duration_expected) reserve_time = datetime.timedelta(minutes=self.reserved_duration) if is_first: # 第一轮加工 if self.routing_type == '装夹预调': date_planned_end = last_time - reserve_time date_planned_start = date_planned_end - duration_expected elif self.routing_type == 'CNC加工': date_planned_start = last_time date_planned_end = last_time + duration_expected last_time = date_planned_end else: date_planned_start = last_time + reserve_time date_planned_end = date_planned_start + duration_expected last_time = date_planned_end type_map.update({self.routing_type: True}) else: date_planned_start = last_time + reserve_time date_planned_end = date_planned_start + duration_expected last_time = date_planned_end return date_planned_start, date_planned_end, last_time def manual_offline_process(self, last_time, is_first): date_planned_end = None date_planned_start = None duration_expected = datetime.timedelta(minutes=self.duration_expected) reserve_time = datetime.timedelta(minutes=self.reserved_duration) if is_first: date_planned_start = last_time date_planned_end = last_time + duration_expected else: date_planned_start = last_time + reserve_time date_planned_end = date_planned_start + duration_expected return date_planned_start, date_planned_end, last_time