Files
test/sf_manufacturing/models/mrp_production.py
2023-08-24 11:24:27 +08:00

392 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import re
from odoo import api, fields, models, _
class MrpProduction(models.Model):
_inherit = 'mrp.production'
_description = "制造订单"
_order = 'create_date desc'
# tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘")
maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests")
request_ids = fields.One2many('maintenance.request', 'production_id')
model_file = fields.Binary('模型文件', related='product_id.model_file')
schedule_state = fields.Selection([('未排', '未排'), ('已排', '已排')],
string='排程状态', default='未排')
# state = fields.Selection(selection_add=[('未排程', '未排程'), ('已排程', '已排程')])
# 生产线
production_line_id = fields.Many2one('sf.production.line', string='生产线')
state = fields.Selection([
('draft', 'Draft'),
('confirmed', 'Confirmed'),
('未排程', '未排程'), ('已排程', '已排程'),
('progress', 'In Progress'),
('to_close', 'To Close'),
('done', 'Done'),
('cancel', 'Cancelled')], string='State',
compute='_compute_state', copy=False, index=True, readonly=True,
store=True, tracking=True,
help=" * Draft: The MO is not confirmed yet.\n"
" * Confirmed: The MO is confirmed, the stock rules and the reordering of the components are trigerred.\n"
" * In Progress: The production has started (on the MO or on the WO).\n"
" * To Close: The production is done, the MO has to be closed.\n"
" * Done: The MO is closed, the stock moves are posted. \n"
" * Cancelled: The MO has been cancelled, can't be confirmed anymore.")
# 当不设置计划结束时间时,增加计算计划结束时间的方法,根据采购周期加缓冲期两个值来算就可以了
def action_view_production_schedule(self):
self.ensure_one()
self.state = '已排程'
# if self.plan_start_time and self.plan_end_time:
# return None
# elif self.plan_start_time and not self.plan_end_time:
# # 如果没有给出计划结束时间,则计划结束时间为计划开始时间+采购周期+缓冲期
# # 采购周期
# purchase_cycle = 3
# # 缓冲期
# buffer_period = 1
# # 计划结束时间 = 计划开始时间 + 采购周期 + 缓冲期
# self.plan_end_time = self.plan_start_time + timedelta(days=purchase_cycle) + timedelta(days=buffer_period)
# return self.plan_end_time
# else:
# return None
def cancel_plan(self):
self.ensure_one()
self.state = 'confirmed'
# self.plan_start_time = None
# self.plan_end_time = None
@api.depends('request_ids')
def _compute_maintenance_count(self):
for production in self:
production.maintenance_count = len(production.request_ids)
# 维修模块按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
# 打开维修模块请求
def open_maintenance_request_mo(self):
self.ensure_one()
action = {
'name': _('Maintenance Requests'),
'view_mode': 'kanban,tree,form,pivot,graph,calendar',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
if self.maintenance_count == 1:
production = self.env['maintenance.request'].search([('production_id', '=', self.id)])
action['view_mode'] = 'form'
action['res_id'] = production.id
return action
def action_generate_serial(self):
self.ensure_one()
iot_code = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[
'ir.sequence'].next_by_code('stock.lot.serial')
iot_code_name = re.sub('[\u4e00-\u9fa5]', "", iot_code)
self.lot_producing_id = self.env['stock.lot'].create({
'product_id': self.product_id.id,
'company_id': self.company_id.id,
'name': iot_code_name,
})
if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids:
self.move_finished_ids.filtered(
lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id
if self.product_id.tracking == 'serial':
self._set_qty_producing()
# 重载根据工序生成工单的程序如果产品BOM中没有工序时
# 根据产品对应的模板类型中工序,去生成工单;
# CNC加工工序的选取规则
# 如果自动报价有带过来预分配的机床,
# 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制;
# 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心;
def _create_workorder3(self):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
if production.product_id.categ_id.type == '成品':
# 根据加工面板的面数及对应的工序模板生成工单
i = 0
processing_panel_len = len(production.product_id.model_processing_panel.split(','))
for k in (production.product_id.model_processing_panel.split(',')):
product_routing_workcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in product_routing_workcenter:
if i == 1 and route.routing_type == '获取CNC加工程序':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str('', production, route))
if route.is_repeat == True:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route))
if i == processing_panel_len and route.routing_type == '解除装夹':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route))
# 表面工艺工序
# 获取表面工艺id
if production.product_id.model_process_parameters_ids:
surface_technics_arr = []
# 工序id
route_workcenter_arr = []
for item in production.product_id.product_model_type_id.surface_technics_routing_tmpl_ids:
surface_technics_arr.append(item.route_workcenter_id.surface_technics_id.id)
route_workcenter_arr.append(item.route_workcenter_id.id)
if surface_technics_arr:
production_process_category = self.env['sf.production.process.category'].search(
[('production_process_ids.id', 'in', surface_technics_arr)],
order='sequence asc'
)
# 用filter刷选表面工艺id'是否存在工艺类别对象里
if production_process_category:
for p in production_process_category:
production_process = p.production_process_ids.filtered(
lambda pp: pp.id in surface_technics_arr)
if production_process:
process_parameter = production.product_id.model_process_parameters_ids.filtered(
lambda pm: pm.process_id.id == production_process.id)
if process_parameter:
# 产品为表面工艺服务的供应商
product_production_process = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', process_parameter.id)])
if product_production_process:
route_production_process = self.env[
'mrp.routing.workcenter'].search(
[('surface_technics_id', '=', production_process.id),
('id', 'in', route_workcenter_arr)])
if route_production_process:
workorders_values.append(
self.env[
'mrp.workorder']._json_workorder_surface_process_str(
production, route_production_process,
process_parameter,
product_production_process.seller_ids[0].partner_id.id))
elif production.product_id.categ_id.type == '坯料':
embryo_routing_workcenter = self.env['sf.embryo.model.type.routing.sort'].search(
[('embryo_model_type_id', '=', production.product_id.embryo_model_type_id.id)],
order='sequence asc'
)
for route in embryo_routing_workcenter:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str('', production, route))
production.workorder_ids = workorders_values
process_parameter_workorder = self.env['mrp.workorder'].search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production.id),
('is_subcontract', '=', True)])
if process_parameter_workorder:
is_pick = False
consecutive_workorders = []
m = 0
sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.id)
for i in range(len(sorted_workorders) - 1):
if m == 0:
is_pick = False
if sorted_workorders[i].supplier_id.id == sorted_workorders[i + 1].supplier_id.id and \
sorted_workorders[i].is_subcontract == sorted_workorders[i + 1].is_subcontract and \
sorted_workorders[i].id == sorted_workorders[i + 1].id - 1:
if sorted_workorders[i] not in consecutive_workorders:
consecutive_workorders.append(sorted_workorders[i])
consecutive_workorders.append(sorted_workorders[i + 1])
m += 1
continue
else:
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders, production)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
# 当前面的连续工序生成对应的外协出入库单再生成当前工序的外协出入库单
if is_pick is False:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i], production)
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders, production)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders, production)
if is_pick is False and m == 0:
if len(sorted_workorders) == 1:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders, production)
else:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i], production)
for workorder in production.workorder_ids:
workorder.duration_expected = workorder._get_duration_expected()
# 在之前的销售单上重新生成制造订单
def create_production1_values(self, production):
production_values_str = {'origin': production.origin,
'product_id': production.product_id.id,
'product_description_variants': production.product_description_variants,
'product_qty': production.product_qty,
'product_uom_id': production.product_uom_id.id,
'location_src_id': production.location_src_id.id,
'location_dest_id': production.location_dest_id.id,
'bom_id': production.bom_id.id,
'date_deadline': production.date_deadline,
'date_planned_start': production.date_planned_start,
'date_planned_finished': production.date_planned_finished,
'procurement_group_id': False,
'propagate_cancel': production.propagate_cancel,
'orderpoint_id': production.orderpoint_id.id,
'picking_type_id': production.picking_type_id.id,
'company_id': production.company_id.id,
'move_dest_ids': production.move_dest_ids.ids,
'user_id': production.user_id.id}
return production_values_str
def _get_stock_move_values_Res(self, item, location_src_id, location_dest_id, picking_type_id):
move_values = {
'name': item.name if item.name else '/',
'company_id': item.company_id.id,
'product_id': item.bom_id.bom_line_ids.product_id.id,
'product_uom': item.bom_id.bom_line_ids.product_uom_id.id,
'product_uom_qty': 1.0,
'location_id': location_src_id,
'location_dest_id': location_dest_id,
'origin': item.origin,
'picking_type_id': picking_type_id,
}
return move_values
# 工单排序
def _reset_work_order_sequence1(self, k):
sequen = 0
for rec in self:
current_sequence = 10
for work in rec.workorder_ids:
work.sequence = current_sequence
current_sequence += 10
if work.name == '后置三元质量检测' and work.processing_panel == k:
sequen = work.sequence
for work in rec.workorder_ids:
if work.name == '后置三元质量检测(返工)' and work.processing_panel == k:
work.sequence = sequen + 2
if work.name == 'CNC加工(返工)' and work.processing_panel == k:
work.sequence = sequen + 1
# 在制造订单上新增工单
def _create_workorder1(self, k):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
# 根据加工面板的面数及对应的成品工序模板生成工单
i = 0
production.product_id.model_processing_panel = k
for k in (production.product_id.model_processing_panel.split(',')):
routingworkcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in routingworkcenter:
if route.routing_type == '后置三元质量检测':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route)
)
if route.routing_type == 'CNC加工':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route))
production.workorder_ids = workorders_values
workorder = self.env['mrp.workorder'].browse(production.workorder_ids.ids)
print(workorder)
# for item in workorder:
# workorder.duration_expected = workorder._get_duration_expected()
def _create_workorder2(self, k):
self._create_workorder1(k)
self._reset_work_order_sequence1(k)
return True
def _reset_work_order_sequence(self):
for rec in self:
current_sequence = 1
for work in rec.workorder_ids:
work.sequence = current_sequence
current_sequence += 1
if work.name == '获取CNC加工程序':
work.button_start()
#work.fetchCNC()
# 创建工单并进行排序
def _create_workorder(self):
self._create_workorder3()
self._reset_work_order_sequence()
return True