# -*- coding: utf-8 -*- import base64 import random import re import traceback import qrcode from itertools import groupby from collections import defaultdict, namedtuple import logging import io import json from re import split as regex_split from re import findall as regex_findall from datetime import datetime import requests from odoo import SUPERUSER_ID, _, api, fields, models from odoo.tools import float_compare from odoo.addons.stock.models.stock_rule import ProcurementException from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from io import BytesIO from odoo.exceptions import ValidationError class stockWarehouse(models.Model): _inherit = 'stock.warehouse' subcontracting_surface_technology_pull_out_id = fields.Many2one( 'stock.rule', '表面工艺规则1') subcontracting_surface_technology_pull_in_id = fields.Many2one( 'stock.rule', '表面工艺规则2' ) def _get_global_route_rules_values(self): rules = super(stockWarehouse, self)._get_global_route_rules_values() location_virtual_id = self.env.ref( 'sf_manufacturing.stock_location_locations_virtual_outcontract').id, location_pre_id = self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id, rules.update({ 'subcontracting_surface_technology_pull_in_id': { 'create_values': { 'action': 'pull', 'picking_type_id': self.env.ref('sf_manufacturing.outcontract_picking_in').id, 'group_propagation_option': 'none', 'company_id': self.company_id.id, 'location_src_id': location_virtual_id, 'location_dest_id': location_pre_id, 'route_id': self._find_global_route('sf_manufacturing.route_surface_technology_outsourcing', _('表面工艺外协')).id, } }, 'subcontracting_surface_technology_pull_out_id': { 'create_values': { 'action': 'pull', 'picking_type_id': self.env.ref('sf_manufacturing.outcontract_picking_out').id, 'group_propagation_option': 'none', 'company_id': self.company_id.id, 'location_src_id': location_pre_id, 'location_dest_id': location_virtual_id, 'route_id': self._find_global_route('sf_manufacturing.route_surface_technology_outsourcing', _('表面工艺外协')).id, } } }) return rules class StockRule(models.Model): _inherit = 'stock.rule' @api.model def _run_pull(self, procurements): logging.info(procurements) moves_values_by_company = defaultdict(list) mtso_products_by_locations = defaultdict(list) for procurement, rule in procurements: if not rule.location_src_id: msg = _('No source location defined on stock rule: %s!') % (rule.name,) raise ProcurementException([(procurement, msg)]) if rule.procure_method == 'mts_else_mto': mtso_products_by_locations[rule.location_src_id].append(procurement.product_id.id) # Get the forecasted quantity for the `mts_else_mto` procurement. forecasted_qties_by_loc = {} for location, product_ids in mtso_products_by_locations.items(): products = self.env['product.product'].browse(product_ids).with_context(location=location.id) forecasted_qties_by_loc[location] = {product.id: product.free_qty for product in products} # Prepare the move values, adapt the `procure_method` if needed. procurements = sorted(procurements, key=lambda proc: float_compare(proc[0].product_qty, 0.0, precision_rounding=proc[ 0].product_uom.rounding) > 0) list2 = [] for item in procurements: num = int(item[0].product_qty) product = self.env['product.product'].search( [("id", '=', item[0].product_id.id)]) product_tmpl = self.env['product.template'].search( ["&", ("id", '=', product.product_tmpl_id.id), ('single_manufacturing', "!=", False)]) if product_tmpl: if num > 1: for no in range(1, num + 1): Procurement = namedtuple('Procurement', ['product_id', 'product_qty', 'product_uom', 'location_id', 'name', 'origin', 'company_id', 'values']) s = Procurement(product_id=item[0].product_id, product_qty=1.0, product_uom=item[0].product_uom, location_id=item[0].location_id, name=item[0].name, origin=item[0].origin, company_id=item[0].company_id, values=item[0].values, ) item1 = list(item) item1[0] = s list2.append(tuple(item1)) else: list2.append(item) else: list2.append(item) for procurement, rule in list2: procure_method = rule.procure_method if rule.procure_method == 'mts_else_mto': qty_needed = procurement.product_uom._compute_quantity(procurement.product_qty, procurement.product_id.uom_id) if float_compare(qty_needed, 0, precision_rounding=procurement.product_id.uom_id.rounding) <= 0: procure_method = 'make_to_order' for move in procurement.values.get('group_id', self.env['procurement.group']).stock_move_ids: if move.rule_id == rule and float_compare(move.product_uom_qty, 0, precision_rounding=move.product_uom.rounding) > 0: procure_method = move.procure_method break forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed elif float_compare(qty_needed, forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id], precision_rounding=procurement.product_id.uom_id.rounding) > 0: procure_method = 'make_to_order' else: forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed procure_method = 'make_to_stock' move_values = rule._get_stock_move_values(*procurement) move_values['procure_method'] = procure_method moves_values_by_company[procurement.company_id.id].append(move_values) for company_id, moves_values in moves_values_by_company.items(): # create the move as SUPERUSER because the current user may not have the rights to do it (mto product # launched by a sale for example) moves = self.env['stock.move'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create( moves_values) # Since action_confirm launch following procurement_group we should activate it. moves._action_confirm() return True def attachment_update(self, name, res_id, res_field): attachment_info = self.env['ir.attachment'].sudo().search( [('res_id', '=', res_id), ('res_field', '=', res_field)], limit=1) attachment_info.write({'name': name}) @api.model def _run_manufacture(self, procurements): productions_values_by_company = defaultdict(list) errors = [] for procurement, rule in procurements: if float_compare(procurement.product_qty, 0, precision_rounding=procurement.product_uom.rounding) <= 0: # If procurement contains negative quantity, don't create a MO that would be for a negative value. continue bom = rule._get_matching_bom(procurement.product_id, procurement.company_id, procurement.values) productions_values_by_company[procurement.company_id.id].append(rule._prepare_mo_vals(*procurement, bom)) if errors: raise ProcurementException(errors) for company_id, productions_values in productions_values_by_company.items(): # create the MO as SUPERUSER because the current user may not have the rights to do it # (mto product launched by a sale for example) '''创建制造订单''' productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create( productions_values) # 将这一批制造订单的采购组根据成品设置为不同的采购组 # product_group_id = {} # for index, production in enumerate(productions): # if production.product_id.id not in product_group_id.keys(): # product_group_id[production.product_id.id] = production.procurement_group_id.id # else: # productions_values[index].update({'name': production.name}) # procurement_group_vals = production._prepare_procurement_group_vals(productions_values[index]) # production.procurement_group_id = self.env["procurement.group"].create(procurement_group_vals).id # self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) ''' 创建工单 ''' # productions._create_workorder() # # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() # 处理 根据制造订单生成的采购单坯料入库时到原材料库,手动将原材料位置该为坯料存货区 for production in productions: if production.picking_ids: product_type_id = production.picking_ids[0].move_ids[0].product_id.categ_id if product_type_id.name == '坯料': location_id = self.env['stock.location'].search([('name', '=', '坯料存货区')]) if not location_id: logging.info(f'没有搜索到【坯料存货区】: {location_id}') break for picking_id in production.picking_ids: if picking_id.picking_type_id.name == '内部调拨': if picking_id.location_dest_id.product_type != product_type_id: picking_id.location_dest_id = location_id.id elif picking_id.picking_type_id.name == '生产发料': if picking_id.location_id.product_type != product_type_id: picking_id.location_id = location_id.id for production in productions: ''' 创建制造订单时生成序列号 ''' production.action_generate_serial() origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) ''' 创建生产计划 ''' # 工单耗时 workorder_duration = 0 for workorder in production.workorder_ids: workorder_duration += workorder.duration_expected sale_order = self.env['sale.order'].sudo().search([('name', '=', production.origin)]) # 如果订单为空,则获取来源制造订单的销售单 if not sale_order: mrp_production = self.env['mrp.production'].sudo().search([('name', '=', production.origin)], limit=1) if mrp_production: sale_order = self.env['sale.order'].sudo().search([('name', '=', mrp_production.origin)]) else: mrp_production = production # if sale_order: # sale_order.write({'schedule_status': 'to schedule'}) self.env['sf.production.plan'].sudo().with_company(company_id).create({ 'name': production.name, 'order_deadline': sale_order.deadline_of_delivery, 'production_id': production.id, 'date_planned_start': production.date_planned_start, 'origin': mrp_production.origin, 'product_qty': production.product_qty, 'product_id': production.product_id.id, 'state': 'draft', }) all_production = productions grouped_product_ids = {k: list(g) for k, g in groupby(all_production, key=lambda x: x.product_id.id)} # 初始化一个字典来存储每个product_id对应的生产订单名称列表 product_id_to_production_names = {} # 对于每个product_id,获取其所有生产订单的名称 for product_id, all_production in grouped_product_ids.items(): # 为同一个product_id创建一个生产订单名称列表 product_id_to_production_names[product_id] = [production.name for production in all_production] for production_item in productions: technology_design_values = [] production_programming = self.env['mrp.production'].search( [('product_id.id', '=', production_item.product_id.id), ('origin', '=', production_item.origin)], limit=1, order='id asc') if production_item.product_id.id in product_id_to_production_names: # 同一个产品多个制造订单对应一个编程单和模型库 # 只调用一次fetchCNC,并将所有生产订单的名称作为字符串传递 if not production_item.programming_no and production_item.production_type in ['自动化产线加工', '人工线下加工']: if not production_programming.programming_no: production_item.fetchCNC( ', '.join(product_id_to_production_names[production_item.product_id.id])) else: production_item.write({'programming_no': production_programming.programming_no, 'programming_state': '编程中'}) i = 0 if production_item.product_id.categ_id.type == '成品': # 根据加工面板的面数及成品工序模板生成工序设计 if production_item.production_type == '自动化产线加工': model = 'sf.product.model.type.routing.sort' domain = [ ('product_model_type_id', '=', production_item.product_id.product_model_type_id.id)] else: model = 'sf.manual.product.model.type.routing.sort' domain = [('manual_product_model_type_id', '=', production_item.product_id.product_model_type_id.id)] product_routing_workcenter = self.env[model].search(domain, order='sequence asc') if production_item.production_type == '自动化产线加工': for k in (production_item.product_id.model_processing_panel.split(',')): for route in product_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(k, route, i, False)) elif production_item.production_type == '人工线下加工': for route in product_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str('ZM', route, i, False)) else: for route in product_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(False, route, i, False)) elif production_item.product_id.categ_id.type == '坯料': embryo_routing_workcenter = self.env['sf.embryo.model.type.routing.sort'].search( [('embryo_model_type_id', '=', production_item.product_id.embryo_model_type_id.id)], order='sequence asc' ) for route_embryo in embryo_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(False, route_embryo, i, False)) surface_technics_arr = [] route_workcenter_arr = [] for item in production_item.product_id.product_model_type_id.surface_technics_routing_tmpl_ids: if item.route_workcenter_id.surface_technics_id.id: for process_param in production_item.product_id.model_process_parameters_ids: if item.route_workcenter_id.surface_technics_id == process_param.process_id: surface_technics_arr.append( item.route_workcenter_id.surface_technics_id.id) route_workcenter_arr.append(item.route_workcenter_id.id) if surface_technics_arr: production_process = self.env['sf.production.process'].search( [('id', 'in', surface_technics_arr)], order='sequence asc' ) for p in production_process: logging.info('production_process:%s' % p.name) process_parameters = production_item.product_id.model_process_parameters_ids.filtered( lambda pm: pm.process_id.id == p.id) for process_parameter in process_parameters: i += 1 route_production_process = self.env[ 'mrp.routing.workcenter'].search( [('surface_technics_id', '=', p.id), ('id', 'in', route_workcenter_arr)]) technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(False, route_production_process, i, process_parameter)) production_item.technology_design_ids = technology_design_values productions.write({'state': 'technology_to_confirmed'}) return True class ProductionLot(models.Model): _name = 'stock.lot' _inherit = ['stock.lot', 'printing.utils'] rfid = fields.Char('Rfid', readonly=True) product_specification = fields.Char('规格', compute='_compute_product_specification', store=True) def search_lot_put_rfid(self): # 使用SQL将所有刀柄Rfid不满十位的值在前方补零 self.env.cr.execute( '''UPDATE stock_lot SET rfid = LPAD(rfid, 10, '0') WHERE rfid IS NOT NULL AND LENGTH(rfid) < 10''' ) self.env.cr.commit() @api.depends('product_id') def _compute_product_specification(self): for stock in self: if stock: if stock.product_id: if stock.product_id.categ_id.name in '刀具': stock.product_specification = stock.product_id.specification_id.name elif stock.product_id.categ_id.name in '夹具': stock.product_specification = stock.product_id.specification_fixture_id.name else: stock.product_specification = stock.product_id.default_code @api.model def generate_lot_names1(self, display_name, first_lot, count): """Generate `lot_names` from a string.""" if first_lot.__contains__(display_name): first_lot = first_lot[(len(display_name) + 1):] else: first_lot = first_lot[-3:] # We look if the first lot contains at least one digit. caught_initial_number = regex_findall(r"\d+", first_lot) if not caught_initial_number: return self.generate_lot_names1(display_name, first_lot + "0", count) # We base the series on the last number found in the base lot. initial_number = caught_initial_number[-1] padding = len(initial_number) # We split the lot name to get the prefix and suffix. splitted = regex_split(initial_number, first_lot) # initial_number could appear several times, e.g. BAV023B00001S00001 prefix = initial_number.join(splitted[:-1]) suffix = splitted[-1] initial_number = int(initial_number) lot_names = [] for i in range(0, count): lot_names.append('%s-%s%s%s' % ( display_name, prefix, str(initial_number + i).zfill(padding), suffix )) return lot_names def get_tool_generate_lot_names1(self, company, product): """ 采购时生成刀具物料序列号 """ now = datetime.now().strftime("%Y%m%d") last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id)], limit=1, order='id DESC') if product.cutting_tool_model_id: split_codes = product.cutting_tool_model_id.code.split('-') if not last_serial: return "%s-T-%s-%s-%03d" % (split_codes[0], now, product.specification_id.name, 1) else: return "%s-T-%s-%s-%03d" % ( split_codes[0], now, product.specification_id.name, int(last_serial.name[-3:]) + 1) else: raise ValidationError('该刀具物料产品的型号字段为空,请补充完整!!!') @api.model def _get_next_serial(self, company, product): """Return the next serial number to be attributed to the product.""" if product.tracking == "serial": last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id), ('name', 'ilike', product.name)], limit=1, order='name desc') move_line_id = self.env['stock.move.line'].sudo().search( [('company_id', '=', company.id), ('product_id', '=', product.id), ('lot_name', 'ilike', product.name)], limit=1, order='lot_name desc') if last_serial or move_line_id: return self.env['stock.lot'].generate_lot_names1(product.name, last_serial.name if ( not move_line_id or (last_serial and last_serial.name > move_line_id.lot_name)) else move_line_id.lot_name, 2)[1] return "%s-%03d" % (product.name, 1) qr_code_image = fields.Binary(string='二维码', compute='_generate_qr_code') @api.depends('name') def _generate_qr_code(self): for record in self: # Generate QR code qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(record.name) qr.make(fit=True) qr_image = qr.make_image(fill_color="black", back_color="white") # Encode the image data in base64 image_stream = BytesIO() qr_image.save(image_stream, format="PNG") encoded_image = base64.b64encode(image_stream.getvalue()) record.qr_code_image = encoded_image def print_single_method(self): print('self.name========== %s' % self.name) self.ensure_one() qr_code_data = self.qr_code_image if not qr_code_data: raise UserError("没有找到二维码数据。") lot_name = self.name # host = "192.168.50.110" # 可以根据实际情况修改 # port = 9100 # 可以根据实际情况修改 # 获取默认打印机配置 printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1) if not printer_config: raise UserError('请先配置打印机') host = printer_config.printer_id.ip_address port = printer_config.printer_id.port self.print_qr_code(lot_name, host, port) # self.ensure_one() # 确保这个方法只为一个记录调用 # # if not self.lot_id: # # raise UserError("没有找到序列号。") # # 假设_lot_qr_code方法已经生成了二维码并保存在字段中 # qr_code_data = self.qr_code_image # if not qr_code_data: # raise UserError("没有找到二维码数据。") # # # 生成下载链接或直接触发下载 # # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例 # attachment = self.env['ir.attachment'].sudo().create({ # 'datas': self.qr_code_image, # 'type': 'binary', # 'description': '二维码图片', # 'name': self.name + '.png', # # 'res_id': invoice.id, # # 'res_model': 'stock.picking', # 'public': True, # 'mimetype': 'application/x-png', # # 'model_name': 'stock.picking', # }) # # 返回附件的下载链接 # download_url = '/web/content/%s?download=true' % attachment.id # base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') # return { # 'type': 'ir.actions.act_url', # 'url': str(base_url) + download_url, # 'target': 'self', # } @api.model_create_multi def create(self, vals_list): for vals in vals_list: if vals.get('rfid'): lots = self.env['stock.lot'].search([('rfid', '=', vals['rfid'])]) if lots: for lot in lots: raise ValidationError('Rfid【%s】已被序列号为【%s】的【%s】产品占用!' % ( lot.rfid, lot.name, lot.product_id.name)) records = super(ProductionLot, self).create(vals_list) return records class StockPicking(models.Model): _inherit = 'stock.picking' surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数") person_of_delivery = fields.Char('收货人', compute='_compute_move_ids', store=True) telephone_of_delivery = fields.Char('电话号码', compute='_compute_move_ids', store=True) address_of_delivery = fields.Char('联系地址', compute='_compute_move_ids', store=True) retrospect_ref = fields.Char('追溯参考', compute='_compute_move_ids', store=True) sale_order_id = fields.Many2one('sale.order', '销售单号', compute='_compute_move_ids', store=True) picking_type_sequence_code = fields.Char(related='picking_type_id.sequence_code') part_numbers = fields.Char(string="零件图号", compute='_compute_part_info', store=True, index=True) part_names = fields.Char(string="零件名称", compute='_compute_part_info', store=True, index=True) @api.depends('move_ids_without_package.part_number', 'move_ids_without_package.part_name') def _compute_part_info(self): for picking in self: # 聚合所有关联行的 part_number 和 part_name part_numbers = picking.move_ids_without_package.mapped('part_number') part_names = picking.move_ids_without_package.mapped('part_name') picking.part_numbers = ','.join(filter(None, part_numbers)) picking.part_names = ','.join(filter(None, part_names)) @api.depends('move_ids', 'move_ids.product_id') def _compute_move_ids(self): for item in self: if item.move_ids: product_id = item.move_ids[0].product_id if product_id: sale_info = None if product_id.categ_id.type == '坯料' and product_id.name.startswith('R-S'): parts = product_id.name.split('-') if len(parts) >= 3: sale_name = parts[1] sale_info = self.env['sale.order'].sudo().search( [('name', '=', sale_name)]) else: sale_order_line = self.env['sale.order.line'].sudo().search( [('product_id', '=', product_id.id)]) if sale_order_line: sale_info = sale_order_line[0].order_id if sale_info: item.sale_order_id = sale_info.id item.retrospect_ref = sale_info.order_code if item.picking_type_id.sequence_code == 'DL': item.person_of_delivery = sale_info.person_of_delivery item.telephone_of_delivery = sale_info.telephone_of_delivery item.address_of_delivery = sale_info.address_of_delivery # 设置外协出入单的名称 def _get_name_Res(self, rescode, sequence): last_picking = self.sudo().search([('name', 'ilike', rescode)], order='name desc', limit=1) sequence_id = sequence.next_by_id() name_without_prefix = last_picking.name.removeprefix(rescode) try: name_value = int(name_without_prefix) # 假设 name 是一个数字字符串 except ValueError: name_value = 0 if name_value >= int(sequence_id.removeprefix(rescode)): sequence.write({ 'number_next': name_value + 1, }) return sequence.next_by_id() else: return sequence_id def button_validate(self): # 校验“收料入库单、客供料入库单”是否已经分配序列号,如果没有分配则自动分配 if self.picking_type_id.use_existing_lots is False and self.picking_type_id.use_create_lots is True: for move in self.move_ids: if not move.move_line_nosuggest_ids: move.action_show_details() else: # 对已经生成的序列号做唯一性校验,如果重复则重新生成新的序列号 line_lot_name = [line_id.lot_name for line_id in move.move_line_nosuggest_ids] lot_ids = self.env['stock.lot'].sudo().search([('name', 'in', line_lot_name)]) if lot_ids: move.action_clear_lines_show_details() move.action_show_details() res = super().button_validate() lot_ids = None product_ids = self.move_ids.mapped('product_id') if not self.move_ids[0].product_id.single_manufacturing and self.move_ids[0].product_id.tracking == 'none': lot_ids = self.move_ids.move_line_ids.mapped('lot_id') production_ids = self.sale_order_id.mrp_production_ids if self.sale_order_id else self.env['mrp.production'] if res and self.location_id.name == '外协收料区' and self.location_dest_id.name == '制造前': # 如果是最后一张外协入库单,则设置库存位置的预留数量 for production_id in production_ids: if lot_ids: lot_id = production_id.move_raw_ids.move_line_ids.lot_id # picking_ids = production_id.picking_ids.filtered( # lambda wk: wk.location_id.name == '外协收料区' and wk.location_dest_id.name == '制造前') if lot_id in lot_ids: workorder_id = production_id.workorder_ids.filtered( lambda a: a.state == 'progress' and a.is_subcontract) if not workorder_id: continue workorder_id.button_finish() else: workorder_id = production_id.workorder_ids.filtered(lambda a: a.state == 'progress' and a.is_subcontract) if not workorder_id: continue workorder_id.button_finish() # lot_id = workorder.production_id.move_raw_ids.move_line_ids.lot_id # picking_ids = workorder.production_id.picking_ids.filtered( # lambda wk: wk.location_id.name == '外协收料区' and wk.location_dest_id.name == '制造前') # if move_in: # workorder = move_in.subcontract_workorder_id # workorders = workorder.production_id.workorder_ids # subcontract_workorders = workorders.filtered( # lambda wo: wo.is_subcontract == True and wo.state != 'cancel').sorted('sequence') # # if workorder == subcontract_workorders[-1]: # # self.env['stock.quant']._update_reserved_quantity( # # move_in.product_id, move_in.location_dest_id, move_in.product_uom_qty, # # lot_id=move_in.move_line_ids.lot_id, # # package_id=False, owner_id=False, strict=False # # ) # workorder.button_finish() if res and self.location_id.name == '制造前' and self.location_dest_id.name == '外协加工区': for production_id in production_ids: if lot_ids: lot_id = production_id.move_raw_ids.move_line_ids.lot_id # picking_ids = production_id.picking_ids.filtered( # lambda wk: wk.location_id.name == '外协收料区' and wk.location_dest_id.name == '制造前') if lot_id in lot_ids: workorder_id = production_id.workorder_ids.filtered( lambda a: a.state == 'progress' and a.is_subcontract) if not workorder_id: continue workorder_id.button_finish() else: workorder_id = production_id.workorder_ids.filtered(lambda a: a.state == 'ready' and a.is_subcontract) if not workorder_id: continue workorder_id.button_start() if self.location_id.name == '成品存货区' and self.location_dest_id.name == '客户': sale_id = self.env['sale.order'].sudo().search( [('name', '=', self.origin)]) stock_picking_list = self.env['stock.picking'].sudo().search( [('id', 'in', sale_id.picking_ids.ids)]) stock_picking = stock_picking_list.filtered(lambda p: p.state not in ("done", "cancel")) if sale_id and not stock_picking: sale_id.write({'state': 'delivered'}) if self.location_dest_id.name == '成品存货区' and self.state == 'done': for move in self.move_ids: for production in self.sale_order_id.mrp_production_ids: moves = self.env['stock.move'].search([ ('name', '=', production.name), ('state', '!=', 'cancel') ]) finish_move = next((move for move in moves if move.location_dest_id.name == '制造后'), None) if finish_move.id in move.move_orig_ids.ids and finish_move.state == 'done': production.workorder_ids.write({'back_button_display': False}) return res # 创建 外协出库入单 def create_outcontract_picking(self, workorders, item, sorted_workorders): production = workorders[0].production_id for workorder in workorders: if workorder.move_subcontract_workorder_ids: workorder.move_subcontract_workorder_ids.write({'state': 'cancel'}) workorder.move_subcontract_workorder_ids.picking_id.write({'state': 'cancel'}) # 创建一个新的补货组 procurement_group_id = self.env['procurement.group'].create({ 'name': workorder.name, 'partner_id': self.partner_id.id, }) move_dest_id = False # 如果当前工单是是制造订单的最后一个工艺外协工单 if workorder == next((workorder for workorder in reversed(sorted_workorders) if workorder.is_subcontract), None): if item.move_raw_ids: move_dest_id = item.move_raw_ids[0].id else: # 从sorted_workorders中找到上一工单的move if len(sorted_workorders) > 1: move_dest_id = \ sorted_workorders[sorted_workorders.index(workorder) + 1].move_subcontract_workorder_ids[1].id new_picking = True outcontract_picking_type_in = self.env.ref( 'sf_manufacturing.outcontract_picking_in').id, outcontract_picking_type_out = self.env.ref( 'sf_manufacturing.outcontract_picking_out').id, context = dict(self.env.context) context.update({ 'default_production_id': item.id, # 添加额外信息到 context 中 }) moves_in = self.env['stock.move'].sudo().with_context(context).create( self.env['stock.move']._get_stock_move_values_Res(item, outcontract_picking_type_in, procurement_group_id.id, move_dest_id, production.product_uom_qty)) picking_in = self.create( moves_in._get_new_picking_values_Res(item, workorder, 'WH/OCIN/')) # pick_ids.append(picking_in.id) moves_in.write({'picking_id': picking_in.id}) moves_in._action_confirm() moves_in._assign_picking_post_process(new=new_picking) # self.env.context.get('default_production_id') moves_out = self.env['stock.move'].sudo().with_context(context).create( self.env['stock.move']._get_stock_move_values_Res(item, outcontract_picking_type_out, procurement_group_id.id, moves_in.id, production.product_uom_qty)) workorder.write({'move_subcontract_workorder_ids': [(6, 0, [moves_in.id, moves_out.id])]}) picking_out = self.create( moves_out._get_new_picking_values_Res(item, workorder, 'WH/OCOUT/')) # pick_ids.append(picking_out.id) moves_out.write({'picking_id': picking_out.id}) moves_out._action_confirm() moves_out._assign_picking_post_process(new=new_picking) @api.depends('move_type', 'immediate_transfer', 'move_ids.state', 'move_ids.picking_id') def _compute_state(self): super(StockPicking, self)._compute_state() for picking in self: # 外协出库单根据工单状态,采购单状态来确定 picking_type_id = self.env.ref('sf_manufacturing.outcontract_picking_out').id if picking.picking_type_id.id == picking_type_id: if picking.move_ids: workorder = picking.move_ids[0].subcontract_workorder_id if picking.state == 'assigned': if workorder.state in ['pending', 'waiting'] or workorder._get_surface_technics_purchase_ids().state in [ 'draft', 'sent']: picking.state = 'waiting' @api.constrains('state', 'move_ids_without_package') def _check_move_ids_without_package(self): """ 凡库存调拨单的【作业类型】=“收料入库、客供料入库”,且其产品行的【产品_库存_追溯】="按唯一序列号/按批次”的,当调拨单的【状态】=就绪时 自动生成预分配序列号 """ for sp in self: if (sp.picking_type_id.use_existing_lots is False and sp.picking_type_id.use_create_lots is True and sp.state == 'assigned'): if sp.move_ids_without_package: for move_id in sp.move_ids_without_package: if move_id.product_id.tracking in ['serial', 'lot'] and not move_id.move_line_nosuggest_ids: move_id.action_show_details() @api.model def read_group(self, domain, fields, groupby, offset=0, limit=None, orderby=False, lazy=True): aggregate_field = 'create_date:max' if aggregate_field not in fields: fields.append(aggregate_field) orderby = "create_date desc" return super(StockPicking, self).read_group( domain, fields, groupby, offset=offset, limit=limit, orderby=orderby, lazy=lazy ) class ReStockMove(models.Model): _inherit = 'stock.move' materiel_length = fields.Float(string='物料长度', digits=(16, 4)) materiel_width = fields.Float(string='物料宽度', digits=(16, 4)) materiel_height = fields.Float(string='物料高度', digits=(16, 4)) part_number = fields.Char(string='零件图号', compute='_compute_part_info', store=True) part_name = fields.Char(string='零件名称', compute='_compute_part_info', store=True) @api.depends('product_id') def _compute_part_info(self): try: for move in self: if move.product_id.categ_id.type == '成品': move.part_number = move.product_id.part_number move.part_name = move.product_id.part_name elif move.product_id.categ_id.type == '坯料': product_name = '' match = re.search(r'(S\d{5}-\d)', move.product_id.name) # 如果匹配成功,提取结果 if match: product_name = match.group(0) if move.picking_id.sale_order_id: sale_order = move.picking_id.sale_order_id else: sale_order_name = '' match = re.search(r'(S\d+)', move.product_id.name) if match: sale_order_name = match.group(0) sale_order = self.env['sale.order'].sudo().search( [('name', '=', sale_order_name)]) filtered_order_line = sale_order.order_line.filtered( lambda production: re.search(f'{product_name}$', production.product_id.name) ) if filtered_order_line: move.part_number = filtered_order_line.part_number move.part_name = filtered_order_line.part_name elif move.product_id.categ_id.type == '原材料': production_id = move.production_id or move.raw_material_production_id if not production_id: if not move.origin: continue logging.info('制造订单的调拨单 %s', move.origin) production_id = self.env['mrp.production'].sudo().search( [('name', '=', move.origin.split(',')[0] if move.origin else '')], limit=1) if not production_id: continue product_name = '' logging.info('制造订单的产品 %s', production_id.product_id.name) match = re.search(r'(S\d{5}-\d)', production_id.product_id.name) # 如果匹配成功,提取结果 if match: product_name = match.group(0) if move.picking_id.sale_order_id: sale_order = move.picking_id.sale_order_id else: sale_order_name = '' match = re.search(r'(S\d+)', production_id.product_id.name) if match: sale_order_name = match.group(0) sale_order = self.env['sale.order'].sudo().search( [('name', '=', sale_order_name)]) filtered_order_line = sale_order.order_line.filtered( lambda production: re.search(f'{product_name}$', production.product_id.name) ) if filtered_order_line: move.part_number = filtered_order_line.part_number move.part_name = filtered_order_line.part_name except Exception as e: traceback_error = traceback.format_exc() logging.error("零件图号 零件名称获取失败:%s" % traceback_error) def _get_stock_move_values_Res(self, item, picking_type_id, group_id, move_dest_ids=False, product_uom_qty=1.0): route_id = self.env.ref('sf_manufacturing.route_surface_technology_outsourcing').id stock_rule = self.env['stock.rule'].sudo().search( [('route_id', '=', route_id), ('picking_type_id', '=', picking_type_id)]) move_values = { 'name': '推', 'company_id': item.company_id.id, 'product_id': item.bom_id.bom_line_ids.product_id.id, 'product_uom': item.bom_id.bom_line_ids.product_uom_id.id, 'product_uom_qty': product_uom_qty, 'location_id': stock_rule.location_src_id.id, 'location_dest_id': stock_rule.location_dest_id.id, 'origin': item.name, 'group_id': group_id, 'move_dest_ids': [(6, 0, [move_dest_ids])] if move_dest_ids else False, # 'production_id': item.id, # 'route_ids': False if not route else [(4, route.id)], 'date_deadline': datetime.now(), 'picking_type_id': picking_type_id, # 'is_subcontract': True, } return move_values def _get_new_picking_values_Res(self, item, sorted_workorders, rescode): picking_type_id = self.mapped('picking_type_id').id sequence = False if rescode == 'WH/OCOUT/': picking_type_id = self.env.ref('sf_manufacturing.outcontract_picking_out').id sequence = self.env.ref('sf_manufacturing.sequence_stock_picking_out') elif rescode == 'WH/OCIN/': picking_type_id = self.env.ref('sf_manufacturing.outcontract_picking_in').id sequence = self.env.ref('sf_manufacturing.sequence_stock_picking_in') return { 'name': self.env['stock.picking']._get_name_Res(rescode, sequence), 'origin': item.name, 'surface_technics_parameters_id': sorted_workorders.surface_technics_parameters_id.id, 'company_id': self.mapped('company_id').id, 'user_id': False, 'move_type': self.mapped('group_id').move_type or 'direct', 'partner_id': sorted_workorders.supplier_id.id, 'picking_type_id': picking_type_id, 'location_id': self.mapped('location_id').id, 'location_dest_id': self.mapped('location_dest_id').id, 'state': 'waiting', } def get_move_line(self, production_id, sorted_workorders): # if not self.move_ids[0].product_id.single_manufacturing and self.move_ids[0].product_id.tracking == 'none': qty = production_id.product_qty return { 'move_id': self.id, 'product_id': self.product_id.id, 'product_uom_id': self.product_uom.id, 'location_id': self.picking_id.location_id.id, 'location_dest_id': self.picking_id.location_dest_id.id, 'picking_id': self.picking_id.id, 'reserved_uom_qty': qty, 'lot_id': production_id.move_line_raw_ids.lot_id.id, 'company_id': self.env.company.id, # 'workorder_id': '' if not sorted_workorders else sorted_workorders.id, # 'production_id': '' if not sorted_workorders else sorted_workorders.production_id.id, 'state': 'assigned', } def print_serial_numbers(self): if not self.next_serial: raise UserError(_("请先分配序列号再进行打印")) label_data = [] for item in self.move_line_ids: label_data.append({ 'item_id': item.id, }) if label_data: report_template = self.env.ref('stock.label_package_template') res = report_template.report_action(label_data) res['id'] = report_template.id return res else: raise UserError(_("没有可打印的标签数据")) def action_show_details(self): """ Returns an action that will open a form view (in a popup) allowing to work on all the move lines of a particular move. This form view is used when "show operations" is not checked on the picking type. """ self.ensure_one() # If "show suggestions" is not checked on the picking type, we have to filter out the # reserved move lines. We do this by displaying `move_line_nosuggest_ids`. We use # different views to display one field or another so that the webclient doesn't have to # fetch both. if self.picking_type_id.show_reserved: view = self.env.ref('stock.view_stock_move_operations') else: view = self.env.ref('stock.view_stock_move_nosuggest_operations') if self.state == "assigned": if self.product_id.tracking == "serial": if self.product_id.categ_id.name == '刀具': self.next_serial = self._get_tool_next_serial(self.company_id, self.product_id, self.origin) else: self.next_serial = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) if (self.picking_type_id.use_existing_lots is False and self.picking_type_id.use_create_lots is True and not self.move_line_nosuggest_ids): self.action_assign_serial_show_details() elif self.product_id.tracking == "lot": if self.product_id.categ_id.name == '刀具': self._put_tool_lot(self.company_id, self.product_id, self.origin) return { 'name': _('Detailed Operations'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'stock.move', 'views': [(view.id, 'form')], 'view_id': view.id, 'target': 'new', 'res_id': self.id, 'context': dict( self.env.context, show_owner=self.picking_type_id.code != 'incoming', show_lots_m2o=self.has_tracking != 'none' and ( self.picking_type_id.use_existing_lots or self.state == 'done' or self.origin_returned_move_id.id), # able to create lots, whatever the value of ` use_create_lots`. show_lots_text=self.has_tracking != 'none' and self.picking_type_id.use_create_lots and not self.picking_type_id.use_existing_lots and self.state != 'done' and not self.origin_returned_move_id.id, show_source_location=self.picking_type_id.code != 'incoming', show_destination_location=self.picking_type_id.code != 'outgoing', show_package=not self.location_id.usage == 'supplier', show_reserved_quantity=self.state != 'done' and not self.picking_id.immediate_transfer and self.picking_type_id.code != 'incoming' ), } def _put_tool_lot(self, company, product, origin): if product.tracking == "lot" and self.product_id.categ_id.name == '刀具': if not self.move_line_nosuggest_ids: lot_code = '%s-%s-%s' % ('%s-T-DJWL-%s' % ( product.cutting_tool_model_id.code.split('-')[0], product.cutting_tool_material_id.code), datetime.now().strftime("%Y%m%d"), origin) move_line_ids = self.env['stock.move.line'].sudo().search( [('company_id', '=', company.id), ('lot_name', 'like', lot_code)], limit=1, order='id desc') if not move_line_ids: lot_code = '%s-001' % lot_code else: lot_code = '%s-%03d' % (lot_code, int(move_line_ids.lot_name[-3:]) + 1) lot_names = self.env['stock.lot'].generate_lot_names(lot_code, 1) move_lines_commands = self._generate_serial_move_line_commands_tool_lot(lot_names) for move_lines_command in move_lines_commands: move_lines_command[2]['qty_done'] = self.product_uom_qty self.write({'move_line_nosuggest_ids': move_lines_commands}) for item in self.move_line_nosuggest_ids: if item.lot_name: item.lot_qr_code = self.compute_lot_qr_code(item.lot_name) def compute_lot_qr_code(self, lot_name): qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(lot_name) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") binary_data = buffer.getvalue() data = base64.b64encode(binary_data).decode() # 确保返回的是字符串形式的数据 return data def _get_tool_next_serial(self, company, product, origin): """Return the next serial number to be attributed to the product.""" if product.tracking == "serial": last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id), ('name', 'ilike', origin)], limit=1, order='id DESC') move_line_id = self.env['stock.move.line'].sudo().search( [('company_id', '=', company.id), ('product_id', '=', product.id), ('lot_name', 'ilike', origin)], limit=1, order='lot_name desc') split_codes = product.cutting_tool_model_id.code.split('-') if last_serial or move_line_id: return "%s-T-%s-%s-%03d" % ( split_codes[0], origin, product.specification_id.name, int(last_serial.name[-3:] if (not move_line_id or (last_serial and last_serial.name > move_line_id.lot_name)) else move_line_id.lot_name[-3:]) + 1) else: return "%s-T-%s-%s-%03d" % (split_codes[0], origin, product.specification_id.name, 1) def _generate_serial_move_line_commands_tool_lot(self, lot_names, origin_move_line=None): """Return a list of commands to update the move lines (write on existing ones or create new ones). Called when user want to create and assign multiple serial numbers in one time (using the button/wizard or copy-paste a list in the field). :param lot_names: A list containing all serial number to assign. :type lot_names: list :param origin_move_line: A move line to duplicate the value from, default to None :type origin_move_line: record of :class:`stock.move.line` :return: A list of commands to create/update :class:`stock.move.line` :rtype: list """ self.ensure_one() # Select the right move lines depending of the picking type configuration. move_lines = self.env['stock.move.line'] if self.picking_type_id.show_reserved: move_lines = self.move_line_ids.filtered(lambda ml: not ml.lot_id and not ml.lot_name) else: move_lines = self.move_line_nosuggest_ids.filtered(lambda ml: not ml.lot_id and not ml.lot_name) loc_dest = origin_move_line and origin_move_line.location_dest_id move_line_vals = { 'picking_id': self.picking_id.id, 'location_id': self.location_id.id, 'product_id': self.product_id.id, 'product_uom_id': self.product_id.uom_id.id, 'qty_done': self.product_uom_qty, } if origin_move_line: # `owner_id` and `package_id` are taken only in the case we create # new move lines from an existing move line. Also, updates the # `qty_done` because it could be usefull for products tracked by lot. move_line_vals.update({ 'owner_id': origin_move_line.owner_id.id, 'package_id': origin_move_line.package_id.id, 'qty_done': origin_move_line.qty_done or 1, }) move_lines_commands = [] qty_by_location = defaultdict(float) for lot_name in lot_names: # We write the lot name on an existing move line (if we have still one)... if move_lines: move_lines_commands.append((1, move_lines[0].id, { 'lot_name': lot_name, 'qty_done': 1, })) qty_by_location[move_lines[0].location_dest_id.id] += 1 move_lines = move_lines[1:] # ... or create a new move line with the serial name. else: loc = loc_dest or self.location_dest_id._get_putaway_strategy(self.product_id, quantity=1, packaging=self.product_packaging_id, additional_qty=qty_by_location) move_line_cmd = dict(move_line_vals, lot_name=lot_name, location_dest_id=loc.id) move_lines_commands.append((0, 0, move_line_cmd)) qty_by_location[loc.id] += 1 return move_lines_commands def _merge_moves_fields(self): """ 合并制造订单的完成move单据 """ res = super(ReStockMove, self)._merge_moves_fields() if self[0].origin and self.picking_type_id.name in ['生产发料', '内部调拨', '生产入库', '客供料入库']: production = self.env['mrp.production'].search([('name', '=', self[0].origin)], limit=1, order='id asc') productions = self.env['mrp.production'].search( [('origin', '=', production.origin), ('product_id', '=', production.product_id.id)]) res['origin'] = ','.join(productions.mapped('name')) if self.picking_type_id.name == '客供料入库': self.picking_id.sudo().write( {'origin': res['origin'] if res.get('origin') else self[0].picking_id.origin}) return res def _get_new_picking_values(self): """ 创建调拨单时,在此新增或修改调拨单的数据 """ res = super(ReStockMove, self)._get_new_picking_values() ## 制造订单报废生成的新制造订单不走合并 if not self.env.context.get('is_remanufacture_flag'): if self[0].origin and self.picking_type_id.name in ['生产发料', '内部调拨']: production = self.env['mrp.production'].search([('name', '=', self[0].origin)], limit=1, order='id asc') productions = self.env['mrp.production'].search( [('origin', '=', production.origin), ('product_id', '=', production.product_id.id)]) if productions.mapped('name'): res['origin'] = ','.join(productions.mapped('name')) res['retrospect_ref'] = production.product_id.name return res subcontract_workorder_id = fields.Many2one('mrp.workorder', '外协工单组件', check_company=True, index='btree_not_null') def button_update_the_sequence_number(self): """ 更新序列号 功能按钮 """ self.move_line_nosuggest_ids.unlink() if self.state != 'assigned': self.state = 'assigned' return self.action_show_details() def _prepare_move_line_vals(self, quantity=None, reserved_quant=None): res = super(ReStockMove, self)._prepare_move_line_vals(quantity, reserved_quant) if self.subcontract_workorder_id: if self.subcontract_workorder_id.production_id.move_raw_ids.move_line_ids: res['lot_id'] = self.subcontract_workorder_id.production_id.move_raw_ids.move_line_ids[0].lot_id.id return res class ReStockQuant(models.Model): _inherit = 'stock.quant' # def action_apply_inventory(self): # inventory_diff_quantity = self.inventory_diff_quantity # super(ReStockQuant, self).action_apply_inventory() # if inventory_diff_quantity >= 1: # stock = self.env['stock.move'].search([('product_id', '=', self.product_id.id), ('is_inventory', '=', True), # ('reference', '=', '更新的产品数量'), ('state', '=', 'done')], # limit=1, order='id desc') # if self.product_id.categ_type == '夹具': # stock._register_fixture() # elif self.product_id.categ_type == '刀具': # stock._register_cutting_tool() # return True