import logging import requests from datetime import timedelta from odoo import models, fields, api, _ from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class RepeatTaskException(UserError): pass class AgvScheduling(models.Model): _name = 'sf.agv.scheduling' _description = 'agv调度' _order = 'id desc' name = fields.Char('任务单号', index=True, copy=False) agv_route_id = fields.Many2one('sf.agv.task.route', '任务路线') def _get_agv_route_type_selection(self): return self.env['sf.agv.task.route'].fields_get(['route_type'])['route_type']['selection'] agv_route_type = fields.Selection(selection=_get_agv_route_type_selection, string='任务类型', required=True) start_site_id = fields.Many2one('sf.agv.site', '起点接驳站', required=True) end_site_id = fields.Many2one('sf.agv.site', '终点接驳站', tracking=True) site_state = fields.Selection([ ('占用', '占用'), ('空闲', '空闲')], string='终点接驳站状态', default='占用') state = fields.Selection([ ('待下发', '待下发'), ('配送中', '配送中'), ('已配送', '已配送'), ('已取消', '已取消')], string='状态', default='待下发', tracking=True) workorder_ids = fields.Many2many('mrp.workorder', 'sf_agv_scheduling_mrp_workorder_ref', string='关联工单') task_create_time = fields.Datetime('任务创建时间') task_delivery_time = fields.Datetime('任务下发时间') task_completion_time = fields.Datetime('任务完成时间') task_duration = fields.Char('任务时长', compute='_compute_task_duration') @api.depends('agv_route_type') def _compute_delivery_workpieces(self): for record in self: if record.agv_route_type == '运送空料架': record.delivery_workpieces = '/' else: record.delivery_workpieces = '、'.join(record.workorder_ids.mapped('production_id.name')) delivery_workpieces = fields.Char('配送工件', compute=_compute_delivery_workpieces) @api.model def web_search_read(self, domain=None, fields=None, offset=0, limit=None, order=None, count_limit=None): domain = domain or [] new_domain = [] for item in domain: if isinstance(item, list): if item[0] == 'delivery_workpieces': new_domain.append('&') new_domain.append(['workorder_ids.production_id.name', item[1], item[2]]) new_domain.append(['agv_route_type', '!=', '运送空料架']) continue new_domain.append(item) return super(AgvScheduling, self).web_search_read(new_domain, fields, offset, limit, order, count_limit) @api.depends('task_completion_time', 'task_delivery_time') def _compute_task_duration(self): for rec in self: if rec.task_completion_time and rec.task_delivery_time: rec.task_duration = str(rec.task_completion_time - rec.task_delivery_time) else: rec.task_duration = '' @api.model_create_multi def create(self, vals_list): # We generate a standard reference for vals in vals_list: vals['name'] = self.env['ir.sequence'].next_by_code('sf.agv.scheduling') or _('New') return super().create(vals_list) def add_scheduling(self, agv_start_site_name, agv_route_type, workorders): """ add_scheduling(agv_start_site_id, agv_route_type, workorders) -> agv_scheduling 新增AGV调度 params: agv_start_site_name: AGV起点接驳站名称 agv_route_type: AGV任务类型 workorders: 工单 """ _logger.info('创建AGV调度任务\r\n起点为【%s】,任务类型为【%s】,工单为【%s】' % (agv_start_site_name, agv_route_type, workorders)) if not workorders: raise UserError(_('工单不能为空')) agv_start_site = self.env['sf.agv.site'].sudo().search([('name', '=', agv_start_site_name)], limit=1) if not agv_start_site: raise UserError(_('不存在名称为【%s】的接驳站,请先创建!' % agv_start_site_name)) # 如果存在相同任务类型工单的AGV调度任务,则提示错误 agv_scheduling = self.sudo().search([ ('workorder_ids', 'in', workorders.ids), ('agv_route_type', '=', agv_route_type), ('state', 'in', ['待下发', '配送中']) ], limit=1) if agv_scheduling: # 计算agv_scheduling.workorder_ids与workorders的交集 repetitive_workorders = agv_scheduling.workorder_ids & workorders raise RepeatTaskException( '制造订单号【%s】已存在于【%s】AGV调度任务,请勿重复下发!' % (','.join(repetitive_workorders.mapped('production_id.name')), agv_scheduling.name) ) vals = { 'start_site_id': agv_start_site.id, 'agv_route_type': agv_route_type, 'workorder_ids': workorders.ids, # 'workpiece_delivery_ids': deliveries.mapped('id') if deliveries else [], 'task_create_time': fields.Datetime.now() } # 如果只有唯一任务路线,则自动赋予终点接驳站跟任务名称 agv_routes = self.env['sf.agv.task.route'].sudo().search([ ('route_type', '=', agv_route_type), ('start_site_id', '=', agv_start_site.id) ]) if not agv_routes: raise UserError(_('不存在起点为【%s】的【%s】任务路线,请先创建!' % (agv_start_site_name, agv_route_type))) idle_route = None if len(agv_routes) == 1: idle_route = agv_routes[0] vals.update({'end_site_id': idle_route.end_site_id.id, 'agv_route_id': idle_route.id}) else: # 判断终点接驳站是否为空闲 idle_routes = agv_routes.filtered(lambda r: r.end_site_id.state == '空闲') if idle_routes: # 将空闲的路线按照终点接驳站名称排序 idle_routes = sorted(idle_routes, key=lambda r: r.end_site_id.name) idle_route = idle_routes[0] vals.update({'end_site_id': idle_route.end_site_id.id, 'agv_route_id': idle_route.id}) try: scheduling = self.env['sf.agv.scheduling'].sudo().create(vals) # 触发空闲接驳站状态更新,触发新任务下发 if idle_route and idle_route.end_site_id.state == '空闲': scheduling.dispatch_scheduling(idle_route) except Exception as e: _logger.error('添加AGV调度任务失败: %s', e) raise UserError(_('添加AGV调度任务失败: %s', e)) return scheduling def on_site_state_change(self, agv_site_id, agv_site_state): """ 响应AGV接驳站站点状态变化 params: agv_site_id: 接驳站ID agv_site_state: 站点状态('空闲', '占用') """ if agv_site_state == '空闲': # 查询终点接驳站为agv_site_id的AGV路线 task_routes = self.env['sf.agv.task.route'].sudo().search([('end_site_id', '=', agv_site_id)]) agv_schedulings = self.env['sf.agv.scheduling'].sudo().search( [('state', '=', '待下发'), ('agv_route_type', 'in', task_routes.mapped('route_type'))], order='id asc', ) for agv_scheduling in agv_schedulings: # 找到所有起点接驳站匹配的路线 start_matched_task_routes = task_routes.filtered( lambda r: r.start_site_id == agv_scheduling.start_site_id ) # 如果调度任务有终点接驳站,找到终点接驳站匹配的路线 if agv_scheduling.end_site_id: matched_task_routes = start_matched_task_routes.filtered( lambda r: r.end_site_id == agv_scheduling.end_site_id ) else: matched_task_routes = start_matched_task_routes if matched_task_routes: # 下发AGV调度任务并修改接驳站状态为占用 agv_scheduling.dispatch_scheduling(matched_task_routes[0]) break; def _delivery_avg(self): config = self.env['res.config.settings'].get_values() position_code_arr = [{ 'positionCode': self.start_site_id.name, 'code': '00' }, { 'positionCode': self.end_site_id.name, 'code': '00' }] res = {'reqCode': self.name, 'reqTime': '', 'clientCode': '', 'tokenCode': '', 'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'], 'positionCodePath': position_code_arr, 'podCode': '', 'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '', 'data': ''} try: logging.info('AGV请求路径:%s' % config['agv_rcs_url']) logging.info('AGV-json:%s' % res) headers = {'Content-Type': 'application/json'} ret = requests.post((config['agv_rcs_url']), json=res, headers=headers) ret = ret.json() logging.info('config-ret:%s' % ret) if ret['code'] == 0: return True else: raise UserError(ret['message']) except Exception as e: logging.info('config-e:%s' % e) raise UserError("工件配送请求agv失败:%s" % e) def button_cancel(self): # 弹出二次确认窗口后执行 for rec in self: rec.state = '已取消' return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'target': 'new', 'params': { 'message': '任务取消成功!', 'type': 'success', 'sticky': False, 'next': {'type': 'ir.actions.act_window_close'}, } } def finish_scheduling(self): """ 完成调度任务 """ for rec in self: if rec.state != '配送中': return False _logger.info('AGV任务调度:完成任务%s' % rec) rec.state = '已配送' rec.task_completion_time = fields.Datetime.now() def dispatch_scheduling(self, agv_task_route): """ 下发调度任务 params: agv_route sf.agv.task.route对象 """ for rec in self: if rec.state not in ['待下发', '配送中']: return False _logger.info('AGV任务调度:下发调度任务,路线为%s' % agv_task_route) rec.state = '配送中' rec.task_delivery_time = fields.Datetime.now() rec.site_state = '空闲' rec.end_site_id = agv_task_route.end_site_id.id rec.agv_route_id = agv_task_route.id is_agv_task_dispatch = self.env['ir.config_parameter'].sudo().get_param('is_agv_task_dispatch') if is_agv_task_dispatch: rec._delivery_avg() # 更新接驳站状态 rec.env['sf.agv.site'].update_site_state({rec.end_site_id.name: '占用'}, False) def write(self, vals): if vals.get('state', False): if vals['state'] == '已取消': self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({'status': '待下发'}) elif vals['state'] == '已配送': self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({ 'status': '已配送', 'feeder_station_destination_id': self.end_site_id.id, 'route_id': self.agv_route_id.id, 'task_completion_time': fields.Datetime.now() }) elif vals['state'] == '配送中': self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({ 'feeder_station_destination_id': self.end_site_id.id, 'route_id': self.agv_route_id.id, 'task_delivery_time': fields.Datetime.now() }) return super().write(vals) def button_cancel_confirm(self): if self.task_delivery_time > fields.Datetime.now() - timedelta(minutes=10): return { 'type': 'ir.actions.client', 'tag': 'agv_scheduling_cancel_confirm', 'params': { 'agv_scheduling_id': self.id, } } else: return self.button_cancel() def button_resend_confirm(self): if self.task_delivery_time > fields.Datetime.now() - timedelta(minutes=10): return { 'type': 'ir.actions.client', 'tag': 'agv_scheduling_resend_confirm', 'params': { 'agv_scheduling_id': self.id, 'context': self.env.context, } } else: return self.button_resend() def button_resend(self): self.dispatch_scheduling(self.agv_route_id) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'target': 'new', 'params': { 'message': '任务重新下发成功!', 'type': 'success', 'sticky': False, 'next': {'type': 'ir.actions.act_window_close'}, } } class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' agv_scheduling_ids = fields.Many2many( 'sf.agv.scheduling', 'sf_agv_scheduling_mrp_workorder_ref', string='AGV调度', domain=[('state', '!=', '已取消')]) def get_down_product_agv_scheduling(self): """ 获取关联的制造订单下产线的agv任务 """ self.ensure_one() workorder_ids = self.production_id.workorder_ids cnc_workorder = workorder_ids.filtered( lambda w: w.routing_type == 'CNC加工' and w.state == 'done' and w.processing_panel == self.processing_panel ) if cnc_workorder: agv_schedulingss = cnc_workorder.agv_scheduling_ids return agv_schedulingss.filtered(lambda a: a.state == '已配送' and a.agv_route_type == '下产线') else: return None