Files
test/sf_manufacturing/models/mrp_workcenter.py
2025-04-23 11:06:03 +08:00

271 lines
14 KiB
Python

import datetime
import logging
from datetime import timedelta, time
from collections import defaultdict
from odoo import fields, models, api
from odoo.addons.resource.models.resource import Intervals
from odoo.exceptions import UserError, ValidationError
import math
_logger = logging.getLogger(__name__)
class ResWorkcenter(models.Model):
_name = "mrp.workcenter"
_inherit = ['mrp.workcenter', 'mail.thread']
# 生产线显示
production_line_show = fields.Char(string='生产线名称')
equipment_id = fields.Many2one('maintenance.equipment', string="设备", tracking=True)
production_line_id = fields.Many2one('sf.production.line', string='生产线',
related='equipment_id.production_line_id', store=True)
is_process_outsourcing = fields.Boolean('工艺外协')
users_ids = fields.Many2many("res.users", 'users_workcenter', tracking=True)
@api.constrains('name')
def _check_unique_name_code(self):
for record in self:
# 检查是否已经存在相同的 name 和 code 组合
existing = self.search([
('name', '=', record.name),
('id', '!=', record.id) # 排除当前记录
])
if existing:
raise ValueError('记录已存在')
def write(self, vals):
if 'users_ids' in vals:
old_users = self.users_ids
res = super(ResWorkcenter, self).write(vals)
new_users = self.users_ids
added_users = new_users - old_users
removed_users = old_users - new_users
if added_users or removed_users:
message = "增加 → %s ; 移除 → %s (可操作用户)" % (
# ','.join(added_users.mapped('name')), ','.join(removed_users.mapped('name')))
added_users.mapped('name'), removed_users.mapped('name'))
self.message_post(body=message)
return res
return super(ResWorkcenter, self).write(vals)
name = fields.Char('Work Center', related='resource_id.name', store=True, readonly=False, tracking=True)
time_efficiency = fields.Float('Time Efficiency', related='resource_id.time_efficiency', default=100, store=True,
readonly=False, tracking=True)
default_capacity = fields.Float(
'Capacity', default=1.0,
help="Default number of pieces (in product UoM) that can be produced in parallel (at the same time) at this work center. For example: the capacity is 5 and you need to produce 10 units, then the operation time listed on the BOM will be multiplied by two. However, note that both time before and after production will only be counted once.",
tracking=True)
oee_target = fields.Float(
string='OEE Target', help="Overall Effective Efficiency Target in percentage", default=90, tracking=True)
oee = fields.Float(compute='_compute_oee', help='Overall Equipment Effectiveness, based on the last month',
store=True)
time_start = fields.Float('Setup Time', tracking=True)
time_stop = fields.Float('Cleanup Time', tracking=True)
costs_hour = fields.Float(string='Cost per hour', help='Hourly processing cost.', default=0.0, tracking=True)
equipment_status = fields.Selection(
[("正常", "正常"), ("故障停机", "故障停机"), ("计划维保", "计划维保"), ("空闲", "空闲"),
("封存(报废)", "封存(报废)")],
string="设备状态", related='equipment_id.state')
# @api.depends('equipment_id')
# def _compute_equipment_id(self):
# for record in self:
# if record:
# record.equipment_status = record.equipment_id.state
equipment_image = fields.Binary('设备图片', related='equipment_id.machine_tool_picture')
# 查询工艺外协加工中心
def get_process_outsourcing_workcenter(self):
outsourcing_workcenter = self.env['mrp.workcenter'].search([('is_process_outsourcing', '=', True)])
return outsourcing_workcenter.id
# @api.onchange('machine_tool_id')
# def update_machine_tool_is_binding(self):
# machine_tool = self.env["sf.machine_tool"].search([('is_binding', '=', True)])
# if machine_tool:
# for item in machine_tool:
# workcenter_machine_tool = self.env["mrp.workcenter"].search([('machine_tool_id', '=', item.id)])
# if workcenter_machine_tool:
# if self.machine_tool_id.id:
# if workcenter_machine_tool.id != self.machine_tool_id.id:
# self.machine_tool_id.is_binding = True
# else:
# self.machine_tool_id.is_binding = True
# else:
# self.machine_tool_id.is_binding = True
# item.is_binding = False
# else:
# self.machine_tool_id.is_binding = True
def action_work_order(self):
if not self.env.context.get('desktop_list_view', False):
action = self.env["ir.actions.actions"]._for_xml_id("sf_manufacturing.mrp_workorder_action_tablet")
return action
else:
return super(ResWorkcenter, self).action_work_order()
def _get_unavailability_intervals(self, start_datetime, end_datetime):
res = super(ResWorkcenter, self)._get_unavailability_intervals(start_datetime, end_datetime)
if not self:
return res
sql = """
SELECT workcenter_id, ARRAY_AGG((schedule_date || '|' || schedule_date + INTERVAL '1h' * duration)) as date_intervals
FROM maintenance_request
LEFT JOIN maintenance_equipment
ON maintenance_request.equipment_id = maintenance_equipment.id
WHERE
schedule_date IS NOT NULL
AND duration IS NOT NULL
AND equipment_id IS NOT NULL
AND maintenance_equipment.workcenter_id IS NOT NULL
AND maintenance_equipment.workcenter_id IN %s
AND (schedule_date, schedule_date + INTERVAL '1 hour') OVERLAPS (%s, %s)
GROUP BY maintenance_equipment.workcenter_id;
"""
self.env.cr.execute(sql, [tuple(self.ids), fields.Datetime.to_string(start_datetime.astimezone()),
fields.Datetime.to_string(end_datetime.astimezone())])
res_maintenance = defaultdict(list)
for wc_row in self.env.cr.dictfetchall():
res_maintenance[wc_row.get('workcenter_id')] = [
[fields.Datetime.to_datetime(i) for i in intervals.split('|')]
for intervals in wc_row.get('date_intervals')
]
for wc_id in self.ids:
intervals_previous_list = [(s.timestamp(), e.timestamp(), self.env['maintenance.request']) for s, e in
res[wc_id]]
intervals_maintenances_list = [(m[0].timestamp(), m[1].timestamp(), self.env['maintenance.request']) for m
in res_maintenance[wc_id]]
final_intervals_wc = Intervals(intervals_previous_list + intervals_maintenances_list)
res[wc_id] = [(datetime.fromtimestamp(s), datetime.fromtimestamp(e)) for s, e, _ in final_intervals_wc]
return res
# AGV是否可配送
is_agv_scheduling = fields.Boolean(string="AGV所属区域", tracking=True)
# 生产线优化
available_machine_number = fields.Integer(string="可用机台数量")
single_machine_capacity = fields.Float(string="单台小时产能")
production_line_hour_capacity = fields.Float(string="生产线小时产能", readonly=True,
compute='_compute_production_line_hour_capacity')
effective_working_hours_day = fields.Float(string="日有效工作时长", default=0, readonly=True,
compute='_compute_effective_working_hours_day')
default_capacity = fields.Float(
string='生产线日产能', compute='_compute_production_line_day_capacity', readonly=True)
# 计算生产线日产能
@api.depends('production_line_hour_capacity', 'effective_working_hours_day')
def _compute_production_line_day_capacity(self):
for record in self:
record.default_capacity = round(
record.production_line_hour_capacity * record.effective_working_hours_day, 2)
# 计算日有效工作时长
@api.depends('resource_calendar_id', 'resource_calendar_id.attendance_ids',
'resource_calendar_id.attendance_ids.hour_to', 'resource_calendar_id.attendance_ids.hour_from')
def _compute_effective_working_hours_day(self):
for record in self:
attendance_ids = [p for p in record.resource_calendar_id.attendance_ids if
p.dayofweek == self.get_current_day_of_week(datetime.datetime.now())]
if attendance_ids:
for attendance_id in attendance_ids:
if attendance_id.hour_from and attendance_id.hour_to:
record.effective_working_hours_day += attendance_id.hour_to - attendance_id.hour_from
else:
record.effective_working_hours_day = 0
# 计算传入时间日有效工作时长
def _compute_effective_working_hours_day1(self, date):
effective_working_hours_day = 0
for record in self:
attendance_ids = [p for p in record.resource_calendar_id.attendance_ids if
p.dayofweek == self.get_current_day_of_week(date)]
if attendance_ids:
for attendance_id in attendance_ids:
if attendance_id.hour_from and attendance_id.hour_to:
effective_working_hours_day += attendance_id.hour_to - attendance_id.hour_from
return effective_working_hours_day
# 获取传入时间是星期几
def get_current_day_of_week(self, datetime):
day_num = datetime.weekday()
return str(day_num)
# 计算生产线小时产能
@api.depends('single_machine_capacity', 'available_machine_number')
def _compute_production_line_hour_capacity(self):
for record in self:
record.production_line_hour_capacity = round(
record.single_machine_capacity * record.available_machine_number, 2)
# 判断计划开始时间是否在配置的工作中心的工作日历内
def deal_with_workcenter_calendar(self, start_date):
start_date = start_date + timedelta(hours=8) # 转换为北京时间
for record in self:
attendance_ids = [p for p in record.resource_calendar_id.attendance_ids if
p.dayofweek == record.get_current_day_of_week(start_date) and self.is_between_times(
p.hour_from, p.hour_to, start_date)]
return False if not attendance_ids else True
# 判断传入时间是否在配置的工作中心的工作日历内
def is_between_times(self, hour_from, hour_to, start_date):
integer_part, decimal_part = self.get_integer_and_decimal_parts(hour_from)
start_time = time(integer_part, decimal_part)
integer_part, decimal_part = self.get_integer_and_decimal_parts(hour_to)
end_time = time(integer_part, decimal_part)
return start_time <= start_date.time() <= end_time
# 获取整数部分和小数部分
def get_integer_and_decimal_parts(self, value):
integer_part = int(value)
decimal_part = value - integer_part
if decimal_part > 0:
decimal_part = round(decimal_part, 2) * 60
return int(integer_part), math.ceil(decimal_part)
# 处理排程是否超过日产能
def deal_available_default_capacity(self, date_planned):
date_planned_start = date_planned.strftime('%Y-%m-%d')
date_planned_end = date_planned + timedelta(days=1)
date_planned_end = date_planned_end.strftime('%Y-%m-%d')
plan_ids = self.env['sf.production.plan'].sudo().search([('date_planned_start', '>=', date_planned_start),
('date_planned_start', '<',
date_planned_end),
('state', 'not in', ['draft', 'cancel'])])
if plan_ids:
sum_qty = sum([p.product_qty for p in plan_ids])
date_planned_working_hours = self._compute_effective_working_hours_day1(date_planned)
default_capacity = round(
self.production_line_hour_capacity * date_planned_working_hours, 2)
_logger.info('排程日期:%s,计划数量:%s,日产能:%s,日工时:%s' % (
date_planned, sum_qty, default_capacity, date_planned_working_hours))
if sum_qty >= default_capacity:
return False
return True
# 处理排程是否超过小时产能
def deal_available_single_machine_capacity(self, date_planned, count):
date_planned_start = date_planned.strftime('%Y-%m-%d %H:00:00')
date_planned_end = date_planned + timedelta(hours=1)
date_planned_end = date_planned_end.strftime('%Y-%m-%d %H:00:00')
plan_ids = self.env['sf.production.plan'].sudo().search([('date_planned_start', '>=', date_planned_start),
('date_planned_start', '<',
date_planned_end),
('state', 'not in', ['draft', 'cancel'])])
sum_qty = sum([p.product_qty for p in plan_ids]) if plan_ids else count
production_line_hour_capacity = self.production_line_hour_capacity
if sum_qty > production_line_hour_capacity:
message = '当前计划开始时间不能预约排程,超过生产线小时产能(%d件)%d' % (
production_line_hour_capacity, count)
raise UserError(message)
return True
class ResWorkcenterProductivity(models.Model):
_inherit = 'mrp.workcenter.productivity'
workcenter_id = fields.Many2one('mrp.workcenter', required=False)