Files
test/sf_manufacturing/models/agv_scheduling.py
2024-08-22 15:57:17 +08:00

266 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from odoo import models, fields, api, _
from odoo.exceptions import UserError
import logging
_logger = logging.getLogger(__name__)
class RepeatTaskException(UserError):
pass
class AgvScheduling(models.Model):
_name = 'sf.agv.scheduling'
_description = 'agv调度'
_order = 'id desc'
name = fields.Char('任务单号', index=True, copy=False)
def _get_agv_route_type_selection(self):
return self.env['sf.agv.task.route'].fields_get(['route_type'])['route_type']['selection']
agv_route_type = fields.Selection(selection=_get_agv_route_type_selection, string='任务类型', required=True)
agv_route_id = fields.Many2one('sf.agv.task.route', '任务路线')
start_site_id = fields.Many2one('sf.agv.site', '起点接驳站', required=True)
end_site_id = fields.Many2one('sf.agv.site', '终点接驳站', tracking=True)
site_state = fields.Selection([
('占用', '占用'),
('空闲', '空闲')], string='终点接驳站状态', default='占用')
state = fields.Selection([
('待下发', '待下发'),
('配送中', '配送中'),
('已配送', '已配送'),
('已取消', '已取消')], string='状态', default='待下发', tracking=True)
workorder_ids = fields.Many2many('mrp.workorder', 'sf_agv_scheduling_mrp_workorder_ref', string='关联工单')
task_create_time = fields.Datetime('任务创建时间')
task_delivery_time = fields.Datetime('任务下发时间')
task_completion_time = fields.Datetime('任务完成时间')
task_duration = fields.Char('任务时长', compute='_compute_task_duration')
@api.depends('agv_route_type')
def _compute_delivery_workpieces(self):
for record in self:
if record.agv_route_type == '运送空料架':
record.delivery_workpieces = '/'
else:
record.delivery_workpieces = ''.join(record.workorder_ids.mapped('production_id.name'))
delivery_workpieces = fields.Char('配送工件', compute=_compute_delivery_workpieces)
@api.model
def web_search_read(self, domain=None, fields=None, offset=0, limit=None, order=None, count_limit=None):
domain = domain or []
new_domain = []
for index, item in enumerate(domain):
if isinstance(item, list):
if item[0] == 'delivery_workpieces':
new_domain.append('&')
new_domain.append(['workorder_ids.production_id.name', item[1], item[2]])
new_domain.append(['agv_route_type', '!=', '运送空料架'])
continue
new_domain.append(item)
return super(AgvScheduling, self).web_search_read(new_domain, fields, limit=limit, offset=offset)
@api.depends('task_completion_time', 'task_delivery_time')
def _compute_task_duration(self):
for rec in self:
if rec.task_completion_time and rec.task_delivery_time:
rec.task_duration = str(rec.task_completion_time - rec.task_delivery_time)
else:
rec.task_duration = ''
@api.model_create_multi
def create(self, vals_list):
# We generate a standard reference
for vals in vals_list:
vals['name'] = self.env['ir.sequence'].next_by_code('sf.agv.scheduling') or _('New')
return super().create(vals_list)
def add_scheduling(self, agv_start_site_name, agv_route_type, workorders):
""" add_scheduling(agv_start_site_id, agv_route_type, workorders) -> agv_scheduling
新增AGV调度
params:
agv_start_site_name: AGV起点接驳站名称
agv_route_type: AGV任务类型
workorders: 工单
"""
_logger.info('创建AGV调度任务\r\n起点为【%s】,任务类型为【%s】,工单为【%s' % (agv_start_site_name, agv_route_type, workorders))
if not workorders:
raise UserError(_('工单不能为空'))
agv_start_site = self.env['sf.agv.site'].sudo().search([('name', '=', agv_start_site_name)], limit=1)
if not agv_start_site:
raise UserError(_('不存在名称为【%s】的接驳站,请先创建!' % agv_start_site_name))
# 如果存在相同任务类型工单的AGV调度任务则提示错误
agv_scheduling = self.sudo().search([
('workorder_ids', 'in', workorders.ids),
('agv_route_type', '=', agv_route_type),
('state', 'in', ['待下发', '配送中'])
], limit=1)
if agv_scheduling:
# 计算agv_scheduling.workorder_ids与workorders的交集
repetitive_workorders = agv_scheduling.workorder_ids & workorders
raise RepeatTaskException(
'制造订单号【%s】已存在于【%s】AGV调度任务请勿重复下发' %
(','.join(repetitive_workorders.mapped('production_id.name')), agv_scheduling.name)
)
vals = {
'start_site_id': agv_start_site.id,
'agv_route_type': agv_route_type,
'workorder_ids': workorders.ids,
# 'workpiece_delivery_ids': deliveries.mapped('id') if deliveries else [],
'task_create_time': fields.Datetime.now()
}
# 如果只有唯一任务路线,则自动赋予终点接驳站跟任务名称
agv_routes = self.env['sf.agv.task.route'].sudo().search([
('route_type', '=', agv_route_type),
('start_site_id', '=', agv_start_site.id)
])
if not agv_routes:
raise UserError(_('不存在起点为【%s】的【%s】任务路线,请先创建!' % (agv_start_site_name, agv_route_type)))
idle_route = None
if len(agv_routes) == 1:
idle_route = agv_routes[0]
vals.update({'end_site_id': idle_route.end_site_id.id, 'agv_route_id': idle_route.id})
else:
# 判断终点接驳站是否为空闲
idle_routes = agv_routes.filtered(lambda r: r.end_site_id.state == '空闲')
if idle_routes:
# 将空闲的路线按照终点接驳站名称排序
idle_routes = sorted(idle_routes, key=lambda r: r.end_site_id.name)
idle_route = idle_routes[0]
vals.update({'end_site_id': idle_route.end_site_id.id, 'agv_route_id': idle_route.id})
try:
scheduling = self.env['sf.agv.scheduling'].sudo().create(vals)
# 触发空闲接驳站状态更新,触发新任务下发
if idle_route and idle_route.end_site_id.state == '空闲':
scheduling.dispatch_scheduling(idle_route)
except Exception as e:
_logger.error('添加AGV调度任务失败: %s', e)
raise UserError(_('添加AGV调度任务失败: %s', e))
return scheduling
def on_site_state_change(self, agv_site_id, agv_site_state):
"""
响应AGV接驳站站点状态变化
params:
agv_site_id: 接驳站ID
agv_site_state: 站点状态('空闲', '占用'
"""
if agv_site_state == '空闲':
# 查询终点接驳站为agv_site_id的AGV路线
task_routes = self.env['sf.agv.task.route'].sudo().search([('end_site_id', '=', agv_site_id)])
agv_scheduling = self.env['sf.agv.scheduling'].sudo().search(
[('state', '=', '待下发'), ('agv_route_type', 'in', task_routes.mapped('route_type'))],
order='id asc',
limit=1
)
task_route = task_routes.filtered(
lambda r: r.start_site_id == agv_scheduling.start_site_id and r.start_site_id == agv_scheduling.start_site_id
)
if task_route:
# 下发AGV调度任务并修改接驳站状态为占用
agv_scheduling.dispatch_scheduling(task_route)
def _delivery_avg(self):
config = self.env['res.config.settings'].get_values()
position_code_arr = [{
'positionCode': self.start_site_id.name,
'code': '00'
}, {
'positionCode': self.end_site_id.name,
'code': '00'
}]
res = {'reqCode': self.name, 'reqTime': '', 'clientCode': '', 'tokenCode': '',
'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'],
'positionCodePath': position_code_arr,
'podCode': '',
'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '',
'data': ''}
try:
logging.info('AGV请求路径:%s' % config['agv_rcs_url'])
logging.info('AGV-json:%s' % res)
headers = {'Content-Type': 'application/json'}
ret = requests.post((config['agv_rcs_url']), json=res, headers=headers)
ret = ret.json()
logging.info('config-ret:%s' % ret)
if ret['code'] == 0:
return True
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('config-e:%s' % e)
raise UserError("工件配送请求agv失败:%s" % e)
def button_cancel(self):
# 弹出二次确认窗口后执行
for rec in self:
if rec.state != '待下发':
raise UserError('只有待下发状态的AGV调度任务才能取消')
rec.state = '已取消'
def finish_scheduling(self):
"""
完成调度任务
"""
for rec in self:
if rec.state != '配送中':
return False
_logger.info('AGV任务调度完成任务%s' % rec)
rec.state = '已配送'
rec.task_completion_time = fields.Datetime.now()
def dispatch_scheduling(self, agv_task_route):
"""
下发调度任务
params:
agv_route sf.agv.task.route对象
"""
for rec in self:
if rec.state != '待下发':
return False
_logger.info('AGV任务调度下发调度任务路线为%s' % agv_task_route)
rec.state = '配送中'
rec.task_delivery_time = fields.Datetime.now()
rec.site_state = '空闲'
rec.end_site_id = agv_task_route.end_site_id.id
rec.agv_route_id = agv_task_route.id
# rec._delivery_avg()
# 更新接驳站状态
rec.env['sf.agv.site'].update_site_state({rec.end_site_id.name: '占用'}, False)
def write(self, vals):
if vals.get('state', False):
if vals['state'] == '已取消':
self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({'status': '待下发'})
elif vals['state'] == '已配送':
self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({
'status': '已配送',
'feeder_station_destination_id': self.end_site_id.id,
'route_id': self.agv_route_id.id,
'task_completion_time': fields.Datetime.now()
})
elif vals['state'] == '配送中':
self.env['sf.workpiece.delivery'].search([('agv_scheduling_id', '=', self.id)]).write({
'feeder_station_destination_id': self.end_site_id.id,
'route_id': self.agv_route_id.id,
'task_delivery_time': fields.Datetime.now()
})
return super().write(vals)
class ResMrpWorkOrder(models.Model):
_inherit = 'mrp.workorder'
agv_scheduling_ids = fields.Many2many(
'sf.agv.scheduling',
'sf_agv_scheduling_mrp_workorder_ref',
string='AGV调度',
domain=[('state', '!=', '已取消')])