Files
test/sf_manufacturing/models/mrp_production.py
jinling.yang 5f9c5961a5 报废
2024-08-09 10:39:48 +08:00

1092 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import logging
import json
import os
import re
import requests
from itertools import groupby
from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
from odoo.addons.sf_base.commons.common import Common
from odoo.tools import float_compare, float_round, float_is_zero, format_datetime
class MrpProduction(models.Model):
_inherit = 'mrp.production'
_description = "制造订单"
_order = 'create_date desc'
# tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘")
maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests")
request_ids = fields.One2many('maintenance.request', 'production_id')
model_file = fields.Binary('模型文件', related='product_id.model_file')
schedule_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')],
string='排程状态', default='未排')
work_order_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')],
string='工单状态', default='未排')
detection_result_ids = fields.One2many('sf.detection.result', 'production_id', '检测报告')
tool_state = fields.Selection([('0', '正常'), ('1', '缺刀'), ('2', '无效刀')], string='功能刀具状态', default='0',
store=True, compute='_compute_tool_state')
tool_state_remark = fields.Text(string='功能刀具状态备注(缺刀)', compute='_compute_tool_state_remark', store=True)
tool_state_remark2 = fields.Text(string='功能刀具状态备注(无效刀)', readonly=True)
@api.depends('workorder_ids.tool_state_remark')
def _compute_tool_state_remark(self):
for item in self:
if item.workorder_ids:
workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel'])
if workorder_ids.filtered(lambda a: a.tool_state == '1'):
work_ids = workorder_ids.filtered(lambda a: a.tool_state == '1')
tool_state_remark = ''
for work_id in work_ids:
if tool_state_remark == '':
tool_state_remark = f'{work_id.tool_state_remark}'
else:
tool_state_remark = f"{tool_state_remark}\n{work_id.tool_state_remark}"
item.tool_state_remark = tool_state_remark
else:
item.tool_state_remark = False
@api.depends('workorder_ids.tool_state')
def _compute_tool_state(self):
for item in self:
if item.workorder_ids:
tool_state = item.tool_state
workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel'])
if workorder_ids.filtered(lambda a: a.tool_state == '2'):
item.tool_state = '2'
elif workorder_ids.filtered(lambda a: a.tool_state == '1'):
item.tool_state = '1'
else:
item.tool_state = '0'
if tool_state == '2' and item.tool_state != '2':
item.detection_result_ids.filtered(
lambda a: a.detailed_reason == '无效功能刀具' and a.handle_result == '待处理').write(
{'handle_result': '已处理'})
# state = fields.Selection(selection_add=[
# ('pending_scheduling', '待排程'),
# ('pending_processing', '待加工'),
# ('completed', '已完工')
# ])
state = fields.Selection([
('draft', '草稿'),
('confirmed', '待排程'),
('pending_cam', '待加工'),
('progress', '加工中'),
('rework', '返工'),
('scrap', '报废'),
('to_close', 'To Close'),
('done', 'Done'),
('cancel', '已取消')], string='State',
compute='_compute_state', copy=False, index=True, readonly=True,
store=True, tracking=True,
help=" * Draft: The MO is not confirmed yet.\n"
" * Confirmed: The MO is confirmed, the stock rules and the reordering of the components are trigerred.\n"
" * In Progress: The production has started (on the MO or on the WO).\n"
" * To Close: The production is done, the MO has to be closed.\n"
" * Done: The MO is closed, the stock moves are posted. \n"
" * Cancelled: The MO has been cancelled, can't be confirmed anymore.")
check_status = fields.Boolean(string='启用状态', default=False, readonly=True)
active = fields.Boolean(string='已归档', default=True)
programming_no = fields.Char('编程单号')
work_state = fields.Char('业务状态')
programming_state = fields.Selection(
[('编程中', '编程中'), ('已编程', '已编程'), ('已编程未下发', '已编程未下发'), ('已下发', '已下发')],
string='编程状态',
tracking=True)
glb_file = fields.Binary("glb模型文件")
production_line_id = fields.Many2one('sf.production.line', string='生产线', tracking=True)
plan_start_processing_time = fields.Datetime('计划开始加工时间')
is_rework = fields.Boolean(string='是否返工', default=False)
# production_line_state = fields.Selection(
# [('待上产线', '待上产线'), ('已上产线', '已上产线'), ('已下产线', '已下产线')],
# string='上/下产线', default='待上产线', tracking=True)
# 工序状态
# Todo 研究下用法
process_state = fields.Selection([
('待装夹', '待装夹'),
('待检测', '待检测'),
('待加工', '待加工'),
('待解除装夹', '待解除装夹'),
('已完工', '已完工'),
], string='工序状态', default='待装夹')
# 零件图号
part_number = fields.Char('零件图号')
# 上传零件图纸
part_drawing = fields.Binary('零件图纸')
manual_quotation = fields.Boolean('人工编程', default=False, readonly=True)
is_scrap = fields.Boolean('是否报废', default=False)
@api.depends(
'move_raw_ids.state', 'move_raw_ids.quantity_done', 'move_finished_ids.state', 'tool_state',
'workorder_ids.state', 'product_qty', 'qty_producing', 'schedule_state')
def _compute_state(self):
for production in self:
if not production.state or not production.product_uom_id:
production.state = 'draft'
elif production.state == 'cancel' or (production.move_finished_ids and all(
move.state == 'cancel' for move in production.move_finished_ids)):
production.state = 'cancel'
elif (
production.state == 'done'
or (production.move_raw_ids and all(
move.state in ('cancel', 'done') for move in production.move_raw_ids))
and all(move.state in ('cancel', 'done') for move in production.move_finished_ids)
):
production.state = 'done'
elif production.workorder_ids and all(
wo_state in ('done', 'cancel') for wo_state in production.workorder_ids.mapped('state')):
production.state = 'to_close'
elif not production.workorder_ids and float_compare(production.qty_producing, production.product_qty,
precision_rounding=production.product_uom_id.rounding) >= 0:
production.state = 'to_close'
elif any(wo_state in ('progress', 'done') for wo_state in production.workorder_ids.mapped('state')):
production.state = 'progress'
elif production.product_uom_id and not float_is_zero(production.qty_producing,
precision_rounding=production.product_uom_id.rounding):
production.state = 'progress'
elif any(not float_is_zero(move.quantity_done,
precision_rounding=move.product_uom.rounding or move.product_id.uom_id.rounding)
for move in production.move_raw_ids if move.product_id):
production.state = 'progress'
# # 新添加的状态逻辑
if (
production.state == 'to_close' or production.state == 'progress') and production.schedule_state == '未排':
production.state = 'confirmed'
elif production.state == 'pending_cam' and production.schedule_state == '未排':
production.state = 'confirmed'
elif production.state == 'to_close' and production.schedule_state == '已排':
production.state = 'pending_cam'
if production.state == 'progress':
if all(wo_state not in ('progress', 'done', 'rework', 'scrap') for wo_state in
production.workorder_ids.mapped('state')):
production.state = 'pending_cam'
if production.is_rework is True:
production.state = 'rework'
# if production.state == 'pending_cam':
# if all(wo_state in 'done' for wo_state in production.workorder_ids.mapped('state')):
# production.state = 'done'
if any(
(
wo.test_results == '返工' and wo.state == 'done' and production.programming_state in [
'已编程']) or (
wo.state == 'rework' and production.programming_state == '编程中') or (
wo.is_rework is True and wo.state == 'done' and production.programming_state in ['编程中',
'已编程'])
for wo in
production.workorder_ids):
production.state = 'rework'
if any(wo.test_results == '报废' and wo.state == 'done' for wo in production.workorder_ids):
production.state = 'scrap'
if any(dr.test_results == '报废' and dr.handle_result == '已处理' for dr in
production.detection_result_ids):
production.state = 'cancel'
# 如果制造订单的功能刀具为【无效刀】则制造订单状态改为返工
if production.tool_state == '2':
production.state = 'rework'
def action_check(self):
"""
审核启用
"""
self.check_status = True
def action_uncheck(self):
"""
审核禁用
"""
self.check_status = False
def archive(self):
"""
归档
"""
self.write({'active': False})
def unarchive(self):
"""
取消归档
"""
self.write({'active': True})
@api.depends('request_ids')
def _compute_maintenance_count(self):
for production in self:
production.maintenance_count = len(production.request_ids)
# 获取cloud编程单的状态
def _cron_get_programming_state(self):
try:
if not self:
reproduction = self.env['mrp.production'].search(
[('state', '=', 'rework'), ('programming_state', '=', '编程中'), ('is_rework', '=', True)])
else:
reproduction = self
if reproduction:
programming_no_set = set([str(item.programming_no) for item in reproduction])
programming_no = list(programming_no_set)
programming_no_str = ','.join(programming_no)
res = {'programming_no': programming_no_str}
logging.info('res=%s:' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/get_state'
config_url = configsettings['sf_url'] + url
ret = requests.post(config_url, json=res, data=None, headers=config_header)
ret = ret.json()
result = json.loads(ret['result'])
if result['status'] == 1:
for item in result['programming_list']:
if not self:
for rp in reproduction:
if rp.programming_no == item['programming_no']:
rp.write({'programming_state': '已编程未下发' if item[
'programming_state'] == '已编程' else '编程中'})
else:
return item
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('cron_get_programming_state error:%s' % e)
# 编程单更新
def update_programming_state(self):
try:
res = {'programming_no': self.programming_no,
'manufacturing_type': 'rework' if self.is_scrap is False else 'scrap'}
logging.info('res=%s:' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/reset_state_again'
config_url = configsettings['sf_url'] + url
ret = requests.post(config_url, json=res, data=None, headers=config_header)
ret = ret.json()
result = json.loads(ret['result'])
logging.info('update_programming_state-ret:%s' % result)
if result['status'] == 1:
self.write({'is_rework': True})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('update_programming_state error:%s' % e)
raise UserError("更新编程单状态失败,请联系管理员")
# cnc程序获取
def fetchCNC(self, production_names):
cnc = self.env['mrp.production'].search([('id', '=', self.id)])
quick_order = self.env['quick.easy.order'].search(
[('name', '=', cnc.product_id.default_code.rsplit('-', 1)[0])])
programme_way = False
if cnc.manual_quotation is True:
programme_way = 'manual operation'
else:
programme_way = 'auto'
if quick_order:
programme_way = 'manual operation'
try:
res = {
'production_no': production_names,
'machine_tool_code': '',
'product_name': cnc.product_id.name,
'remanufacture_type': '',
'model_code': cnc.product_id.model_code,
'material_code': self.env['sf.production.materials'].search(
[('id', '=', cnc.product_id.materials_id.id)]).materials_no,
'material_type_code': self.env['sf.materials.model'].search(
[('id', '=', cnc.product_id.materials_type_id.id)]).materials_no,
'machining_processing_panel': cnc.product_id.model_processing_panel,
'machining_precision': '',
'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length,
'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height,
'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width,
'order_no': cnc.origin,
'model_order_no': cnc.product_id.default_code,
'user': cnc.env.user.name,
'programme_way': programme_way,
'model_file': '' if not cnc.product_id.model_file else base64.b64encode(
cnc.product_id.model_file).decode('utf-8')
}
# 打印出除了 model_file 之外的所有键值对
for key, value in res.items():
if key != 'model_file':
logging.info('%s: %s' % (key, value))
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/create'
config_url = configsettings['sf_url'] + url
res['token'] = configsettings['token']
# res_str = json.dumps(res)
ret = requests.post(config_url, json={}, data=res, headers=config_header)
ret = ret.json()
logging.info('fetchCNC-ret:%s' % ret)
if ret['status'] == 1:
self.write(
{'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('fetchCNC error:%s' % e)
raise UserError("cnc程序获取编程单失败,请联系管理员")
# 维修模块按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
# 打开维修模块请求
def open_maintenance_request_mo(self):
self.ensure_one()
action = {
'name': _('Maintenance Requests'),
'view_mode': 'kanban,tree,form,pivot,graph,calendar',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
if self.maintenance_count == 1:
production = self.env['maintenance.request'].search([('production_id', '=', self.id)])
action['view_mode'] = 'form'
action['res_id'] = production.id
return action
def action_generate_serial(self):
self.ensure_one()
iot_code = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[
'ir.sequence'].next_by_code('stock.lot.serial')
iot_code_name = re.sub('[\u4e00-\u9fa5]', "", iot_code)
self.lot_producing_id = self.env['stock.lot'].create({
'product_id': self.product_id.id,
'company_id': self.company_id.id,
'name': iot_code_name,
})
if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids:
self.move_finished_ids.filtered(
lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id
if self.product_id.tracking == 'serial':
self._set_qty_producing()
# 重载根据工序生成工单的程序如果产品BOM中没有工序时
# 根据产品对应的模板类型中工序,去生成工单;
# CNC加工工序的选取规则
# 如果自动报价有带过来预分配的机床,
# 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制;
# 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心;
def _create_workorder3(self, item):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
if production.product_id.categ_id.type == '成品':
# # 根据加工面板的面数及对应的工序模板生成工单
i = 0
processing_panel_len = len(production.product_id.model_processing_panel.split(','))
for k in (production.product_id.model_processing_panel.split(',')):
product_routing_workcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in product_routing_workcenter:
if route.is_repeat is True:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route, item))
# if i == processing_panel_len and route.routing_type == '解除装夹':
# workorders_values.append(
# self.env['mrp.workorder'].json_workorder_str(k, production, route))
# 表面工艺工序
# 获取表面工艺id
if production.product_id.model_process_parameters_ids:
logging.info('model_process_parameters_ids:%s' % production.product_id.model_process_parameters_ids)
surface_technics_arr = []
# 工序id
route_workcenter_arr = []
for item in production.product_id.product_model_type_id.surface_technics_routing_tmpl_ids:
surface_technics_arr.append(item.route_workcenter_id.surface_technics_id.id)
route_workcenter_arr.append(item.route_workcenter_id.id)
if surface_technics_arr:
production_process_category = self.env['sf.production.process.category'].search(
[('production_process_ids.id', 'in', surface_technics_arr)],
order='sequence asc'
)
# 用filter刷选表面工艺id'是否存在工艺类别对象里
if production_process_category:
for p in production_process_category:
logging.info('production_process_category:%s' % p.name)
production_process = p.production_process_ids.filtered(
lambda pp: pp.id in surface_technics_arr)
if production_process:
process_parameter = production.product_id.model_process_parameters_ids.filtered(
lambda pm: pm.process_id.id == production_process.id)
if process_parameter:
# 产品为表面工艺服务的供应商
product_production_process = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', process_parameter.id)])
if product_production_process:
route_production_process = self.env[
'mrp.routing.workcenter'].search(
[('surface_technics_id', '=', production_process.id),
('id', 'in', route_workcenter_arr)])
if route_production_process:
workorders_values.append(
self.env[
'mrp.workorder']._json_workorder_surface_process_str(
production, route_production_process,
process_parameter,
product_production_process.seller_ids[0].partner_id.id))
elif production.product_id.categ_id.type == '坯料':
embryo_routing_workcenter = self.env['sf.embryo.model.type.routing.sort'].search(
[('embryo_model_type_id', '=', production.product_id.embryo_model_type_id.id)],
order='sequence asc'
)
for route in embryo_routing_workcenter:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str('', production, route))
production.workorder_ids = workorders_values
# for production_item in productions:
process_parameter_workorder = self.env['mrp.workorder'].search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production.id),
('is_subcontract', '=', True)])
if process_parameter_workorder:
is_pick = False
consecutive_workorders = []
m = 0
sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.id)
for i in range(len(sorted_workorders) - 1):
if m == 0:
is_pick = False
if sorted_workorders[i].supplier_id.id == sorted_workorders[i + 1].supplier_id.id and \
sorted_workorders[i].is_subcontract == sorted_workorders[i + 1].is_subcontract and \
sorted_workorders[i].id == sorted_workorders[i + 1].id - 1:
if sorted_workorders[i] not in consecutive_workorders:
consecutive_workorders.append(sorted_workorders[i])
consecutive_workorders.append(sorted_workorders[i + 1])
m += 1
continue
else:
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders,
production)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
# 当前面的连续工序生成对应的外协出入库单再生成当前工序的外协出入库单
if is_pick is False:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i],
production)
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders,
production)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders, production)
if is_pick is False and m == 0:
if len(sorted_workorders) == 1:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders, production)
else:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i], production)
for workorder in production.workorder_ids:
workorder.duration_expected = workorder._get_duration_expected()
# 工单排序
def _reset_work_order_sequence1(self, k):
for rec in self:
cnc_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工")
cnc_back_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工(返工)")
for work in rec.workorder_ids:
if work.name == cnc_workorder.name and work.processing_panel == k:
cnc_back_workorder.write({'sequence': work.sequence + 1})
print(cnc_back_workorder.sequence)
elif work.routing_type not in ['装夹预调'] and work != cnc_back_workorder:
work.sequence += 1
# 在制造订单上新增工单
def _create_workorder1(self, k):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
# 根据加工面板的面数及对应的成品工序模板生成工单
i = 0
production.product_id.model_processing_panel = k
for k in (production.product_id.model_processing_panel.split(',')):
routingworkcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in routingworkcenter:
if route.routing_type == 'CNC加工':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route))
production.workorder_ids = workorders_values
workorder = self.env['mrp.workorder'].browse(production.workorder_ids.ids)
print(workorder)
# for item in workorder:
# workorder.duration_expected = workorder._get_duration_expected()
def _create_workorder2(self, k):
self._create_workorder1(k)
self._reset_work_order_sequence1(k)
return True
def _reset_work_order_sequence(self):
for rec in self:
workorder_ids = rec.workorder_ids.filtered(lambda item: item.state in ('返工', 'rework'))
# 产品模型类型
model_type_id = rec.product_id.product_model_type_id
# 产品加工面板
model_processing_panel = rec.product_id.model_processing_panel
if not workorder_ids:
sequence_list = {}
if model_type_id:
if model_processing_panel:
tmpl_num = 1
panel_list = model_processing_panel.split(',')
for panel in panel_list:
panel_sequence_list = {}
# 成品工序
product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids
if product_routing_tmpl_ids:
for tmpl_id in product_routing_tmpl_ids:
panel_sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num})
tmpl_num += 1
sequence_list.update({panel: panel_sequence_list})
# 表面工艺工序
# 模型类型的表面工艺工序模版
surface_tmpl_ids = model_type_id.surface_technics_routing_tmpl_ids
# 产品选择的表面工艺
model_process_parameters_ids = rec.product_id.model_process_parameters_ids
process_dict = {}
if model_process_parameters_ids:
for process_parameters_id in model_process_parameters_ids:
process_id = process_parameters_id.process_id
for surface_tmpl_id in surface_tmpl_ids:
if process_id == surface_tmpl_id.route_workcenter_id.surface_technics_id:
surface_tmpl_name = surface_tmpl_id.route_workcenter_id.name
process_dict.update({int(process_id.category_id.code): '%s-%s' % (
surface_tmpl_name, process_parameters_id.name)})
process_list = sorted(process_dict.keys())
for process_num in process_list:
sequence_list.update({process_dict.get(process_num): tmpl_num})
tmpl_num += 1
# 坯料工序
tmpl_num = 1
embryo_routing_tmpl_ids = model_type_id.embryo_routing_tmpl_ids
if embryo_routing_tmpl_ids:
for tmpl_id in embryo_routing_tmpl_ids:
sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num})
tmpl_num += 1
else:
raise ValidationError('该产品【加工面板】为空!')
else:
raise ValidationError('该产品没有选择【模版类型】!')
for work in rec.workorder_ids:
if sequence_list.get(work.name):
work.sequence = sequence_list[work.name]
elif sequence_list.get(work.processing_panel):
processing_panel = sequence_list.get(work.processing_panel)
if processing_panel.get(work.name):
work.sequence = processing_panel[work.name]
else:
raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name)
else:
raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name)
# 当单个面触发返工时,将新生成的工单插入到返工工单下方,并且后面的所以工单工序重排
elif rec.workorder_ids.filtered(lambda item: item.sequence == 0):
# 获取新增的返工工单
work_ids = rec.workorder_ids.filtered(lambda item: item.sequence == 0)
# 获取当前返工面最后一个工单工序
sequence_max = sorted(
rec.workorder_ids.filtered(lambda item: item.processing_panel == work_ids[0].processing_panel),
key=lambda item: item.sequence, reverse=True)[0].sequence
# 对当前返工工单之后的工单工序进行重排
work_order_ids = rec.workorder_ids.filtered(lambda item: item.sequence > sequence_max)
for work_id in work_order_ids:
work_id.sequence = work_id.sequence + 3
# 生成新增的返工工单的工序
# 成品工序
panel_sequence_list = {}
product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids
if product_routing_tmpl_ids:
for tmpl_id in product_routing_tmpl_ids:
sequence_max += 1
panel_sequence_list.update({tmpl_id.route_workcenter_id.name: sequence_max})
for work_id in work_ids:
if panel_sequence_list.get(work_id.name):
work_id.sequence = panel_sequence_list[work_id.name]
# 创建工单并进行排序
def _create_workorder(self, item):
self._create_workorder3(item)
self._reset_work_order_sequence()
return True
# 修改标记已完成方法
def button_mark_done1(self):
if not self.workorder_ids.filtered(lambda w: w.routing_type not in ['表面工艺']):
self._button_mark_done_sanity_checks()
if not self.env.context.get('button_mark_done_production_ids'):
self = self.with_context(button_mark_done_production_ids=self.ids)
res = self._pre_button_mark_done()
if res is not True:
return res
if self.env.context.get('mo_ids_to_backorder'):
productions_to_backorder = self.browse(self.env.context['mo_ids_to_backorder'])
productions_not_to_backorder = self - productions_to_backorder
else:
productions_not_to_backorder = self
productions_to_backorder = self.env['mrp.production']
backorders = productions_to_backorder and productions_to_backorder._split_productions()
backorders = backorders - productions_to_backorder
productions_not_to_backorder._post_inventory(cancel_backorder=True)
productions_to_backorder._post_inventory(cancel_backorder=True)
# if completed products make other confirmed/partially_available moves available, assign them
done_move_finished_ids = (
productions_to_backorder.move_finished_ids | productions_not_to_backorder.move_finished_ids).filtered(
lambda m: m.state == 'done')
done_move_finished_ids._trigger_assign()
# Moves without quantity done are not posted => set them as done instead of canceling. In
# case the user edits the MO later on and sets some consumed quantity on those, we do not
# want the move lines to be canceled.
(productions_not_to_backorder.move_raw_ids | productions_not_to_backorder.move_finished_ids).filtered(
lambda x: x.state not in ('done', 'cancel')).write({
'state': 'done',
'product_uom_qty': 0.0,
})
for production in self:
logging.info('qty_produced:%s' % production.qty_produced)
if production.qty_produced == 0.0:
production.qty_produced = 1.0
production.write({
'date_finished': fields.Datetime.now(),
'product_qty': production.qty_produced,
'priority': '0',
'is_locked': True,
'state': 'done',
})
for workorder in self.workorder_ids.filtered(lambda w: w.state not in ('done', 'cancel')):
workorder.duration_expected = workorder._get_duration_expected()
if not backorders:
if self.env.context.get('from_workorder'):
return {
'type': 'ir.actions.act_window',
'res_model': 'mrp.production',
'views': [[self.env.ref('mrp.mrp_production_form_view').id, 'form']],
'res_id': self.id,
'target': 'main',
}
if self.user_has_groups(
'mrp.group_mrp_reception_report') and self.picking_type_id.auto_show_reception_report:
lines = self.move_finished_ids.filtered(lambda
m: m.product_id.type == 'product' and m.state != 'cancel' and m.quantity_done and not m.move_dest_ids)
if lines:
if any(mo.show_allocation for mo in self):
action = self.action_view_reception_report()
return action
logging.info('last-product_qty:%s' % production.product_qty)
return True
context = self.env.context.copy()
context = {k: v for k, v in context.items() if not k.startswith('default_')}
for k, v in context.items():
if k.startswith('skip_'):
context[k] = False
action = {
'res_model': 'mrp.production',
'type': 'ir.actions.act_window',
'context': dict(context, mo_ids_to_backorder=None, button_mark_done_production_ids=None)
}
if len(backorders) == 1:
action.update({
'view_mode': 'form',
'res_id': backorders[0].id,
})
else:
action.update({
'name': _("Backorder MO"),
'domain': [('id', 'in', backorders.ids)],
'view_mode': 'tree,form',
})
return action
# 返工
def button_rework(self):
cloud_programming = None
if self.programming_state in ['已编程']:
cloud_programming = self._cron_get_programming_state()
return {
'name': _('返工'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'sf.rework.wizard',
'target': 'new',
'context': {
'default_production_id': self.id,
'default_reprogramming_num': cloud_programming['reprogramming_num'],
'default_programming_state': cloud_programming['programming_state'],
'default_is_reprogramming': True if cloud_programming['programming_state'] in ['已下发'] else False
}
}
# 更新程序
def do_update_program(self):
program_production = self
if len(program_production) >= 1:
# same_product_id = None
# is_not_same_product = 0
for item in program_production:
# if same_product_id is None:
# same_product_id = item.product_id
# if item.product_id != same_product_id:
# is_not_same_product += 1
if item.state != "rework" and item.programming_state != "已编程未下发":
raise UserError("请选择状态为返工且已编程未下发的制造订单")
# if is_not_same_product >= 1:
# raise UserError("您选择的记录中含有其他产品的制造订单,请选择同一产品的制造订单")
grouped_program_ids = {k: list(g) for k, g in groupby(program_production, key=lambda x: x.programming_no)}
program_to_production_names = {}
for programming_no, program_production in grouped_program_ids.items():
program_to_production_names[programming_no] = [production.name for production in program_production]
for production in self:
if production.programming_no in program_to_production_names:
productions_not_delivered = self.env['mrp.production'].search(
[('programming_no', '=', production.programming_no), ('programming_state', '=', '已编程未下发')])
rework_workorder = production.workorder_ids.filtered(lambda m: m.state == 'rework')
if rework_workorder:
for rework_item in rework_workorder:
pending_workorder = production.workorder_ids.filtered(
lambda m1: m1.state in [
'pending'] and m1.processing_panel == rework_item.processing_panel and m1.routing_type == 'CNC加工')
if not pending_workorder.cnc_ids:
production.get_new_program(rework_item.processing_panel)
# production.write({'state': 'progress', 'programming_state': '已编程', 'is_rework': False})
productions_not_delivered.write(
{'state': 'progress', 'programming_state': '已编程', 'is_rework': False})
# 从cloud获取重新编程过的最新程序
def get_new_program(self, processing_panel):
try:
res = {'programming_no': self.programming_no, 'processing_panel': processing_panel}
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/get_new_program'
config_url = configsettings['sf_url'] + url
r = requests.post(config_url, json=res, data=None, headers=config_header)
r = r.json()
result = json.loads(r['result'])
if result['status'] == 1:
program_path_tmp_panel = os.path.join('/tmp', result['folder_name'], 'return', processing_panel)
if os.path.exists(program_path_tmp_panel):
files_r = os.listdir(program_path_tmp_panel)
if files_r:
for file_name in files_r:
file_path = os.path.join(program_path_tmp_panel, file_name)
os.remove(file_path)
download_state = self.env['sf.cnc.processing'].download_file_tmp(result['folder_name'],
processing_panel)
if download_state is False:
raise UserError('编程单号为%s的CNC程序文件从FTP拉取失败' % (self.programming_no))
productions = self.env['mrp.production'].search(
[('programming_no', '=', self.programming_no), ('state', 'not in', ('cancel', 'done'))])
if productions:
for production in productions:
panel_workorder = production.workorder_ids.filtered(lambda
pw: pw.processing_panel == processing_panel and pw.routing_type == 'CNC加工' and pw.state not in (
'rework', 'done'))
if panel_workorder:
if panel_workorder.cmm_ids:
panel_workorder.cmm_ids.sudo().unlink()
if panel_workorder.cnc_ids:
panel_workorder.cnc_ids.sudo().unlink()
self.env['sf.cam.work.order.program.knife.plan'].sudo().unlink_cam_plan(
production)
# program_path_tmp_panel = os.path.join('C://Users//43484//Desktop//fsdownload//test',
# processing_panel)
logging.info('program_path_tmp_panel:%s' % program_path_tmp_panel)
files_panel = os.listdir(program_path_tmp_panel)
if files_panel:
for file in files_panel:
file_extension = os.path.splitext(file)[1]
if file_extension.lower() == '.pdf':
panel_file_path = os.path.join(program_path_tmp_panel, file)
logging.info('panel_file_path:%s' % panel_file_path)
panel_workorder.write(
{'cnc_ids': panel_workorder.cnc_ids.sudo()._json_cnc_processing(processing_panel,
result),
'cmm_ids': panel_workorder.cmm_ids.sudo()._json_cmm_program(processing_panel, result),
'cnc_worksheet': base64.b64encode(open(panel_file_path, 'rb').read())})
logging.info('len(cnc_worksheet):%s' % len(panel_workorder.cnc_worksheet))
pre_workorder = production.workorder_ids.filtered(lambda
ap: ap.routing_type == '装夹预调' and ap.processing_panel == processing_panel and ap.state not in (
'rework', 'done'))
if pre_workorder:
pre_workorder.write(
{'processing_drawing': base64.b64encode(open(panel_file_path, 'rb').read())})
# if production.state == 'rework' and production.programming_state == '已编程未下发':
# production.write(
# {'state': 'progress', 'programming_state': '已编程', 'is_rework': False})
# logging.info('返工含有已编程未下发的程序更新完成:%s' % production.name)
else:
raise UserError(result['message'])
except Exception as e:
logging.info('get_new_program error:%s' % e)
raise UserError("从云平台获取最新程序失败,请联系管理员")
def recreateManufacturing(self):
"""
重新生成制造订单
"""
if self.is_scrap is True:
sale_order = self.env['sale.order'].sudo().search([('name', '=', productions.origin)])
values = self.env['mrp.production'].create_production1_values(self.production_id)
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(
self.production_id.company_id).create(
values)
# self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and
not p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production_item in productions:
process_parameter_workorder = self.env['mrp.workorder'].search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_item.id),
('is_subcontract', '=', True)])
if process_parameter_workorder:
is_pick = False
consecutive_workorders = []
m = 0
sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.id)
for i in range(len(sorted_workorders) - 1):
if m == 0:
is_pick = False
if sorted_workorders[i].supplier_id.id == sorted_workorders[i + 1].supplier_id.id and \
sorted_workorders[i].is_subcontract == sorted_workorders[i + 1].is_subcontract and \
sorted_workorders[i].id == sorted_workorders[i + 1].id - 1:
if sorted_workorders[i] not in consecutive_workorders:
consecutive_workorders.append(sorted_workorders[i])
consecutive_workorders.append(sorted_workorders[i + 1])
m += 1
continue
else:
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders,
production_item)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
# 当前面的连续工序生成对应的外协出入库单再生成当前工序的外协出入库单
if is_pick is False:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i],
production_item)
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders,
production_item)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders, production_item)
if is_pick is False and m == 0:
if len(sorted_workorders) == 1:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders, production_item)
else:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i], production_item)
for production in productions:
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
'''
创建生产计划
'''
# 工单耗时
workorder_duration = 0
for workorder in productions.workorder_ids:
workorder_duration += workorder.duration_expected
if sale_order:
sale_order.mrp_production_ids |= productions
# sale_order.write({'schedule_status': 'to schedule'})
self.env['sf.production.plan'].sudo().with_company(self.production_id.company_id).create({
'name': productions.name,
'order_deadline': sale_order.deadline_of_delivery,
'production_id': productions.id,
'date_planned_start': productions.date_planned_start,
'origin': productions.origin,
'product_qty': productions.product_qty,
'product_id': productions.product_id.id,
'state': 'draft',
})
# 在之前的销售单上重新生成制造订单
def create_production1_values(self, production, sale_order):
production_values_str = {'origin': production.origin,
'product_id': production.product_id.id,
'product_description_variants': production.product_description_variants,
'product_qty': production.product_qty,
'product_uom_id': production.product_uom_id.id,
'location_src_id': production.location_src_id.id,
'location_dest_id': production.location_dest_id.id,
'bom_id': production.bom_id.id,
'date_deadline': production.date_deadline,
'date_planned_start': production.date_planned_start,
'date_planned_finished': production.date_planned_finished,
'procurement_group_id': sale_order.id,
'propagate_cancel': production.propagate_cancel,
'orderpoint_id': production.orderpoint_id.id,
'picking_type_id': production.picking_type_id.id,
'company_id': production.company_id.id,
'move_dest_ids': production.move_dest_ids.ids,
'user_id': production.user_id.id}
return production_values_str
class sf_detection_result(models.Model):
_name = 'sf.detection.result'
_description = "检测结果"
production_id = fields.Many2one('mrp.production')
processing_panel = fields.Char('加工面')
routing_type = fields.Selection([
('装夹预调', '装夹预调'),
('CNC加工', 'CNC加工')], string="工序类型")
rework_reason = fields.Selection(
[("programming", "编程"), ("cutter", "刀具"), ("clamping", "装夹"),
("operate computer", "操机"),
("technology", "工艺"), ("customer redrawing", "客户改图")], string="原因", tracking=True)
detailed_reason = fields.Text('详细原因')
test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")],
string="检测结果", tracking=True)
test_report = fields.Binary('检测报告', readonly=True)
handle_result = fields.Selection([("待处理", "待处理"), ("已处理", "已处理")], default='', string="处理结果",
tracking=True)
# 查看检测报告
def button_look_test_report(self):
return {
'res_model': 'sf.detection.result',
'type': 'ir.actions.act_window',
'res_id': self.id,
'views': [(self.env.ref('sf_manufacturing.sf_test_report_form').id, 'form')],
# 'view_mode': 'form',
# 'context': {
# 'default_id': self.id
# },
'target': 'new'
}
class sf_processing_panel(models.Model):
_name = 'sf.processing.panel'
_description = "加工面"
name = fields.Char('加工面')
active = fields.Boolean('有效', default=True)