Files
jikimo_sf/sf_manufacturing/models/agv_setting.py
2024-08-14 17:34:50 +08:00

122 lines
5.3 KiB
Python

# -*- coding: utf-8 -*-
import requests
import logging
import time
from odoo import fields, models, api
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class AgvSetting(models.Model):
_name = 'sf.agv.site'
_description = 'agv站点'
name = fields.Char('位置编号')
# owning_region = fields.Char('所属区域')
state = fields.Selection([
('占用', '占用'),
('空闲', '空闲')], string='状态')
divide_the_work = fields.Char('主要分工')
active = fields.Boolean('有效', default=True)
workcenter_id = fields.Many2one(string='所属区域', comodel_name='mrp.workcenter', tracking=True,
domain=[('is_agv_scheduling', '=', True)])
# name必须唯一
_sql_constraints = [
('name_uniq', 'unique (name)', '站点编号必须唯一!'),
]
# def update_site_state(self):
# # 调取中控的接驳站接口并修改对应站点的状态
# config = self.env['res.config.settings'].get_values()
# # token = sf_sync_config['token'Ba F2CF5DCC-1A00-4234-9E95-65603F70CC8A]
# headers = {'Authorization': config['center_control_Authorization']}
# center_control_url = config['center_control_url'] + "/AutoDeviceApi/GetAgvStationState?date="
# timestamp = int(time.time())
# center_control_url += str(timestamp)
# logging.info('工件配送-请求中控地址:%s' % center_control_url)
# try:
# center_control_r = requests.get(center_control_url, headers=headers, timeout=10) # 设置超时为60秒
# ret = center_control_r.json()
# logging.info('工件配送-请求中控站点信息:%s' % ret)
# self.env['center_control.interface.log'].sudo().create(
# {'content': ret, 'name': 'AutoDeviceApi/GetAgvStationState?date=%s' % str(timestamp)})
# if ret['Succeed'] is True:
# datas = ret['Datas']
# for item in self:
# for da in datas:
# if da['DeviceId'] == item.name:
# if da['AtHome'] is True:
# item.state = '占用'
# else:
# item.state = '空闲'
# return True
# except requests.exceptions.Timeout:
# logging.error('工件配送-请求中控接口超时')
# return False
# except requests.exceptions.RequestException as e:
# logging.error('工件配送-请求中控接口错误: %s', e)
# return False
def update_site_state(self, agv_site_state_arr, notify=True):
"""
更新接驳站状态
params:
agv_site_state_arr: {'A01': '空闲', 'B01': '占用'}
notify: 是否通知调度(非中控发起的状态改变不触发调度任务)
"""
if isinstance(agv_site_state_arr, dict):
for agv_site_name, is_occupy in agv_site_state_arr.items():
agv_site = self.env['sf.agv.site'].sudo().search([('name', '=', agv_site_name)])
if agv_site:
agv_site.state = is_occupy
if notify:
self.env['sf.agv.scheduling'].on_site_state_change(agv_site.id, agv_site.state)
else:
_logger.error("更新失败:接驳站站点错误!%s" % agv_site_name)
raise UserError("更新失败:接驳站站点错误!")
class AgvTaskRoute(models.Model):
_name = 'sf.agv.task.route'
_description = 'agv任务路线'
name = fields.Char('名称')
type = fields.Selection([
('F01', '搬运'), ], string='任务类型', default="F01")
route_type = fields.Selection([
('上产线', '上产线'), ('下产线', '下产线'), ('运送空料架', '运送空料架')], string='类型')
start_site_id = fields.Many2one('sf.agv.site', '起点接驳站')
end_site_id = fields.Many2one('sf.agv.site', '终点接驳站')
destination_production_line_id = fields.Many2one('sf.production.line', '目的生产线')
active = fields.Boolean('有效', default=True)
@api.constrains('end_site_id')
def _check_end_site_id(self):
if self.end_site_id:
if self.end_site_id == self.start_site_id:
raise UserError("您选择的终点接驳站与起点接驳站重复,请重新选择")
workcenter_id = fields.Many2one(string='所属区域', comodel_name='mrp.workcenter', domain=[('is_agv_scheduling', '=', True)],
compute="_compute_region")
@api.depends('end_site_id')
def _compute_region(self):
for record in self:
if record.end_site_id:
record.workcenter_id = record.end_site_id.workcenter_id
else:
record.workcenter_id = None
class Center_controlInterfaceLog(models.Model):
_name = 'center_control.interface.log'
_description = '中控接口调用日志'
_order = 'id desc'
name = fields.Char('接口名称')
content = fields.Char('接口内容')
interface_call_date = fields.Datetime("调用时间", default=fields.Datetime.now, readonly=True)
active = fields.Boolean('有效', default=True)