from odoo import models, fields, api, _ from odoo.exceptions import UserError import logging _logger = logging.getLogger(__name__) class RepeatTaskException(UserError): pass class AgvScheduling(models.Model): _name = 'sf.agv.scheduling' _description = 'agv调度' name = fields.Char('任务单号', index=True, copy=False) def _get_agv_route_type_selection(self): return self.env['sf.agv.task.route'].fields_get(['route_type'])['route_type']['selection'] agv_route_type = fields.Selection(selection=_get_agv_route_type_selection, string='任务类型', required=True) agv_route_name = fields.Char('任务路线名称') start_site_id = fields.Many2one('sf.agv.site', '起点接驳站', required=True) end_site_id = fields.Many2one('sf.agv.site', '终点接驳站', tracking=True) site_state = fields.Selection([ ('占用', '占用'), ('空闲', '空闲')], string='终点接驳站状态', default='占用') state = fields.Selection([ ('待下发', '待下发'), ('配送中', '配送中'), ('已配送', '已配送'), ('已取消', '已取消')], string='状态', default='待下发', tracking=True) workorder_ids = fields.Many2many('mrp.workorder', 'sf_agv_scheduling_mrp_workorder_ref', string='关联工单') task_create_time = fields.Datetime('任务创建时间') task_delivery_time = fields.Datetime('任务下发时间') task_completion_time = fields.Datetime('任务完成时间') task_duration = fields.Char('任务时长', compute='_compute_task_duration') @api.depends('agv_route_type') def _compute_delivery_workpieces(self): for record in self: if record.agv_route_type == '运送空料架': record.delivery_workpieces = '/' else: record.delivery_workpieces = '、'.join(record.workorder_ids.mapped('production_id.name')) delivery_workpieces = fields.Char('配送工件', compute=_compute_delivery_workpieces) @api.depends('task_completion_time', 'task_delivery_time') def _compute_task_duration(self): for rec in self: if rec.task_completion_time and rec.task_delivery_time: rec.task_duration = str(rec.task_completion_time - rec.task_delivery_time) else: rec.task_duration = '' @api.model_create_multi def create(self, vals_list): # We generate a standard reference for vals in vals_list: vals['name'] = self.env['ir.sequence'].next_by_code('sf.agv.scheduling') or _('New') return super().create(vals_list) def add_scheduling(self, agv_start_site_id, agv_route_type, workorders): """ add_scheduling(agv_start_site_id, agv_route_type, workorders) -> agv_scheduling 新增AGV调度 params: agv_start_site_id: AGV起点接驳站ID agv_route_type: AGV任务类型 workorders: 工单 """ if not workorders: raise UserError(_('工单不能为空')) # 如果存在相同任务类型工单的AGV调度任务,则提示错误 agv_scheduling = self.sudo().search([ ('workorder_ids', 'in', workorders.ids), ('agv_route_type', '=', agv_route_type), ('state', 'in', ['待下发', '配送中']) ], limit=1) if agv_scheduling: # 计算agv_scheduling.workorder_ids与workorders的交集 repetitive_workorders = agv_scheduling.workorder_ids & workorders raise RepeatTaskException( '制造订单号【%s】已存在与【%s】AGV调度任务,请勿重复下发!' % (','.join(repetitive_workorders.mapped('production_id.name')), agv_scheduling.name) ) vals = { 'start_site_id': agv_start_site_id, 'agv_route_type': agv_route_type, 'workorder_ids': workorders.ids, # 'workpiece_delivery_ids': deliveries.mapped('id') if deliveries else [], 'task_create_time': fields.Datetime.now() } # 如果只有唯一任务路线,则自动赋予终点接驳站跟任务名称 agv_routes = self.env['sf.agv.task.route'].sudo().search([ ('route_type', '=', agv_route_type), ('start_site_id', '=', agv_start_site_id) ]) if len(agv_routes) == 1: vals.update({'end_site_id': agv_routes[0].end_site_id.id, 'agv_route_name': agv_routes[0].name}) else: # 判断终点接驳站是否为空闲 idle_routes = agv_routes.filtered(lambda r: r.end_site_id.state == '空闲') if idle_routes: # 将空闲的路线按照终点接驳站名称排序 idle_routes = sorted(idle_routes, key=lambda r: r.end_site_id.name) vals.update({'end_site_id': idle_routes[0].end_site_id.id, 'agv_route_name': idle_routes[0].name}) try: scheduling = self.env['sf.agv.scheduling'].sudo().create(vals) # 触发空闲接驳站状态更新,触发新任务下发 if scheduling.end_site_id.state == '空闲': scheduling.dispatch_scheduling(scheduling.end_site_id.id, scheduling.end_site_id.state) except Exception as e: _logger.error('添加AGV调度任务失败: %s', e) raise UserError(_('添加AGV调度任务失败: %s', e)) return scheduling def on_site_state_change(self, agv_site_id, agv_site_state): """ 响应AGV接驳站站点状态变化 params: agv_site_id: 接驳站ID agv_site_state: 站点状态('空闲', '占用') """ if agv_site_state == '空闲': # 查询终点接驳站为agv_site_id的AGV路线 task_routes = self.env['sf.agv.task.route'].sudo().search([('end_site_id', '=', agv_site_id)]) agv_scheduling = self.env['sf.agv.scheduling'].sudo().search( [('state', '=', '待下发'), ('agv_route_type', 'in', task_routes.mapped('route_type'))], order='id asc', limit=1 ) # 下发AGV调度任务并修改接驳站状态为占用 agv_scheduling.dispatch_scheduling(agv_site_id, agv_site_state) # 更新接驳站状态 self.env['sf.agv.site'].update_site_state({agv_scheduling.end_site_id.name: '占用'}, False) else: # 如果终点接驳站变为占用,则认为任务完成 agv_scheduling = self.env['sf.agv.scheduling'].sudo().search( [('state', '=', '配送中'), ('end_site_id', '=', agv_site_id)], order='id asc', limit=1 ) agv_scheduling.finish_scheduling() def _delivery_avg(self): config = self.env['res.config.settings'].get_values() positionCode_Arr = [] delivery_Arr = [] feeder_station_start = None feeder_station_destination = None route_id = None for item in self: if route_id is None: route_id = item.route_id.id if feeder_station_start is None: feeder_station_start = item.feeder_station_start_id if feeder_station_destination is None: feeder_station_destination = item.feeder_station_destination_id if item.type in ['上产线', '下产线']: item.route_id = route_id item.feeder_station_start_id = feeder_station_start.id item.feeder_station_destination_id = feeder_station_destination.id delivery_Arr.append(item.name) else: self = self.create( {'name': self.env['ir.sequence'].next_by_code('sf.workpiece.delivery'), 'route_id': self.route_id.id, 'feeder_station_start_id': self.feeder_station_start_id.id, 'feeder_station_destination_id': self.feeder_station_destination_id.id}) delivery_Arr.append(self.name) delivery_str = ','.join(map(str, delivery_Arr)) if feeder_station_start is not None: positionCode_Arr.append({ 'positionCode': feeder_station_start.name, 'code': '00' }) if feeder_station_destination is not None: positionCode_Arr.append({ 'positionCode': feeder_station_destination.name, 'code': '00' }) res = {'reqCode': delivery_str, 'reqTime': '', 'clientCode': '', 'tokenCode': '', 'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'], 'positionCodePath': positionCode_Arr, 'podCode': '', 'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '', 'data': ''} try: logging.info('AGV请求路径:%s' % config['agv_rcs_url']) logging.info('AGV-json:%s' % res) headers = {'Content-Type': 'application/json'} ret = requests.post((config['agv_rcs_url']), json=res, headers=headers) ret = ret.json() logging.info('config-ret:%s' % ret) if ret['code'] == 0: req_codes = ret['reqCode'].split(',') for delivery_item in self: for req_code in req_codes: if delivery_item.name == req_code.strip(): logging.info('delivery_item-name:%s' % delivery_item.name) delivery_item.write({ 'task_delivery_time': fields.Datetime.now(), 'status': '待配送' }) if delivery_item.type == "上产线": delivery_item.workorder_id.write({'is_delivery': True}) else: raise UserError(ret['message']) except Exception as e: logging.info('config-e:%s' % e) raise UserError("工件配送请求agv失败:%s" % e) def button_cancel(self): # 弹出二次确认窗口后执行 for rec in self: if rec.state != '待下发': raise UserError('只有待下发状态的AGV调度任务才能取消!') rec.state = '已取消' def finish_scheduling(self): """ 完成调度任务 """ for rec in self: if rec.state != '配送中': return False rec.state = '已配送' rec.task_completion_time = fields.Datetime.now() def dispatch_scheduling(self, agv_end_site_id, agv_site_state): """ 下发调度任务 """ for rec in self: if rec.state != '待下发': return False # rec._delivery_avg() rec.state = '配送中' rec.task_delivery_time = fields.Datetime.now() rec.site_state = agv_site_state rec.end_site_id = agv_end_site_id class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' agv_scheduling_ids = fields.Many2many( 'sf.agv.scheduling', 'sf_agv_scheduling_mrp_workorder_ref', string='AGV调度', domain=[('state', '!=', '已取消')])