import datetime from collections import defaultdict from odoo import fields, models, api from odoo.addons.resource.models.resource import Intervals class ResWorkcenter(models.Model): _name = "mrp.workcenter" _inherit = ['mrp.workcenter', 'mail.thread'] # 生产线显示 production_line_show = fields.Char(string='生产线名称') equipment_id = fields.Many2one('maintenance.equipment', string="设备", tracking=True) production_line_id = fields.Many2one('sf.production.line', string='生产线', related='equipment_id.production_line_id', store=True) is_process_outsourcing = fields.Boolean('工艺外协') users_ids = fields.Many2many("res.users", 'users_workcenter', tracking=True) def write(self, vals): if 'users_ids' in vals: old_users = self.users_ids res = super(ResWorkcenter, self).write(vals) new_users = self.users_ids added_users = new_users - old_users removed_users = old_users - new_users if added_users or removed_users: message = "增加 → %s ; 移除 → %s (可操作用户)" % ( # ','.join(added_users.mapped('name')), ','.join(removed_users.mapped('name'))) added_users.mapped('name'), removed_users.mapped('name')) self.message_post(body=message) return res return super(ResWorkcenter, self).write(vals) name = fields.Char('Work Center', related='resource_id.name', store=True, readonly=False, tracking=True) time_efficiency = fields.Float('Time Efficiency', related='resource_id.time_efficiency', default=100, store=True, readonly=False, tracking=True) default_capacity = fields.Float( 'Capacity', default=1.0, help="Default number of pieces (in product UoM) that can be produced in parallel (at the same time) at this work center. For example: the capacity is 5 and you need to produce 10 units, then the operation time listed on the BOM will be multiplied by two. However, note that both time before and after production will only be counted once.", tracking=True) oee_target = fields.Float( string='OEE Target', help="Overall Effective Efficiency Target in percentage", default=90, tracking=True) oee = fields.Float(compute='_compute_oee', help='Overall Equipment Effectiveness, based on the last month', store=True) time_start = fields.Float('Setup Time', tracking=True) time_stop = fields.Float('Cleanup Time', tracking=True) costs_hour = fields.Float(string='Cost per hour', help='Hourly processing cost.', default=0.0, tracking=True) equipment_status = fields.Selection( [("正常", "正常"), ("故障停机", "故障停机"), ("计划维保", "计划维保"), ("空闲", "空闲"), ("封存(报废)", "封存(报废)")], string="设备状态", related='equipment_id.state') # @api.depends('equipment_id') # def _compute_equipment_id(self): # for record in self: # if record: # record.equipment_status = record.equipment_id.state equipment_image = fields.Binary('设备图片', related='equipment_id.machine_tool_picture') # 查询工艺外协加工中心 def get_process_outsourcing_workcenter(self): outsourcing_workcenter = self.env['mrp.workcenter'].search([('is_process_outsourcing', '=', True)]) return outsourcing_workcenter.id # @api.onchange('machine_tool_id') # def update_machine_tool_is_binding(self): # machine_tool = self.env["sf.machine_tool"].search([('is_binding', '=', True)]) # if machine_tool: # for item in machine_tool: # workcenter_machine_tool = self.env["mrp.workcenter"].search([('machine_tool_id', '=', item.id)]) # if workcenter_machine_tool: # if self.machine_tool_id.id: # if workcenter_machine_tool.id != self.machine_tool_id.id: # self.machine_tool_id.is_binding = True # else: # self.machine_tool_id.is_binding = True # else: # self.machine_tool_id.is_binding = True # item.is_binding = False # else: # self.machine_tool_id.is_binding = True def action_work_order(self): if not self.env.context.get('desktop_list_view', False): action = self.env["ir.actions.actions"]._for_xml_id("sf_manufacturing.mrp_workorder_action_tablet") return action else: return super(ResWorkcenter, self).action_work_order() def _get_unavailability_intervals(self, start_datetime, end_datetime): res = super(ResWorkcenter, self)._get_unavailability_intervals(start_datetime, end_datetime) if not self: return res sql = """ SELECT workcenter_id, ARRAY_AGG((schedule_date || '|' || schedule_date + INTERVAL '1h' * duration)) as date_intervals FROM maintenance_request LEFT JOIN maintenance_equipment ON maintenance_request.equipment_id = maintenance_equipment.id WHERE schedule_date IS NOT NULL AND duration IS NOT NULL AND equipment_id IS NOT NULL AND maintenance_equipment.workcenter_id IS NOT NULL AND maintenance_equipment.workcenter_id IN %s AND (schedule_date, schedule_date + INTERVAL '1 hour') OVERLAPS (%s, %s) GROUP BY maintenance_equipment.workcenter_id; """ self.env.cr.execute(sql, [tuple(self.ids), fields.Datetime.to_string(start_datetime.astimezone()), fields.Datetime.to_string(end_datetime.astimezone())]) res_maintenance = defaultdict(list) for wc_row in self.env.cr.dictfetchall(): res_maintenance[wc_row.get('workcenter_id')] = [ [fields.Datetime.to_datetime(i) for i in intervals.split('|')] for intervals in wc_row.get('date_intervals') ] for wc_id in self.ids: intervals_previous_list = [(s.timestamp(), e.timestamp(), self.env['maintenance.request']) for s, e in res[wc_id]] intervals_maintenances_list = [(m[0].timestamp(), m[1].timestamp(), self.env['maintenance.request']) for m in res_maintenance[wc_id]] final_intervals_wc = Intervals(intervals_previous_list + intervals_maintenances_list) res[wc_id] = [(datetime.fromtimestamp(s), datetime.fromtimestamp(e)) for s, e, _ in final_intervals_wc] return res # AGV是否可配送 is_agv_scheduling = fields.Boolean(string="AGV所属区域", tracking=True) # 生产线优化 available_machine_number = fields.Integer(string="可用机台数量") single_machine_capacity = fields.Float(string="单台小时产能") production_line_hour_capacity = fields.Float(string="生产线小时产能", readonly=True, _compute='_compute_production_line_hour_capacity', store=True) effective_working_hours_day = fields.Float(string="日有效工作时长", default=0, readonly=True, _compute='_compute_effective_working_hours_day', store=True) default_capacity = fields.Float( '生产线日产能', default=2.0, _compute='_compute_production_line_day_capacity', store=True, readonly=True) # 计算生产线日产能 @api.depends('production_line_hour_capacity', 'effective_working_hours_day') def _compute_production_line_day_capacity(self): for record in self: record.default_capacity = round( record.production_line_hour_capacity * record.effective_working_hours_day, 2) # 计算日有效工作时长 @api.depends('attendance_ids', 'attendance_ids.hour_to', 'attendance_ids.hour_from') def _compute_effective_working_hours_day(self): for record in self: attendance_ids = record.resource_calendar_id.attendance_ids.filter( lambda r: r.dayofweek == datetime.now().weekday()) if attendance_ids: for attendance_id in attendance_ids: if attendance_id.hour_from and attendance_id.hour_to: record.effective_working_hours_day += attendance_id.hour_to - attendance_id.hour_from else: record.effective_working_hours_day = 0 # 计算生产线小时产能 @api.depends('single_machine_capacity', 'available_machine_number') def _compute_production_line_hour_capacity(self): for record in self: record.production_line_hour_capacity = round( record.single_machine_capacity * record.available_machine_number, 2) class ResWorkcenterProductivity(models.Model): _inherit = 'mrp.workcenter.productivity' workcenter_id = fields.Many2one('mrp.workcenter', required=False)