Files
test/sf_manufacturing/models/stock.py
2024-06-17 16:16:22 +08:00

823 lines
41 KiB
Python

# -*- coding: utf-8 -*-
import base64
import qrcode
from collections import defaultdict, namedtuple
import logging
import io
import json
from re import split as regex_split
from re import findall as regex_findall
from datetime import datetime
import requests
from odoo import SUPERUSER_ID, _, api, fields, models
from odoo.tools import float_compare
from odoo.addons.stock.models.stock_rule import ProcurementException
from odoo.addons.sf_base.commons.common import Common
from odoo.exceptions import UserError
from io import BytesIO
from odoo.exceptions import ValidationError
class stockWarehouse(models.Model):
_inherit = 'stock.warehouse'
subcontracting_surface_technology_pull_out_id = fields.Many2one(
'stock.rule', '表面工艺规则1')
subcontracting_surface_technology_pull_in_id = fields.Many2one(
'stock.rule', '表面工艺规则2'
)
def _get_global_route_rules_values(self):
rules = super(stockWarehouse, self)._get_global_route_rules_values()
location_virtual_id = self.env.ref(
'sf_manufacturing.stock_location_locations_virtual_outcontract').id,
location_pre_id = self.env['stock.location'].search(
[('barcode', 'ilike', 'WH-PREPRODUCTION')]).id,
rules.update({
'subcontracting_surface_technology_pull_in_id': {
'create_values': {
'action': 'pull',
'picking_type_id': self.env.ref('sf_manufacturing.outcontract_picking_in').id,
'group_propagation_option': 'none',
'company_id': self.company_id.id,
'location_src_id': location_virtual_id,
'location_dest_id': location_pre_id,
'route_id': self._find_global_route('sf_manufacturing.route_surface_technology_outsourcing',
_('表面工艺外协')).id,
}
},
'subcontracting_surface_technology_pull_out_id': {
'create_values': {
'action': 'pull',
'picking_type_id': self.env.ref('sf_manufacturing.outcontract_picking_out').id,
'group_propagation_option': 'none',
'company_id': self.company_id.id,
'location_src_id': location_pre_id,
'location_dest_id': location_virtual_id,
'route_id': self._find_global_route('sf_manufacturing.route_surface_technology_outsourcing',
_('表面工艺外协')).id,
}
}
})
return rules
class StockRule(models.Model):
_inherit = 'stock.rule'
@api.model
def _run_pull(self, procurements):
moves_values_by_company = defaultdict(list)
mtso_products_by_locations = defaultdict(list)
# To handle the `mts_else_mto` procure method, we do a preliminary loop to
# isolate the products we would need to read the forecasted quantity,
# in order to to batch the read. We also make a sanitary check on the
# `location_src_id` field.
# list1 = []
# for item in procurements:
# num = int(item[0].product_qty)
# if num > 1:
# for no in range(1, num+1):
#
# Procurement = namedtuple('Procurement', ['product_id', 'product_qty',
# 'product_uom', 'location_id', 'name', 'origin',
# 'company_id',
# 'values'])
# s = Procurement(product_id=item[0].product_id,product_qty=1.0,product_uom=item[0].product_uom,
# location_id=item[0].location_id,
# name=item[0].name,
# origin=item[0].origin,
# company_id=item[0].company_id,
# values=item[0].values,
# )
# item1 = list(item)
# item1[0]=s
#
# list1.append(tuple(item1))
# else:
# list1.append(item)
for procurement, rule in procurements:
if not rule.location_src_id:
msg = _('No source location defined on stock rule: %s!') % (rule.name,)
raise ProcurementException([(procurement, msg)])
if rule.procure_method == 'mts_else_mto':
mtso_products_by_locations[rule.location_src_id].append(procurement.product_id.id)
# Get the forecasted quantity for the `mts_else_mto` procurement.
forecasted_qties_by_loc = {}
for location, product_ids in mtso_products_by_locations.items():
products = self.env['product.product'].browse(product_ids).with_context(location=location.id)
forecasted_qties_by_loc[location] = {product.id: product.free_qty for product in products}
# Prepare the move values, adapt the `procure_method` if needed.
procurements = sorted(procurements, key=lambda proc: float_compare(proc[0].product_qty, 0.0,
precision_rounding=proc[
0].product_uom.rounding) > 0)
list2 = []
for item in procurements:
num = int(item[0].product_qty)
product = self.env['product.product'].search(
[("id", '=', item[0].product_id.id)])
product_tmpl = self.env['product.template'].search(
["&", ("id", '=', product.product_tmpl_id.id), ('single_manufacturing', "!=", False)])
if product_tmpl:
if num > 1:
for no in range(1, num + 1):
Procurement = namedtuple('Procurement', ['product_id', 'product_qty',
'product_uom', 'location_id', 'name', 'origin',
'company_id',
'values'])
s = Procurement(product_id=item[0].product_id, product_qty=1.0, product_uom=item[0].product_uom,
location_id=item[0].location_id,
name=item[0].name,
origin=item[0].origin,
company_id=item[0].company_id,
values=item[0].values,
)
item1 = list(item)
item1[0] = s
list2.append(tuple(item1))
else:
list2.append(item)
else:
list2.append(item)
for procurement, rule in list2:
procure_method = rule.procure_method
if rule.procure_method == 'mts_else_mto':
qty_needed = procurement.product_uom._compute_quantity(procurement.product_qty,
procurement.product_id.uom_id)
if float_compare(qty_needed, 0, precision_rounding=procurement.product_id.uom_id.rounding) <= 0:
procure_method = 'make_to_order'
for move in procurement.values.get('group_id', self.env['procurement.group']).stock_move_ids:
if move.rule_id == rule and float_compare(move.product_uom_qty, 0,
precision_rounding=move.product_uom.rounding) > 0:
procure_method = move.procure_method
break
forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed
elif float_compare(qty_needed, forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id],
precision_rounding=procurement.product_id.uom_id.rounding) > 0:
procure_method = 'make_to_order'
else:
forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed
procure_method = 'make_to_stock'
move_values = rule._get_stock_move_values(*procurement)
move_values['procure_method'] = procure_method
moves_values_by_company[procurement.company_id.id].append(move_values)
for company_id, moves_values in moves_values_by_company.items():
# create the move as SUPERUSER because the current user may not have the rights to do it (mto product
# launched by a sale for example)
moves = self.env['stock.move'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(moves_values)
# Since action_confirm launch following procurement_group we should activate it.
moves._action_confirm()
return True
@api.model
def _run_manufacture(self, procurements):
productions_values_by_company = defaultdict(list)
errors = []
for procurement, rule in procurements:
if float_compare(procurement.product_qty, 0, precision_rounding=procurement.product_uom.rounding) <= 0:
# If procurement contains negative quantity, don't create a MO that would be for a negative value.
continue
bom = rule._get_matching_bom(procurement.product_id, procurement.company_id, procurement.values)
productions_values_by_company[procurement.company_id.id].append(rule._prepare_mo_vals(*procurement, bom))
if errors:
raise ProcurementException(errors)
for company_id, productions_values in productions_values_by_company.items():
# create the MO as SUPERUSER because the current user may not have the rights to do it
# (mto product launched by a sale for example)
'''创建制造订单'''
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(
productions_values)
# self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
'''
创建工单
'''
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and not
p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production_item in productions:
process_parameter_workorder = self.env['mrp.workorder'].search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_item.id),
('is_subcontract', '=', True)])
if process_parameter_workorder:
is_pick = False
consecutive_workorders = []
m = 0
sorted_workorders = sorted(process_parameter_workorder, key=lambda w: w.id)
for i in range(len(sorted_workorders) - 1):
if m == 0:
is_pick = False
if sorted_workorders[i].supplier_id.id == sorted_workorders[i + 1].supplier_id.id and \
sorted_workorders[i].is_subcontract == sorted_workorders[i + 1].is_subcontract and \
sorted_workorders[i].id == sorted_workorders[i + 1].id - 1:
if sorted_workorders[i] not in consecutive_workorders:
consecutive_workorders.append(sorted_workorders[i])
consecutive_workorders.append(sorted_workorders[i + 1])
m += 1
continue
else:
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders,
production_item)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
# 当前面的连续工序生成对应的外协出入库单再生成当前工序的外协出入库单
if is_pick is False:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i],
production_item)
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders,
production_item)
if sorted_workorders[i] in consecutive_workorders:
is_pick = True
consecutive_workorders = []
m = 0
if m == len(consecutive_workorders) - 1 and m != 0:
self.env['stock.picking'].create_outcontract_picking(consecutive_workorders, production_item)
if is_pick is False and m == 0:
if len(sorted_workorders) == 1:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders, production_item)
else:
self.env['stock.picking'].create_outcontract_picking(sorted_workorders[i], production_item)
for production in productions:
'''
创建制造订单时生成序列号
'''
production.action_generate_serial()
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
'''
创建生产计划
'''
# 工单耗时
workorder_duration = 0
for workorder in production.workorder_ids:
workorder_duration += workorder.duration_expected
sale_order = self.env['sale.order'].sudo().search([('name', '=', production.origin)])
if sale_order:
# sale_order.write({'schedule_status': 'to schedule'})
self.env['sf.production.plan'].sudo().with_company(company_id).create({
'name': production.name,
'order_deadline': sale_order.deadline_of_delivery,
'production_id': production.id,
'date_planned_start': production.date_planned_start,
'origin': production.origin,
'product_qty': production.product_qty,
'product_id': production.product_id.id,
'state': 'draft',
})
return True
class ProductionLot(models.Model):
_name = 'stock.lot'
_inherit = ['stock.lot', 'printing.utils']
rfid = fields.Char('Rfid', readonly=True)
product_specification = fields.Char('规格', compute='_compute_product_specification', store=True)
def search_lot_put_rfid(self):
# 使用SQL将所有刀柄Rfid不满十位的值在前方补零
self.env.cr.execute(
'''UPDATE stock_lot SET rfid = LPAD(rfid, 10, '0') WHERE rfid IS NOT NULL AND LENGTH(rfid) < 10'''
)
self.env.cr.commit()
@api.depends('product_id')
def _compute_product_specification(self):
for stock in self:
if stock:
if stock.product_id:
if stock.product_id.categ_id.name in '刀具':
stock.product_specification = stock.product_id.specification_id.name
elif stock.product_id.categ_id.name in '夹具':
stock.product_specification = stock.product_id.specification_fixture_id.name
else:
stock.product_specification = stock.product_id.default_code
@api.model
def generate_lot_names1(self, display_name, first_lot, count):
"""Generate `lot_names` from a string."""
if first_lot.__contains__(display_name):
first_lot = first_lot[(len(display_name) + 1):]
# We look if the first lot contains at least one digit.
caught_initial_number = regex_findall(r"\d+", first_lot)
if not caught_initial_number:
return self.generate_lot_names1(display_name, first_lot + "0", count)
# We base the series on the last number found in the base lot.
initial_number = caught_initial_number[-1]
padding = len(initial_number)
# We split the lot name to get the prefix and suffix.
splitted = regex_split(initial_number, first_lot)
# initial_number could appear several times, e.g. BAV023B00001S00001
prefix = initial_number.join(splitted[:-1])
suffix = splitted[-1]
initial_number = int(initial_number)
lot_names = []
for i in range(0, count):
lot_names.append('%s-%s%s%s' % (
display_name,
prefix,
str(initial_number + i).zfill(padding),
suffix
))
return lot_names
def get_tool_generate_lot_names1(self, company, product):
"""
采购时生成刀具物料序列号
"""
now = datetime.now().strftime("%Y%m%d")
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id)],
limit=1, order='id DESC')
if product.cutting_tool_model_id:
split_codes = product.cutting_tool_model_id.code.split('-')
if not last_serial:
return "%s-T-%s-%s-%03d" % (split_codes[0], now, product.specification_id.name, 1)
else:
return "%s-T-%s-%s-%03d" % (
split_codes[0], now, product.specification_id.name, int(last_serial.name[-3:]) + 1)
else:
raise ValidationError('该刀具物料产品的型号字段为空,请补充完整!!!')
@api.model
def _get_next_serial(self, company, product):
"""Return the next serial number to be attributed to the product."""
if product.tracking == "serial":
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id)],
limit=1, order='id DESC')
if last_serial:
if product.categ_id.name == '刀具':
return self.env['stock.lot'].get_tool_generate_lot_names1(company, product)
else:
return self.env['stock.lot'].generate_lot_names1(product.name, last_serial.name, 2)[1]
now = datetime.now().strftime("%Y%m%d")
if product.cutting_tool_model_id:
split_codes = product.cutting_tool_model_id.code.split('-')
return "%s-T-%s-%s-%03d" % (split_codes[0], now, product.specification_id.name, 1)
return "%s-%03d" % (product.name, 1)
qr_code_image = fields.Binary(string='二维码', compute='_generate_qr_code')
@api.depends('name')
def _generate_qr_code(self):
for record in self:
# Generate QR code
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(record.name)
qr.make(fit=True)
qr_image = qr.make_image(fill_color="black", back_color="white")
# Encode the image data in base64
image_stream = BytesIO()
qr_image.save(image_stream, format="PNG")
encoded_image = base64.b64encode(image_stream.getvalue())
record.qr_code_image = encoded_image
def print_single_method(self):
self.ensure_one()
qr_code_data = self.qr_code_image
if not qr_code_data:
raise UserError("没有找到二维码数据。")
lot_name = self.name
# host = "192.168.50.110" # 可以根据实际情况修改
# port = 9100 # 可以根据实际情况修改
# 获取默认打印机配置
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
if not printer_config:
raise UserError('请先配置打印机')
host = printer_config.printer_id.ip_address
port = printer_config.printer_id.port
self.print_qr_code(lot_name, host, port)
# self.ensure_one() # 确保这个方法只为一个记录调用
# # if not self.lot_id:
# # raise UserError("没有找到序列号。")
# # 假设_lot_qr_code方法已经生成了二维码并保存在字段中
# qr_code_data = self.qr_code_image
# if not qr_code_data:
# raise UserError("没有找到二维码数据。")
#
# # 生成下载链接或直接触发下载
# # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例
# attachment = self.env['ir.attachment'].sudo().create({
# 'datas': self.qr_code_image,
# 'type': 'binary',
# 'description': '二维码图片',
# 'name': self.name + '.png',
# # 'res_id': invoice.id,
# # 'res_model': 'stock.picking',
# 'public': True,
# 'mimetype': 'application/x-png',
# # 'model_name': 'stock.picking',
# })
# # 返回附件的下载链接
# download_url = '/web/content/%s?download=true' % attachment.id
# base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
# return {
# 'type': 'ir.actions.act_url',
# 'url': str(base_url) + download_url,
# 'target': 'self',
# }
@api.model_create_multi
def create(self, vals_list):
for vals in vals_list:
if vals.get('rfid'):
lots = self.env['stock.lot'].search([('rfid', '=', vals['rfid'])])
if lots:
for lot in lots:
raise ValidationError('Rfid【%s】已被序列号为【%s】的【%s】产品占用!' % (
lot.rfid, lot.name, lot.product_id.name))
records = super(ProductionLot, self).create(vals_list)
return records
class StockPicking(models.Model):
_inherit = 'stock.picking'
# workorder_in_id = fields.One2many('mrp.workorder', 'picking_in_id')
# workorder_out_id = fields.One2many('mrp.workorder', 'picking_out_id')
# 设置外协出入单的名称
def _get_name_Res(self, rescode):
last_picking = self.sudo().search([('name', 'like', rescode)], order='create_date desc,id desc', limit=1)
if not last_picking:
num = "%04d" % 1
else:
logging.info('编号:' + last_picking.name)
m = int(last_picking.name[-3:]) + 1
num = "%04d" % m
return '%s%s' % (rescode, num)
def button_validate(self):
move_out = self.env['stock.move'].search(
[('location_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'WH-PREPRODUCTION')]).id),
('location_dest_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'VL-SPOC')]).id),
('origin', '=', self.origin)])
if self.id == move_out.picking_id.id:
if move_out.move_line_ids.workorder_id.state not in ['progress']:
raise UserError(
_('该出库单里源单据内的单号为%s的工单还未开始,不能进行验证操作!' % move_out.move_line_ids.workorder_id.name))
# 入库单验证
move_in = self.env['stock.move'].search(
[('location_dest_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'WH-PREPRODUCTION')]).id),
('location_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'VL-SPOC')]).id),
('origin', '=', self.origin)])
if self.location_id == move_in.location_id and self.location_dest_id == move_in.location_dest_id:
if move_out.origin == move_in.origin:
if move_out.picking_id.state != 'done':
raise UserError(
_('该入库单对应的单号为%s的出库单还未完成,不能进行验证操作!' % move_out.picking_id.name))
res = super().button_validate()
if res is True:
if self.id == move_out.picking_id.id:
if move_out.move_line_ids.workorder_id.state == 'progress':
move_in = self.env['stock.move'].search(
[('location_dest_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'WH-PREPRODUCTION')]).id),
('location_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'VL-SPOC')]).id),
('origin', '=', self.origin)])
# purchase = self.env['purchase.order'].search([('origin', '=', self.origin)])
if move_in:
move_in.write({'state': 'assigned'})
purchase = self.env['purchase.order'].search([('origin', '=', self.origin)])
self.env['stock.move.line'].create(move_in.get_move_line(purchase, None))
return res
# 创建 外协出库入单
def create_outcontract_picking(self, sorted_workorders_arr, item):
m = 0
for sorted_workorders in sorted_workorders_arr:
pick_ids = []
if m == 0:
outcontract_stock_move = self.env['stock.move'].search(
[('workorder_id', '=', sorted_workorders.id), ('production_id', '=', item.id)])
if not outcontract_stock_move:
new_picking = True
location_id = self.env['stock.location'].search(
[('barcode', 'ilike', 'VL-SPOC')]).id,
location_dest_id = self.env['stock.location'].search(
[('barcode', 'ilike', 'WH-PREPRODUCTION')]).id,
outcontract_picking_type_in = self.env.ref(
'sf_manufacturing.outcontract_picking_in').id,
outcontract_picking_type_out = self.env.ref(
'sf_manufacturing.outcontract_picking_out').id,
moves_out = self.env['stock.move'].sudo().create(
self.env['stock.move']._get_stock_move_values_Res(item, location_dest_id, location_id,
outcontract_picking_type_out))
picking_out = self.create(
moves_out._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCOUT/'))
pick_ids.append(picking_out.id)
moves_out.write(
{'picking_id': picking_out.id, 'state': 'waiting', 'workorder_id': sorted_workorders.id})
moves_out._assign_picking_post_process(new=new_picking)
moves_in = self.env['stock.move'].sudo().create(
self.env['stock.move']._get_stock_move_values_Res(item, location_id, location_dest_id,
outcontract_picking_type_in))
picking_in = self.create(
moves_in._get_new_picking_values_Res(item, sorted_workorders, 'WH/OCIN/'))
pick_ids.append(picking_in.id)
moves_in.write(
{'picking_id': picking_in.id, 'state': 'waiting', 'workorder_id': sorted_workorders.id})
moves_in._assign_picking_post_process(new=new_picking)
m += 1
sorted_workorders.write({'picking_ids': [(6, 0, pick_ids)]})
class ReStockMove(models.Model):
_inherit = 'stock.move'
materiel_length = fields.Float(string='物料长度', digits=(16, 4))
materiel_width = fields.Float(string='物料宽度', digits=(16, 4))
materiel_height = fields.Float(string='物料高度', digits=(16, 4))
def _get_stock_move_values_Res(self, item, location_src_id, location_dest_id, picking_type_id):
route = self.env['stock.route'].sudo().search([('name', '=', '表面工艺外协')])
move_values = {
'name': '',
'company_id': item.company_id.id,
'product_id': item.bom_id.bom_line_ids.product_id.id,
'product_uom': item.bom_id.bom_line_ids.product_uom_id.id,
'product_uom_qty': 1.0,
'location_id': location_src_id,
'location_dest_id': location_dest_id,
'origin': item.name,
# 'route_ids': False if not route else [(4, route.id)],
'date_deadline': datetime.now(),
'picking_type_id': picking_type_id,
}
return move_values
def _get_new_picking_values_Res(self, item, sorted_workorders, rescode):
return {
'name': self.env['stock.picking']._get_name_Res(rescode),
'origin': item.name,
'company_id': self.mapped('company_id').id,
'user_id': False,
'move_type': self.mapped('group_id').move_type or 'direct',
'partner_id': sorted_workorders.supplier_id.id,
'picking_type_id': self.mapped('picking_type_id').id,
'location_id': self.mapped('location_id').id,
'location_dest_id': self.mapped('location_dest_id').id,
'state': 'confirmed',
}
def get_move_line(self, purchase, sorted_workorders):
return {
'move_id': self.id,
'product_id': self.product_id.id,
'product_uom_id': self.product_uom.id,
'location_id': self.picking_id.location_id.id,
'location_dest_id': self.picking_id.location_dest_id.id,
'picking_id': self.picking_id.id,
'reserved_uom_qty': 1.0,
'lot_id': purchase.picking_ids.move_line_ids.lot_id.id,
'company_id': self.company_id.id,
# 'workorder_id': '' if not sorted_workorders else sorted_workorders.id,
# 'production_id': '' if not sorted_workorders else sorted_workorders.production_id.id,
'state': 'assigned',
}
def print_serial_numbers(self):
if not self.next_serial:
raise UserError(_("请先分配序列号再进行打印"))
label_data = []
for item in self.move_line_ids:
label_data.append({
'item_id': item.id,
})
if label_data:
report_template = self.env.ref('stock.label_package_template')
res = report_template.report_action(label_data)
res['id'] = report_template.id
return res
else:
raise UserError(_("没有可打印的标签数据"))
def action_show_details(self):
""" Returns an action that will open a form view (in a popup) allowing to work on all the
move lines of a particular move. This form view is used when "show operations" is not
checked on the picking type.
"""
self.ensure_one()
# If "show suggestions" is not checked on the picking type, we have to filter out the
# reserved move lines. We do this by displaying `move_line_nosuggest_ids`. We use
# different views to display one field or another so that the webclient doesn't have to
# fetch both.
if self.picking_type_id.show_reserved:
view = self.env.ref('stock.view_stock_move_operations')
else:
view = self.env.ref('stock.view_stock_move_nosuggest_operations')
if self.state == "assigned":
if self.product_id.tracking == "serial":
if self.product_id.categ_id.name == '刀具':
self.next_serial = self._get_tool_next_serial(self.company_id, self.product_id, self.origin)
else:
self.next_serial = self.env['stock.lot']._get_next_serial(self.company_id, self.product_id)
elif self.product_id.tracking == "lot":
self._put_tool_lot(self.company_id, self.product_id, self.origin)
return {
'name': _('Detailed Operations'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'stock.move',
'views': [(view.id, 'form')],
'view_id': view.id,
'target': 'new',
'res_id': self.id,
'context': dict(
self.env.context,
show_owner=self.picking_type_id.code != 'incoming',
show_lots_m2o=self.has_tracking != 'none' and (
self.picking_type_id.use_existing_lots or self.state == 'done' or self.origin_returned_move_id.id),
# able to create lots, whatever the value of ` use_create_lots`.
show_lots_text=self.has_tracking != 'none' and self.picking_type_id.use_create_lots and not self.picking_type_id.use_existing_lots and self.state != 'done' and not self.origin_returned_move_id.id,
show_source_location=self.picking_type_id.code != 'incoming',
show_destination_location=self.picking_type_id.code != 'outgoing',
show_package=not self.location_id.usage == 'supplier',
show_reserved_quantity=self.state != 'done' and not self.picking_id.immediate_transfer and self.picking_type_id.code != 'incoming'
),
}
def _put_tool_lot(self, company, product, origin):
if product.tracking == "lot" and self.product_id.categ_id.name == '刀具':
if not self.move_line_nosuggest_ids:
lot_names = self.env['stock.lot'].generate_lot_names(
'%s-%s-%s' % ('%s-T-DJWL-%s' % (
product.cutting_tool_model_id.code.split('-')[0], product.cutting_tool_material_id.code),
datetime.now().strftime("%Y%m%d"), origin), 1)
move_lines_commands = self._generate_serial_move_line_commands_tool_lot(lot_names)
self.write({'move_line_nosuggest_ids': move_lines_commands})
for item in self.move_line_nosuggest_ids:
if item.lot_name:
item.lot_qr_code = self.compute_lot_qr_code(item.lot_name)
def compute_lot_qr_code(self, lot_name):
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(lot_name)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
binary_data = buffer.getvalue()
data = base64.b64encode(binary_data).decode() # 确保返回的是字符串形式的数据
return data
def _get_tool_next_serial(self, company, product, origin):
"""Return the next serial number to be attributed to the product."""
if product.tracking == "serial":
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id), ('name', 'ilike', origin)],
limit=1, order='id DESC')
split_codes = product.cutting_tool_model_id.code.split('-')
if last_serial:
return "%s-T-%s-%s-%03d" % (
split_codes[0], origin, product.specification_id.name, int(last_serial.name[-3:]) + 1)
else:
return "%s-T-%s-%s-%03d" % (split_codes[0], origin, product.specification_id.name, 1)
def _generate_serial_move_line_commands_tool_lot(self, lot_names, origin_move_line=None):
"""Return a list of commands to update the move lines (write on
existing ones or create new ones).
Called when user want to create and assign multiple serial numbers in
one time (using the button/wizard or copy-paste a list in the field).
:param lot_names: A list containing all serial number to assign.
:type lot_names: list
:param origin_move_line: A move line to duplicate the value from, default to None
:type origin_move_line: record of :class:`stock.move.line`
:return: A list of commands to create/update :class:`stock.move.line`
:rtype: list
"""
self.ensure_one()
# Select the right move lines depending of the picking type configuration.
move_lines = self.env['stock.move.line']
if self.picking_type_id.show_reserved:
move_lines = self.move_line_ids.filtered(lambda ml: not ml.lot_id and not ml.lot_name)
else:
move_lines = self.move_line_nosuggest_ids.filtered(lambda ml: not ml.lot_id and not ml.lot_name)
loc_dest = origin_move_line and origin_move_line.location_dest_id
move_line_vals = {
'picking_id': self.picking_id.id,
'location_id': self.location_id.id,
'product_id': self.product_id.id,
'product_uom_id': self.product_id.uom_id.id,
'qty_done': self.product_uom_qty,
}
if origin_move_line:
# `owner_id` and `package_id` are taken only in the case we create
# new move lines from an existing move line. Also, updates the
# `qty_done` because it could be usefull for products tracked by lot.
move_line_vals.update({
'owner_id': origin_move_line.owner_id.id,
'package_id': origin_move_line.package_id.id,
'qty_done': origin_move_line.qty_done or 1,
})
move_lines_commands = []
qty_by_location = defaultdict(float)
for lot_name in lot_names:
# We write the lot name on an existing move line (if we have still one)...
if move_lines:
move_lines_commands.append((1, move_lines[0].id, {
'lot_name': lot_name,
'qty_done': 1,
}))
qty_by_location[move_lines[0].location_dest_id.id] += 1
move_lines = move_lines[1:]
# ... or create a new move line with the serial name.
else:
loc = loc_dest or self.location_dest_id._get_putaway_strategy(self.product_id, quantity=1,
packaging=self.product_packaging_id,
additional_qty=qty_by_location)
move_line_cmd = dict(move_line_vals, lot_name=lot_name, location_dest_id=loc.id)
move_lines_commands.append((0, 0, move_line_cmd))
qty_by_location[loc.id] += 1
return move_lines_commands
class ReStockQuant(models.Model):
_inherit = 'stock.quant'
# def action_apply_inventory(self):
# inventory_diff_quantity = self.inventory_diff_quantity
# super(ReStockQuant, self).action_apply_inventory()
# if inventory_diff_quantity >= 1:
# stock = self.env['stock.move'].search([('product_id', '=', self.product_id.id), ('is_inventory', '=', True),
# ('reference', '=', '更新的产品数量'), ('state', '=', 'done')],
# limit=1, order='id desc')
# if self.product_id.categ_type == '夹具':
# stock._register_fixture()
# elif self.product_id.categ_type == '刀具':
# stock._register_cutting_tool()
# return True