Files
test/sf_manufacturing/models/stock.py

481 lines
23 KiB
Python

# -*- coding: utf-8 -*-
import base64
import qrcode
from collections import defaultdict, namedtuple
import logging
import json
from re import split as regex_split
from re import findall as regex_findall
from datetime import datetime
import requests
from odoo import SUPERUSER_ID, _, api, fields, models
from odoo.tools import float_compare
from odoo.addons.stock.models.stock_rule import ProcurementException
from odoo.addons.sf_base.commons.common import Common
from odoo.exceptions import UserError
from io import BytesIO
from odoo.exceptions import ValidationError
class StockRule(models.Model):
_inherit = 'stock.rule'
@api.model
def _run_pull(self, procurements):
moves_values_by_company = defaultdict(list)
mtso_products_by_locations = defaultdict(list)
# To handle the `mts_else_mto` procure method, we do a preliminary loop to
# isolate the products we would need to read the forecasted quantity,
# in order to to batch the read. We also make a sanitary check on the
# `location_src_id` field.
# list1 = []
# for item in procurements:
# num = int(item[0].product_qty)
# if num > 1:
# for no in range(1, num+1):
#
# Procurement = namedtuple('Procurement', ['product_id', 'product_qty',
# 'product_uom', 'location_id', 'name', 'origin',
# 'company_id',
# 'values'])
# s = Procurement(product_id=item[0].product_id,product_qty=1.0,product_uom=item[0].product_uom,
# location_id=item[0].location_id,
# name=item[0].name,
# origin=item[0].origin,
# company_id=item[0].company_id,
# values=item[0].values,
# )
# item1 = list(item)
# item1[0]=s
#
# list1.append(tuple(item1))
# else:
# list1.append(item)
for procurement, rule in procurements:
if not rule.location_src_id:
msg = _('No source location defined on stock rule: %s!') % (rule.name,)
raise ProcurementException([(procurement, msg)])
if rule.procure_method == 'mts_else_mto':
mtso_products_by_locations[rule.location_src_id].append(procurement.product_id.id)
# Get the forecasted quantity for the `mts_else_mto` procurement.
forecasted_qties_by_loc = {}
for location, product_ids in mtso_products_by_locations.items():
products = self.env['product.product'].browse(product_ids).with_context(location=location.id)
forecasted_qties_by_loc[location] = {product.id: product.free_qty for product in products}
# Prepare the move values, adapt the `procure_method` if needed.
procurements = sorted(procurements, key=lambda proc: float_compare(proc[0].product_qty, 0.0,
precision_rounding=proc[
0].product_uom.rounding) > 0)
list2 = []
for item in procurements:
num = int(item[0].product_qty)
product = self.env['product.product'].search(
[("id", '=', item[0].product_id.id)])
product_tmpl = self.env['product.template'].search(
["&", ("id", '=', product.product_tmpl_id.id), ('single_manufacturing', "!=", False)])
if product_tmpl:
if num > 1:
for no in range(1, num + 1):
Procurement = namedtuple('Procurement', ['product_id', 'product_qty',
'product_uom', 'location_id', 'name', 'origin',
'company_id',
'values'])
s = Procurement(product_id=item[0].product_id, product_qty=1.0, product_uom=item[0].product_uom,
location_id=item[0].location_id,
name=item[0].name,
origin=item[0].origin,
company_id=item[0].company_id,
values=item[0].values,
)
item1 = list(item)
item1[0] = s
list2.append(tuple(item1))
else:
list2.append(item)
else:
list2.append(item)
for procurement, rule in list2:
procure_method = rule.procure_method
if rule.procure_method == 'mts_else_mto':
qty_needed = procurement.product_uom._compute_quantity(procurement.product_qty,
procurement.product_id.uom_id)
if float_compare(qty_needed, 0, precision_rounding=procurement.product_id.uom_id.rounding) <= 0:
procure_method = 'make_to_order'
for move in procurement.values.get('group_id', self.env['procurement.group']).stock_move_ids:
if move.rule_id == rule and float_compare(move.product_uom_qty, 0,
precision_rounding=move.product_uom.rounding) > 0:
procure_method = move.procure_method
break
forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed
elif float_compare(qty_needed, forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id],
precision_rounding=procurement.product_id.uom_id.rounding) > 0:
procure_method = 'make_to_order'
else:
forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed
procure_method = 'make_to_stock'
move_values = rule._get_stock_move_values(*procurement)
move_values['procure_method'] = procure_method
moves_values_by_company[procurement.company_id.id].append(move_values)
for company_id, moves_values in moves_values_by_company.items():
# create the move as SUPERUSER because the current user may not have the rights to do it (mto product
# launched by a sale for example)
moves = self.env['stock.move'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(moves_values)
# Since action_confirm launch following procurement_group we should activate it.
moves._action_confirm()
return True
@api.model
def _run_manufacture(self, procurements):
productions_values_by_company = defaultdict(list)
errors = []
for procurement, rule in procurements:
if float_compare(procurement.product_qty, 0, precision_rounding=procurement.product_uom.rounding) <= 0:
# If procurement contains negative quantity, don't create a MO that would be for a negative value.
continue
bom = rule._get_matching_bom(procurement.product_id, procurement.company_id, procurement.values)
productions_values_by_company[procurement.company_id.id].append(rule._prepare_mo_vals(*procurement, bom))
if errors:
raise ProcurementException(errors)
for company_id, productions_values in productions_values_by_company.items():
# create the MO as SUPERUSER because the current user may not have the rights to do it
# (mto product launched by a sale for example)
'''创建制造订单'''
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(
productions_values)
self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
'''
创建工单
'''
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and not
p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production in productions:
'''
创建制造订单时生成序列号
'''
production.action_generate_serial()
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
'''
创建生产计划
'''
# 工单耗时
workorder_duration = 0
for workorder in production.workorder_ids:
workorder_duration += workorder.duration_expected
sale_order = self.env['sale.order'].sudo().search([('name', '=', production.origin)])
if sale_order:
sale_order.write({'schedule_status': 'to schedule'})
self.env['sf.production.plan'].sudo().with_company(company_id).create({
'name': production.name,
'order_deadline': sale_order.deadline_of_delivery,
'production_id': production.id,
'date_planned_start': production.date_planned_start,
'origin': production.origin,
'product_qty': production.product_qty,
'product_id': production.product_id.id,
'state': 'draft',
})
return True
class ProductionLot(models.Model):
_inherit = 'stock.lot'
@api.model
def generate_lot_names1(self, display_name, first_lot, count):
"""Generate `lot_names` from a string."""
if first_lot.__contains__(display_name):
first_lot = first_lot[(len(display_name) + 1):]
# We look if the first lot contains at least one digit.
caught_initial_number = regex_findall(r"\d+", first_lot)
if not caught_initial_number:
return self.generate_lot_names1(display_name, first_lot + "0", count)
# We base the series on the last number found in the base lot.
initial_number = caught_initial_number[-1]
padding = len(initial_number)
# We split the lot name to get the prefix and suffix.
splitted = regex_split(initial_number, first_lot)
# initial_number could appear several times, e.g. BAV023B00001S00001
prefix = initial_number.join(splitted[:-1])
suffix = splitted[-1]
initial_number = int(initial_number)
lot_names = []
for i in range(0, count):
lot_names.append('%s-%s%s%s' % (
display_name,
prefix,
str(initial_number + i).zfill(padding),
suffix
))
return lot_names
def get_tool_generate_lot_names1(self, company, product):
"""
采购时生成刀具物料序列号
"""
now = datetime.now().strftime("%Y%m%d")
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id), ('name', 'like', now)],
limit=1, order='id DESC')
if product.cutting_tool_model_id:
if not last_serial:
return "%s-%s%03d" % (product.cutting_tool_model_id.code[:-12], now, 1)
else:
return "%s-%s%03d" % (product.cutting_tool_model_id.code[:-12], now, int(last_serial.name[-3:]) + 1)
else:
raise ValidationError('该刀具物料产品的型号字段为空,请补充完整!!!')
@api.model
def _get_next_serial(self, company, product):
"""Return the next serial number to be attributed to the product."""
if product.tracking == "serial":
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id)],
limit=1, order='id DESC')
if last_serial:
if product.categ_id.name == '刀具':
return self.env['stock.lot'].get_tool_generate_lot_names1(company, product)
else:
return self.env['stock.lot'].generate_lot_names1(product.name, last_serial.name, 2)[1]
now = datetime.now().strftime("%Y%m%d")
if product.cutting_tool_model_id:
return "%s-%s%03d" % (product.cutting_tool_model_id.code[:-12], now, 1)
return "%s-%03d" % (product.name, 1)
qr_code_image = fields.Binary(string='二维码', compute='_generate_qr_code')
@api.depends('name')
def _generate_qr_code(self):
for record in self:
# Generate QR code
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(record.name)
qr.make(fit=True)
qr_image = qr.make_image(fill_color="black", back_color="white")
# Encode the image data in base64
image_stream = BytesIO()
qr_image.save(image_stream, format="PNG")
encoded_image = base64.b64encode(image_stream.getvalue())
record.qr_code_image = encoded_image
def print_qr_code(self):
self.ensure_one() # 确保这个方法只为一个记录调用
# if not self.lot_id:
# raise UserError("没有找到序列号。")
# 假设_lot_qr_code方法已经生成了二维码并保存在字段中
qr_code_data = self.qr_code_image
if not qr_code_data:
raise UserError("没有找到二维码数据。")
# 生成下载链接或直接触发下载
# 此处的实现依赖于你的具体需求,以下是触发下载的一种示例
attachment = self.env['ir.attachment'].sudo().create({
'datas': self.qr_code_image,
'type': 'binary',
'description': '二维码图片',
'name': self.name + '.png',
# 'res_id': invoice.id,
# 'res_model': 'stock.picking',
'public': True,
'mimetype': 'application/x-png',
# 'model_name': 'stock.picking',
})
# 返回附件的下载链接
download_url = '/web/content/%s?download=true' % attachment.id
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
return {
'type': 'ir.actions.act_url',
'url': str(base_url) + download_url,
'target': 'self',
}
class StockPicking(models.Model):
_inherit = 'stock.picking'
workorder_in_id = fields.One2many('mrp.workorder', 'picking_in_id')
workorder_out_id = fields.One2many('mrp.workorder', 'picking_out_id')
# 设置外协出入单的名称
def _get_name_Res(self, rescode):
last_picking = self.sudo().search([('name', 'like', rescode)], order='create_date DESC', limit=1)
logging.info('编号:' + last_picking.name)
if not last_picking:
num = "%04d" % 1
else:
m = int(last_picking.name[-3:]) + 1
num = "%04d" % m
return '%s%s' % (rescode, num)
def button_validate(self):
# 出库单验证
if self.workorder_out_id:
workorder_in = self.workorder_out_id.filtered(lambda p: p.state == 'progress' and p.is_subcontract is True)
if workorder_in:
picking_in = self.sudo().search([('id', '=', workorder_in.picking_in_id.id)])
if picking_in:
picking_in.write({'state': 'assigned'})
else:
workorder_subcontract = self.workorder_out_id.filtered(
lambda p: p.state == 'pending' and p.is_subcontract is True)
if workorder_subcontract:
raise UserError(
_('该出库单里源单据内的单号为%s的工单还未开始,不能进行验证操作!' % workorder_subcontract[
0].name))
# 入库单验证
if self.workorder_in_id:
workorder_out = self.workorder_in_id.filtered(lambda p: p.state == 'progress' and p.is_subcontract is True)
if workorder_out:
picking_out = self.sudo().search([('id', '=', workorder_out.picking_out_id.id)])
if picking_out.state != 'done':
raise UserError(
_('该入库单对应的单号为%s的出库单还未完成,不能进行验证操作!' % picking_out.name))
res = super().button_validate()
# 采购单验证(夹具)
# for item in self.move_ids_without_package:
# if item.quantity_done > 0:
# if item.product_id.categ_type == '夹具':
# item._register_fixture()
# elif item.product_id.categ_type == '刀具':
# item._register_cutting_tool()
return res
# 创建 外协出库入单
def create_outcontract_picking(self, sorted_workorders_arr, item):
m = 0
for sorted_workorders in sorted_workorders_arr:
if m == 0:
outcontract_stock_move = self.env['stock.move'].search(
[('workorder_id', '=', sorted_workorders.id), ('production_id', '=', item.id)])
if not outcontract_stock_move:
location_id = self.env.ref(
'sf_manufacturing.stock_location_locations_virtual_outcontract').id,
location_dest_id = self.env['stock.location'].search(
[('barcode', '=', 'WH-PREPRODUCTION')]).id,
outcontract_picking_type_in = self.env.ref(
'sf_manufacturing.outcontract_picking_in').id,
outcontract_picking_type_out = self.env.ref(
'sf_manufacturing.outcontract_picking_out').id,
moves_in = self.env['stock.move'].sudo().create(
item._get_stock_move_values_Res(item, location_id, location_dest_id,
outcontract_picking_type_in))
moves_out = self.env['stock.move'].sudo().create(
item._get_stock_move_values_Res(item, location_dest_id, location_id,
outcontract_picking_type_out))
new_picking = True
picking_in = self.create(
moves_in._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCIN/'))
picking_out = self.create(
moves_out._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCOUT/'))
moves_in.write({'picking_id': picking_in.id, 'state': 'confirmed'})
moves_out.write({'picking_id': picking_out.id, 'state': 'confirmed'})
moves_in._assign_picking_post_process(new=new_picking)
moves_out._assign_picking_post_process(new=new_picking)
m += 1
sorted_workorders.write({'picking_in_id': picking_in.id, 'picking_out_id': picking_out.id})
class ReStockMove(models.Model):
_inherit = 'stock.move'
materiel_length = fields.Float(string='物料长度', digits=(16, 4))
materiel_width = fields.Float(string='物料宽度', digits=(16, 4))
materiel_height = fields.Float(string='物料高度', digits=(16, 4))
def _get_new_picking_values_Res(self, item, sorted_workorders, rescode):
return {
'name': self.env['stock.picking']._get_name_Res(rescode),
'origin': item.name,
'company_id': self.mapped('company_id').id,
'user_id': False,
'move_type': self.mapped('group_id').move_type or 'direct',
'partner_id': sorted_workorders.supplier_id.id,
'picking_type_id': self.mapped('picking_type_id').id,
'location_id': self.mapped('location_id').id,
'location_dest_id': self.mapped('location_dest_id').id,
'state': 'confirmed',
}
def print_serial_numbers(self):
if not self.next_serial:
raise UserError(_("请先分配序列号再进行打印"))
label_data = []
for item in self.move_line_ids:
label_data.append({
'item_id': item.id,
})
if label_data:
report_template = self.env.ref('stock.label_package_template')
res = report_template.report_action(label_data)
res['id'] = report_template.id
return res
else:
raise UserError(_("没有可打印的标签数据"))
class ReStockQuant(models.Model):
_inherit = 'stock.quant'
def action_apply_inventory(self):
inventory_diff_quantity = self.inventory_diff_quantity
super(ReStockQuant, self).action_apply_inventory()
if inventory_diff_quantity >= 1:
stock = self.env['stock.move'].search([('product_id', '=', self.product_id.id), ('is_inventory', '=', True),
('reference', '=', '更新的产品数量'), ('state', '=', 'done')],
limit=1, order='id desc')
if self.product_id.categ_type == '夹具':
stock._register_fixture()
elif self.product_id.categ_type == '刀具':
stock._register_cutting_tool()
return True