Files
test/sf_manufacturing/models/mrp_production.py
2024-06-17 16:38:14 +08:00

645 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import logging
import re
import requests
from itertools import groupby
from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
from odoo.addons.sf_base.commons.common import Common
from odoo.tools import float_compare, float_round, float_is_zero, format_datetime
class MrpProduction(models.Model):
_inherit = 'mrp.production'
_description = "制造订单"
_order = 'create_date desc'
# tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘")
maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests")
request_ids = fields.One2many('maintenance.request', 'production_id')
model_file = fields.Binary('模型文件', related='product_id.model_file')
schedule_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')],
string='排程状态', default='未排')
work_order_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')],
string='工单状态', default='未排')
# state = fields.Selection(selection_add=[
# ('pending_scheduling', '待排程'),
# ('pending_processing', '待加工'),
# ('completed', '已完工')
# ])
state = fields.Selection([
('draft', 'Draft'),
('confirmed', 'Confirmed'),
('progress', '待排程'),
('pending_cam', '待装夹'),
('pending_processing', '待加工'),
('pending_era_cam', '待解除装夹'),
('completed', '已完工'),
('to_close', 'To Close'),
('done', 'Done'),
('cancel', 'Cancelled')], string='State',
compute='_compute_state', copy=False, index=True, readonly=True,
store=True, tracking=True,
help=" * Draft: The MO is not confirmed yet.\n"
" * Confirmed: The MO is confirmed, the stock rules and the reordering of the components are trigerred.\n"
" * In Progress: The production has started (on the MO or on the WO).\n"
" * To Close: The production is done, the MO has to be closed.\n"
" * Done: The MO is closed, the stock moves are posted. \n"
" * Cancelled: The MO has been cancelled, can't be confirmed anymore.")
check_status = fields.Boolean(string='启用状态', default=False, readonly=True)
active = fields.Boolean(string='已归档', default=True)
programming_no = fields.Char('编程单号')
work_state = fields.Char('业务状态')
programming_state = fields.Char('编程状态', tracking=True)
glb_file = fields.Binary("glb模型文件")
production_line_id = fields.Many2one('sf.production.line', string='生产线', tracking=True)
plan_start_processing_time = fields.Datetime('计划开始加工时间')
production_line_state = fields.Selection(
[('待上产线', '待上产线'), ('已上产线', '已上产线'), ('已下产线', '已下产线')],
string='上/下产线', default='待上产线', tracking=True)
# 工序状态
# Todo 研究下用法
process_state = fields.Selection([
('待装夹', '待装夹'),
('待检测', '待检测'),
('待加工', '待加工'),
('待解除装夹', '待解除装夹'),
('已完工', '已完工'),
], string='工序状态', default='待装夹')
# 零件图号
part_number = fields.Char('零件图号')
# 上传零件图纸
part_drawing = fields.Binary('零件图纸')
manual_quotation = fields.Boolean('人工编程', default=False, readonly=True)
rework_production = fields.Many2one('mrp.production', string='返工的制造订单')
@api.depends(
'move_raw_ids.state', 'move_raw_ids.quantity_done', 'move_finished_ids.state',
'workorder_ids.state', 'product_qty', 'qty_producing', 'schedule_state', 'process_state')
def _compute_state(self):
for production in self:
if not production.state or not production.product_uom_id:
production.state = 'draft'
elif production.state == 'cancel' or (production.move_finished_ids and all(
move.state == 'cancel' for move in production.move_finished_ids)):
production.state = 'cancel'
elif (
production.state == 'done'
or (production.move_raw_ids and all(
move.state in ('cancel', 'done') for move in production.move_raw_ids))
and all(move.state in ('cancel', 'done') for move in production.move_finished_ids)
):
production.state = 'done'
elif production.workorder_ids and all(
wo_state in ('done', 'cancel') for wo_state in production.workorder_ids.mapped('state')):
production.state = 'to_close'
elif not production.workorder_ids and float_compare(production.qty_producing, production.product_qty,
precision_rounding=production.product_uom_id.rounding) >= 0:
production.state = 'to_close'
elif any(wo_state in ('progress', 'done') for wo_state in production.workorder_ids.mapped('state')):
production.state = 'progress'
elif production.product_uom_id and not float_is_zero(production.qty_producing,
precision_rounding=production.product_uom_id.rounding):
production.state = 'progress'
elif any(not float_is_zero(move.quantity_done,
precision_rounding=move.product_uom.rounding or move.product_id.uom_id.rounding)
for move in production.move_raw_ids):
production.state = 'progress'
# 新添加的状态逻辑
if production.state == 'progress' and production.schedule_state == '已排' and production.process_state == '待装夹':
# production.state = 'pending_processing'
production.state = 'pending_cam'
if production.state == 'progress' and production.schedule_state == '已排' and production.process_state == '待加工':
# if production.state == 'pending_cam' and production.process_state == '待加工':
production.state = 'pending_processing'
elif production.state == 'progress' and production.process_state == '待解除装夹':
production.state = 'pending_era_cam'
elif production.state == 'progress' and production.process_state == '已完工':
production.state = 'completed'
elif production.state == 'progress' and production.work_order_state == '已完成':
production.state = 'completed'
def action_check(self):
"""
审核启用
"""
self.check_status = True
def action_uncheck(self):
"""
审核禁用
"""
self.check_status = False
def archive(self):
"""
归档
"""
self.write({'active': False})
def unarchive(self):
"""
取消归档
"""
self.write({'active': True})
@api.depends('request_ids')
def _compute_maintenance_count(self):
for production in self:
production.maintenance_count = len(production.request_ids)
# 制造订单报废:编程单更新
def updateCNC(self):
try:
res = {'production_no': self.name, 'programming_no': self.programming_no,
'order_no': self.origin}
logging.info('res=%s:' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/update_intelligent_programmings'
config_url = configsettings['sf_url'] + url
res['token'] = configsettings['token']
ret = requests.post(config_url, json={}, data=res, headers=config_header)
ret = ret.json()
logging.info('updateCNC-ret:%s' % ret)
if ret['status'] == 1:
self.write({'work_state': '已编程'})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('updateCNC error:%s' % e)
raise UserError("更新程单失败,请联系管理员")
# cnc程序获取
def fetchCNC(self, production_names):
cnc = self.env['mrp.production'].search([('id', '=', self.id)])
quick_order = self.env['quick.easy.order'].search(
[('name', '=', cnc.product_id.default_code.rsplit('-', 1)[0])])
programme_way = False
if cnc.manual_quotation is True:
programme_way = 'manual operation'
else:
programme_way = 'auto'
if quick_order:
programme_way = 'manual operation'
try:
res = {
'production_no': production_names,
'machine_tool_code': '',
'product_name': cnc.product_id.name,
'remanufacture_type': '',
'model_code': cnc.product_id.model_code,
'material_code': self.env['sf.production.materials'].search(
[('id', '=', cnc.product_id.materials_id.id)]).materials_no,
'material_type_code': self.env['sf.materials.model'].search(
[('id', '=', cnc.product_id.materials_type_id.id)]).materials_no,
'machining_processing_panel': cnc.product_id.model_processing_panel,
'machining_precision': cnc.product_id.model_machining_precision,
'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length,
'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height,
'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width,
'order_no': cnc.origin,
'model_order_no': cnc.product_id.default_code,
'user': cnc.env.user.name,
'programme_way': programme_way,
'model_file': '' if not cnc.product_id.model_file else base64.b64encode(
cnc.product_id.model_file).decode('utf-8')
}
# 打印出除了 model_file 之外的所有键值对
for key, value in res.items():
if key != 'model_file':
logging.info('%s: %s' % (key, value))
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/create'
config_url = configsettings['sf_url'] + url
res['token'] = configsettings['token']
# res_str = json.dumps(res)
ret = requests.post(config_url, json={}, data=res, headers=config_header)
ret = ret.json()
logging.info('fetchCNC-ret:%s' % ret)
if ret['status'] == 1:
self.write(
{'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('fetchCNC error:%s' % e)
raise UserError("cnc程序获取编程单失败,请联系管理员")
# 维修模块按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
# 打开维修模块请求
def open_maintenance_request_mo(self):
self.ensure_one()
action = {
'name': _('Maintenance Requests'),
'view_mode': 'kanban,tree,form,pivot,graph,calendar',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
if self.maintenance_count == 1:
production = self.env['maintenance.request'].search([('production_id', '=', self.id)])
action['view_mode'] = 'form'
action['res_id'] = production.id
return action
def action_generate_serial(self):
self.ensure_one()
iot_code = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[
'ir.sequence'].next_by_code('stock.lot.serial')
iot_code_name = re.sub('[\u4e00-\u9fa5]', "", iot_code)
self.lot_producing_id = self.env['stock.lot'].create({
'product_id': self.product_id.id,
'company_id': self.company_id.id,
'name': iot_code_name,
})
if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids:
self.move_finished_ids.filtered(
lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id
if self.product_id.tracking == 'serial':
self._set_qty_producing()
# 重载根据工序生成工单的程序如果产品BOM中没有工序时
# 根据产品对应的模板类型中工序,去生成工单;
# CNC加工工序的选取规则
# 如果自动报价有带过来预分配的机床,
# 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制;
# 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心;
def _create_workorder3(self):
# 根据product_id对self进行分组
grouped_product_ids = {k: list(g) for k, g in groupby(self, key=lambda x: x.product_id.id)}
# 初始化一个字典来存储每个product_id对应的生产订单名称列表
product_id_to_production_names = {}
# 对于每个product_id获取其所有生产订单的名称
for product_id, productions in grouped_product_ids.items():
# 为同一个product_id创建一个生产订单名称列表
product_id_to_production_names[product_id] = [production.name for production in productions]
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
if production.product_id.categ_id.type == '成品':
if production.product_id.id in product_id_to_production_names:
# # 同一个产品多个制造订单对应一个编程单和模型库
# # 只调用一次fetchCNC并将所有生产订单的名称作为字符串传递
if not production.programming_no:
production_programming = self.search(
[('product_id.id', '=', production.product_id.id), ('origin', '=', production.origin)],
limit=1, order='id asc')
if not production_programming.programming_no:
production.fetchCNC(', '.join(product_id_to_production_names[production.product_id.id]))
else:
production.write({'programming_no': production_programming.programming_no,
'programming_state': '编程中'})
# # 根据加工面板的面数及对应的工序模板生成工单
i = 0
processing_panel_len = len(production.product_id.model_processing_panel.split(','))
for k in (production.product_id.model_processing_panel.split(',')):
product_routing_workcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in product_routing_workcenter:
if route.is_repeat is True:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route))
if i == processing_panel_len and route.routing_type == '解除装夹':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route))
# 表面工艺工序
# 获取表面工艺id
if production.product_id.model_process_parameters_ids:
surface_technics_arr = []
# 工序id
route_workcenter_arr = []
for item in production.product_id.product_model_type_id.surface_technics_routing_tmpl_ids:
surface_technics_arr.append(item.route_workcenter_id.surface_technics_id.id)
route_workcenter_arr.append(item.route_workcenter_id.id)
if surface_technics_arr:
production_process_category = self.env['sf.production.process.category'].search(
[('production_process_ids.id', 'in', surface_technics_arr)],
order='sequence asc'
)
# 用filter刷选表面工艺id'是否存在工艺类别对象里
if production_process_category:
for p in production_process_category:
production_process = p.production_process_ids.filtered(
lambda pp: pp.id in surface_technics_arr)
if production_process:
process_parameter = production.product_id.model_process_parameters_ids.filtered(
lambda pm: pm.process_id.id == production_process.id)
if process_parameter:
# 产品为表面工艺服务的供应商
product_production_process = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', process_parameter.id)])
if product_production_process:
route_production_process = self.env[
'mrp.routing.workcenter'].search(
[('surface_technics_id', '=', production_process.id),
('id', 'in', route_workcenter_arr)])
if route_production_process:
workorders_values.append(
self.env[
'mrp.workorder']._json_workorder_surface_process_str(
production, route_production_process,
process_parameter,
product_production_process.seller_ids[0].partner_id.id))
elif production.product_id.categ_id.type == '坯料':
embryo_routing_workcenter = self.env['sf.embryo.model.type.routing.sort'].search(
[('embryo_model_type_id', '=', production.product_id.embryo_model_type_id.id)],
order='sequence asc'
)
for route in embryo_routing_workcenter:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str('', production, route))
production.workorder_ids = workorders_values
for workorder in production.workorder_ids:
workorder.duration_expected = workorder._get_duration_expected()
# 在之前的销售单上重新生成制造订单
def create_production1_values(self, production):
production_values_str = {'origin': production.origin,
'product_id': production.product_id.id,
'product_description_variants': production.product_description_variants,
'product_qty': production.product_qty,
'product_uom_id': production.product_uom_id.id,
'location_src_id': production.location_src_id.id,
'location_dest_id': production.location_dest_id.id,
'bom_id': production.bom_id.id,
'date_deadline': production.date_deadline,
'date_planned_start': production.date_planned_start,
'date_planned_finished': production.date_planned_finished,
'procurement_group_id': False,
'propagate_cancel': production.propagate_cancel,
'orderpoint_id': production.orderpoint_id.id,
'picking_type_id': production.picking_type_id.id,
'company_id': production.company_id.id,
'move_dest_ids': production.move_dest_ids.ids,
'user_id': production.user_id.id}
return production_values_str
# 工单排序
def _reset_work_order_sequence1(self, k):
for rec in self:
cnc_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工")
cnc_back_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工(返工)")
for work in rec.workorder_ids:
if work.name == cnc_workorder.name and work.processing_panel == k:
cnc_back_workorder.write({'sequence': work.sequence + 1})
print(cnc_back_workorder.sequence)
elif work.routing_type not in ['装夹预调'] and work != cnc_back_workorder:
work.sequence += 1
# 在制造订单上新增工单
def _create_workorder1(self, k):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
# 根据加工面板的面数及对应的成品工序模板生成工单
i = 0
production.product_id.model_processing_panel = k
for k in (production.product_id.model_processing_panel.split(',')):
routingworkcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in routingworkcenter:
if route.routing_type == 'CNC加工':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route))
production.workorder_ids = workorders_values
workorder = self.env['mrp.workorder'].browse(production.workorder_ids.ids)
print(workorder)
# for item in workorder:
# workorder.duration_expected = workorder._get_duration_expected()
def _create_workorder2(self, k):
self._create_workorder1(k)
self._reset_work_order_sequence1(k)
return True
def _reset_work_order_sequence(self):
for rec in self:
sequence_list = {}
model_type_id = rec.product_id.product_model_type_id
if model_type_id:
tmpl_num = 1
# 成品工序
product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids
if product_routing_tmpl_ids:
for tmpl_id in product_routing_tmpl_ids:
sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num})
tmpl_num += 1
# 表面工艺工序
# 模型类型的表面工艺工序模版
surface_tmpl_ids = model_type_id.surface_technics_routing_tmpl_ids
# 产品选择的表面工艺
model_process_parameters_ids = rec.product_id.model_process_parameters_ids
process_dict = {}
if model_process_parameters_ids:
for process_parameters_id in model_process_parameters_ids:
process_id = process_parameters_id.process_id
for surface_tmpl_id in surface_tmpl_ids:
if process_id == surface_tmpl_id.route_workcenter_id.surface_technics_id:
surface_tmpl_name = surface_tmpl_id.route_workcenter_id.name
process_dict.update({int(process_id.category_id.code): '%s-%s' % (
surface_tmpl_name, process_parameters_id.name)})
process_list = sorted(process_dict.keys())
for process_num in process_list:
sequence_list.update({process_dict.get(process_num): tmpl_num})
tmpl_num += 1
# 坯料工序
tmpl_num = 1
embryo_routing_tmpl_ids = model_type_id.embryo_routing_tmpl_ids
if embryo_routing_tmpl_ids:
for tmpl_id in embryo_routing_tmpl_ids:
sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num})
tmpl_num += 1
else:
raise ValidationError('该产品没有选择【模版类型】!')
for work in rec.workorder_ids:
if sequence_list.get(work.name):
work.sequence = sequence_list[work.name]
else:
raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name)
# if work.name == '获取CNC加工程序':
# work.button_start()
# #work.fetchCNC()
# work.button_finish()
# 创建工单并进行排序
def _create_workorder(self):
self._create_workorder3()
self._reset_work_order_sequence()
return True
# 修改标记已完成方法
def button_mark_done1(self):
if not self.workorder_ids.filtered(lambda w: w.routing_type not in ['表面工艺']):
self._button_mark_done_sanity_checks()
if not self.env.context.get('button_mark_done_production_ids'):
self = self.with_context(button_mark_done_production_ids=self.ids)
res = self._pre_button_mark_done()
if res is not True:
return res
if self.env.context.get('mo_ids_to_backorder'):
productions_to_backorder = self.browse(self.env.context['mo_ids_to_backorder'])
productions_not_to_backorder = self - productions_to_backorder
else:
productions_not_to_backorder = self
productions_to_backorder = self.env['mrp.production']
backorders = productions_to_backorder and productions_to_backorder._split_productions()
backorders = backorders - productions_to_backorder
productions_not_to_backorder._post_inventory(cancel_backorder=True)
productions_to_backorder._post_inventory(cancel_backorder=True)
# if completed products make other confirmed/partially_available moves available, assign them
done_move_finished_ids = (
productions_to_backorder.move_finished_ids | productions_not_to_backorder.move_finished_ids).filtered(
lambda m: m.state == 'done')
done_move_finished_ids._trigger_assign()
# Moves without quantity done are not posted => set them as done instead of canceling. In
# case the user edits the MO later on and sets some consumed quantity on those, we do not
# want the move lines to be canceled.
(productions_not_to_backorder.move_raw_ids | productions_not_to_backorder.move_finished_ids).filtered(
lambda x: x.state not in ('done', 'cancel')).write({
'state': 'done',
'product_uom_qty': 0.0,
})
for production in self:
production.write({
'date_finished': fields.Datetime.now(),
'product_qty': production.qty_produced,
'priority': '0',
'is_locked': True,
'state': 'done',
})
for workorder in self.workorder_ids.filtered(lambda w: w.state not in ('done', 'cancel')):
workorder.duration_expected = workorder._get_duration_expected()
if not backorders:
if self.env.context.get('from_workorder'):
return {
'type': 'ir.actions.act_window',
'res_model': 'mrp.production',
'views': [[self.env.ref('mrp.mrp_production_form_view').id, 'form']],
'res_id': self.id,
'target': 'main',
}
if self.user_has_groups(
'mrp.group_mrp_reception_report') and self.picking_type_id.auto_show_reception_report:
lines = self.move_finished_ids.filtered(lambda
m: m.product_id.type == 'product' and m.state != 'cancel' and m.quantity_done and not m.move_dest_ids)
if lines:
if any(mo.show_allocation for mo in self):
action = self.action_view_reception_report()
return action
return True
context = self.env.context.copy()
context = {k: v for k, v in context.items() if not k.startswith('default_')}
for k, v in context.items():
if k.startswith('skip_'):
context[k] = False
action = {
'res_model': 'mrp.production',
'type': 'ir.actions.act_window',
'context': dict(context, mo_ids_to_backorder=None, button_mark_done_production_ids=None)
}
if len(backorders) == 1:
action.update({
'view_mode': 'form',
'res_id': backorders[0].id,
})
else:
action.update({
'name': _("Backorder MO"),
'domain': [('id', 'in', backorders.ids)],
'view_mode': 'tree,form',
})
return action