# -*- coding: utf-8 -*- import base64 import json from datetime import datetime, timedelta import requests from odoo import models, fields, api, _ from odoo.exceptions import UserError, ValidationError # sf排程 class sf_production_plan(models.Model): _name = 'sf.production.plan' _description = 'sf_production_plan' _inherit = ['mail.thread'] # _order = 'state desc, write_date desc' part_name = fields.Char('零件名称', related='product_id.part_name', readonly=True) part_number = fields.Char('零件图号', related='product_id.part_number', readonly=True) state = fields.Selection([ ('draft', '待排程'), ('done', '已排程'), ('processing', '加工中'), ('finished', '已完成'), ('cancel', '已取消') ], string='状态', tracking=True) state_order = fields.Integer(compute='_compute_state_order', store=True) @api.depends('state') def _compute_state_order(self): order_mapping = { 'draft': 1, 'done': 2, 'processing': 3, 'finished': 4 } for record in self: record.state_order = order_mapping.get(record.state, 0) _order = 'state_order asc, write_date desc' name = fields.Char(string='制造订单') active = fields.Boolean(string='已归档', default=True) # selected = fields.Boolean(default=False) # order_number = fields.Char(string='订单号') order_deadline = fields.Datetime(string='订单交期') production_id = fields.Many2one('mrp.production', '关联制造订单') product_qty = fields.Float(string='数量', digits='Product Unit of Measure', required=True, default=0.0) production_line_id = fields.Many2one('sf.production.line', string='生产线') # date_planned_start = fields.Datetime(string='计划开始时间', required=True, index=True, copy=False, # default=fields.Datetime.now) date_planned_start = fields.Datetime(string='计划开始时间') date_planned_finished = fields.Datetime(string='计划结束时间') # 排程设置selection(倒排,顺排,默认倒排) schedule_setting = fields.Selection([ ('reverse', '倒排'), ('positive', '顺排')], string='排程设置', default='reverse') product_id = fields.Many2one('product.product', '关联产品') origin = fields.Char(string='销售订单') # # 加工时长 # process_time = fields.Float(string='加工时长', digits=(16, 2)) # 实际加工时长、实际开始时间、实际结束时间 actual_process_time = fields.Float(string='实际加工时长(分钟)', digits=(16, 2), compute='_compute_actual_process_time') actual_start_time = fields.Datetime(string='实际开始时间') actual_end_time = fields.Datetime(string='实际结束时间') shift = fields.Char(string='班次') # 序号、坯料编号、坯料名称、材质、数量、长度、宽度、厚度、直径、计划开始时间、计划结束时间、状态(已产出与待产出)、操作、创建人、创建时间、 # 客户名称、订单号、行号、长度、宽度、厚度、直径、交货数量、交货日期 # sequence = fields.Integer(string='序号', required=True, copy=False, readonly=True, index=True, # default=lambda self: self.env['ir.sequence'].sudo().next_by_code('sf.pl.plan')) sequence = fields.Integer(string='序号', copy=False, readonly=True, index=True) current_operation_name = fields.Char(string='当前工序名称', size=64, default='生产计划') @api.onchange('date_planned_start') def date_planned_start_onchange(self): if self.date_planned_start: self.date_planned_finished = self.date_planned_start + timedelta(hours=1) # 处理计划状态非待排程,计划结束时间为空的数据处理 def deal_no_date_planned_finished(self): plans = self.env['sf.production.plan'].search( [('date_planned_finished', '=', False), ('state', 'in', ['processing', 'done', 'finished'])]) for item in plans: if item.date_planned_start: item.date_planned_finished = item.date_planned_start + timedelta(hours=1) # 处理计划订单截止时间为空的数据 def deal_no_order_deadline(self): plans = self.env['sf.production.plan'].sudo().search( [('order_deadline', '=', False)]) for item in plans: if item.date_planned_start: item.order_deadline = item.date_planned_start + timedelta(days=7) @api.model def search_read(self, domain=None, fields=None, offset=0, limit=None, order=None): """ 修改搜索方法,只有制造订单状态为待排程时才显示 """ domain = domain or [] domain.append(('production_id.state', 'not in', ['draft', 'technology_to_confirmed'])) info = super(sf_production_plan, self).search_read(domain, fields, offset, limit, order) return info # 计算实际加工时长 @api.depends('actual_start_time', 'actual_end_time') def _compute_actual_process_time(self): for item in self: if item.actual_start_time and item.actual_end_time: item.actual_process_time = (item.actual_end_time - item.actual_start_time).total_seconds() / 60 else: item.actual_process_time = None @api.onchange('production_line_id') def _compute_production_line_id(self): for item in self: item.sudo().production_id.production_line_id = item.production_line_id.id item.sudo().production_id.workorder_ids.filtered( lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.write( {'production_line_id': item.production_line_id.id, 'plan_start_processing_time': item.date_planned_start}) # item.sudo().production_id.plan_start_processing_time = item.date_planned_start # @api.onchange('state') # def _onchange_state(self): # if self.state == 'finished': # self.production_id.schedule_state = '已完成' # @api.model # def _search(self, args, offset=0, limit=None, order=None, count=False, access_rights_uid=None): # """ # 默认修改筛选 # """ # return super(sf_production_plan, self.with_context(active_test=False))._search( # args, offset, limit, order, count, access_rights_uid) # def archive(self): # """ # 归档 # """ # self.write({'active': False}) # # def unarchive(self): # """ # 取消归档 # """ # self.write({'active': True}) @api.model def get_import_templates(self): """returns the xlsx import template file""" return [{ 'label': _('导入计划数据'), 'template': '/sf_plan/static/src/xlsx/sf_production_plan.xlsx' }] @api.model def _compute_orderpoint_id(self): pass def test_sale_order(self): company_id = self.env.ref('base.main_company').sudo() date = datetime.today() aaa = self.env['sale.order'].with_user(self.env.ref("base.user_admin")).sale_order_create( company_id, 'delivery_name', 'delivery_telephone', 'delivery_address', date) print('aaa', aaa) # 当不设置计划结束时间时,增加计算计划结束时间的方法,根据采购周期加缓冲期两个值来算就可以了 def action_view_production_schedule(self): self.ensure_one() if self.date_planned_start and self.date_planned_finished: return None elif self.date_planned_start and not self.date_planned_finished: # 如果没有给出计划结束时间,则计划结束时间为计划开始时间+采购周期+缓冲期 # 采购周期 purchase_cycle = 3 # 缓冲期 buffer_period = 1 # 计划结束时间 = 计划开始时间 + 采购周期 + 缓冲期 self.date_planned_finished = self.date_planned_start + timedelta(days=purchase_cycle) + timedelta( days=buffer_period) self.state = '已排程' return self.date_planned_finished else: return None def cancel_plan(self): self.ensure_one() self.date_planned_finished = None self.state = 'draft' def unlink(self): sequence_to_reorder = self.mapped('sequence') res = super().unlink() records_to_reorder = self.search([('sequence', '>', min(sequence_to_reorder))]) for record in records_to_reorder: record.sequence -= 1 return res # 生成编码 def _get_pl_no(self): sf_pl_no = self.env['sf.production.plan'].sudo().search( [('pl_no', '!=', '')], limit=1, order="id desc") today_date = datetime.today().strftime('%y%m%d') if not sf_pl_no: # 如果没有找到先前的坯料编码,则今天的第一个坯料编码为PL230520-001 num = 'PL' + today_date + '-001' else: # 获取最后一个坯料编码 last_pl_no = sf_pl_no.pl_no last_date = last_pl_no[2:8] last_seq = int(last_pl_no[-3:]) if last_date == today_date: # 如果最后一个坯料编码的日期与今天相同,则序号加1 new_seq = last_seq + 1 num = 'PL' + today_date + f'-{new_seq:03}' else: # 如果最后一个坯料编码的日期与今天不同,则今天的第一个坯料编码为PL230520-001 num = 'PL' + today_date + '-001' return num def do_production_schedule(self): """ 排程方法 """ self.deal_processing_schedule(self[0].date_planned_start) for record in self: if not record.production_line_id: raise ValidationError("未选择生产线") else: # 自动化产线加工 if record.production_id.workorder_ids: # 自动化产线加工 if record.production_id.production_type == '自动化产线加工': # 找到第一张CNC加工工单 first_cnc_workorder = record.production_id.workorder_ids.filtered(lambda x: x.name == 'CNC加工')[0] date_start = record.date_planned_start if record.date_planned_start else datetime.now() routing_workcenter = first_cnc_workorder.technology_design_id.route_id # 设置一个小的开始时间 first_cnc_workorder.date_planned_start = datetime.now() - timedelta(days=100) first_cnc_workorder.date_planned_finished = date_start + timedelta( minutes=routing_workcenter.time_cycle + routing_workcenter.reserved_duration) first_cnc_workorder.date_planned_start = date_start record.sudo().production_id.plan_start_processing_time = first_cnc_workorder.date_planned_start first_cnc_workorder.duration_expected = routing_workcenter.time_cycle + routing_workcenter.reserved_duration record.calculate_plan_time(first_cnc_workorder, record.production_id.workorder_ids) # 找到最后一张CNC加工工单 last_cnc_workorder = record.production_id.workorder_ids.filtered(lambda x: x.name == 'CNC加工')[-1] record.date_planned_finished = last_cnc_workorder.date_planned_finished else: # 人工线下加工只排第一张工单 item = record.production_id.workorder_ids[0] wo_start = record.date_planned_start if record.date_planned_start else datetime.now() routing_workcenter = item.technology_design_id.route_id item.date_planned_start = datetime.now() - timedelta(days=100) item.date_planned_finished = wo_start + timedelta( minutes=routing_workcenter.time_cycle + routing_workcenter.reserved_duration) item.date_planned_start = wo_start record.sudo().production_id.plan_start_processing_time = item.date_planned_start item.duration_expected = routing_workcenter.time_cycle + routing_workcenter.reserved_duration record.calculate_plan_time(item, record.production_id.workorder_ids) last_cnc_workorder = record.production_id.workorder_ids[-1] record.date_planned_finished = last_cnc_workorder.date_planned_finished record.state = 'done' # record.production_id.schedule_state = '已排' record.sudo().production_id.schedule_state = '已排' record.sudo().production_id.process_state = '待装夹' # self.env['sale.order'].browse(record.production_id.origin).schedule_status = 'to process' # sale_obj = self.env['sale.order'].search([('name', '=', record.origin)]) # if 'S' in sale_obj.name: # sale_obj.schedule_status = 'to process' # mrp_production_ids = record.production_id._get_children().ids # print('mrp_production_ids', mrp_production_ids) # for i in mrp_production_ids: # record.env['mrp.production'].sudo().browse(i).schedule_state = '已排' # record.production_id.date_planned_start = record.date_planned_start # record.production_id.date_planned_finished = record.date_planned_finished record.sudo().production_id.production_line_id = record.production_line_id.id if record.production_id.workorder_ids: record.sudo().production_id.workorder_ids.filtered( lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.write( {'production_line_id': record.production_line_id.id, 'plan_start_processing_time': record.date_planned_start}) # record.date_planned_finished = record.date_planned_start + timedelta(days=3) # record.state = 'done' return { 'name': '排程甘特图', 'type': 'ir.actions.act_window', 'res_model': 'sf.production.plan', # 要跳转的模型名称 # 要显示的视图类型,可以是'form', 'tree', 'kanban', 'graph', 'calendar', 'pivot'等 'view_mode': 'gantt,tree,form', 'target': 'current', # 跳转的目标窗口,可以是'current'或'new' } # 处理是否可排程 def deal_processing_schedule(self, date_planned_start,): count = len(self) workcenter_ids = self.production_line_id.mrp_workcenter_ids if not workcenter_ids: raise UserError('生产线没有配置工作中心') production_lines = workcenter_ids.filtered(lambda b: "自动生产线" in b.name) if not production_lines: # 判断是否配置了自动生产线 raise UserError('生产线没有配置自动生产线') if date_planned_start < datetime.now(): # 判断计划开始时间是否小于当前时间 raise UserError('计划开始时间不能小于当前时间') if all(not production_line.deal_with_workcenter_calendar(date_planned_start) for production_line in production_lines): # 判断计划开始时间是否在配置的工作中心的工作日历内 raise UserError('当前计划开始时间不能预约排程,请在工作时间内排程') if not production_lines.deal_available_default_capacity(date_planned_start): # 判断生产线是否可排程 raise UserError('当前计划开始时间不能预约排程,生产线今日没有可排程的资源') if not production_lines.deal_available_single_machine_capacity(date_planned_start, count): # 判断生产线是否可排程 raise UserError('当前计划开始时间不能预约排程,生产线该时间段没有可排程的资源') return True def calculate_plan_time(self, item, workorder_list): """ 根据CNC工单的时间去计算之前的其他工单的开始结束时间 param: item: 基准工单(根据该工单的开始结束时间去计算其他工单的开始结束时间) workorder_list: 需排程的工单列表 """ item_position = 0 for index, workorder in enumerate(workorder_list): if workorder.id == item.id: item_position = index break for i in range(item_position, -1, -1): if i < 1: break current_workorder = workorder_list[i] next_workorder = workorder_list[i - 1] routing_workcenter = next_workorder.technology_design_id.route_id # 设置一个小的开始时间 next_workorder.date_planned_start = datetime.now() - timedelta(days=100) next_workorder.date_planned_finished = current_workorder.date_planned_start next_workorder.date_planned_start = next_workorder.date_planned_finished - timedelta( minutes=routing_workcenter.time_cycle + routing_workcenter.reserved_duration) next_workorder.duration_expected = routing_workcenter.time_cycle + routing_workcenter.reserved_duration for i in range(item_position, len(workorder_list) - 1): if i > len(workorder_list) - 1: break current_workorder = workorder_list[i] next_workorder = workorder_list[i + 1] routing_workcenter = next_workorder.technology_design_id.route_id # 设置一个小的开始时间 next_workorder.date_planned_start = datetime.now() - timedelta(days=100) next_workorder.date_planned_finished = current_workorder.date_planned_finished + timedelta( minutes=routing_workcenter.time_cycle + routing_workcenter.reserved_duration) next_workorder.date_planned_start = current_workorder.date_planned_finished next_workorder.duration_expected = routing_workcenter.time_cycle + routing_workcenter.reserved_duration def calculate_plan_time_after(self, item, workorder_id_list): """ 计算CNC加工之后工单的开始结束时间 """ sequence = workorder_id_list.index(item.id) - 1 # 计算CNC加工之后工单的开始结束时间 for j in range(len(workorder_id_list) - sequence - 2): current_workorder_id = (item.id + (j + 1)) current_workorder_obj = self.env['mrp.workorder'].sudo().search( [('id', '=', current_workorder_id)]) old_workorder_obj = self.env['mrp.workorder'].sudo().search( [('id', '=', (current_workorder_id - 1))]) work_order = self.env['mrp.workorder'].sudo().search( [('production_id', '=', self.production_id.id), ('id', '=', current_workorder_id)]) try: work_order.date_planned_finished = datetime.now() + timedelta(days=100) work_order.date_planned_start = old_workorder_obj.date_planned_finished print('work_order.data_start', work_order.date_planned_start) work_order.date_planned_finished = old_workorder_obj.date_planned_finished + timedelta( minutes=self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', current_workorder_obj.name)]).time_cycle) work_order.duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', current_workorder_obj.name)]).time_cycle except ValueError as e: print('时间设置失败,请检查是否为工序分配工作中心,%s' % e) def cancel_production_schedule(self): self.date_planned_start = None self.date_planned_finished = None self.state = 'draft' self.production_line_id = None aa = self.env['mrp.production'].sudo().search([('name', '=', self.name)]) aa.schedule_state = '未排' # self.env['sale.order'].browse(record.production_id.origin).schedule_status = 'to shedule' sale_obj = self.env['sale.order'].search([('name', '=', self.origin)]) # if 'S' in sale_obj.name: # sale_obj.schedule_status = 'to schedule' return self.date_planned_finished def liucheng_cs(self): res = {'order_number': '123', 'delivery_end_date': str(datetime.now()), 'delivery_name': '机企猫', 'delivery_telephone': '18943919239', 'delivery_address': '新时空大厦', 'bfm_process_order_list': []} aa = self.env['ir.attachment'].search([('id', '=', 631)]) temp = self.env['product.template'].search([('id', '=', 47)]) val = { 'model_long': 3, 'model_width': 1, 'model_height': 1, 'model_volume': 3, 'model_machining_precision': '0.10', 'model_name': aa.name, 'model_data': base64.b64encode(aa.datas).decode('utf-8'), 'model_file': base64.b64encode(temp.model_file).decode('utf-8'), 'texture_code': '001', 'texture_type_code': '001001', # 'surface_process_code': self.env['jikimo.surface.process']._json_surface_process_code(item), 'process_parameters_code': 'R', 'price': 20, 'number': 2, 'total_amount': 100, 'remark': '这只是测试', 'barcode': 123456789, } res['bfm_process_order_list'].append(val) url = '/api/bfm_process_order/list' res['bfm_process_order_list'] = json.dumps(res['bfm_process_order_list']) try: ret = requests.post(('http://localhost:1069' + url), json={}, data=res) # aa = json.loads(ret.text) print(ret) except Exception as e: raise UserError(e) # 增加制造订单类型 production_type = fields.Selection( [('自动化产线加工', '自动化产线加工'), ('人工线下加工', '人工线下加工')], string='制造类型', related='production_id.production_type', store=True ) # 机台作业计划 class machine_work_schedule(models.Model): _name = 'sf.machine.schedule' _description = '机台作业计划' name = fields.Char(string='机台名') class MrpProductionInheritForPlan(models.Model): _inherit = 'mrp.production' def button_cancel(self): # 调用父类的取消操作 res = super(MrpProductionInheritForPlan, self).button_cancel() # 更新 sf.production.plan 模型的 state 为 'cancel' self.env['sf.production.plan'].search([('production_id', '=', self.id)]).write({'state': 'cancel'}) return res def toggle_active(self): # 调用父类方法切换 active 状态 res = super(MrpProductionInheritForPlan, self).toggle_active() stage_active = self.filtered('active') stage_inactive = self - stage_active self.env['sf.production.plan'].search( [('production_id', 'in', stage_active.ids)]).write( {'active': True}) self.env['sf.production.plan'].search( [('production_id', 'in', stage_inactive.ids)]).write( {'active': False}) return res