# -*- coding: utf-8 -*- import base64 import qrcode from collections import defaultdict, namedtuple import logging import json from re import split as regex_split from re import findall as regex_findall from datetime import datetime import requests from odoo import SUPERUSER_ID, _, api, fields, models from odoo.tools import float_compare from odoo.addons.stock.models.stock_rule import ProcurementException from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from io import BytesIO class StockRule(models.Model): _inherit = 'stock.rule' @api.model def _run_pull(self, procurements): moves_values_by_company = defaultdict(list) mtso_products_by_locations = defaultdict(list) # To handle the `mts_else_mto` procure method, we do a preliminary loop to # isolate the products we would need to read the forecasted quantity, # in order to to batch the read. We also make a sanitary check on the # `location_src_id` field. # list1 = [] # for item in procurements: # num = int(item[0].product_qty) # if num > 1: # for no in range(1, num+1): # # Procurement = namedtuple('Procurement', ['product_id', 'product_qty', # 'product_uom', 'location_id', 'name', 'origin', # 'company_id', # 'values']) # s = Procurement(product_id=item[0].product_id,product_qty=1.0,product_uom=item[0].product_uom, # location_id=item[0].location_id, # name=item[0].name, # origin=item[0].origin, # company_id=item[0].company_id, # values=item[0].values, # ) # item1 = list(item) # item1[0]=s # # list1.append(tuple(item1)) # else: # list1.append(item) for procurement, rule in procurements: if not rule.location_src_id: msg = _('No source location defined on stock rule: %s!') % (rule.name,) raise ProcurementException([(procurement, msg)]) if rule.procure_method == 'mts_else_mto': mtso_products_by_locations[rule.location_src_id].append(procurement.product_id.id) # Get the forecasted quantity for the `mts_else_mto` procurement. forecasted_qties_by_loc = {} for location, product_ids in mtso_products_by_locations.items(): products = self.env['product.product'].browse(product_ids).with_context(location=location.id) forecasted_qties_by_loc[location] = {product.id: product.free_qty for product in products} # Prepare the move values, adapt the `procure_method` if needed. procurements = sorted(procurements, key=lambda proc: float_compare(proc[0].product_qty, 0.0, precision_rounding=proc[ 0].product_uom.rounding) > 0) list2 = [] for item in procurements: num = int(item[0].product_qty) product = self.env['product.product'].search( [("id", '=', item[0].product_id.id)]) product_tmpl = self.env['product.template'].search( ["&", ("id", '=', product.product_tmpl_id.id), ('single_manufacturing', "!=", False)]) if product_tmpl: if num > 1: for no in range(1, num + 1): Procurement = namedtuple('Procurement', ['product_id', 'product_qty', 'product_uom', 'location_id', 'name', 'origin', 'company_id', 'values']) s = Procurement(product_id=item[0].product_id, product_qty=1.0, product_uom=item[0].product_uom, location_id=item[0].location_id, name=item[0].name, origin=item[0].origin, company_id=item[0].company_id, values=item[0].values, ) item1 = list(item) item1[0] = s list2.append(tuple(item1)) else: list2.append(item) else: list2.append(item) for procurement, rule in list2: procure_method = rule.procure_method if rule.procure_method == 'mts_else_mto': qty_needed = procurement.product_uom._compute_quantity(procurement.product_qty, procurement.product_id.uom_id) if float_compare(qty_needed, 0, precision_rounding=procurement.product_id.uom_id.rounding) <= 0: procure_method = 'make_to_order' for move in procurement.values.get('group_id', self.env['procurement.group']).stock_move_ids: if move.rule_id == rule and float_compare(move.product_uom_qty, 0, precision_rounding=move.product_uom.rounding) > 0: procure_method = move.procure_method break forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed elif float_compare(qty_needed, forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id], precision_rounding=procurement.product_id.uom_id.rounding) > 0: procure_method = 'make_to_order' else: forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed procure_method = 'make_to_stock' move_values = rule._get_stock_move_values(*procurement) move_values['procure_method'] = procure_method moves_values_by_company[procurement.company_id.id].append(move_values) for company_id, moves_values in moves_values_by_company.items(): # create the move as SUPERUSER because the current user may not have the rights to do it (mto product # launched by a sale for example) moves = self.env['stock.move'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(moves_values) # Since action_confirm launch following procurement_group we should activate it. moves._action_confirm() return True @api.model def _run_manufacture(self, procurements): productions_values_by_company = defaultdict(list) errors = [] for procurement, rule in procurements: if float_compare(procurement.product_qty, 0, precision_rounding=procurement.product_uom.rounding) <= 0: # If procurement contains negative quantity, don't create a MO that would be for a negative value. continue bom = rule._get_matching_bom(procurement.product_id, procurement.company_id, procurement.values) productions_values_by_company[procurement.company_id.id].append(rule._prepare_mo_vals(*procurement, bom)) if errors: raise ProcurementException(errors) for company_id, productions_values in productions_values_by_company.items(): # create the MO as SUPERUSER because the current user may not have the rights to do it # (mto product launched by a sale for example) '''创建制造订单''' productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create( productions_values) self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) ''' 创建工单 ''' productions._create_workorder() productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() for production in productions: ''' 创建制造订单时生成序列号 ''' production.action_generate_serial() origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) ''' 创建生产计划 ''' # 工单耗时 workorder_duration = 0 for workorder in production.workorder_ids: workorder_duration += workorder.duration_expected sale_order = self.env['sale.order'].sudo().search([('name', '=', production.origin)]) if sale_order: sale_order.write({'schedule_status': 'to schedule'}) self.env['sf.production.plan'].sudo().with_company(company_id).create({ 'name': production.name, 'order_deadline': sale_order.deadline_of_delivery, 'production_id': production.id, 'date_planned_start': production.date_planned_start, 'origin': production.origin, 'product_qty': production.product_qty, 'product_id': production.product_id.id, 'state': 'draft', }) return True class ProductionLot(models.Model): _inherit = 'stock.lot' @api.model def generate_lot_names1(self, display_name, first_lot, count): """Generate `lot_names` from a string.""" if first_lot.__contains__(display_name): first_lot = first_lot[(len(display_name) + 1):] # We look if the first lot contains at least one digit. caught_initial_number = regex_findall(r"\d+", first_lot) if not caught_initial_number: return self.generate_lot_names1(display_name, first_lot + "0", count) # We base the series on the last number found in the base lot. initial_number = caught_initial_number[-1] padding = len(initial_number) # We split the lot name to get the prefix and suffix. splitted = regex_split(initial_number, first_lot) # initial_number could appear several times, e.g. BAV023B00001S00001 prefix = initial_number.join(splitted[:-1]) suffix = splitted[-1] initial_number = int(initial_number) lot_names = [] for i in range(0, count): lot_names.append('%s-%s%s%s' % ( display_name, prefix, str(initial_number + i).zfill(padding), suffix )) return lot_names @api.model def _get_next_serial(self, company, product): """Return the next serial number to be attributed to the product.""" if product.tracking == "serial": last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id)], limit=1, order='id DESC') if last_serial: return self.env['stock.lot'].generate_lot_names1(product.name, last_serial.name, 2)[ 1] now = datetime.now().strftime("%Y-%m-%d") # formatted_date = now.strftime("%Y-%m-%d") if product.cutting_tool_model_id: return "%s-%s-%03d" % (product.cutting_tool_model_id.code, now, 1) return "%s-%03d" % (product.name, 1) qr_code_image = fields.Binary(string='二维码', compute='_generate_qr_code') @api.depends('name') def _generate_qr_code(self): for record in self: # Generate QR code qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(record.name) qr.make(fit=True) qr_image = qr.make_image(fill_color="black", back_color="white") # Encode the image data in base64 image_stream = BytesIO() qr_image.save(image_stream, format="PNG") encoded_image = base64.b64encode(image_stream.getvalue()) record.qr_code_image = encoded_image def print_qr_code(self): self.ensure_one() # 确保这个方法只为一个记录调用 # if not self.lot_id: # raise UserError("没有找到序列号。") # 假设_lot_qr_code方法已经生成了二维码并保存在字段中 qr_code_data = self.qr_code_image if not qr_code_data: raise UserError("没有找到二维码数据。") # 生成下载链接或直接触发下载 # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例 attachment = self.env['ir.attachment'].sudo().create({ 'datas': self.qr_code_image, 'type': 'binary', 'description': '二维码图片', 'name': self.name + '.png', # 'res_id': invoice.id, # 'res_model': 'stock.picking', 'public': True, 'mimetype': 'application/x-png', # 'model_name': 'stock.picking', }) # 返回附件的下载链接 download_url = '/web/content/%s?download=true' % attachment.id base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') return { 'type': 'ir.actions.act_url', 'url': str(base_url) + download_url, 'target': 'self', } class StockPicking(models.Model): _inherit = 'stock.picking' workorder_in_id = fields.One2many('mrp.workorder', 'picking_in_id') workorder_out_id = fields.One2many('mrp.workorder', 'picking_out_id') # 设置外协出入单的名称 def _get_name_Res(self, rescode): last_picking = self.sudo().search([('name', 'like', rescode)], order='create_date DESC', limit=1) logging.info('编号:' + last_picking.name) if not last_picking: num = "%04d" % 1 else: m = int(last_picking.name[-3:]) + 1 num = "%04d" % m return '%s%s' % (rescode, num) def button_validate(self): # 出库单验证 if self.workorder_out_id: workorder_in = self.workorder_out_id.filtered(lambda p: p.state == 'progress' and p.is_subcontract is True) if workorder_in: picking_in = self.sudo().search([('id', '=', workorder_in.picking_in_id.id)]) if picking_in: picking_in.write({'state': 'assigned'}) else: workorder_subcontract = self.workorder_out_id.filtered( lambda p: p.state == 'pending' and p.is_subcontract is True) if workorder_subcontract: raise UserError( _('该出库单里源单据内的单号为%s的工单还未开始,不能进行验证操作!' % workorder_subcontract[ 0].name)) # 入库单验证 if self.workorder_in_id: workorder_out = self.workorder_in_id.filtered(lambda p: p.state == 'progress' and p.is_subcontract is True) if workorder_out: picking_out = self.sudo().search([('id', '=', workorder_out.picking_out_id.id)]) if picking_out.state != 'done': raise UserError( _('该入库单对应的单号为%s的出库单还未完成,不能进行验证操作!' % picking_out.name)) res = super().button_validate() # 采购单验证(夹具) # for item in self.move_ids_without_package: # if item.quantity_done > 0: # if item.product_id.categ_type == '夹具': # item._register_fixture() # elif item.product_id.categ_type == '刀具': # item._register_cutting_tool() return res # 创建 外协出库入单 def create_outcontract_picking(self, sorted_workorders_arr, item): m = 0 for sorted_workorders in sorted_workorders_arr: if m == 0: outcontract_stock_move = self.env['stock.move'].search( [('workorder_id', '=', sorted_workorders.id), ('production_id', '=', item.id)]) if not outcontract_stock_move: location_id = self.env.ref( 'sf_manufacturing.stock_location_locations_virtual_outcontract').id, location_dest_id = self.env['stock.location'].search( [('barcode', '=', 'WH-PREPRODUCTION')]).id, outcontract_picking_type_in = self.env.ref( 'sf_manufacturing.outcontract_picking_in').id, outcontract_picking_type_out = self.env.ref( 'sf_manufacturing.outcontract_picking_out').id, moves_in = self.env['stock.move'].sudo().create( item._get_stock_move_values_Res(item, location_id, location_dest_id, outcontract_picking_type_in)) moves_out = self.env['stock.move'].sudo().create( item._get_stock_move_values_Res(item, location_dest_id, location_id, outcontract_picking_type_out)) new_picking = True picking_in = self.create( moves_in._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCIN/')) picking_out = self.create( moves_out._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCOUT/')) moves_in.write({'picking_id': picking_in.id, 'state': 'confirmed'}) moves_out.write({'picking_id': picking_out.id, 'state': 'confirmed'}) moves_in._assign_picking_post_process(new=new_picking) moves_out._assign_picking_post_process(new=new_picking) m += 1 sorted_workorders.write({'picking_in_id': picking_in.id, 'picking_out_id': picking_out.id}) class ReStockMove(models.Model): _inherit = 'stock.move' materiel_length = fields.Float(string='物料长度', digits=(16, 4)) materiel_width = fields.Float(string='物料宽度', digits=(16, 4)) materiel_height = fields.Float(string='物料高度', digits=(16, 4)) def _get_new_picking_values_Res(self, item, sorted_workorders, rescode): return { 'name': self.env['stock.picking']._get_name_Res(rescode), 'origin': item.name, 'company_id': self.mapped('company_id').id, 'user_id': False, 'move_type': self.mapped('group_id').move_type or 'direct', 'partner_id': sorted_workorders.supplier_id.id, 'picking_type_id': self.mapped('picking_type_id').id, 'location_id': self.mapped('location_id').id, 'location_dest_id': self.mapped('location_dest_id').id, 'state': 'confirmed', } def print_serial_numbers(self): if not self.next_serial: raise UserError(_("请先分配序列号再进行打印")) label_data = [] for item in self.move_line_ids: label_data.append({ 'item_id': item.id, }) if label_data: report_template = self.env.ref('stock.label_package_template') res = report_template.report_action(label_data) res['id'] = report_template.id return res else: raise UserError(_("没有可打印的标签数据")) class ReStockQuant(models.Model): _inherit = 'stock.quant' def action_apply_inventory(self): inventory_diff_quantity = self.inventory_diff_quantity super(ReStockQuant, self).action_apply_inventory() if inventory_diff_quantity >= 1: stock = self.env['stock.move'].search([('product_id', '=', self.product_id.id), ('is_inventory', '=', True), ('reference', '=', '更新的产品数量'), ('state', '=', 'done')], limit=1, order='id desc') if self.product_id.categ_type == '夹具': stock._register_fixture() elif self.product_id.categ_type == '刀具': stock._register_cutting_tool() return True