# -*- coding: utf-8 -*- # Part of YiZuo. See LICENSE file for full copyright and licensing details. import logging from datetime import datetime, date from odoo.exceptions import UserError from odoo import models, api, fields def convert_datetime(obj): if isinstance(obj, (datetime, date)): return obj.isoformat() # 将 datetime 或 date 对象转换为 ISO 8601 字符串格式 raise TypeError(f"Type {type(obj)} not serializable") class WorkpieceDeliveryWizard(models.TransientModel): _name = 'sf.workpiece.delivery.wizard' _inherit = ["barcodes.barcode_events_mixin"] _description = '工件配送' rfid_code = fields.Char('rfid码') delivery_ids = fields.Many2many('sf.workpiece.delivery', string='配送单') workorder_ids = fields.Many2many('mrp.workorder', string='工单') production_ids = fields.Many2many('mrp.production', string='制造订单号') destination_production_line_id = fields.Many2one('sf.production.line', '目的生产线') route_id = fields.Many2one('sf.agv.task.route', '任务路线', domain=[('route_type', 'in', ['上产线', '下产线'])]) feeder_station_start_id = fields.Many2one('sf.agv.site', '起点接驳站') feeder_station_destination_id = fields.Many2one('sf.agv.site', '目的接驳站') workcenter_id = fields.Many2one(string='所属区域', comodel_name='mrp.workcenter', tracking=True) confirm_button = fields.Char('按钮名称') @api.onchange('delivery_type') def _onchange_type(self): if self.delivery_type: routes = self.env['sf.agv.task.route'].search([('route_type', '=', self.delivery_type)]) if self.workcenter_id: routes = routes.filtered(lambda a: a.start_site_id.workcenter_id.id == self.workcenter_id.id) start_site_ids = routes.mapped('start_site_id.id') workcenter_ids = routes.mapped('end_site_id.workcenter_id.id') if workcenter_ids: self.workcenter_id = workcenter_ids[0] return { 'domain': { 'feeder_station_start_id': [('id', 'in', start_site_ids)], 'workcenter_id': [('id', 'in', workcenter_ids)], } } else: return { 'domain': { 'feeder_station_start_id': [], 'workcenter_id': [], } } def _get_agv_route_type_selection(self): return self.env['sf.agv.task.route'].fields_get(['route_type'])['route_type']['selection'] delivery_type = fields.Selection(selection=_get_agv_route_type_selection, string='类型') def dispatch_confirm(self): if len(self.workorder_ids) < 4: return { 'type': 'ir.actions.client', 'tag': 'dispatch_confirm', 'params': { 'workorder_count': len(self.workorder_ids), 'active_id': self.id, 'context': self.env.context } } else: scheduling = self.confirm() # 显示通知 return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'target': 'new', 'params': { 'message': f'任务下发成功!AGV任务调度编号为【{scheduling["name"]}】', 'type': 'success', 'sticky': False, 'next': {'type': 'ir.actions.act_window_close'}, } } def confirm(self): try: if not self.workorder_ids: raise UserError('制造订单不能为空') scheduling = self.env['sf.agv.scheduling'].add_scheduling( agv_start_site_name=self.feeder_station_start_id.name, agv_route_type=self.delivery_type, workorders=self.workorder_ids, ) # 如果关联了工件配送单,则修改状态为已下发 if self.delivery_ids: val = { 'status': '已下发', 'agv_scheduling_id': scheduling.id, 'feeder_station_start_id': scheduling.start_site_id.id, } # 如果agv任务已经下发,则修改工件配送单信息 if scheduling.state == '配送中': val.update({ 'feeder_station_destination_id': scheduling.end_site_id.id, 'route_id': scheduling.agv_route_id.id, 'task_delivery_time': fields.Datetime.now() }) self.delivery_ids.write(val) # 如果是解除装夹工单,则需要处理工单逻辑 for item in self.workorder_ids: if item.routing_type == '解除装夹' and item.state == 'ready': item.button_start() item.button_finish() # return scheduling.read()[0] if isinstance(scheduling, bool) and scheduling is True: return{ 'type': 'ir.actions.client', 'tag': 'display_notification', 'target': 'new', 'params': { 'message': f'解除装夹成功', 'type': 'success', 'sticky': False, 'next': {'type': 'ir.actions.act_window_close'}, } } else: return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'target': 'new', 'params': { 'message': f'任务下发成功!AGV任务调度编号为【{scheduling.name}】', 'type': 'success', 'sticky': False, 'next': {'type': 'ir.actions.act_window_close'}, } } except Exception as e: logging.info('%s任务下发失败:%s', self.delivery_type, e) raise UserError(f'{self.delivery_type}任务下发失败:{e}') from e # def recognize_production(self): # # production_ids = [] # # delivery_ids = [] # # aa = self.production_ids.workorder_ids.filtered( # # lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.filtered( # # lambda c: c.rfid_code == self.rfid_code) # # logging.info('aa:%s' % aa) # if len(self.production_ids) == 4: # raise UserError('只能配送四个制造订单') # else: # if self.rfid_code: # wd = self.env['sf.workpiece.delivery'].search( # [('type', '=', self.delivery_ids[0].type), ('rfid_code', '=', self.rfid_code), # ('status', '=', self.delivery_ids[0].status)]) # if wd: # if wd.production_line_id.id == self.delivery_ids[0].production_line_id.id: # # production_ids.append(wd.production_id) # # delivery_ids.append(wd.id) # # 将对象添加到对应的同模型且是多对多类型里 # self.production_ids |= wd.production_id # self.delivery_ids |= wd # self.rfid_code = False # # self.production_ids = [(6, 0, production_ids)] # # self.delivery_ids = [(6, 0, delivery_ids)] # else: # raise UserError('该rfid对应的制造订单号为%s的目的生产线不一致' % wd.production_id.name) # return { # 'name': _('确认'), # 'type': 'ir.actions.act_window', # 'view_mode': 'form', # 'res_model': 'sf.workpiece.delivery.wizard', # 'target': 'new', # 'context': { # 'default_delivery_ids': [(6, 0, self.delivery_ids.ids)], # 'default_production_ids': [(6, 0, self.production_ids.ids)], # 'default_route_id': self.delivery_ids[0].route_id.id, # 'default_type': self.delivery_ids[0].type # }} @api.onchange('route_id') def onchange_route(self): if self.route_id: self.feeder_station_start_id = self.route_id.start_site_id.id self.feeder_station_destination_id = self.route_id.end_site_id.id def on_barcode_scanned(self, barcode): delivery_type = self.env.context.get('default_delivery_type') # 判断barcode是否是数字 if not barcode.isdigit(): # 判断是否是AGV接驳站名称 agv_site = self.env['sf.agv.site'].search([('name', '=', barcode)]) if agv_site: self.feeder_station_start_id = agv_site.id # 修正:移除 .id return if delivery_type == '上产线': workorder = self.env['mrp.workorder'].search( [('production_line_state', '=', '待上产线'), ('rfid_code', '=', barcode), ('state', '=', 'done')]) # 找到对应的配送单 delivery = self.env['sf.workpiece.delivery'].search( [('type', '=', '上产线'), ('rfid_code', '=', barcode), ('status', '=', '待下发')]) if delivery: self.delivery_ids |= delivery elif delivery_type == '运送空料架': workorder = self.env['mrp.workorder'].search( [('routing_type', '=', '解除装夹'), ('rfid_code', '=', barcode), ('state', '=', 'ready')]) if workorder: if (len(self.production_ids) > 0 and workorder.production_line_id.id != self.production_ids[0].production_line_id.id): raise UserError(f'该rfid对应的制造订单号为{workorder.production_id.name}的目的生产线不一致') if workorder.routing_type == '解除装夹': # 调用打印成品条码方法 workorder.print_method() # 将对象添加到对应的同模型且是多对多类型里 self.production_ids |= workorder.production_id self.workorder_ids |= workorder down_product_agv_scheduling = workorder.get_down_product_agv_scheduling() if down_product_agv_scheduling: if not self.feeder_station_start_id: self.feeder_station_start_id = down_product_agv_scheduling.end_site_id.id else: if self.feeder_station_start_id.id != down_product_agv_scheduling.end_site_id.id: raise UserError(f'该rfid不在{self.feeder_station_start_id.name}接驳站内') else: raise UserError('该rfid码对应的工单不存在') return @api.onchange('feeder_station_start_id') def on_start_id_change(self): if self.delivery_type == '运送空料架' and len(self.workorder_ids) > 0: down_product_agv_scheduling = self.workorder_ids[0].get_down_product_agv_scheduling() if down_product_agv_scheduling and self.feeder_station_start_id \ and down_product_agv_scheduling.end_site_id.id != self.feeder_station_start_id.id: raise UserError('当前接驳站不匹配!')