Files
jikimo_sf/sf_manufacturing/models/agv_scheduling.py

350 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import requests
from datetime import timedelta
from odoo import models, fields, api, _
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class RepeatTaskException(UserError):
pass
class AgvScheduling(models.Model):
_name = 'sf.agv.scheduling'
_description = 'agv调度'
_order = 'id desc'
name = fields.Char('任务单号', index=True, copy=False)
agv_route_id = fields.Many2one('sf.agv.task.route', '任务路线')
def _get_agv_route_type_selection(self):
return self.env['sf.agv.task.route'].fields_get(['route_type'])['route_type']['selection']
agv_route_type = fields.Selection(selection=_get_agv_route_type_selection, string='任务类型', required=True)
start_site_id = fields.Many2one('sf.agv.site', '起点接驳站', required=True)
end_site_id = fields.Many2one('sf.agv.site', '终点接驳站', tracking=True)
site_state = fields.Selection([
('占用', '占用'),
('空闲', '空闲')], string='终点接驳站状态', default='占用')
state = fields.Selection([
('待下发', '待下发'),
('配送中', '配送中'),
('已配送', '已配送'),
('已取消', '已取消')], string='状态', default='待下发', tracking=True)
workorder_ids = fields.Many2many('mrp.workorder', 'sf_agv_scheduling_mrp_workorder_ref', string='关联工单')
task_create_time = fields.Datetime('任务创建时间')
task_delivery_time = fields.Datetime('任务下发时间')
task_completion_time = fields.Datetime('任务完成时间')
task_duration = fields.Char('任务时长', compute='_compute_task_duration')
@api.depends('agv_route_type')
def _compute_delivery_workpieces(self):
for record in self:
if record.agv_route_type == '运送空料架':
record.delivery_workpieces = '/'
else:
record.delivery_workpieces = ''.join(record.workorder_ids.mapped('production_id.name'))
delivery_workpieces = fields.Char('配送工件', compute=_compute_delivery_workpieces)
@api.model
def web_search_read(self, domain=None, fields=None, offset=0, limit=None, order=None, count_limit=None):
domain = domain or []
new_domain = []
for item in domain:
if isinstance(item, list):
if item[0] == 'delivery_workpieces':
new_domain.append('&')
new_domain.append(['workorder_ids.production_id.name', item[1], item[2]])
new_domain.append(['agv_route_type', '!=', '运送空料架'])
continue
new_domain.append(item)
return super(AgvScheduling, self).web_search_read(new_domain, fields, offset, limit, order, count_limit)
@api.depends('task_completion_time', 'task_delivery_time')
def _compute_task_duration(self):
for rec in self:
if rec.task_completion_time and rec.task_delivery_time:
rec.task_duration = str(rec.task_completion_time - rec.task_delivery_time)
else:
rec.task_duration = ''
@api.model_create_multi
def create(self, vals_list):
# We generate a standard reference
for vals in vals_list:
vals['name'] = self.env['ir.sequence'].next_by_code('sf.agv.scheduling') or _('New')
return super().create(vals_list)
def add_scheduling(self, agv_start_site_name, agv_route_type, workorders):
""" add_scheduling(agv_start_site_id, agv_route_type, workorders) -> agv_scheduling
新增AGV调度
params:
agv_start_site_name: AGV起点接驳站名称
agv_route_type: AGV任务类型
workorders: 工单
"""
scheduling = None
_logger.info('创建AGV调度任务\r\n起点为【%s】,任务类型为【%s】,工单为【%s' % (agv_start_site_name, agv_route_type, workorders))
if not workorders:
raise UserError(_('工单不能为空'))
agv_start_sites = self.env['sf.agv.site'].sudo().search([('name', '=', agv_start_site_name)])
if not agv_start_sites:
raise UserError(_('不存在名称为【%s】的接驳站,请先创建!' % agv_start_site_name))
# 如果存在相同任务类型工单的AGV调度任务则提示错误
agv_scheduling = self.sudo().search([
('workorder_ids', 'in', workorders.ids),
('agv_route_type', '=', agv_route_type),
('state', 'in', ['待下发', '配送中'])
], limit=1)
if agv_scheduling:
# 计算agv_scheduling.workorder_ids与workorders的交集
repetitive_workorders = agv_scheduling.workorder_ids & workorders
raise RepeatTaskException(
'制造订单号【%s】已存在于【%s】AGV调度任务请勿重复下发' %
(','.join(repetitive_workorders.mapped('production_id.name')), agv_scheduling.name)
)
# 如果只有唯一任务路线,则自动赋予终点接驳站跟任务名称
agv_routes = self.env['sf.agv.task.route'].sudo().search([
('route_type', '=', agv_route_type),
('start_site_id', 'in', agv_start_sites.ids)
])
vals = {
'agv_route_type': agv_route_type,
'workorder_ids': workorders.ids,
# 'workpiece_delivery_ids': deliveries.mapped('id') if deliveries else [],
'task_create_time': fields.Datetime.now()
}
if not agv_routes:
raise UserError(_('不存在起点为【%s】的【%s】任务路线,请先创建!' % (agv_start_site_name, agv_route_type)))
# 如果路线中包含起点与终点相同的接驳站则不创建AGV调度任务
if agv_routes.filtered(lambda r: r.start_site_id.name == r.end_site_id.name):
return True
# 配送类型相同的接驳站为同一个,取第一个即可
vals.update({
'start_site_id': agv_routes[0].start_site_id.id,
})
idle_route = None
if len(agv_routes) == 1:
idle_route = agv_routes[0]
vals.update({
'end_site_id': idle_route.end_site_id.id, 'agv_route_id': idle_route.id
})
else:
# 判断终点接驳站是否为空闲
idle_routes = agv_routes.filtered(lambda r: r.end_site_id.state == '空闲')
if idle_routes:
# 将空闲的路线按照终点接驳站名称排序
idle_routes = sorted(idle_routes, key=lambda r: r.end_site_id.name)
idle_route = idle_routes[0]
vals.update({
'end_site_id': idle_route.end_site_id.id, 'agv_route_id': idle_route.id
})
try:
scheduling = self.env['sf.agv.scheduling'].sudo().create(vals)
# 触发空闲接驳站状态更新,触发新任务下发
if idle_route and idle_route.end_site_id.state == '空闲':
scheduling.dispatch_scheduling(idle_route)
except Exception as e:
_logger.error('添加AGV调度任务失败: %s', e)
raise UserError(_('添加AGV调度任务失败: %s', e))
return scheduling
def on_site_state_change(self, agv_site_id, agv_site_state):
"""
响应AGV接驳站站点状态变化
params:
agv_site_id: 接驳站ID
agv_site_state: 站点状态('空闲', '占用'
"""
if agv_site_state == '空闲':
# 查询终点接驳站为agv_site_id的AGV路线
task_routes = self.env['sf.agv.task.route'].sudo().search([('end_site_id', '=', agv_site_id)])
agv_schedulings = self.env['sf.agv.scheduling'].sudo().search(
[('state', '=', '待下发'), ('agv_route_type', 'in', task_routes.mapped('route_type'))],
order='id asc',
)
for agv_scheduling in agv_schedulings:
# 找到所有起点接驳站匹配的路线
start_matched_task_routes = task_routes.filtered(
lambda r: r.start_site_id == agv_scheduling.start_site_id
)
# 如果调度任务有终点接驳站,找到终点接驳站匹配的路线
if agv_scheduling.end_site_id:
matched_task_routes = start_matched_task_routes.filtered(
lambda r: r.end_site_id == agv_scheduling.end_site_id
)
else:
matched_task_routes = start_matched_task_routes
if matched_task_routes:
# 下发AGV调度任务并修改接驳站状态为占用
agv_scheduling.dispatch_scheduling(matched_task_routes[0])
break;
def _delivery_avg(self):
config = self.env['res.config.settings'].get_values()
position_code_arr = [{
'positionCode': self.start_site_id.name,
'code': '00'
}, {
'positionCode': self.end_site_id.name,
'code': '00'
}]
res = {'reqCode': self.name, 'reqTime': '', 'clientCode': '', 'tokenCode': '',
'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'],
'positionCodePath': position_code_arr,
'podCode': '',
'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '',
'data': ''}
try:
logging.info('AGV请求路径:%s' % config['agv_rcs_url'])
logging.info('AGV-json:%s' % res)
headers = {'Content-Type': 'application/json'}
ret = requests.post((config['agv_rcs_url']), json=res, headers=headers)
ret = ret.json()
logging.info('config-ret:%s' % ret)
if ret['code'] == 0:
return True
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('config-e:%s' % e)
raise UserError("工件配送请求agv失败:%s" % e)
def button_cancel(self):
# 弹出二次确认窗口后执行
for rec in self:
rec.state = '已取消'
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'target': 'new',
'params': {
'message': '任务取消成功!',
'type': 'success',
'sticky': False,
'next': {'type': 'ir.actions.act_window_close'},
}
}
def finish_scheduling(self):
"""
完成调度任务
"""
for rec in self:
if rec.state != '配送中':
return False
_logger.info('AGV任务调度完成任务%s' % rec)
rec.state = '已配送'
rec.task_completion_time = fields.Datetime.now()
def dispatch_scheduling(self, agv_task_route):
"""
下发调度任务
params:
agv_route sf.agv.task.route对象
"""
for rec in self:
if rec.state not in ['待下发', '配送中']:
return False
_logger.info('AGV任务调度下发调度任务路线为%s' % agv_task_route)
rec.state = '配送中'
rec.task_delivery_time = fields.Datetime.now()
rec.site_state = '空闲'
rec.end_site_id = agv_task_route.end_site_id.id
rec.agv_route_id = agv_task_route.id
is_agv_task_dispatch = self.env['ir.config_parameter'].sudo().get_param('is_agv_task_dispatch')
if is_agv_task_dispatch:
rec._delivery_avg()
# 更新接驳站状态
rec.env['sf.agv.site'].update_site_state({rec.end_site_id.name: '占用'}, False)
def write(self, vals):
if vals.get('state', False):
if vals['state'] == '已取消':
self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({'status': '待下发'})
elif vals['state'] == '已配送':
self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({
'status': '已配送',
'feeder_station_destination_id': self.end_site_id.id,
'route_id': self.agv_route_id.id,
'task_completion_time': fields.Datetime.now()
})
elif vals['state'] == '配送中':
self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({
'feeder_station_destination_id': self.end_site_id.id,
'route_id': self.agv_route_id.id,
'task_delivery_time': fields.Datetime.now()
})
return super().write(vals)
def button_cancel_confirm(self):
if self.task_delivery_time > fields.Datetime.now() - timedelta(minutes=10):
return {
'type': 'ir.actions.client',
'tag': 'agv_scheduling_cancel_confirm',
'params': {
'agv_scheduling_id': self.id,
}
}
else:
return self.button_cancel()
def button_resend_confirm(self):
if self.task_delivery_time > fields.Datetime.now() - timedelta(minutes=10):
return {
'type': 'ir.actions.client',
'tag': 'agv_scheduling_resend_confirm',
'params': {
'agv_scheduling_id': self.id,
'context': self.env.context,
}
}
else:
return self.button_resend()
def button_resend(self):
self.dispatch_scheduling(self.agv_route_id)
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'target': 'new',
'params': {
'message': '任务重新下发成功!',
'type': 'success',
'sticky': False,
'next': {'type': 'ir.actions.act_window_close'},
}
}
class ResMrpWorkOrder(models.Model):
_inherit = 'mrp.workorder'
agv_scheduling_ids = fields.Many2many(
'sf.agv.scheduling',
'sf_agv_scheduling_mrp_workorder_ref',
string='AGV调度',
domain=[('state', '!=', '已取消')])
def get_down_product_agv_scheduling(self):
"""
获取关联的制造订单下产线的agv任务
"""
self.ensure_one()
workorder_ids = self.production_id.workorder_ids
cnc_workorder = workorder_ids.filtered(
lambda w: w.routing_type == 'CNC加工' and w.state == 'done' and w.processing_panel == self.processing_panel
)
if cnc_workorder:
agv_schedulingss = cnc_workorder.agv_scheduling_ids
return agv_schedulingss.filtered(lambda a: a.state == '已配送' and a.agv_route_type == '下产线')
else:
return None