Files
test/sf_manufacturing/models/mrp_production.py
2025-01-23 16:42:16 +08:00

1762 lines
95 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import datetime
import logging
import json
import os
import re
import requests
from itertools import groupby
from collections import defaultdict, namedtuple
from odoo import api, fields, models, SUPERUSER_ID, _
from odoo.exceptions import UserError, ValidationError
from odoo.addons.sf_base.commons.common import Common
from odoo.tools import float_compare, float_round, float_is_zero, format_datetime
class MrpProduction(models.Model):
_inherit = 'mrp.production'
_description = "制造订单"
_order = 'create_date desc'
sale_order_id = fields.Many2one('sale.order', string='销售订单', compute='_compute_sale_order_id', store=True)
deadline_of_delivery = fields.Date('订单交期', tracking=True, compute='_compute_deadline_of_delivery', store=True)
# tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘")
maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests")
request_ids = fields.One2many('maintenance.request', 'production_id')
model_file = fields.Binary('模型文件', related='product_id.model_file')
schedule_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')],
string='排程状态', default='未排')
work_order_state = fields.Selection([('未排', '未排'), ('已排', '已排'), ('已完成', '已完成')],
string='工单状态', default='未排')
detection_result_ids = fields.One2many('sf.detection.result', 'production_id', '检测报告')
tool_state = fields.Selection([('0', '正常'), ('1', '缺刀'), ('2', '无效刀')], string='功能刀具状态', default='0',
store=True, compute='_compute_tool_state')
tool_state_remark = fields.Text(string='功能刀具状态备注(缺刀)', compute='_compute_tool_state_remark', store=True)
tool_state_remark2 = fields.Text(string='功能刀具状态备注(无效刀)', readonly=True)
@api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id')
def _compute_sale_order_id(self):
for production in self:
# 初始化 sale_order_id 为 False
sale_order_id = False
# 使用正则表达式查找产品名称中的 'S' 开头的字母数字字符串
match = re.search(r'S\d+', production.product_id.with_context(lang='zh_CN').name) # 从字符串开始匹配
if match:
result = match.group(0)
try:
# 查找与匹配的字符串相符的销售订单
sale_order = self.env['sale.order'].search(
[('name', '=', result)], limit=1, order='id asc'
)
if sale_order:
production.sale_order_id = sale_order.id
else:
logging.warning("No sale order found for production {} with product {} (name match: {})".format(
production.id, production.product_id.name, result))
except Exception as e:
logging.error("Error while fetching sale order for production {}: {}".format(production.id, str(e)))
@api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id')
def _compute_deadline_of_delivery(self):
for production in self:
# 确保 procurement_group_id 和相关字段存在
if production.procurement_group_id:
# 获取相关的 sale_id
sale_order_id = production.procurement_group_id.mrp_production_ids.mapped(
'move_dest_ids.group_id.sale_id')
# 确保 sale_order_id 是有效的 ID 列表
if sale_order_id:
# 获取 sale.order 记录
sale_id = self.env['sale.order'].sudo().browse(sale_order_id.ids) # 使用 mapped 返回的 ID 列表
# 处理 sale_id
if sale_id:
# 假设我们只需要第一个 sale_id
production.deadline_of_delivery = sale_id[0].deadline_of_delivery if sale_id else False
else:
production.deadline_of_delivery = False
else:
production.deadline_of_delivery = False
def _compute_default_delivery_status(self):
try:
if self.state == 'cancel':
return False
if not self.deadline_of_delivery:
return False
hours = self.get_hours_diff()
if hours >= 48:
return '正常'
elif hours > 0 and hours < 48 and self.state != 'done':
return '预警'
elif hours > 0 and hours < 48 and self.state == 'done':
return '正常'
else:
return '已逾期'
except Exception as e:
logging.error("Error processing production ID {}: {}".format(self.id, e))
raise e
@api.depends('state', 'deadline_of_delivery')
def _compute_delivery_status(self):
for production in self:
delivery_status = production._compute_default_delivery_status()
if delivery_status and production.delivery_status != delivery_status:
production.delivery_status = delivery_status
delivery_status = fields.Selection([('正常', '正常'), ('预警', '预警'), ('已逾期', '已逾期')], string='交期状态',
store=True,
compute='_compute_delivery_status',
default=lambda self: self._compute_default_delivery_status())
def get_page_all_records(self, model_name, func, domain, page_size=100):
# 获取模型对象
model = self.env[model_name].sudo()
# 初始化分页参数
page_number = 1
while True:
# 计算偏移量
offset = (page_number - 1) * page_size
# 获取当前页的数据
records = model.search(domain, limit=page_size, offset=offset)
# 如果没有更多记录,退出循环
if not records:
break
# 将当前页的数据添加到结果列表
func(records)
# 增加页码
page_number += 1
def run_compute_delivery_status(self, records):
records._compute_delivery_status()
def _corn_update_delivery_status(self):
need_list = [
'draft',
'technology_to_confirmed',
'confirmed',
'pending_cam',
'progress',
'rework',
'scrap',
'to_close',
]
# previous_workorder = self.env['mrp.production'].search([('state', 'in', need_list)])
self.get_page_all_records('mrp.production', self.run_compute_delivery_status,
[('state', 'in', need_list)], 100)
def get_hours_diff(self):
# 获取当前日期和时间
current_datetime = fields.Datetime.now()
# 将 date_field 转换为 datetime 对象
if self.deadline_of_delivery:
date_obj = fields.Date.from_string(self.deadline_of_delivery)
# 将 date 对象转换为 datetime 对象,设置时间为 00:00:00
date_obj = datetime.datetime.combine(date_obj, datetime.time.min)
# 计算两个日期之间的差值
delta = date_obj - current_datetime
# 返回差值的小时数
return int(delta.total_seconds() / 3600)
else:
return 0.0
@api.depends('workorder_ids.tool_state_remark')
def _compute_tool_state_remark(self):
for item in self:
if item.workorder_ids:
workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel'])
if workorder_ids.filtered(lambda a: a.tool_state == '1'):
work_ids = workorder_ids.filtered(lambda a: a.tool_state == '1')
tool_state_remark = ''
for work_id in work_ids:
if tool_state_remark == '':
tool_state_remark = f'{work_id.tool_state_remark}'
else:
tool_state_remark = f"{tool_state_remark}\n{work_id.tool_state_remark}"
item.tool_state_remark = tool_state_remark
else:
item.tool_state_remark = False
@api.depends('workorder_ids.tool_state')
def _compute_tool_state(self):
for item in self:
if item.workorder_ids:
tool_state = item.tool_state
workorder_ids = item.workorder_ids.filtered(lambda a: a.state not in ['rework', 'done', 'cancel'])
if workorder_ids.filtered(lambda a: a.tool_state == '2'):
item.tool_state = '2'
elif workorder_ids.filtered(lambda a: a.tool_state == '1'):
item.tool_state = '1'
else:
item.tool_state = '0'
if tool_state == '2' and item.tool_state != '2':
item.detection_result_ids.filtered(
lambda a: a.detailed_reason == '无效功能刀具' and a.handle_result == '待处理').write(
{'handle_result': '已处理'})
# state = fields.Selection(selection_add=[
# ('pending_scheduling', '待排程'),
# ('pending_processing', '待加工'),
# ('completed', '已完工')
# ])
state = fields.Selection([
('draft', '草稿'),
('technology_to_confirmed', '待工艺确认'),
('confirmed', '待排程'),
('pending_cam', '待加工'),
('progress', '加工中'),
('rework', '返工'),
('scrap', '报废'),
('to_close', 'To Close'),
('done', 'Done'),
('cancel', '已取消')], string='State',
compute='_compute_state', copy=False, index=True, readonly=True,
store=True, tracking=True,
help=" * Draft: The MO is not confirmed yet.\n"
" * Confirmed: The MO is confirmed, the stock rules and the reordering of the components are trigerred.\n"
" * In Progress: The production has started (on the MO or on the WO).\n"
" * To Close: The production is done, the MO has to be closed.\n"
" * Done: The MO is closed, the stock moves are posted. \n"
" * Cancelled: The MO has been cancelled, can't be confirmed anymore.")
check_status = fields.Boolean(string='启用状态', default=False, readonly=True)
active = fields.Boolean(string='已归档', default=True)
programming_no = fields.Char('编程单号')
work_state = fields.Char('业务状态')
programming_state = fields.Selection(
[('编程中', '编程中'), ('已编程', '已编程'), ('已编程未下发', '已编程未下发'), ('已下发', '已下发')],
string='编程状态',
tracking=True)
glb_file = fields.Binary("glb模型文件")
production_line_id = fields.Many2one('sf.production.line', string='生产线', tracking=True)
plan_start_processing_time = fields.Datetime('计划开始加工时间')
is_rework = fields.Boolean(string='是否返工', default=False)
# production_line_state = fields.Selection(
# [('待上产线', '待上产线'), ('已上产线', '已上产线'), ('已下产线', '已下产线')],
# string='上/下产线', default='待上产线', tracking=True)
# 工序状态
# Todo 研究下用法
process_state = fields.Selection([
('待装夹', '待装夹'),
('待检测', '待检测'),
('待加工', '待加工'),
('待解除装夹', '待解除装夹'),
('已完工', '已完工'),
], string='工序状态', default='待装夹')
# 零件图号
part_number = fields.Char('零件图号', related='product_id.part_number', readonly=True)
# 上传零件图纸
part_drawing = fields.Binary('零件图纸', related='product_id.machining_drawings', readonly=True)
quality_standard = fields.Binary('质检标准', related='product_id.quality_standard', readonly=True)
part_name = fields.Char(string='零件名称', related='product_id.part_name', readonly=True)
@api.depends('product_id.manual_quotation')
def _compute_manual_quotation(self):
for item in self:
item.manual_quotation = item.product_id.manual_quotation
manual_quotation = fields.Boolean('人工编程', default=False, compute=_compute_manual_quotation, store=True)
is_scrap = fields.Boolean('是否报废', default=False)
is_remanufacture = fields.Boolean('是否重新制造', default=False)
remanufacture_count = fields.Integer("重新制造订单数量", compute='_compute_remanufacture_production_ids')
remanufacture_production_id = fields.Many2one('mrp.production', string='')
technology_design_ids = fields.One2many('sf.technology.design', 'production_id', string='工艺设计')
is_adjust = fields.Boolean('是否退回调整', default=False)
@api.depends('remanufacture_production_id')
def _compute_remanufacture_production_ids(self):
for production in self:
if production.remanufacture_production_id:
remanufacture_production = self.env['mrp.production'].search(
[('id', '=', production.remanufacture_production_id.id)])
if remanufacture_production:
production.remanufacture_count = len(remanufacture_production)
else:
production.remanufacture_count = 0
def action_view_remanufacture_productions(self):
self.ensure_one()
mrp_production = self.env['mrp.production'].search(
[('id', '=', self.remanufacture_production_id.id)])
action = {
'res_model': 'mrp.production',
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_id': mrp_production.id,
}
return action
@api.depends(
'move_raw_ids.state', 'move_raw_ids.quantity_done', 'move_finished_ids.state', 'tool_state',
'workorder_ids.state', 'product_qty', 'qty_producing', 'schedule_state', 'programming_state', 'is_adjust')
def _compute_state(self):
for production in self:
if not production.state or not production.product_uom_id:
production.state = 'draft'
elif production.state == 'cancel' or (production.move_finished_ids and all(
move.state == 'cancel' for move in production.move_finished_ids)):
production.state = 'cancel'
elif production.workorder_ids and all(
wo_state in ('done', 'cancel') for wo_state in production.workorder_ids.mapped('state')):
production.state = 'to_close'
elif not production.workorder_ids and float_compare(production.qty_producing, production.product_qty,
precision_rounding=production.product_uom_id.rounding) >= 0:
production.state = 'to_close'
elif any(wo_state in ('progress', 'done') for wo_state in production.workorder_ids.mapped('state')):
production.state = 'progress'
elif production.product_uom_id and not float_is_zero(production.qty_producing,
precision_rounding=production.product_uom_id.rounding):
production.state = 'progress'
elif any(not float_is_zero(move.quantity_done,
precision_rounding=move.product_uom.rounding or move.product_id.uom_id.rounding)
for move in production.move_raw_ids if move.product_id):
production.state = 'progress'
# 新添加的状态逻辑
if production.state in ['to_close', 'progress',
'technology_to_confirmed'] and production.schedule_state == '未排':
if not production.workorder_ids or production.is_adjust is True:
production.state = 'technology_to_confirmed'
else:
if production.is_adjust is True:
production.state = 'technology_to_confirmed'
else:
production.state = 'confirmed'
elif production.state == 'pending_cam' and production.schedule_state == '未排':
production.state = 'confirmed'
elif production.state == 'to_close' and production.schedule_state == '已排':
production.state = 'pending_cam'
elif production.state == 'confirmed' and production.is_adjust is True:
production.state = 'technology_to_confirmed'
if production.state == 'confirmed' and production.schedule_state == '已排':
production.state = 'pending_cam'
if (production.state == 'rework' and production.tool_state == '0'
and production.schedule_state == '已排' and production.is_rework is False):
production.state = 'pending_cam'
if any((wo.test_results == '返工' and wo.state == 'done' and
(production.programming_state in ['已编程'] or wo.individuation_page_PTD is True))
or (wo.is_rework is True and wo.state == 'done' and production.programming_state in ['编程中', '已编程'])
for wo in production.workorder_ids) or production.is_rework is True:
production.state = 'rework'
if any(wo.test_results == '报废' and wo.state == 'done' for wo in production.workorder_ids):
production.state = 'scrap'
if any(dr.test_results == '报废' and dr.handle_result == '已处理' for dr in
production.detection_result_ids):
production.state = 'cancel'
if production.workorder_ids and all(
wo_state in ('done', 'rework', 'cancel') for wo_state in production.workorder_ids.mapped('state')):
if production.state not in ['scrap', 'rework', 'cancel']:
production.state = 'done'
elif production.state == 'done':
production.state = 'progress'
# 退回调整
def technology_back_adjust(self):
process_parameters = []
domain = [('state', '=', 'confirmed'), ('origin', '=', self.origin)]
if self.production_type == '自动化产线加工':
cloud_programming = self._cron_get_programming_state()
if cloud_programming['send_state'] == 'sending':
raise UserError(_("编程文件正在下发中,请稍后重试"))
domain += [('programming_no', '=', self.programming_no)]
# 带排程的制造订单
production_confirmed = self.env['mrp.production'].search(domain)
for special in production_confirmed.technology_design_ids:
if special.process_parameters_id:
product_production_process = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', special.process_parameters_id.id)])
if not product_production_process:
if special.process_parameters_id not in process_parameters:
process_parameters.append(special.process_parameters_id.display_name)
if process_parameters:
raise UserError(_("【工艺设计】-【参数】为%s的在【产品】中不存在,请先创建", ", ".join(process_parameters)))
if production_confirmed:
production_count = self.env['mrp.production'].search_count([
('origin', '=', self.origin),
('product_id', '=', self.product_id.id),
('state', '=', 'confirmed')
])
if production_count > 1:
return {
'name': _('退回调整'),
'type': 'ir.actions.act_window',
'views': [(self.env.ref(
'sf_manufacturing.sf_production_technology_re_adjust_wizard_form_view').id,
'form')],
'res_model': 'sf.production.technology.re_adjust.wizard',
'target': 'new',
'context': {
'default_production_id': self.id,
'default_origin': self.origin,
}}
else:
return {
'name': _('退回调整'),
'type': 'ir.actions.act_window',
'views': [(self.env.ref(
'sf_manufacturing.sf_production_technology_re_adjust_wizard_confirm_form_view').id,
'form')],
'res_model': 'sf.production.technology.re_adjust.wizard',
'target': 'new',
'context': {
'default_production_id': self.id,
'default_origin': self.origin,
}}
# 工艺确认
def technology_confirm(self):
process_parameters = []
purchase_orders = []
parameters_not = []
# 获取原有的工单对应的工序
origin_designs = self.workorder_ids.technology_design_id
# 获取已删除的工序
deleted_designs = origin_designs - self.technology_design_ids
if deleted_designs:
for deleted_design in deleted_designs:
workorder = self.env['mrp.workorder'].search([('technology_design_id', '=', deleted_design.id)])
purchase = workorder._get_surface_technics_purchase_ids()
if purchase.state not in ['cancel', 'draft', False]:
purchase_orders.append(purchase.name)
special_design = self.technology_design_ids.filtered(
lambda a: a.routing_tag == 'special' and a.is_auto is False)
for special in special_design:
if special.route_id.routing_type == '表面工艺' and not special.process_parameters_id:
parameters_not.append(special.route_id.name)
if special.process_parameters_id:
product_production_process = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', special.process_parameters_id.id)])
if not product_production_process:
if special.process_parameters_id not in process_parameters:
process_parameters.append(special.process_parameters_id.display_name)
if purchase_orders:
raise UserError(_("请联系工厂生产经理对该(%s)采购订单进行取消", ", ".join(purchase_orders)))
if parameters_not:
raise UserError(_("【工艺设计】-【工序】为%s未选择参数,请选择", ", ".join(parameters_not)))
if process_parameters:
raise UserError(_("【工艺设计】-【参数】为%s的在【产品】中不存在,请先创建", ", ".join(process_parameters)))
# 判断同一个加工面的标准工序的顺序是否依次排序
error_panel = []
technology_design = self.technology_design_ids.filtered(lambda a: a.routing_tag == 'standard').sorted(
key=lambda m: m.sequence)
for index, design in enumerate(technology_design):
routing_type = design.route_id.routing_type
if index < len(technology_design) - 1:
next_index = index + 1
next_design = technology_design[next_index]
next_design_routing_type = next_design.route_id.routing_type
# logging.info('当前工序和加工面: %s-%s' % (design.route_id.name, design.panel))
# logging.info('下一个工序和加工面: %s-%s' % (next_design.route_id.name, next_design.panel))
if design.panel is not False:
if design.panel != next_design.panel:
if index == 0:
raise UserError('【加工面】为%s的标准工序里含有其他加工面的工序,请调整后重试' % design.panel)
if routing_type not in ['解除装夹']:
raise UserError('【加工面】为%s的标准工序顺序有误,请调整后重试' % design.panel)
if design.panel == next_design.panel:
if (routing_type == '装夹预调' and next_design_routing_type == '解除装夹') or (
routing_type == 'CNC加工' and next_design_routing_type == '装夹预调'):
if design.panel not in error_panel:
error_panel.append(design.panel)
else:
if not error_panel and not process_parameters:
production_count = self.env['mrp.production'].search_count([
('origin', '=', self.origin),
('product_id', '=', self.product_id.id),
('state', '=', 'technology_to_confirmed')
])
if production_count > 1:
return {
'name': _('工艺确认'),
'type': 'ir.actions.act_window',
'views': [(self.env.ref(
'sf_manufacturing.sf_production_technology_wizard_form_view').id,
'form')],
'res_model': 'sf.production.technology.wizard',
'target': 'new',
'context': {
'default_production_id': self.id,
'default_origin': self.origin,
}}
else:
return {
'name': _('工艺确认'),
'type': 'ir.actions.act_window',
'views': [(self.env.ref(
'sf_manufacturing.sf_production_technology_wizard_confirm_form_view').id,
'form')],
'res_model': 'sf.production.technology.wizard',
'target': 'new',
'context': {
'default_production_id': self.id,
'default_origin': self.origin,
}}
if error_panel:
raise UserError(_("【加工面】为%s的标准工序顺序有误,请调整后重试", ", ".join(error_panel)))
return True
def action_check(self):
"""
审核启用
"""
self.check_status = True
def action_uncheck(self):
"""
审核禁用
"""
self.check_status = False
def archive(self):
"""
归档
"""
self.write({'active': False})
def unarchive(self):
"""
取消归档
"""
self.write({'active': True})
@api.depends('request_ids')
def _compute_maintenance_count(self):
for production in self:
production.maintenance_count = len(production.request_ids)
# 获取cloud编程单的状态
def _cron_get_programming_state(self):
try:
if not self:
reproduction = self.env['mrp.production'].search(
[('state', '=', 'rework'), ('programming_state', '=', '编程中'), ('is_rework', '=', True)])
else:
reproduction = self
if reproduction:
programming_no_set = set([str(item.programming_no) for item in reproduction])
programming_no = list(programming_no_set)
programming_no_str = ','.join(programming_no)
res = {'programming_no': programming_no_str}
logging.info('res=%s:' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/get_state'
config_url = configsettings['sf_url'] + url
ret = requests.post(config_url, json=res, data=None, headers=config_header)
ret = ret.json()
result = json.loads(ret['result'])
if result['status'] == 1:
for item in result['programming_list']:
if not self:
for rp in reproduction:
if rp.programming_no == item['programming_no']:
rp.write({'programming_state': '已编程未下发' if item[
'programming_state'] == '已编程' else '编程中'})
else:
return item
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('cron_get_programming_state error:%s' % e)
# 编程单更新
# 增加触发时间参数
def update_programming_state(self, trigger_time=None, reprogramming_reason=None):
try:
manufacturing_type = None
if self.is_scrap:
manufacturing_type = 'scrap'
elif self.tool_state == '2':
manufacturing_type = 'invalid_tool_rework'
elif self.is_rework:
manufacturing_type = 'rework'
res = {'programming_no': self.programming_no,
'manufacturing_type': manufacturing_type,
'trigger_time': trigger_time,
'reprogramming_reason': reprogramming_reason}
logging.info('res=%s:' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/reset_state_again'
config_url = configsettings['sf_url'] + url
ret = requests.post(config_url, json=res, data=None, headers=config_header)
ret = ret.json()
result = json.loads(ret['result'])
logging.info('update_programming_state-ret:%s' % result)
if result['status'] == 1:
self.write({'is_rework': True})
else:
raise UserError(ret['message'])
# # 增加对编程记录的更新
# cloud_programming = self._cron_get_programming_state()
# if manufacturing_type == 'rework':
# self.programming_record_ids.create({
# 'number': len(self.programming_record_ids) + 1,
# 'production_id': self.id,
# 'reason': '返工',
# 'programming_method': cloud_programming['programme_way'],
# 'current_programming_count': cloud_programming['reprogramming_num'],
# 'target_production_id': cloud_programming['production_order_no'],
# 'apply_time': trigger_time,
# 'send_time': cloud_programming['send_time'],
# })
# elif manufacturing_type == 'scrap':
# self.programming_record_ids.create({
# 'number': len(self.programming_record_ids) + 1,
# 'production_id': self.id,
# 'reason': '报废',
# 'programming_method': cloud_programming['programme_way'],
# 'current_programming_count': cloud_programming['reprogramming_num'],
# 'target_production_id': cloud_programming['production_order_no'],
# 'apply_time': trigger_time,
# 'send_time': cloud_programming['send_time'],
# })
# elif manufacturing_type == 'invalid_tool_rework':
# self.programming_record_ids.create({
# 'number': len(self.programming_record_ids) + 1,
# 'production_id': self.id,
# 'reason': '无效功能刀具',
# 'programming_method': cloud_programming['programme_way'],
# 'current_programming_count': cloud_programming['reprogramming_num'],
# 'target_production_id': cloud_programming['production_order_no'],
# 'apply_time': trigger_time,
# 'send_time': cloud_programming['send_time'],
# })
# else:
# logging.info('无对应状态,不需更新编程记录')
except Exception as e:
logging.info('update_programming_state error:%s' % e)
raise UserError("更新编程单状态失败,请联系管理员")
# cnc程序获取
def fetchCNC(self, production_names):
cnc = self.env['mrp.production'].search([('id', '=', self.id)])
quick_order = False
if cnc.product_id.default_code:
quick_order = self.env['quick.easy.order'].search(
[('name', '=', cnc.product_id.default_code.rsplit('-', 1)[0])])
programme_way = False
if cnc.manual_quotation is True:
programme_way = 'manual operation'
else:
programme_way = 'auto'
if quick_order:
programme_way = 'manual operation'
try:
res = {
'production_no': production_names,
'machine_tool_code': '',
'product_name': cnc.product_id.name,
'remanufacture_type': '',
'model_code': cnc.product_id.model_code,
'material_code': self.env['sf.production.materials'].search(
[('id', '=', cnc.product_id.materials_id.id)]).materials_no,
'material_type_code': self.env['sf.materials.model'].search(
[('id', '=', cnc.product_id.materials_type_id.id)]).materials_no,
'machining_processing_panel': cnc.product_id.model_processing_panel,
'machining_precision': '',
'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length,
'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height,
'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width,
'order_no': cnc.origin,
'model_order_no': cnc.product_id.default_code,
'user': cnc.env.user.name,
'programme_way': programme_way,
'model_file': '' if not cnc.product_id.model_file else base64.b64encode(
cnc.product_id.model_file).decode('utf-8'),
'part_name': cnc.product_id.part_name,
'part_number': cnc.product_id.part_number,
'machining_drawings': base64.b64encode(cnc.product_id.machining_drawings).decode(
'utf-8') if cnc.product_id.machining_drawings else '',
'machining_drawings_name': cnc.product_id.machining_drawings_name,
'machining_drawings_mimetype': cnc.product_id.machining_drawings_mimetype,
}
# 打印出除了 model_file 之外的所有键值对
for key, value in res.items():
if key != 'model_file':
logging.info('%s: %s' % (key, value))
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/create'
config_url = configsettings['sf_url'] + url
res['token'] = configsettings['token']
# res_str = json.dumps(res)
ret = requests.post(config_url, json={}, data=res, headers=config_header)
ret = ret.json()
logging.info('fetchCNC-ret:%s' % ret)
if ret['status'] == 1:
self.write(
{'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('fetchCNC error:%s' % e)
raise UserError("cnc程序获取编程单失败,请联系管理员")
# 维修模块按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
# 打开维修模块请求
def open_maintenance_request_mo(self):
self.ensure_one()
action = {
'name': _('Maintenance Requests'),
'view_mode': 'kanban,tree,form,pivot,graph,calendar',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
if self.maintenance_count == 1:
production = self.env['maintenance.request'].search([('production_id', '=', self.id)])
action['view_mode'] = 'form'
action['res_id'] = production.id
return action
def action_generate_serial(self):
self.ensure_one()
iot_code = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[
'ir.sequence'].next_by_code('stock.lot.serial')
iot_code_name = re.sub('[\u4e00-\u9fa5]', "", iot_code)
self.lot_producing_id = self.env['stock.lot'].create({
'product_id': self.product_id.id,
'company_id': self.company_id.id,
'name': iot_code_name,
})
if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids:
self.move_finished_ids.filtered(
lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id
# if self.product_id.tracking == 'serial':
# self._set_qty_producing()
# 重载根据工序生成工单的程序如果产品BOM中没有工序时
# 根据产品对应的模板类型中工序,去生成工单;
# CNC加工工序的选取规则
# 如果自动报价有带过来预分配的机床,
# 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制;
# 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心;
def _create_workorder3(self, item):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
if production.product_id.categ_id.type in ['成品', '坯料']:
# # 根据工序设计生成工单
technology_design_ids = sorted(production.technology_design_ids, key=lambda x: x.sequence)
for route in technology_design_ids:
workorder_has = self.env['mrp.workorder'].search(
[('technology_design_id', '=', route.id), ('production_id', '=', production.id)])
if not workorder_has:
if route.route_id.routing_type not in ['表面工艺']:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(production, route))
else:
product_production_process = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', route.process_parameters_id.id)])
workorders_values.append(
self.env[
'mrp.workorder']._json_workorder_surface_process_str(
production, route, product_production_process.seller_ids[0].partner_id.id))
production.workorder_ids = workorders_values
for workorder in production.workorder_ids:
workorder.duration_expected = workorder._get_duration_expected()
# 外协出入库单处理
def get_subcontract_pick_purchase(self):
production_all = self.sorted(lambda x: x.id)
product_id_to_production_names = {}
grouped_product_ids = {k: list(g) for k, g in
groupby(production_all, key=lambda x: x.product_id.id)}
for product_id, pd in grouped_product_ids.items():
product_id_to_production_names[product_id] = [p.name for p in pd]
sorted_workorders = None
for production in production_all:
proc_workorders = []
process_parameter_workorder = self.env['mrp.workorder'].search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production.id),
('is_subcontract', '=', True), ('state', '!=', 'cancel')], order='sequence asc')
if process_parameter_workorder:
# 将这些特殊表面工艺工单的采购单与调拨单置为失效
for workorder in process_parameter_workorder:
# workorder._get_surface_technics_purchase_ids().write({'state': 'cancel'})
workorder.move_subcontract_workorder_ids.write({'state': 'cancel'})
workorder.move_subcontract_workorder_ids.picking_id.write({'state': 'cancel'})
sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.sequence)
if not sorted_workorders:
return
for workorders in reversed(sorted_workorders):
self.env['stock.picking'].create_outcontract_picking(workorders, production, sorted_workorders)
self.env['purchase.order'].get_purchase_order(workorders, production, product_id_to_production_names)
# 工单排序
def _reset_work_order_sequence1(self, k):
for rec in self:
cnc_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工")
cnc_back_workorder = rec.workorder_ids.filtered(lambda wo: wo.name == "CNC加工(返工)")
for work in rec.workorder_ids:
if work.name == cnc_workorder.name and work.processing_panel == k:
cnc_back_workorder.write({'sequence': work.sequence + 1})
print(cnc_back_workorder.sequence)
elif work.routing_type not in ['装夹预调'] and work != cnc_back_workorder:
work.sequence += 1
# 在制造订单上新增工单
def _create_workorder1(self, k):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
# 根据加工面板的面数及对应的成品工序模板生成工单
i = 0
production.product_id.model_processing_panel = k
for k in (production.product_id.model_processing_panel.split(',')):
routingworkcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in routingworkcenter:
if route.routing_type == 'CNC加工':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route))
production.workorder_ids = workorders_values
workorder = self.env['mrp.workorder'].browse(production.workorder_ids.ids)
print(workorder)
# for item in workorder:
# workorder.duration_expected = workorder._get_duration_expected()
def _create_workorder2(self, k):
self._create_workorder1(k)
self._reset_work_order_sequence1(k)
return True
# 需对不连续工单对应的采购单和外协出入库单做处理
def _reset_subcontract_pick_purchase(self):
production_all = self.sorted(lambda x: x.id)
product_id_to_production_names = {}
grouped_product_ids = {k: list(g) for k, g in
groupby(production_all, key=lambda x: x.product_id.id)}
for product_id, pd in grouped_product_ids.items():
product_id_to_production_names[product_id] = [p.name for p in pd]
for item in production_all:
production_process = product_id_to_production_names.get(item.product_id.id)
workorder_sf = item.workorder_ids.filtered(lambda sf: sf.routing_type == '表面工艺')
for i, workorder in enumerate(workorder_sf):
if i == 0:
continue
elif workorder.sequence != workorder_sf[i - 1].sequence + 1:
# workorder.picking_ids.move_ids = False
workorder.picking_ids = False
purchase_order = self.env['purchase.order'].search(
[('state', '=', 'draft'), ('origin', '=', item.name),
('purchase_type', '=', 'consignment')])
server_template = self.env['product.template'].search(
[('server_product_process_parameters_id', '=',
workorder.surface_technics_parameters_id.id),
('detailed_type', '=', 'service')])
for po in purchase_order:
for line in po.order_line:
if line.product_id == server_template.product_variant_id:
continue
if server_template.server_product_process_parameters_id != line.product_id.server_product_process_parameters_id:
purchase_order_line = self.env['purchase.order.line'].search(
[('product_id', '=', server_template.product_variant_id.id), ('id', '=', line.id),
('product_qty', '=', 1)], limit=1, order='id desc')
if purchase_order_line:
line.unlink()
def _reset_work_order_sequence(self):
"""
工单工序排序方法(新)
"""
for rec in self:
workorder_ids = rec.workorder_ids
technology_design_ids = rec.technology_design_ids
if workorder_ids.filtered(lambda item: item.state in ('返工', 'rework')):
# 获取返工后新生成的工单
work_ids = workorder_ids.filtered(lambda item: item.sequence == 0)
# 对工单进行逐个插入
for work_id in work_ids:
order_rework_ids = rec.workorder_ids.filtered(
lambda item: (item.sequence > 0 and work_id.name == item.name
and work_id.processing_panel == item.processing_panel))
order_rework_ids = sorted(order_rework_ids, key=lambda item: item.sequence, reverse=True)
work_id.sequence = order_rework_ids[0].sequence + 1
# 对该工单之后的工单工序进行加一
work_order_ids = rec.workorder_ids.filtered(
lambda item: item.sequence >= work_id.sequence and item.id != work_id.id)
for work in work_order_ids:
work.sequence = work.sequence + 1
else:
# 将工艺设计生成的工单序号赋值给工单的序号
for work in workorder_ids:
td_ids = technology_design_ids.filtered(
lambda item: (item.route_id.name in work.name and item.process_parameters_id
and item.process_parameters_id == work.surface_technics_parameters_id) or
(item.route_id.name == work.name and item.panel
and item.panel == work.processing_panel))
if work.name == '人工线下加工':
td_ids = technology_design_ids.filtered(lambda item: (item.route_id.name in work.name))
if td_ids:
work.sequence = td_ids[0].sequence
cancel_work_ids = workorder_ids.filtered(lambda item: item.state in ('已取消', 'cancel'))
if cancel_work_ids:
sequence = max(workorder_ids.filtered(lambda item: item.state not in ('已取消', 'cancel')),
key=lambda w: w.sequence).sequence
for cw in cancel_work_ids:
cw.sequence = sequence + 1
def _reset_work_order_sequence_1(self):
"""
工单工序排序方法(旧)
"""
for rec in self:
workorder_ids = rec.workorder_ids.filtered(lambda item: item.state in ('返工', 'rework'))
# 产品模型类型
model_type_id = rec.product_id.product_model_type_id
# 产品加工面板
model_processing_panel = rec.product_id.model_processing_panel
if not workorder_ids:
sequence_list = {}
if model_type_id:
if model_processing_panel:
tmpl_num = 1
panel_list = model_processing_panel.split(',')
for panel in panel_list:
panel_sequence_list = {}
# 成品工序
product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids
if product_routing_tmpl_ids:
for tmpl_id in product_routing_tmpl_ids:
panel_sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num})
tmpl_num += 1
sequence_list.update({panel: panel_sequence_list})
# 表面工艺工序
# 模型类型的表面工艺工序模版
surface_tmpl_ids = model_type_id.surface_technics_routing_tmpl_ids
# 产品选择的表面工艺参数
model_process_parameters_ids = rec.product_id.model_process_parameters_ids
process_dict = {}
if model_process_parameters_ids:
for process_parameters_id in model_process_parameters_ids:
process_id = process_parameters_id.process_id
for surface_tmpl_id in surface_tmpl_ids:
if process_id == surface_tmpl_id.route_workcenter_id.surface_technics_id:
surface_tmpl_name = surface_tmpl_id.route_workcenter_id.name
process_dict.update({int(process_id.sequence): '%s-%s' % (
surface_tmpl_name, process_parameters_id.name)})
process_list = sorted(process_dict.keys())
for process_num in process_list:
sequence_list.update({process_dict.get(process_num): tmpl_num})
tmpl_num += 1
# 坯料工序
tmpl_num = 1
embryo_routing_tmpl_ids = model_type_id.embryo_routing_tmpl_ids
if embryo_routing_tmpl_ids:
for tmpl_id in embryo_routing_tmpl_ids:
sequence_list.update({tmpl_id.route_workcenter_id.name: tmpl_num})
tmpl_num += 1
else:
raise ValidationError('该产品【加工面板】为空!')
else:
raise ValidationError('该产品没有选择【模版类型】!')
logging.info('sequence_list: %s' % sequence_list)
for work in rec.workorder_ids:
work_name = work.name
logging.info(work_name)
if sequence_list.get(work_name):
work.sequence = sequence_list[work_name]
elif sequence_list.get(work.processing_panel):
processing_panel = sequence_list.get(work.processing_panel)
if processing_panel.get(work_name):
work.sequence = processing_panel[work_name]
else:
raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name)
else:
raise ValidationError('工序【%s】在产品选择的模版类型中不存在!' % work.name)
# 当单个面触发返工时,将新生成的工单插入到返工工单下方,并且后面的所以工单工序重排
elif rec.workorder_ids.filtered(lambda item: item.sequence == 0):
# 获取新增的返工工单
work_ids = rec.workorder_ids.filtered(lambda item: item.sequence == 0)
# 获取当前返工面最后一个工单工序
sequence_max = sorted(
rec.workorder_ids.filtered(lambda item: item.processing_panel == work_ids[0].processing_panel),
key=lambda item: item.sequence, reverse=True)[0].sequence
# 对当前返工工单之后的工单工序进行重排
work_order_ids = rec.workorder_ids.filtered(lambda item: item.sequence > sequence_max)
for work_id in work_order_ids:
work_id.sequence = work_id.sequence + 3
# 生成新增的返工工单的工序
# 成品工序
panel_sequence_list = {}
product_routing_tmpl_ids = model_type_id.product_routing_tmpl_ids
if product_routing_tmpl_ids:
for tmpl_id in product_routing_tmpl_ids:
sequence_max += 1
panel_sequence_list.update({tmpl_id.route_workcenter_id.name: sequence_max})
for work_id in work_ids:
work_name = work_id.name
if panel_sequence_list.get(work_name):
work_id.sequence = panel_sequence_list[work_name]
# 创建工单并进行排序
def _create_workorder(self, item):
self._create_workorder3(item)
self._reset_work_order_sequence()
return True
def production_process(self, pro_plan):
type_map = {'装夹预调': False, 'CNC加工': False, '解除装夹': False}
# 最后一次加工结束时间
last_time = pro_plan.date_planned_start
# 预置时间
works = self.workorder_ids
for index, work in enumerate(works):
count = type_map.get(work.routing_type)
date_planned_end = None
date_planned_start = None
if self.production_type == '自动化产线加工':
date_planned_start, date_planned_end, last_time = work.auto_production_process(last_time, count,
type_map)
elif self.production_type == '':
date_planned_start, date_planned_end, last_time = work.manual_offline_process(last_time, index)
work.update_work_start_end(date_planned_start, date_planned_end)
# def
def process_range_time(self):
for production in self:
works = production.workorder_ids
pro_plan = self.env['sf.production.plan'].search([('production_id', '=', production.id)], limit=1)
if not pro_plan:
continue
if production.production_type:
production.production_process(pro_plan)
# 修改标记已完成方法
def button_mark_done1(self):
if not self.workorder_ids.filtered(lambda w: w.routing_type not in ['表面工艺']):
self._button_mark_done_sanity_checks()
if not self.env.context.get('button_mark_done_production_ids'):
self = self.with_context(button_mark_done_production_ids=self.ids)
res = self._pre_button_mark_done()
if res is not True:
return res
if self.env.context.get('mo_ids_to_backorder'):
productions_to_backorder = self.browse(self.env.context['mo_ids_to_backorder'])
productions_not_to_backorder = self - productions_to_backorder
else:
productions_not_to_backorder = self
productions_to_backorder = self.env['mrp.production']
backorders = productions_to_backorder and productions_to_backorder._split_productions()
backorders = backorders - productions_to_backorder
productions_not_to_backorder._post_inventory(cancel_backorder=True)
# 查出最后一张工单完成入库操作
# if self.workorder_ids.filtered(lambda w: w.routing_type in ['表面工艺']):
# move_finish = self.env['stock.move'].search([('created_production_id', '=', self.id)])
# if move_finish:
# move_finish._action_assign()
productions_to_backorder._post_inventory(cancel_backorder=True)
# if completed products make other confirmed/partially_available moves available, assign them
done_move_finished_ids = (
productions_to_backorder.move_finished_ids | productions_not_to_backorder.move_finished_ids).filtered(
lambda m: m.state == 'done')
done_move_finished_ids._trigger_assign()
# Moves without quantity done are not posted => set them as done instead of canceling. In
# case the user edits the MO later on and sets some consumed quantity on those, we do not
# want the move lines to be canceled.
(productions_not_to_backorder.move_raw_ids | productions_not_to_backorder.move_finished_ids).filtered(
lambda x: x.state not in ('done', 'cancel')).write({
'state': 'done',
'product_uom_qty': 0.0,
})
for production in self:
logging.info('qty_produced:%s' % production.qty_produced)
if production.qty_produced == 0.0:
production.qty_produced = 1.0
production.write({
'date_finished': fields.Datetime.now(),
'product_qty': production.qty_produced,
'priority': '0',
'is_locked': True,
'state': 'done',
})
for workorder in self.workorder_ids.filtered(lambda w: w.state not in ('done', 'cancel')):
workorder.duration_expected = workorder._get_duration_expected()
if not backorders:
if self.env.context.get('from_workorder'):
return {
'type': 'ir.actions.act_window',
'res_model': 'mrp.production',
'views': [[self.env.ref('mrp.mrp_production_form_view').id, 'form']],
'res_id': self.id,
'target': 'main',
}
if self.user_has_groups(
'mrp.group_mrp_reception_report') and self.picking_type_id.auto_show_reception_report:
lines = self.move_finished_ids.filtered(lambda
m: m.product_id.type == 'product' and m.state != 'cancel' and m.quantity_done and not m.move_dest_ids)
if lines:
if any(mo.show_allocation for mo in self):
action = self.action_view_reception_report()
return action
logging.info('last-product_qty:%s' % production.product_qty)
return True
context = self.env.context.copy()
context = {k: v for k, v in context.items() if not k.startswith('default_')}
for k, v in context.items():
if k.startswith('skip_'):
context[k] = False
action = {
'res_model': 'mrp.production',
'type': 'ir.actions.act_window',
'context': dict(context, mo_ids_to_backorder=None, button_mark_done_production_ids=None)
}
if len(backorders) == 1:
action.update({
'view_mode': 'form',
'res_id': backorders[0].id,
})
else:
action.update({
'name': _("Backorder MO"),
'domain': [('id', 'in', backorders.ids)],
'view_mode': 'tree,form',
})
return action
# 报废
def button_scrap_new(self):
cloud_programming = self._cron_get_programming_state()
return {
'name': _('报废'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'sf.production.wizard',
'target': 'new',
'context': {
'default_mrp_production_id': self.id,
'is_remanufacture_flag': True,
'default_reprogramming_num': cloud_programming['reprogramming_num'],
'default_programming_states': cloud_programming['programming_state'],
'default_is_reprogramming': True if cloud_programming['programming_state'] in ['已下发'] else False
}
}
# 返工
def button_rework(self):
cloud_programming = None
if self.programming_state in ['已编程']:
cloud_programming = self._cron_get_programming_state()
elif self.programming_state is False:
cloud_programming = {}
result_ids = self.detection_result_ids.filtered(lambda dr: dr.handle_result == '待处理')
work_id_list = []
if result_ids:
work_id_list = [self.workorder_ids.filtered(
lambda wk: (wk.name == result_id.routing_type and wk.processing_panel == result_id.processing_panel
and wk.state == 'done')).id
for result_id in result_ids]
workorder_ids = self.workorder_ids.filtered(
lambda wk: wk.technology_design_id.routing_tag == 'standard' and wk.state not in ['rework', 'cancel'])
logging.info('标准工艺工单【%s' % workorder_ids)
return {
'name': _('返工'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'sf.rework.wizard',
'target': 'new',
'context': {
'default_production_id': self.id,
'default_workorder_ids': workorder_ids.ids if workorder_ids.ids != [] else self.workorder_ids.ids,
'default_hidden_workorder_ids': ','.join(map(str, work_id_list)) if work_id_list != [] else '',
'default_reprogramming_num': cloud_programming.get('reprogramming_num') if cloud_programming else '',
'default_programming_state': cloud_programming.get('programming_state') if cloud_programming else '',
'default_is_reprogramming': True if cloud_programming and (cloud_programming.get('programming_state') in ['已下发']) else False
}
}
# 更新程序
def do_update_program(self):
program_production = self
if len(program_production) >= 1:
# same_product_id = None
# is_not_same_product = 0
for item in program_production:
# if same_product_id is None:
# same_product_id = item.product_id
# if item.product_id != same_product_id:
# is_not_same_product += 1
if item.state != "rework" and item.programming_state != "已编程未下发":
raise UserError("请选择状态为返工且已编程未下发的制造订单")
# if is_not_same_product >= 1:
# raise UserError("您选择的记录中含有其他产品的制造订单,请选择同一产品的制造订单")
grouped_program_ids = {k: list(g) for k, g in groupby(program_production, key=lambda x: x.programming_no)}
program_to_production_names = {}
for programming_no, program_production in grouped_program_ids.items():
program_to_production_names[programming_no] = [production.name for production in program_production]
for production in self:
if production.programming_no in program_to_production_names:
productions_not_delivered = self.env['mrp.production'].search(
[('programming_no', '=', production.programming_no), ('programming_state', '=', '已编程未下发')])
productions = self.env['mrp.production'].search(
[('programming_no', '=', production.programming_no), ('state', 'not in', ('cancel', 'done'))])
rework_workorder = production.workorder_ids.filtered(lambda m: m.state == 'rework')
if rework_workorder:
for rework_item in rework_workorder:
pending_workorder = production.workorder_ids.filtered(
lambda m1: m1.state in [
'pending'] and m1.processing_panel == rework_item.processing_panel and m1.routing_type == 'CNC加工')
if not pending_workorder.cnc_ids:
production.get_new_program(rework_item.processing_panel)
# production.write({'state': 'progress', 'programming_state': '已编程', 'is_rework': False})
productions_not_delivered.write(
{'state': 'progress', 'programming_state': '已编程', 'is_rework': False})
# 对制造订单所以面的cnc工单的程序用刀进行校验
try:
logging.info(f'已更新制造订单:{productions_not_delivered}')
productions.production_cnc_tool_checkout()
except Exception as e:
logging.info(f'对cnc工单的程序用刀进行校验报错{e}')
# 从cloud获取重新编程过的最新程序
def get_new_program(self, processing_panel):
try:
res = {'programming_no': self.programming_no, 'processing_panel': processing_panel}
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/get_new_program'
config_url = configsettings['sf_url'] + url
r = requests.post(config_url, json=res, data=None, headers=config_header)
r = r.json()
result = json.loads(r['result'])
if result['status'] == 1:
program_path_tmp_panel = os.path.join('/tmp', result['folder_name'], 'return', processing_panel)
if os.path.exists(program_path_tmp_panel):
files_r = os.listdir(program_path_tmp_panel)
if files_r:
for file_name in files_r:
file_path = os.path.join(program_path_tmp_panel, file_name)
os.remove(file_path)
download_state = self.env['sf.cnc.processing'].download_file_tmp(result['folder_name'],
processing_panel)
if download_state is False:
raise UserError('编程单号为%s的CNC程序文件从FTP拉取失败' % (self.programming_no))
productions = self.env['mrp.production'].search(
[('programming_no', '=', self.programming_no), ('state', 'not in', ('cancel', 'done'))])
if productions:
for production in productions:
panel_workorder = production.workorder_ids.filtered(lambda
pw: pw.processing_panel == processing_panel and pw.routing_type == 'CNC加工' and pw.state not in (
'rework', 'done'))
if panel_workorder:
if panel_workorder.cmm_ids:
panel_workorder.cmm_ids.sudo().unlink()
if panel_workorder.cnc_ids:
panel_workorder.cnc_ids.sudo().unlink()
# self.env['sf.cam.work.order.program.knife.plan'].sudo().unlink_cam_plan(
# production)
# program_path_tmp_panel = os.path.join('C://Users//43484//Desktop//fsdownload//test',
# processing_panel)
logging.info('program_path_tmp_panel:%s' % program_path_tmp_panel)
files_panel = os.listdir(program_path_tmp_panel)
if files_panel:
for file in files_panel:
file_extension = os.path.splitext(file)[1]
if file_extension.lower() == '.pdf':
panel_file_path = os.path.join(program_path_tmp_panel, file)
logging.info('panel_file_path:%s' % panel_file_path)
panel_workorder.write(
{'cnc_ids': panel_workorder.cnc_ids.sudo()._json_cnc_processing(processing_panel,
result),
'cmm_ids': panel_workorder.cmm_ids.sudo()._json_cmm_program(processing_panel, result),
'cnc_worksheet': base64.b64encode(open(panel_file_path, 'rb').read())})
logging.info('len(cnc_worksheet):%s' % len(panel_workorder.cnc_worksheet))
pre_workorder = production.workorder_ids.filtered(lambda
ap: ap.routing_type == '装夹预调' and ap.processing_panel == processing_panel and ap.state not in (
'rework', 'done'))
if pre_workorder:
pre_workorder.write(
{'processing_drawing': base64.b64encode(open(panel_file_path, 'rb').read())})
# if production.state == 'rework' and production.programming_state == '已编程未下发':
# production.write(
# {'state': 'progress', 'programming_state': '已编程', 'is_rework': False})
# logging.info('返工含有已编程未下发的程序更新完成:%s' % production.name)
else:
raise UserError(result['message'])
except Exception as e:
logging.info('get_new_program error:%s' % e)
raise UserError("从云平台获取最新程序失败,请联系管理员")
def recreateManufacturing(self, item):
"""
重新生成制造订单
"""
if self.is_scrap is True:
procurement_requests = []
sale_order = self.env['sale.order'].sudo().search([('name', '=', self.origin)])
# 查询出库移动记录
out_picking = self.env['stock.picking'].search(
[('origin', '=', sale_order.name), ('name', 'ilike', 'WH/OUT/')])
move = out_picking.move_ids.filtered(lambda pd: pd.product_id == self.product_id)
move_values = {'product_description_variants': '',
'date_planned': fields.Datetime.now(),
'date_deadline': fields.Datetime.now(),
'move_dest_ids': move,
'group_id': move.group_id,
'route_ids': [],
'warehouse_id': self.warehouse_id,
'priority': 0,
'orderpoint_id': False,
'product_packaging_id': False}
procurement_requests.append(self.env['procurement.group'].Procurement(
move.product_id, 1.0, move.product_uom,
move.location_id, move.rule_id and move.rule_id.name or "/",
sale_order.name, move.company_id, move_values))
self.env['procurement.group'].run(procurement_requests,
raise_user_error=not self.env.context.get('from_orderpoint'))
productions = self.env['mrp.production'].sudo().search(
[('origin', '=', self.origin)], order='id desc', limit=1)
productions.write({'programming_no': self.programming_no, 'is_remanufacture': True})
# mo_move = self.env['stock.move'].search(
# [('origin', '=', sale_order.name), ('reference', 'ilike', 'WH/MO/')])
# if mo_move:
# sfp_move = self.env['stock.move'].search(
# [('origin', '=', sale_order.name), ('reference', 'ilike', 'WH/SFP/')], limit=1)
# mo_move.write({'reference': sfp_move.reference, 'partner_id': sfp_move.partner_id.id,
# 'picking_id': sfp_move.picking_id.id, 'picking_type_id': sfp_move.picking_type_id.id,
# 'production_id': False})
# productions.procurement_group_id.mrp_production_ids.move_dest_ids.write(
# {'group_id': self.env['procurement.group'].search([('name', '=', sale_order.name)])})
stock_picking_remanufacture = self.env['stock.picking'].search([('origin', '=', productions.name)])
for pick in stock_picking_remanufacture:
if pick.name.startswith('WH/PC/') or pick.name.startswith('WH/INT/'):
if pick.move_ids:
product_type_id = pick.move_ids[0].product_id.categ_id
if product_type_id.name == '坯料':
location_id = self.env['stock.location'].search([('name', '=', '坯料存货区')])
if not location_id:
logging.info(f'没有搜索到【坯料存货区】: {location_id}')
break
if pick.picking_type_id.name == '内部调拨':
if pick.location_dest_id.product_type != product_type_id:
pick.location_dest_id = location_id.id
elif pick.picking_type_id.name == '生产发料':
if pick.location_id.product_type != product_type_id:
pick.location_id = location_id.id
scarp_process_parameter_workorder = self.env['mrp.workorder'].search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', self.id),
('is_subcontract', '=', True)])
if scarp_process_parameter_workorder:
production_programming = self.env['mrp.production'].search(
[('programming_no', '=', self.programming_no), ('id', '!=', productions.id)], order='name asc')
production_list = [production.name for production in production_programming]
purchase_orders = self.env['purchase.order'].search([('origin', 'ilike', ','.join(production_list))])
for purchase_item in purchase_orders.order_line:
for process_item in scarp_process_parameter_workorder:
if purchase_item.product_id.categ_type == '表面工艺':
if purchase_item.product_id.server_product_process_parameters_id == process_item.surface_technics_parameters_id:
if purchase_orders.origin.find(productions.name) == -1:
purchase_orders.origin += ',' + productions.name
if item['is_reprogramming'] is False:
productions.programming_state = '已编程'
else:
productions.programming_state = '编程中'
return productions
# 在之前的销售单上重新生成制造订单
def create_production1_values(self, production):
production_values_str = {'origin': production.origin,
'product_id': production.product_id.id,
'programming_state': '已编程',
'product_description_variants': production.product_description_variants,
'product_qty': production.product_qty,
'product_uom_id': production.product_uom_id.id,
'location_src_id': production.location_src_id.id,
'location_dest_id': production.location_dest_id.id,
'bom_id': production.bom_id.id,
'date_deadline': production.date_deadline,
'date_planned_start': production.date_planned_start,
'date_planned_finished': production.date_planned_finished,
# 'procurement_group_id': self.env["procurement.group"].create(
# {'name': production.name}).id,
'propagate_cancel': production.propagate_cancel,
'orderpoint_id': production.orderpoint_id.id,
'picking_type_id': production.picking_type_id.id,
'company_id': production.company_id.id,
'move_dest_ids': production.move_dest_ids.ids,
'user_id': production.user_id.id}
return production_values_str
# 增加制造订单类型
production_type = fields.Selection(
[('自动化产线加工', '自动化产线加工'), ('人工线下加工', '人工线下加工')],
string='制造类型',
compute='_compute_production_type',
store=True
)
@api.depends('product_id.is_manual_processing')
def _compute_production_type(self):
for production in self:
production.production_type = '自动化产线加工' if not production.product_id.is_manual_processing else '人工线下加工'
@api.depends('procurement_group_id.mrp_production_ids.move_dest_ids.group_id.sale_id')
def _compute_sale_order_count(self):
for production in self:
if production.sale_order_id:
production.sale_order_count = 1
else:
production.sale_order_count = 0
def action_view_sale_orders(self):
if self.sale_order_id:
action = {
'res_model': 'sale.order',
'type': 'ir.actions.act_window',
}
action.update({
'view_mode': 'form',
'res_id': self.sale_order_id.id,
})
return action
@api.model_create_multi
def create(self, vals_list):
"""
重载创建制造订单的方法,单个制造订单,同一成品只创建一个采购组,用于后续单据的创建
"""
product_group_id = {}
is_custemer_group_id = {} # 客供料与非客供料
for vals in vals_list:
if not vals.get('name', False) or vals['name'] == _('New'):
picking_type_id = vals.get('picking_type_id')
if not picking_type_id:
picking_type_id = self._get_default_picking_type_id(vals.get('company_id', self.env.company.id))
vals['picking_type_id'] = picking_type_id
vals['name'] = self.env['stock.picking.type'].browse(picking_type_id).sequence_id.next_by_id()
product_id = self.env['product.product'].browse(vals['product_id'])
is_self_process = product_id.materials_type_id and product_id.materials_type_id.gain_way and product_id.materials_type_id.gain_way != '自加工'
is_customer_provided = product_id.is_customer_provided
key = f"{is_self_process}_{is_customer_provided}"
if not is_custemer_group_id.get(key):
is_custemer_group_id[key] = self.env["procurement.group"].create({'name': vals.get('name')}).id
# if not (is_first_customer or is_first_not_customer) and is_self_process:
# is_first = True
# group_id = self.env["procurement.group"].create({'name': vals.get('name')}).id
if not vals.get('procurement_group_id'):
if product_id.product_tmpl_id.single_manufacturing:
if product_id.categ_id.name == '成品':
vals['procurement_group_id'] = is_custemer_group_id[key]
continue
if product_id.id not in product_group_id.keys():
procurement_group_vals = self._prepare_procurement_group_vals(vals)
procurement_group_id = self.env["procurement.group"].create(procurement_group_vals).id
vals['procurement_group_id'] = procurement_group_id
product_group_id[product_id.id] = procurement_group_id
else:
vals['procurement_group_id'] = product_group_id[product_id.id]
else:
vals['procurement_group_id'] = is_custemer_group_id[key]
return super(MrpProduction, self).create(vals_list)
@api.depends('procurement_group_id.stock_move_ids.created_purchase_line_id.order_id',
'procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id')
def _compute_purchase_order_count(self):
for production in self:
# 找到来源的第一张制造订单的采购组
if production.product_id.product_tmpl_id.single_manufacturing == True:
first_production = self.env['mrp.production'].search(
[('origin', '=', production.origin), ('product_id', '=', production.product_id.id)], limit=1,
order='id asc')
production.purchase_order_count = len(
first_production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id |
first_production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id)
else:
production.purchase_order_count = len(
production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id |
production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id)
@api.depends('procurement_group_id', 'procurement_group_id.stock_move_ids.group_id')
def _compute_picking_ids(self):
for order in self:
if order.product_id.product_tmpl_id.single_manufacturing == True and not order.is_remanufacture:
first_order = self.env['mrp.production'].search(
[('origin', '=', order.origin), ('product_id', '=', order.product_id.id)], limit=1, order='id asc')
order.picking_ids = self.env['stock.picking'].search([
('group_id', '=', first_order.procurement_group_id.id), ('group_id', '!=', False),
])
order.delivery_count = len(first_order.picking_ids)
else:
order.picking_ids = self.env['stock.picking'].search([
('group_id', '=', order.procurement_group_id.id), ('group_id', '!=', False),
])
order.delivery_count = len(order.picking_ids)
def action_view_purchase_orders(self):
self.ensure_one()
if self.is_remanufacture:
production = self
elif self.product_id.product_tmpl_id.single_manufacturing == True:
production = self.env['mrp.production'].search(
[('origin', '=', self.origin), ('product_id', '=', self.product_id.id)], limit=1, order='id asc')
else:
production = self
purchase_order_ids = (
production.procurement_group_id.stock_move_ids.created_purchase_line_id.order_id | production.procurement_group_id.stock_move_ids.move_orig_ids.purchase_line_id.order_id).ids
action = {
'res_model': 'purchase.order',
'type': 'ir.actions.act_window',
}
if len(purchase_order_ids) == 1:
action.update({
'view_mode': 'form',
'res_id': purchase_order_ids[0],
})
else:
action.update({
'name': _("Purchase Order generated from %s", self.name),
'domain': [('id', 'in', purchase_order_ids)],
'view_mode': 'tree,form',
})
return action
def _subcontract_sanity_check(self):
for production in self:
if production.product_tracking != 'none' and not self.lot_producing_id:
raise UserError(_('You must enter a serial number for %s') % production.product_id.name)
for sml in production.move_raw_ids.move_line_ids:
if sml.tracking != 'none' and not sml.lot_id:
picking_ids = production.picking_ids.filtered(
lambda p: p.state not in ['done', 'cancel'])
picking_num = len(picking_ids)
picking_info = ', '.join(
['%s:%s' % (picking.picking_type_id.name, picking.name) for picking in picking_ids]
)
if picking_info:
raise UserError(_('You have %s incomplete supplies: %s') % (
picking_num, picking_info))
else:
raise UserError(
_('You must enter a serial number for each line of %s') % sml.product_id.display_name)
return True
reprogramming_count = fields.Integer(string='重新编程次数', default=0)
# 申请编程
def action_apply_programming(self):
"""
检查前置条件:制造订单【状态】=“待排程、待加工”,制造订单的【编程状态】=“已编程”。
"""
print('申请编程')
if len(self) > 1:
raise UserError('仅支持选择单个制造订单进行编程申请,请重新选择')
for production in self:
if production.state not in ['confirmed', 'pending_cam'] or production.programming_state != '已编程':
raise UserError('不可操作。所选制造订单必须同时满足如下条件:\n1、制造订单状态待排程 或 待加工;\n2、制造订单编程状态已编程。\n请检查!')
cloud_programming = production._cron_get_programming_state()
if cloud_programming['programming_state'] in ['待编程', '已编程', '编程中']:
raise UserError("当前编程单正在重新编程,请注意查看当前制造订单的“编程记录”确认进度!")
return {
'type': 'ir.actions.act_window',
'res_model': 'sf.programming.reason',
'view_mode': 'form',
'target': 'new',
'context': {
'default_production_id': self.id,
'active_id': self.id,
# 传当前时间
'default_apply_time': fields.Datetime.now(),
},
'view_id': self.env.ref('sf_manufacturing.sf_programming_reason_form_view').id,
}
# 编程记录
programming_record_ids = fields.One2many('sf.programming.record', 'production_id')
# 编程单更新
def re_programming_update_programming_state(self):
try:
res = {'programming_no': self.programming_no,
'manufacturing_type': ''}
logging.info('res=%s:' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/reset_state_again'
config_url = configsettings['sf_url'] + url
ret = requests.post(config_url, json=res, data=None, headers=config_header)
# ret = ret.json()
# result = json.loads(ret['result'])
# logging.info('update_programming_state-ret:%s' % result)
# if result['status'] == 1:
# self.write({'is_rework': True})
# else:
# raise UserError(ret['message'])
except Exception as e:
logging.info('update_programming_state error:%s' % e)
raise UserError("更新编程单状态失败,请联系管理员")
# 编程记录
class sf_programming_record(models.Model):
_name = 'sf.programming.record'
_description = "编程记录"
production_id = fields.Many2one('mrp.production')
# 编号、编程原因、编程方式、当前编程次数、目标制造单号、申请时间、下发时间
number = fields.Char('编号')
reason = fields.Text('重新编程原因')
programming_method = fields.Selection([
('auto', '自动'),
('manual operation', '人工')], string="编程方式")
current_programming_count = fields.Integer('当前编程次数')
target_production_id = fields.Char('目标制造单号')
apply_time = fields.Datetime('申请时间')
send_time = fields.Datetime('下发时间')
class sf_detection_result(models.Model):
_name = 'sf.detection.result'
_description = "检测结果"
_order = 'handle_result_date desc, id asc'
production_id = fields.Many2one('mrp.production')
processing_panel = fields.Char('加工面')
routing_type = fields.Selection([
('装夹预调', '装夹预调'),
('CNC加工', 'CNC加工'),
('解除装夹', '解除装夹'),
('切割', '切割'),
('表面工艺', '表面工艺'),
('线切割', '线切割'),
('人工线下加工', '人工线下加工')], string="工序类型")
rework_reason = fields.Selection(
[("programming", "编程"), ("cutter", "刀具"), ("clamping", "装夹"),
("operate computer", "操机"),
("technology", "工艺"), ("customer redrawing", "客户改图")], string="原因", tracking=True)
detailed_reason = fields.Text('详细原因')
test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")],
string="检测结果", tracking=True)
test_report = fields.Binary('检测报告', readonly=True)
handle_result = fields.Selection([("待处理", "待处理"), ("已处理", "已处理")], default='', string="处理结果",
tracking=True)
handle_result_date = fields.Datetime('处理时间')
handle_result_user = fields.Many2one('res.users', '处理人')
# 查看检测报告
def button_look_test_report(self):
return {
'res_model': 'sf.detection.result',
'type': 'ir.actions.act_window',
'res_id': self.id,
'views': [(self.env.ref('sf_manufacturing.sf_test_report_form').id, 'form')],
'target': 'new'
}
def write(self, vals):
if vals.get('handle_result') and vals.get('handle_result') == '已处理':
vals['handle_result_date'] = fields.Datetime.now()
vals['handle_result_user'] = self.env.user.id
return super(sf_detection_result, self).write(vals)
class sf_processing_panel(models.Model):
_name = 'sf.processing.panel'
_description = "加工面"
name = fields.Char('加工面')
active = fields.Boolean('有效', default=True)