# -*- coding: utf-8 -*- import base64 import qrcode from itertools import groupby from collections import defaultdict, namedtuple import logging import io import json from re import split as regex_split from re import findall as regex_findall from datetime import datetime import requests from odoo import SUPERUSER_ID, _, api, fields, models from odoo.tools import float_compare from odoo.addons.stock.models.stock_rule import ProcurementException from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from io import BytesIO from odoo.exceptions import ValidationError class stockWarehouse(models.Model): _inherit = 'stock.warehouse' subcontracting_surface_technology_pull_out_id = fields.Many2one( 'stock.rule', '表面工艺规则1') subcontracting_surface_technology_pull_in_id = fields.Many2one( 'stock.rule', '表面工艺规则2' ) def _get_global_route_rules_values(self): rules = super(stockWarehouse, self)._get_global_route_rules_values() location_virtual_id = self.env.ref( 'sf_manufacturing.stock_location_locations_virtual_outcontract').id, location_pre_id = self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id, rules.update({ 'subcontracting_surface_technology_pull_in_id': { 'create_values': { 'action': 'pull', 'picking_type_id': self.env.ref('sf_manufacturing.outcontract_picking_in').id, 'group_propagation_option': 'none', 'company_id': self.company_id.id, 'location_src_id': location_virtual_id, 'location_dest_id': location_pre_id, 'route_id': self._find_global_route('sf_manufacturing.route_surface_technology_outsourcing', _('表面工艺外协')).id, } }, 'subcontracting_surface_technology_pull_out_id': { 'create_values': { 'action': 'pull', 'picking_type_id': self.env.ref('sf_manufacturing.outcontract_picking_out').id, 'group_propagation_option': 'none', 'company_id': self.company_id.id, 'location_src_id': location_pre_id, 'location_dest_id': location_virtual_id, 'route_id': self._find_global_route('sf_manufacturing.route_surface_technology_outsourcing', _('表面工艺外协')).id, } } }) return rules class StockRule(models.Model): _inherit = 'stock.rule' @api.model def _run_pull(self, procurements): logging.info(procurements) moves_values_by_company = defaultdict(list) mtso_products_by_locations = defaultdict(list) # To handle the `mts_else_mto` procure method, we do a preliminary loop to # isolate the products we would need to read the forecasted quantity, # in order to to batch the read. We also make a sanitary check on the # `location_src_id` field. # list1 = [] # for item in procurements: # num = int(item[0].product_qty) # if num > 1: # for no in range(1, num+1): # # Procurement = namedtuple('Procurement', ['product_id', 'product_qty', # 'product_uom', 'location_id', 'name', 'origin', # 'company_id', # 'values']) # s = Procurement(product_id=item[0].product_id,product_qty=1.0,product_uom=item[0].product_uom, # location_id=item[0].location_id, # name=item[0].name, # origin=item[0].origin, # company_id=item[0].company_id, # values=item[0].values, # ) # item1 = list(item) # item1[0]=s # # list1.append(tuple(item1)) # else: # list1.append(item) for procurement, rule in procurements: if not rule.location_src_id: msg = _('No source location defined on stock rule: %s!') % (rule.name,) raise ProcurementException([(procurement, msg)]) if rule.procure_method == 'mts_else_mto': mtso_products_by_locations[rule.location_src_id].append(procurement.product_id.id) # Get the forecasted quantity for the `mts_else_mto` procurement. forecasted_qties_by_loc = {} for location, product_ids in mtso_products_by_locations.items(): products = self.env['product.product'].browse(product_ids).with_context(location=location.id) forecasted_qties_by_loc[location] = {product.id: product.free_qty for product in products} # Prepare the move values, adapt the `procure_method` if needed. procurements = sorted(procurements, key=lambda proc: float_compare(proc[0].product_qty, 0.0, precision_rounding=proc[ 0].product_uom.rounding) > 0) list2 = [] for item in procurements: num = int(item[0].product_qty) product = self.env['product.product'].search( [("id", '=', item[0].product_id.id)]) product_tmpl = self.env['product.template'].search( ["&", ("id", '=', product.product_tmpl_id.id), ('single_manufacturing', "!=", False)]) if product_tmpl: if num > 1: for no in range(1, num + 1): Procurement = namedtuple('Procurement', ['product_id', 'product_qty', 'product_uom', 'location_id', 'name', 'origin', 'company_id', 'values']) s = Procurement(product_id=item[0].product_id, product_qty=1.0, product_uom=item[0].product_uom, location_id=item[0].location_id, name=item[0].name, origin=item[0].origin, company_id=item[0].company_id, values=item[0].values, ) item1 = list(item) item1[0] = s list2.append(tuple(item1)) else: list2.append(item) else: list2.append(item) for procurement, rule in list2: procure_method = rule.procure_method if rule.procure_method == 'mts_else_mto': qty_needed = procurement.product_uom._compute_quantity(procurement.product_qty, procurement.product_id.uom_id) if float_compare(qty_needed, 0, precision_rounding=procurement.product_id.uom_id.rounding) <= 0: procure_method = 'make_to_order' for move in procurement.values.get('group_id', self.env['procurement.group']).stock_move_ids: if move.rule_id == rule and float_compare(move.product_uom_qty, 0, precision_rounding=move.product_uom.rounding) > 0: procure_method = move.procure_method break forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed elif float_compare(qty_needed, forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id], precision_rounding=procurement.product_id.uom_id.rounding) > 0: procure_method = 'make_to_order' else: forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed procure_method = 'make_to_stock' move_values = rule._get_stock_move_values(*procurement) move_values['procure_method'] = procure_method moves_values_by_company[procurement.company_id.id].append(move_values) for company_id, moves_values in moves_values_by_company.items(): # create the move as SUPERUSER because the current user may not have the rights to do it (mto product # launched by a sale for example) moves = self.env['stock.move'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create( moves_values) # Since action_confirm launch following procurement_group we should activate it. moves._action_confirm() return True def attachment_update(self, name, res_id, res_field): attachment_info = self.env['ir.attachment'].sudo().search( [('res_id', '=', res_id), ('res_field', '=', res_field)], limit=1) attachment_info.write({'name': name}) @api.model def _run_manufacture(self, procurements): productions_values_by_company = defaultdict(list) errors = [] for procurement, rule in procurements: if float_compare(procurement.product_qty, 0, precision_rounding=procurement.product_uom.rounding) <= 0: # If procurement contains negative quantity, don't create a MO that would be for a negative value. continue bom = rule._get_matching_bom(procurement.product_id, procurement.company_id, procurement.values) productions_values_by_company[procurement.company_id.id].append(rule._prepare_mo_vals(*procurement, bom)) if errors: raise ProcurementException(errors) for company_id, productions_values in productions_values_by_company.items(): # create the MO as SUPERUSER because the current user may not have the rights to do it # (mto product launched by a sale for example) '''创建制造订单''' productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create( productions_values) # 将这一批制造订单的采购组根据成品设置为不同的采购组 product_group_id = {} for index, production in enumerate(productions): if production.product_id.id not in product_group_id.keys(): product_group_id[production.product_id.id] = production.procurement_group_id.id else: productions_values[index].update({'name': production.name}) procurement_group_vals = production._prepare_procurement_group_vals(productions_values[index]) production.procurement_group_id = self.env["procurement.group"].create(procurement_group_vals).id # self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) ''' 创建工单 ''' # productions._create_workorder() # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() # 处理 根据制造订单生成的采购单坯料入库时到原材料库,手动将原材料位置该为坯料存货区 for production in productions: if production.picking_ids: product_type_id = production.picking_ids[0].move_ids[0].product_id.categ_id if product_type_id.name == '坯料': location_id = self.env['stock.location'].search([('name', '=', '坯料存货区')]) if not location_id: logging.info(f'没有搜索到【坯料存货区】: {location_id}') break for picking_id in production.picking_ids: if picking_id.picking_type_id.name == '内部调拨': if picking_id.location_dest_id.product_type != product_type_id: picking_id.location_dest_id = location_id.id elif picking_id.picking_type_id.name == '生产发料': if picking_id.location_id.product_type != product_type_id: picking_id.location_id = location_id.id for production in productions: ''' 创建制造订单时生成序列号 ''' production.action_generate_serial() origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) ''' 创建生产计划 ''' # 工单耗时 workorder_duration = 0 for workorder in production.workorder_ids: workorder_duration += workorder.duration_expected sale_order = self.env['sale.order'].sudo().search([('name', '=', production.origin)]) # 如果订单为空,则获取来源制造订单的销售单 if not sale_order: mrp_production = self.env['mrp.production'].sudo().search([('name', '=', production.origin)], limit=1) if mrp_production: sale_order = self.env['sale.order'].sudo().search([('name', '=', mrp_production.origin)]) else: mrp_production = production # if sale_order: # sale_order.write({'schedule_status': 'to schedule'}) self.env['sf.production.plan'].sudo().with_company(company_id).create({ 'name': production.name, 'order_deadline': sale_order.deadline_of_delivery, 'production_id': production.id, 'date_planned_start': production.date_planned_start, 'origin': mrp_production.origin, 'product_qty': production.product_qty, 'product_id': production.product_id.id, 'state': 'draft', }) technology_design_values = [] all_production = productions grouped_product_ids = {k: list(g) for k, g in groupby(all_production, key=lambda x: x.product_id.id)} # 初始化一个字典来存储每个product_id对应的生产订单名称列表 product_id_to_production_names = {} # 对于每个product_id,获取其所有生产订单的名称 for product_id, all_production in grouped_product_ids.items(): # 为同一个product_id创建一个生产订单名称列表 product_id_to_production_names[product_id] = [production.name for production in all_production] for production_item in productions: production_programming = self.env['mrp.production'].search( [('product_id.id', '=', production_item.product_id.id), ('origin', '=', production_item.origin)], limit=1, order='id asc') if production_item.product_id.id in product_id_to_production_names: # 同一个产品多个制造订单对应一个编程单和模型库 # 只调用一次fetchCNC,并将所有生产订单的名称作为字符串传递 if not production_item.programming_no and production.production_type == '自动化产线加工': if not production_programming.programming_no: production_item.fetchCNC( ', '.join(product_id_to_production_names[production_item.product_id.id])) else: production_item.write({'programming_no': production_programming.programming_no, 'programming_state': '编程中'}) if not technology_design_values: i = 0 if production_item.product_id.categ_id.type == '成品': # 根据加工面板的面数及成品工序模板生成工序设计 if production_item.production_type == '自动化产线加工': model = 'sf.product.model.type.routing.sort' domain = [ ('product_model_type_id', '=', production_item.product_id.product_model_type_id.id)] else: model = 'sf.manual.product.model.type.routing.sort' domain = [('manual_product_model_type_id', '=', production_item.product_id.product_model_type_id.id)] product_routing_workcenter = self.env[model].search(domain, order='sequence asc') if production_item.production_type == '自动化产线加工': for k in (production_item.product_id.model_processing_panel.split(',')): for route in product_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(k, route, i, False)) else: for route in product_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(False, route, i, False)) elif production_item.product_id.categ_id.type == '坯料': embryo_routing_workcenter = self.env['sf.embryo.model.type.routing.sort'].search( [('embryo_model_type_id', '=', production_item.product_id.embryo_model_type_id.id)], order='sequence asc' ) for route_embryo in embryo_routing_workcenter: i += 1 technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(False, route_embryo, i, False)) surface_technics_arr = [] route_workcenter_arr = [] for item in production_item.product_id.product_model_type_id.surface_technics_routing_tmpl_ids: if item.route_workcenter_id.surface_technics_id.id: for process_param in production_item.product_id.model_process_parameters_ids: if item.route_workcenter_id.surface_technics_id == process_param.process_id: surface_technics_arr.append( item.route_workcenter_id.surface_technics_id.id) route_workcenter_arr.append(item.route_workcenter_id.id) if surface_technics_arr: production_process = self.env['sf.production.process'].search( [('id', 'in', surface_technics_arr)], order='sequence asc' ) for p in production_process: logging.info('production_process:%s' % p.name) process_parameter = production_item.product_id.model_process_parameters_ids.filtered( lambda pm: pm.process_id.id == p.id) if process_parameter: i += 1 route_production_process = self.env[ 'mrp.routing.workcenter'].search( [('surface_technics_id', '=', p.id), ('id', 'in', route_workcenter_arr)]) technology_design_values.append( self.env['sf.technology.design'].json_technology_design_str(False, route_production_process, i, process_parameter)) productions.technology_design_ids = technology_design_values productions.write({'state': 'technology_to_confirmed'}) return True class ProductionLot(models.Model): _name = 'stock.lot' _inherit = ['stock.lot', 'printing.utils'] rfid = fields.Char('Rfid', readonly=True) product_specification = fields.Char('规格', compute='_compute_product_specification', store=True) def search_lot_put_rfid(self): # 使用SQL将所有刀柄Rfid不满十位的值在前方补零 self.env.cr.execute( '''UPDATE stock_lot SET rfid = LPAD(rfid, 10, '0') WHERE rfid IS NOT NULL AND LENGTH(rfid) < 10''' ) self.env.cr.commit() @api.depends('product_id') def _compute_product_specification(self): for stock in self: if stock: if stock.product_id: if stock.product_id.categ_id.name in '刀具': stock.product_specification = stock.product_id.specification_id.name elif stock.product_id.categ_id.name in '夹具': stock.product_specification = stock.product_id.specification_fixture_id.name else: stock.product_specification = stock.product_id.default_code @api.model def generate_lot_names1(self, display_name, first_lot, count): """Generate `lot_names` from a string.""" if first_lot.__contains__(display_name): first_lot = first_lot[(len(display_name) + 1):] # We look if the first lot contains at least one digit. caught_initial_number = regex_findall(r"\d+", first_lot) if not caught_initial_number: return self.generate_lot_names1(display_name, first_lot + "0", count) # We base the series on the last number found in the base lot. initial_number = caught_initial_number[-1] padding = len(initial_number) # We split the lot name to get the prefix and suffix. splitted = regex_split(initial_number, first_lot) # initial_number could appear several times, e.g. BAV023B00001S00001 prefix = initial_number.join(splitted[:-1]) suffix = splitted[-1] initial_number = int(initial_number) lot_names = [] for i in range(0, count): lot_names.append('%s-%s%s%s' % ( display_name, prefix, str(initial_number + i).zfill(padding), suffix )) return lot_names def get_tool_generate_lot_names1(self, company, product): """ 采购时生成刀具物料序列号 """ now = datetime.now().strftime("%Y%m%d") last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id)], limit=1, order='id DESC') if product.cutting_tool_model_id: split_codes = product.cutting_tool_model_id.code.split('-') if not last_serial: return "%s-T-%s-%s-%03d" % (split_codes[0], now, product.specification_id.name, 1) else: return "%s-T-%s-%s-%03d" % ( split_codes[0], now, product.specification_id.name, int(last_serial.name[-3:]) + 1) else: raise ValidationError('该刀具物料产品的型号字段为空,请补充完整!!!') @api.model def _get_next_serial(self, company, product): """Return the next serial number to be attributed to the product.""" if product.tracking == "serial": last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id)], limit=1, order='name desc') if last_serial: if product.categ_id.name == '刀具': return self.env['stock.lot'].get_tool_generate_lot_names1(company, product) else: # 对last_serial的name进行检测,如果不是以产品名称+数字的形式的就重新搜索 if product.name.split('[')[0] not in last_serial.name: last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id), ('name', 'ilike', product.name.split('[')[0])], limit=1, order='name desc') if not last_serial: return "%s-%03d" % (product.name, 1) return self.env['stock.lot'].generate_lot_names1(product.name, last_serial.name, 2)[1] now = datetime.now().strftime("%Y%m%d") if product.cutting_tool_model_id: split_codes = product.cutting_tool_model_id.code.split('-') return "%s-T-%s-%s-%03d" % (split_codes[0], now, product.specification_id.name, 1) return "%s-%03d" % (product.name, 1) qr_code_image = fields.Binary(string='二维码', compute='_generate_qr_code') @api.depends('name') def _generate_qr_code(self): for record in self: # Generate QR code qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(record.name) qr.make(fit=True) qr_image = qr.make_image(fill_color="black", back_color="white") # Encode the image data in base64 image_stream = BytesIO() qr_image.save(image_stream, format="PNG") encoded_image = base64.b64encode(image_stream.getvalue()) record.qr_code_image = encoded_image def print_single_method(self): print('self.name========== %s' % self.name) self.ensure_one() qr_code_data = self.qr_code_image if not qr_code_data: raise UserError("没有找到二维码数据。") lot_name = self.name # host = "192.168.50.110" # 可以根据实际情况修改 # port = 9100 # 可以根据实际情况修改 # 获取默认打印机配置 printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1) if not printer_config: raise UserError('请先配置打印机') host = printer_config.printer_id.ip_address port = printer_config.printer_id.port self.print_qr_code(lot_name, host, port) # self.ensure_one() # 确保这个方法只为一个记录调用 # # if not self.lot_id: # # raise UserError("没有找到序列号。") # # 假设_lot_qr_code方法已经生成了二维码并保存在字段中 # qr_code_data = self.qr_code_image # if not qr_code_data: # raise UserError("没有找到二维码数据。") # # # 生成下载链接或直接触发下载 # # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例 # attachment = self.env['ir.attachment'].sudo().create({ # 'datas': self.qr_code_image, # 'type': 'binary', # 'description': '二维码图片', # 'name': self.name + '.png', # # 'res_id': invoice.id, # # 'res_model': 'stock.picking', # 'public': True, # 'mimetype': 'application/x-png', # # 'model_name': 'stock.picking', # }) # # 返回附件的下载链接 # download_url = '/web/content/%s?download=true' % attachment.id # base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') # return { # 'type': 'ir.actions.act_url', # 'url': str(base_url) + download_url, # 'target': 'self', # } @api.model_create_multi def create(self, vals_list): for vals in vals_list: if vals.get('rfid'): lots = self.env['stock.lot'].search([('rfid', '=', vals['rfid'])]) if lots: for lot in lots: raise ValidationError('Rfid【%s】已被序列号为【%s】的【%s】产品占用!' % ( lot.rfid, lot.name, lot.product_id.name)) records = super(ProductionLot, self).create(vals_list) return records class StockPicking(models.Model): _inherit = 'stock.picking' surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数") person_of_delivery = fields.Char('收货人', compute='_compute_move_ids', store=True) telephone_of_delivery = fields.Char('电话号码', compute='_compute_move_ids', store=True) address_of_delivery = fields.Char('联系地址', compute='_compute_move_ids', store=True) retrospect_ref = fields.Char('追溯参考', compute='_compute_move_ids', store=True) picking_type_sequence_code = fields.Char(related='picking_type_id.sequence_code') @api.depends('move_ids', 'move_ids.product_id') def _compute_move_ids(self): for item in self: if item.move_ids: if item.picking_type_id.sequence_code == 'DL': sale_name = item.move_ids[0].product_id.name.split('-')[1] if 'S' in sale_name: sale_id = self.env['sale.order'].sudo().search([('name', '=', sale_name)]) item.person_of_delivery = sale_id.person_of_delivery item.telephone_of_delivery = sale_id.telephone_of_delivery item.address_of_delivery = sale_id.address_of_delivery else: raise ValidationError('坯料名称格式错误,正确格式为[R-S???-?]!!!') move_ids = [] for move_id in item.move_ids: move_ids.append(move_id.product_id.id) boms = self.env['mrp.bom'].sudo().search([('bom_line_ids.product_id', 'in', move_ids)]) if boms: codes_list = [] for bom in boms: if bom.product_tmpl_id.default_code: code_list = bom.product_tmpl_id.default_code.split('-') if len(code_list) >= 4: code = '-'.join(code_list[:4]) if code not in codes_list: codes_list.append(code) else: raise ValidationError('坯料成品的内部参考值格式错误') item.retrospect_ref = ','.join(codes_list) elif item.picking_type_id.sequence_code in ['INT', 'PC']: pass # 设置外协出入单的名称 def _get_name_Res(self, rescode): last_picking = self.sudo().search([('name', 'ilike', rescode)], order='create_date desc,id desc', limit=1) if not last_picking: num = "%04d" % 1 else: logging.info('编号:' + last_picking.name) m = int(last_picking.name[-3:]) + 1 num = "%04d" % m return '%s%s' % (rescode, num) def button_validate(self): if self.picking_type_id.barcode == 'OCOUT': move_out = self.env['stock.move'].search( [('location_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id), ('location_dest_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'VL-SPOC')]).id), ('origin', '=', self.origin)]) move_in = self.env['stock.move'].search( [('location_dest_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id), ('location_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'VL-SPOC')]).id), ('origin', '=', self.origin), ('picking_id', '=', self.id)]) if self.location_id == move_in.location_id and self.location_dest_id == move_in.location_dest_id: if move_out.origin == move_in.origin: move_in.write({'production_id': False}) if move_out.picking_id.state != 'done': raise UserError( _('该入库单对应的单号为%s的出库单还未完成,不能进行验证操作!' % move_out.picking_id.name)) res = super().button_validate() if res is True and self.picking_type_id.barcode == 'OCIN': if self.id == move_out.picking_id.id: # if move_out.move_line_ids.workorder_id.state == 'progress': move_in = self.env['stock.move'].search( [('location_dest_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id), ('location_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'VL-SPOC')]).id), ('origin', '=', self.origin)]) production = self.env['mrp.production'].search([('name', '=', self.origin)]) if move_in.state != 'done': move_in.write({'state': 'assigned'}) self.env['stock.move.line'].create(move_in.get_move_line(production, None)) return res # 创建 外协出库入单 def create_outcontract_picking(self, sorted_workorders_arr, item): domain = [('origin', '=', item.name), ('name', 'ilike', 'OCOUT')] if len(sorted_workorders_arr) > 1: sorted_workorders_arr = sorted_workorders_arr[0] else: domain += [ ('surface_technics_parameters_id', '=', sorted_workorders_arr[0].surface_technics_parameters_id.id)] stock_picking = self.env['stock.picking'].search(domain) if not stock_picking: for sorted_workorders in sorted_workorders_arr: # pick_ids = [] if not sorted_workorders.picking_ids: # outcontract_stock_move = self.env['stock.move'].search([('production_id', '=', item.id)]) # if not outcontract_stock_move: new_picking = True location_id = self.env['stock.location'].search( [('barcode', 'ilike', 'VL-SPOC')]).id, location_dest_id = self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id, outcontract_picking_type_in = self.env.ref( 'sf_manufacturing.outcontract_picking_in').id, outcontract_picking_type_out = self.env.ref( 'sf_manufacturing.outcontract_picking_out').id, moves_out = self.env['stock.move'].sudo().create( self.env['stock.move']._get_stock_move_values_Res(item, location_dest_id, location_id, outcontract_picking_type_out)) picking_out = self.create( moves_out._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCOUT/')) # pick_ids.append(picking_out.id) moves_out.write( {'picking_id': picking_out.id, 'state': 'waiting'}) moves_out._assign_picking_post_process(new=new_picking) moves_in = self.env['stock.move'].sudo().create( self.env['stock.move']._get_stock_move_values_Res(item, location_id, location_dest_id, outcontract_picking_type_in)) picking_in = self.create( moves_in._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCIN/')) # pick_ids.append(picking_in.id) moves_in.write( {'picking_id': picking_in.id, 'state': 'waiting'}) moves_in._assign_picking_post_process(new=new_picking) class ReStockMove(models.Model): _inherit = 'stock.move' materiel_length = fields.Float(string='物料长度', digits=(16, 4)) materiel_width = fields.Float(string='物料宽度', digits=(16, 4)) materiel_height = fields.Float(string='物料高度', digits=(16, 4)) def _get_stock_move_values_Res(self, item, location_src_id, location_dest_id, picking_type_id): route = self.env['stock.route'].sudo().search([('name', '=', '表面工艺外协')]) move_values = { 'name': '推', 'company_id': item.company_id.id, 'product_id': item.bom_id.bom_line_ids.product_id.id, 'product_uom': item.bom_id.bom_line_ids.product_uom_id.id, 'product_uom_qty': 1.0, 'location_id': location_src_id, 'location_dest_id': location_dest_id, 'origin': item.name, # 'route_ids': False if not route else [(4, route.id)], 'date_deadline': datetime.now(), 'picking_type_id': picking_type_id, } return move_values def _get_new_picking_values_Res(self, item, sorted_workorders, rescode): picking_type_id = self.mapped('picking_type_id').id if rescode == 'WH/OCOUT/': picking_type_id = self.env.ref('sf_manufacturing.outcontract_picking_out').id elif rescode == 'WH/OCIN/': picking_type_id = self.env.ref('sf_manufacturing.outcontract_picking_in').id return { 'name': self.env['stock.picking']._get_name_Res(rescode), 'origin': item.name, 'surface_technics_parameters_id': sorted_workorders.surface_technics_parameters_id.id, 'company_id': self.mapped('company_id').id, 'user_id': False, 'move_type': self.mapped('group_id').move_type or 'direct', 'partner_id': sorted_workorders.supplier_id.id, 'picking_type_id': picking_type_id, 'location_id': self.mapped('location_id').id, 'location_dest_id': self.mapped('location_dest_id').id, 'state': 'confirmed', } def get_move_line(self, production_id, sorted_workorders): return { 'move_id': self.id, 'product_id': self.product_id.id, 'product_uom_id': self.product_uom.id, 'location_id': self.picking_id.location_id.id, 'location_dest_id': self.picking_id.location_dest_id.id, 'picking_id': self.picking_id.id, 'reserved_uom_qty': 1.0, 'lot_id': production_id.move_line_raw_ids.lot_id.id, 'company_id': self.env.company.id, # 'workorder_id': '' if not sorted_workorders else sorted_workorders.id, # 'production_id': '' if not sorted_workorders else sorted_workorders.production_id.id, 'state': 'assigned', } def print_serial_numbers(self): if not self.next_serial: raise UserError(_("请先分配序列号再进行打印")) label_data = [] for item in self.move_line_ids: label_data.append({ 'item_id': item.id, }) if label_data: report_template = self.env.ref('stock.label_package_template') res = report_template.report_action(label_data) res['id'] = report_template.id return res else: raise UserError(_("没有可打印的标签数据")) def action_show_details(self): """ Returns an action that will open a form view (in a popup) allowing to work on all the move lines of a particular move. This form view is used when "show operations" is not checked on the picking type. """ self.ensure_one() # If "show suggestions" is not checked on the picking type, we have to filter out the # reserved move lines. We do this by displaying `move_line_nosuggest_ids`. We use # different views to display one field or another so that the webclient doesn't have to # fetch both. if self.picking_type_id.show_reserved: view = self.env.ref('stock.view_stock_move_operations') else: view = self.env.ref('stock.view_stock_move_nosuggest_operations') if self.state == "assigned": if self.product_id.tracking == "serial": if self.product_id.categ_id.name == '刀具': self.next_serial = self._get_tool_next_serial(self.company_id, self.product_id, self.origin) else: self.next_serial = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) if self.picking_type_id.sequence_code == 'DL' and not self.move_line_nosuggest_ids: self.action_assign_serial_show_details() elif self.product_id.tracking == "lot": self._put_tool_lot(self.company_id, self.product_id, self.origin) return { 'name': _('Detailed Operations'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'stock.move', 'views': [(view.id, 'form')], 'view_id': view.id, 'target': 'new', 'res_id': self.id, 'context': dict( self.env.context, show_owner=self.picking_type_id.code != 'incoming', show_lots_m2o=self.has_tracking != 'none' and ( self.picking_type_id.use_existing_lots or self.state == 'done' or self.origin_returned_move_id.id), # able to create lots, whatever the value of ` use_create_lots`. show_lots_text=self.has_tracking != 'none' and self.picking_type_id.use_create_lots and not self.picking_type_id.use_existing_lots and self.state != 'done' and not self.origin_returned_move_id.id, show_source_location=self.picking_type_id.code != 'incoming', show_destination_location=self.picking_type_id.code != 'outgoing', show_package=not self.location_id.usage == 'supplier', show_reserved_quantity=self.state != 'done' and not self.picking_id.immediate_transfer and self.picking_type_id.code != 'incoming' ), } def put_move_line(self): """ 确认订单时,自动分配序列号 """ if self.product_id.tracking == "serial": if self.product_id.categ_id.name == '刀具': self.next_serial = self._get_tool_next_serial(self.company_id, self.product_id, self.origin) else: self.next_serial = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) self._generate_serial_numbers() for item in self.move_line_nosuggest_ids: if item.lot_name: lot_name = item.lot_name if item.product_id.categ_id.name == '坯料': lot_name = lot_name.split('[', 1)[0] item.lot_qr_code = self.compute_lot_qr_code(lot_name) def _put_tool_lot(self, company, product, origin): if product.tracking == "lot" and self.product_id.categ_id.name == '刀具': if not self.move_line_nosuggest_ids: lot_code = '%s-%s-%s' % ('%s-T-DJWL-%s' % ( product.cutting_tool_model_id.code.split('-')[0], product.cutting_tool_material_id.code), datetime.now().strftime("%Y%m%d"), origin) move_line_ids = self.env['stock.move.line'].sudo().search([('lot_name', 'like', lot_code)], limit=1, order='id desc') if not move_line_ids: lot_code = '%s-001' % lot_code else: lot_code = '%s-%03d' % (lot_code, int(move_line_ids.lot_name[-3:]) + 1) lot_names = self.env['stock.lot'].generate_lot_names(lot_code, 1) move_lines_commands = self._generate_serial_move_line_commands_tool_lot(lot_names) self.write({'move_line_nosuggest_ids': move_lines_commands}) for item in self.move_line_nosuggest_ids: if item.lot_name: item.lot_qr_code = self.compute_lot_qr_code(item.lot_name) def compute_lot_qr_code(self, lot_name): qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(lot_name) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") binary_data = buffer.getvalue() data = base64.b64encode(binary_data).decode() # 确保返回的是字符串形式的数据 return data def _get_tool_next_serial(self, company, product, origin): """Return the next serial number to be attributed to the product.""" if product.tracking == "serial": last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id), ('name', 'ilike', origin)], limit=1, order='id DESC') split_codes = product.cutting_tool_model_id.code.split('-') if last_serial: return "%s-T-%s-%s-%03d" % ( split_codes[0], origin, product.specification_id.name, int(last_serial.name[-3:]) + 1) else: return "%s-T-%s-%s-%03d" % (split_codes[0], origin, product.specification_id.name, 1) def _generate_serial_move_line_commands_tool_lot(self, lot_names, origin_move_line=None): """Return a list of commands to update the move lines (write on existing ones or create new ones). Called when user want to create and assign multiple serial numbers in one time (using the button/wizard or copy-paste a list in the field). :param lot_names: A list containing all serial number to assign. :type lot_names: list :param origin_move_line: A move line to duplicate the value from, default to None :type origin_move_line: record of :class:`stock.move.line` :return: A list of commands to create/update :class:`stock.move.line` :rtype: list """ self.ensure_one() # Select the right move lines depending of the picking type configuration. move_lines = self.env['stock.move.line'] if self.picking_type_id.show_reserved: move_lines = self.move_line_ids.filtered(lambda ml: not ml.lot_id and not ml.lot_name) else: move_lines = self.move_line_nosuggest_ids.filtered(lambda ml: not ml.lot_id and not ml.lot_name) loc_dest = origin_move_line and origin_move_line.location_dest_id move_line_vals = { 'picking_id': self.picking_id.id, 'location_id': self.location_id.id, 'product_id': self.product_id.id, 'product_uom_id': self.product_id.uom_id.id, 'qty_done': self.product_uom_qty, } if origin_move_line: # `owner_id` and `package_id` are taken only in the case we create # new move lines from an existing move line. Also, updates the # `qty_done` because it could be usefull for products tracked by lot. move_line_vals.update({ 'owner_id': origin_move_line.owner_id.id, 'package_id': origin_move_line.package_id.id, 'qty_done': origin_move_line.qty_done or 1, }) move_lines_commands = [] qty_by_location = defaultdict(float) for lot_name in lot_names: # We write the lot name on an existing move line (if we have still one)... if move_lines: move_lines_commands.append((1, move_lines[0].id, { 'lot_name': lot_name, 'qty_done': 1, })) qty_by_location[move_lines[0].location_dest_id.id] += 1 move_lines = move_lines[1:] # ... or create a new move line with the serial name. else: loc = loc_dest or self.location_dest_id._get_putaway_strategy(self.product_id, quantity=1, packaging=self.product_packaging_id, additional_qty=qty_by_location) move_line_cmd = dict(move_line_vals, lot_name=lot_name, location_dest_id=loc.id) move_lines_commands.append((0, 0, move_line_cmd)) qty_by_location[loc.id] += 1 return move_lines_commands def _merge_moves_fields(self): """ 合并制造订单的完成move单据 """ res = super(ReStockMove, self)._merge_moves_fields() if self[0].origin and self.picking_type_id.name in ['生产发料', '内部调拨', '生产入库']: production = self.env['mrp.production'].search([('name', '=', self[0].origin)], limit=1, order='id asc') productions = self.env['mrp.production'].search( [('origin', '=', production.origin), ('product_id', '=', production.product_id.id)]) res['origin'] = ','.join(productions.mapped('name')) return res def _get_new_picking_values(self): """ 创建调拨单时,在此新增或修改调拨单的数据 """ res = super(ReStockMove, self)._get_new_picking_values() ## 制造订单报废生成的新制造订单不走合并 production_remanufacture = None if 'origin' in res: if self.picking_type_id.name in ['生产发料', '内部调拨']: production_remanufacture = self.env['mrp.production'].search( [('name', '=', res['origin']), ('is_remanufacture', '=', True)]) if not production_remanufacture: if self[0].origin and self.picking_type_id.name in ['生产发料', '内部调拨']: production = self.env['mrp.production'].search([('name', '=', self[0].origin)], limit=1, order='id asc') productions = self.env['mrp.production'].search( [('origin', '=', production.origin), ('product_id', '=', production.product_id.id)]) res['origin'] = ','.join(productions.mapped('name')) res['retrospect_ref'] = production.product_id.name return res class ReStockQuant(models.Model): _inherit = 'stock.quant' # def action_apply_inventory(self): # inventory_diff_quantity = self.inventory_diff_quantity # super(ReStockQuant, self).action_apply_inventory() # if inventory_diff_quantity >= 1: # stock = self.env['stock.move'].search([('product_id', '=', self.product_id.id), ('is_inventory', '=', True), # ('reference', '=', '更新的产品数量'), ('state', '=', 'done')], # limit=1, order='id desc') # if self.product_id.categ_type == '夹具': # stock._register_fixture() # elif self.product_id.categ_type == '刀具': # stock._register_cutting_tool() # return True