1197 lines
52 KiB
Python
1197 lines
52 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
|
||
import datetime
|
||
import logging
|
||
import base64
|
||
import qrcode
|
||
import io
|
||
|
||
import requests
|
||
|
||
from odoo import api, fields, models, _
|
||
from odoo.osv import expression
|
||
from odoo.exceptions import UserError, ValidationError
|
||
|
||
|
||
class SfLocation(models.Model):
|
||
_inherit = 'stock.location'
|
||
|
||
# 重写字段定义
|
||
name = fields.Char('Location Name', required=True, size=20)
|
||
barcode = fields.Char('Barcode', copy=False, size=15)
|
||
|
||
# 仓库类别(selection:库区、库位、货位)
|
||
# location_type = fields.Selection([
|
||
# ('库区', '库区'),
|
||
# ('货架', '货架'),
|
||
# ('货位', '货位')
|
||
# ], string='存储类型')
|
||
location_type = fields.Selection([
|
||
('库区', '库区')
|
||
], string='存储类型')
|
||
# 库区类型(selection:拣货区、存货区、收货区、退货区、次品区)
|
||
area_type = fields.Selection([
|
||
('拣货区', '拣货区'),
|
||
('存货区', '存货区'),
|
||
('收货区', '收货区'),
|
||
('退货区', '退货区'),
|
||
('次品区', '次品区')
|
||
], string='库区类型')
|
||
# 当前位置
|
||
current_location_id = fields.Many2one('sf.shelf.location', string='当前位置')
|
||
# 目的位置
|
||
destination_location_id = fields.Many2one('sf.shelf.location', string='目的位置')
|
||
# 存储类型(selection:库区、货架)
|
||
# storage_type = fields.Selection([
|
||
# ('库区', '库区'),
|
||
# ('货架', '货架')
|
||
# ], string='存储类型')
|
||
# 产品类别 (关联:product.category)
|
||
product_type = fields.Many2many('product.category', string='产品类别')
|
||
# 货架独有字段:通道、方向、货架高度(m)、货架层数、层数容量
|
||
channel = fields.Char(string='通道')
|
||
direction = fields.Selection([
|
||
('R', 'R'),
|
||
('L', 'L')
|
||
], string='方向')
|
||
shelf_height = fields.Float(string='货架高度(m)')
|
||
shelf_layer = fields.Integer(string='货架层数')
|
||
layer_capacity = fields.Integer(string='层数容量')
|
||
|
||
# 货位独有字段:货位状态、产品(关联产品对象)、产品序列号(关联产品序列号对象)
|
||
location_status = fields.Selection([
|
||
('空闲', '空闲'),
|
||
('占用', '占用'),
|
||
('禁用', '禁用')
|
||
], string='货位状态', default='空闲')
|
||
# product_id = fields.Many2one('product.template', string='产品')
|
||
product_id = fields.Many2one('product.product', string='产品', compute='_compute_product_id', readonly=True)
|
||
product_sn_id = fields.Many2one('stock.lot', string='产品序列号')
|
||
# time_test = fields.Char(string='time')
|
||
# 添加SQL约束
|
||
# _sql_constraints = [
|
||
# ('name_uniq', 'unique(name)', '位置名称必须唯一!'),
|
||
# ]
|
||
|
||
hide_location_type = fields.Boolean(compute='_compute_hide_what', string='隐藏仓库')
|
||
hide_area = fields.Boolean(compute='_compute_hide_what', string='隐藏库区')
|
||
hide_shelf = fields.Boolean(compute='_compute_hide_what', string='隐藏货架')
|
||
hide_location = fields.Boolean(compute='_compute_hide_what', string='隐藏货位')
|
||
|
||
# @api.model
|
||
# def create(self, vals):
|
||
# """
|
||
# 重写create方法,添加自定义的约束
|
||
# """
|
||
# print('create', vals)
|
||
# if vals.get('location_id'):
|
||
# location = self.env['stock.location'].browse(vals.get('location_id'))
|
||
# if location.storage_type == '库区':
|
||
# raise UserError('库区不能作为父级仓库')
|
||
# return super().create(vals)
|
||
#
|
||
# @api.onchange('location_id')
|
||
# def _onchange_location_id(self):
|
||
# """
|
||
# 重写onchange方法,添加自定义的约束
|
||
# """
|
||
# if self.location_id:
|
||
# if self.location_id.storage_type == '库区':
|
||
# raise UserError('库区不能作为父级仓库')
|
||
|
||
# @api.constrains('shelf_height')
|
||
# def _check_shelf_height(self):
|
||
# for record in self:
|
||
# if not (0 <= record.shelf_height < 1000): # 限制字段值在0到999之间
|
||
# raise UserError('shelf_height的值必须在0到1000之间')
|
||
#
|
||
# @api.constrains('shelf_layer')
|
||
# def _check_shelf_layer(self):
|
||
# for record in self:
|
||
# if not (0 < record.shelf_layer < 1000):
|
||
# raise UserError('shelf_layer的值必须在0到999之间,且不能为0')
|
||
#
|
||
# @api.constrains('layer_capacity')
|
||
# def _check_layer_capacity(self):
|
||
# for record in self:
|
||
# if not (0 <= record.layer_capacity < 1000):
|
||
# raise UserError('layer_capacity的值必须在0到999之间,且不能为0')
|
||
|
||
@api.depends('product_sn_id')
|
||
def _compute_product_id(self):
|
||
"""
|
||
根据产品序列号,获取产品
|
||
"""
|
||
for record in self:
|
||
if record.product_sn_id:
|
||
record.product_id = record.product_sn_id.product_id
|
||
# record.location_status = '占用'
|
||
else:
|
||
record.product_id = False
|
||
# record.location_status = '空闲'
|
||
|
||
@api.depends('location_type')
|
||
def _compute_hide_what(self):
|
||
"""
|
||
根据仓库类别,隐藏不需要的字段
|
||
:return:
|
||
"""
|
||
for record in self:
|
||
record.hide_location_type = False
|
||
record.hide_area = False
|
||
record.hide_shelf = False
|
||
record.hide_location = False
|
||
if record.location_type and record.location_type == '仓库':
|
||
record.hide_location_type = True
|
||
elif record.location_type and record.location_type == '库区':
|
||
record.hide_area = True
|
||
elif record.location_type and record.location_type == '货架':
|
||
record.hide_shelf = True
|
||
elif record.location_type and record.location_type == '货位':
|
||
record.hide_location = True
|
||
else:
|
||
pass
|
||
|
||
# # 添加Python约束
|
||
# @api.constrains('name', 'barcode')
|
||
# def _check_len(self):
|
||
# for rec in self:
|
||
# if len(rec.name) > 20:
|
||
# raise ValidationError("Location Name length must be less equal than 20!")
|
||
# if len(rec.barcode) > 15:
|
||
# raise ValidationError("Barcode length must be less equal than 15!")
|
||
|
||
# @api.model
|
||
# def default_get(self, fields):
|
||
# print('fields:', fields)
|
||
# res = super(SfLocation, self).default_get(fields)
|
||
# print('res:', res)
|
||
# if 'barcode' in fields and 'barcode' not in res:
|
||
# # 这里是你生成barcode的代码
|
||
# pass
|
||
# # res['barcode'] = self.generate_barcode() # 假设你有一个方法generate_barcode来生成barcode
|
||
# return res
|
||
# @api.model
|
||
# def create(self, vals):
|
||
# """
|
||
# 重写create方法,当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量
|
||
# """
|
||
# res = super(SfLocation, self).create(vals)
|
||
# if res.location_type == '货架':
|
||
# for i in range(res.shelf_layer):
|
||
# for j in range(res.layer_capacity):
|
||
# self.create({
|
||
# 'name': res.name + '-' + str(i+1) + '-' + str(j+1),
|
||
# 'location_id': res.id,
|
||
# 'location_type': '货位',
|
||
# 'barcode': self.generate_barcode(res, i, j),
|
||
# 'location_status': '空闲'
|
||
# })
|
||
# return res
|
||
|
||
# 生成货位
|
||
|
||
def create_location(self):
|
||
"""
|
||
当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量
|
||
"""
|
||
pass
|
||
# if self.location_type == '货架':
|
||
# for i in range(self.shelf_layer):
|
||
# for j in range(self.layer_capacity):
|
||
# self.create({
|
||
# 'name': self.name + '-' + str(i + 1) + '层' + '-' + str(j + 1) + '位置',
|
||
# 'location_id': self.id,
|
||
# 'location_type': '货位',
|
||
# 'barcode': self.generate_barcode(i, j),
|
||
# 'location_status': '空闲'
|
||
# })
|
||
|
||
def generate_barcode(self, i, j):
|
||
"""
|
||
生成货位条码
|
||
"""
|
||
pass
|
||
# # 这里是你生成barcode的代码
|
||
# # area_type_barcode = self.location_id.barcode
|
||
# area_type_barcode = self.barcode
|
||
# i_str = str(i + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0
|
||
# j_str = str(j + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0
|
||
# return area_type_barcode + self.channel + self.direction + '-' + self.barcode + '-' + i_str + '-' + j_str
|
||
|
||
|
||
class SfShelf(models.Model):
|
||
_name = 'sf.shelf'
|
||
_inherit = ['printing.utils']
|
||
_description = '货架'
|
||
_order = 'create_date desc'
|
||
|
||
name = fields.Char('货架名称', required=True, size=20)
|
||
active = fields.Boolean("有效", default=True)
|
||
barcode = fields.Char('编码', copy=False, size=15, required=True)
|
||
|
||
# 货位
|
||
location_ids = fields.One2many('sf.shelf.location', 'shelf_id', string='货位')
|
||
|
||
check_state = fields.Selection([
|
||
('enable', '启用'),
|
||
('close', '关闭')
|
||
], string='审核状态', default='close')
|
||
|
||
def action_check(self):
|
||
self.check_state = 'enable'
|
||
|
||
# 绑定库区
|
||
shelf_location_id = fields.Many2one('stock.location', string='所属库区')
|
||
|
||
# 货架独有字段:通道、方向、货架高度(m)、货架层数、层数容量
|
||
channel = fields.Char(string='通道', required=True, size=10)
|
||
direction = fields.Selection([
|
||
('R', 'R'),
|
||
('L', 'L')
|
||
], string='方向', required=True)
|
||
shelf_height = fields.Float(string='货架高度(m)')
|
||
shelf_layer = fields.Integer(string='货架层数')
|
||
layer_capacity = fields.Integer(string='层数容量')
|
||
|
||
# 是否有货位
|
||
is_there_area = fields.Boolean(string='是否有货位', compute='_compute_is_there_area', default=False, store=True)
|
||
|
||
@api.depends('location_ids')
|
||
def _compute_is_there_area(self):
|
||
for record in self:
|
||
record.is_there_area = bool(record.location_ids)
|
||
|
||
@api.onchange('shelf_location_id')
|
||
def _onchange_shelf_location_id(self):
|
||
"""
|
||
根据货架的所属库区修改货位的所属库区
|
||
"""
|
||
if self.name:
|
||
all_location = self.env['sf.shelf.location'].search([('name', 'ilike', self.name)])
|
||
for record in self:
|
||
for location in all_location:
|
||
location.location_id = record.shelf_location_id.id
|
||
|
||
def create_location(self):
|
||
"""
|
||
当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量
|
||
"""
|
||
area_obj = self.env['sf.shelf.location']
|
||
for i in range(self.shelf_layer):
|
||
for j in range(self.layer_capacity):
|
||
location_name = self.name + '-' + str(i + 1) + '层' + '-' + str(j + 1) + '位置'
|
||
# 检查是否已经有同名的位置存在
|
||
existing_location = area_obj.search([('name', '=', location_name)])
|
||
if not existing_location:
|
||
area_obj.create({
|
||
'name': location_name,
|
||
'location_id': self.shelf_location_id.id,
|
||
'barcode': self.generate_barcode(i, j),
|
||
'location_status': '空闲',
|
||
'shelf_id': self.id
|
||
})
|
||
|
||
def generate_barcode(self, i, j):
|
||
"""
|
||
生成货位条码
|
||
"""
|
||
# 这里是你生成barcode的代码
|
||
# area_type_barcode = self.location_id.barcode
|
||
area_type_barcode = self.barcode
|
||
i_str = str(i + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0
|
||
j_str = str(j + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0
|
||
return area_type_barcode + self.channel + self.direction + '-' + self.barcode + '-' + i_str + '-' + j_str
|
||
|
||
def print_all_location_barcode(self):
|
||
"""
|
||
打印所有货位编码
|
||
"""
|
||
print('=======打印货架所有货位编码=========')
|
||
for record in self.location_ids:
|
||
print('record', record)
|
||
if not record.barcode:
|
||
continue
|
||
record.ensure_one()
|
||
# qr_code_data = record.lot_qr_code
|
||
# if not qr_code_data:
|
||
# raise UserError("没有找到二维码数据。")
|
||
barcode = record.barcode
|
||
# todo 待控制
|
||
if not barcode:
|
||
raise ValidationError("请先分配序列号")
|
||
# host = "192.168.50.110" # 可以根据实际情况修改
|
||
# port = 9100 # 可以根据实际情况修改
|
||
|
||
# 获取默认打印机配置
|
||
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
|
||
if not printer_config:
|
||
raise UserError('请先配置打印机')
|
||
host = printer_config.printer_id.ip_address
|
||
port = printer_config.printer_id.port
|
||
self.print_qr_code(barcode, host, port)
|
||
|
||
|
||
class ShelfLocation(models.Model):
|
||
_name = 'sf.shelf.location'
|
||
_inherit = ['printing.utils']
|
||
_description = '货位'
|
||
_rec_name = 'barcode'
|
||
_order = 'id asc, create_date asc'
|
||
|
||
# current_location_id = fields.Many2one('sf.shelf.location', string='当前位置')
|
||
# # 目的位置
|
||
# destination_location_id = fields.Many2one('sf.shelf.location', string='目的位置')
|
||
current_move_ids = fields.One2many('stock.move.line', 'current_location_id', '当前位置调拨单')
|
||
destination_move_ids = fields.One2many('stock.move.line', 'destination_location_id', '目标位置调拨单')
|
||
storage_time = fields.Datetime('入库时间', compute='_compute_location_status')
|
||
production_id = fields.Many2one('mrp.production', string='制造订单')
|
||
active = fields.Boolean("有效", default=True)
|
||
|
||
@api.depends('location_status')
|
||
def _compute_location_status(self):
|
||
for record in self:
|
||
if record.location_status == '占用':
|
||
record.storage_time = datetime.datetime.now()
|
||
if record.location_status == '空闲':
|
||
record.storage_time = False
|
||
if record.location_status == '禁用':
|
||
record.storage_time = False
|
||
|
||
name = fields.Char('货位名称', required=True, size=20)
|
||
barcode = fields.Char('货位编码', copy=False, size=50)
|
||
qr_code = fields.Binary(string='二维码', compute='_compute_location_qr_code', store=True)
|
||
|
||
# 货架
|
||
shelf_id = fields.Many2one('sf.shelf', string='货架')
|
||
|
||
check_state = fields.Selection([
|
||
('enable', '启用'),
|
||
('close', '关闭')
|
||
], string='审核状态', default='close')
|
||
|
||
def action_check(self):
|
||
self.check_state = 'enable'
|
||
|
||
@api.depends('barcode')
|
||
def _compute_location_qr_code(self):
|
||
for record in self:
|
||
if record.barcode:
|
||
# 创建一个QRCode对象
|
||
qr = qrcode.QRCode(
|
||
version=1, # 设置版本, 1-40,控制二维码的大小
|
||
error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级
|
||
box_size=10, # 设置每个格子的像素大小
|
||
border=4, # 设置边框的格子宽度
|
||
)
|
||
# 添加数据
|
||
qr.add_data(record.barcode)
|
||
qr.make(fit=True)
|
||
# 创建二维码图像
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
# 创建一个内存文件
|
||
buffer = io.BytesIO()
|
||
img.save(buffer, format="PNG") # 将图像保存到内存文件中
|
||
# 获取二进制数据
|
||
binary_data = buffer.getvalue()
|
||
# 使用Base64编码这些二进制数据
|
||
data = base64.b64encode(binary_data)
|
||
self.qr_code = data
|
||
else:
|
||
record.qr_code = False
|
||
|
||
def print_single_location_qr_code(self):
|
||
self.ensure_one()
|
||
qr_code_data = self.qr_code
|
||
if not qr_code_data:
|
||
raise UserError("没有找到二维码数据。")
|
||
barcode = self.barcode
|
||
# host = "192.168.50.110" # 可以根据实际情况修改
|
||
# port = 9100 # 可以根据实际情况修改
|
||
# 获取默认打印机配置
|
||
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
|
||
if not printer_config:
|
||
raise UserError('请先配置打印机')
|
||
host = printer_config.printer_id.ip_address
|
||
port = printer_config.printer_id.port
|
||
self.print_qr_code(barcode, host, port)
|
||
# 获取当前wizard的视图ID或其他标识信息
|
||
view_id = self.env.context.get('view_id')
|
||
# 构造返回wizard页面的action字典
|
||
action = {
|
||
'type': 'ir.actions.act_window',
|
||
'name': '返回 Wizard',
|
||
'res_model': 'sf.shelf', # 替换为你的wizard模型名称
|
||
'view_mode': 'form',
|
||
'view_id': view_id, # 如果需要基于特定的视图返回
|
||
'target': 'new', # 如果需要在新的窗口或标签页打开
|
||
'res_id': self.shelf_id, # 如果你想要返回当前记录的视图
|
||
}
|
||
return action
|
||
|
||
# # 仓库类别(selection:库区、库位、货位)
|
||
# location_type = fields.Selection([
|
||
# ('货架', '货架'),
|
||
# ('货位', '货位')
|
||
# ], string='存储类型')
|
||
# 绑定库区
|
||
# shelf_location_id = fields.Many2one('stock.location', string='所属库区', domain=[('location_type', '=', '库区')])
|
||
location_id = fields.Many2one('stock.location', string='所属库区')
|
||
# 产品类别 (关联:product.category)
|
||
# product_type = fields.Many2many('product.category', string='产品类别')
|
||
|
||
# picking_product_type = fields.Many2many('stock.picking', string='调拨产品类别', related='location_dest_id.product_type')
|
||
|
||
# 货位独有字段:货位状态、产品(关联产品对象)、产品序列号(关联产品序列号对象)
|
||
location_status = fields.Selection([
|
||
('空闲', '空闲'),
|
||
('占用', '占用'),
|
||
('禁用', '禁用')
|
||
], string='货位状态', default='空闲', compute='_compute_product_num', store=True)
|
||
# product_id = fields.Many2one('product.template', string='产品')
|
||
product_id = fields.Many2one('product.product', string='产品', compute='_compute_product_id', store=True)
|
||
product_sn_id = fields.Many2one('stock.lot', string='产品序列号')
|
||
product_sn_ids = fields.One2many('sf.shelf.location.lot', 'shelf_location_id', string='产品批次号')
|
||
# 产品数量
|
||
product_num = fields.Integer('总数量', compute='_compute_number', store=True)
|
||
|
||
@api.depends('product_num')
|
||
def _compute_product_num(self):
|
||
for record in self:
|
||
if record.product_num > 0:
|
||
record.location_status = '占用'
|
||
elif record.product_num == 0:
|
||
record.location_status = '空闲'
|
||
|
||
@api.depends('product_sn_ids.qty')
|
||
def _compute_number(self):
|
||
for item in self:
|
||
if item.product_sn_ids:
|
||
qty = 0
|
||
for product_sn_id in item.product_sn_ids:
|
||
qty += product_sn_id.qty
|
||
item.product_num = qty
|
||
|
||
# 修改货位状态为禁用
|
||
def action_location_status_disable(self):
|
||
self.location_status = '禁用'
|
||
|
||
# 修改货位状态为空闲
|
||
def action_location_status_enable(self):
|
||
self.location_status = '空闲'
|
||
|
||
@api.depends('product_sn_id', 'product_sn_ids')
|
||
def _compute_product_id(self):
|
||
"""
|
||
根据产品序列号,获取产品
|
||
"""
|
||
for record in self:
|
||
if record.product_sn_id:
|
||
try:
|
||
record.sudo().product_id = record.product_sn_id.product_id
|
||
# record.sudo().location_status = '占用'
|
||
record.sudo().product_num = 1
|
||
except Exception as e:
|
||
print('eeeeeee占用', e)
|
||
elif record.product_sn_ids:
|
||
return True
|
||
else:
|
||
try:
|
||
record.sudo().product_id = False
|
||
# record.sudo().location_status = '空闲'
|
||
record.sudo().product_num = 0
|
||
except Exception as e:
|
||
print('eeeeeee空闲', e)
|
||
|
||
# 调取获取货位信息接口
|
||
def get_sf_shelf_location_info(self):
|
||
|
||
config = self.env['res.config.settings'].get_values()
|
||
headers = {'Authorization': config['center_control_Authorization']}
|
||
crea_url = config['center_control_url'] + "/AutoDeviceApi/GetLocationInfos"
|
||
|
||
params = {'DeviceId': 'Cabinet-AL'}
|
||
r = requests.get(crea_url, params=params, headers=headers)
|
||
|
||
ret = r.json()
|
||
|
||
print(ret)
|
||
if ret['Succeed'] == True:
|
||
return ret['Datas']
|
||
else:
|
||
raise UserError("该库位无产品")
|
||
|
||
@api.model_create_multi
|
||
def create(self, vals_list):
|
||
# 编码重复校验
|
||
barcode_list = []
|
||
for val in vals_list:
|
||
location = self.search([('barcode', '=', val['barcode'])])
|
||
if location:
|
||
barcode_list.append(val['name'])
|
||
if barcode_list:
|
||
raise UserError("货位编码【%s】存在重复" % barcode_list)
|
||
records = super(ShelfLocation, self).create(vals_list)
|
||
return records
|
||
|
||
|
||
class SfShelfLocationLot(models.Model):
|
||
_name = 'sf.shelf.location.lot'
|
||
_description = '批次数量'
|
||
|
||
name = fields.Char('名称', related='lot_id.name')
|
||
shelf_location_id = fields.Many2one('sf.shelf.location', string="货位")
|
||
lot_id = fields.Many2one('stock.lot', string='批次号')
|
||
qty = fields.Integer('数量')
|
||
qty_num = fields.Integer('变更数量')
|
||
|
||
@api.onchange('qty_num')
|
||
def _onchange_qty_num(self):
|
||
for item in self:
|
||
if item.qty_num > item.qty:
|
||
raise ValidationError('变更数量不能比库存数量大!!!')
|
||
|
||
|
||
class SfStockMoveLine(models.Model):
|
||
_name = 'stock.move.line'
|
||
_inherit = ['stock.move.line', 'printing.utils']
|
||
|
||
current_location_id = fields.Many2one(
|
||
'sf.shelf.location', string='当前货位', compute='_compute_current_location_id', store=True)
|
||
# location_dest_id = fields.Many2one('stock.location', string='目标库位')
|
||
location_dest_id_product_type = fields.Many2many(related='location_dest_id.product_type')
|
||
location_dest_id_value = fields.Integer(compute='_compute_location_dest_id_value', store=True)
|
||
# lot_qr_code = fields.Binary(string='二维码', compute='_compute_lot_qr_code', store=True)
|
||
lot_qr_code = fields.Binary(string='二维码', compute='_compute_lot_qr_code', store=True)
|
||
current_product_id = fields.Integer(compute='_compute_location_dest_id_value', store=True)
|
||
there_is_no_sn = fields.Boolean('是否有序列号', default=False)
|
||
|
||
rfid = fields.Char('Rfid')
|
||
rfid_barcode = fields.Char('Rfid', compute='_compute_rfid')
|
||
|
||
@api.depends('lot_id')
|
||
def _compute_rfid(self):
|
||
for item in self:
|
||
item.rfid_barcode = item.lot_id.rfid
|
||
|
||
def action_revert_inventory(self):
|
||
# 检查用户是否有执行操作的权限
|
||
if not self.env.user.has_group('sf_warehouse.group_sf_stock_user'):
|
||
raise UserError(_('抱歉,只有库管人员可以执行此动作'))
|
||
|
||
# 如果用户有权限,调用父类方法
|
||
return super().action_revert_inventory()
|
||
|
||
@api.depends('lot_name')
|
||
def _compute_lot_qr_code(self):
|
||
for record in self:
|
||
if record.lot_id:
|
||
# record.lot_qr_code = record.lot_id.lot_qr_code
|
||
# 创建一个QRCode对象
|
||
qr = qrcode.QRCode(
|
||
version=1, # 设置版本, 1-40,控制二维码的大小
|
||
error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级
|
||
box_size=10, # 设置每个格子的像素大小
|
||
border=4, # 设置边框的格子宽度
|
||
)
|
||
|
||
# 添加数据
|
||
qr.add_data(record.lot_id.name)
|
||
qr.make(fit=True)
|
||
|
||
# 创建二维码图像
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
|
||
# 创建一个内存文件
|
||
buffer = io.BytesIO()
|
||
img.save(buffer, format="PNG") # 将图像保存到内存文件中
|
||
|
||
# 获取二进制数据
|
||
binary_data = buffer.getvalue()
|
||
|
||
# 使用Base64编码这些二进制数据
|
||
data = base64.b64encode(binary_data)
|
||
self.lot_qr_code = data
|
||
else:
|
||
record.lot_qr_code = False
|
||
|
||
def print_single_method(self):
|
||
self.ensure_one()
|
||
qr_code_data = self.lot_qr_code
|
||
if not qr_code_data:
|
||
raise UserError("没有找到二维码数据。")
|
||
lot_name = self.lot_name
|
||
# host = "192.168.50.110" # 可以根据实际情况修改
|
||
# port = 9100 # 可以根据实际情况修改
|
||
|
||
# 获取默认打印机配置
|
||
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
|
||
if not printer_config:
|
||
raise UserError('请先配置打印机')
|
||
host = printer_config.printer_id.ip_address
|
||
port = printer_config.printer_id.port
|
||
self.print_qr_code(lot_name, host, port)
|
||
|
||
# 返回当前wizard页面
|
||
# base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
|
||
# return {
|
||
# 'type': 'ir.actions.act_url',
|
||
# 'url': str(base_url) + download_url,
|
||
# 'target': 'self',
|
||
# }
|
||
# 获取当前wizard的视图ID或其他标识信息
|
||
view_id = self.env.context.get('view_id')
|
||
# 构造返回wizard页面的action字典
|
||
action = {
|
||
'type': 'ir.actions.act_window',
|
||
'name': '返回 Wizard',
|
||
'res_model': 'stock.move', # 替换为你的wizard模型名称
|
||
'view_mode': 'form',
|
||
'view_id': view_id, # 如果需要基于特定的视图返回
|
||
'target': 'new', # 如果需要在新的窗口或标签页打开
|
||
'res_id': self.id, # 如果你想要返回当前记录的视图
|
||
}
|
||
return action
|
||
|
||
# def generate_zpl_code(self, code):
|
||
# # 初始化ZPL代码字符串
|
||
# zpl_code = "^XA\n"
|
||
# zpl_code += "^CW1,E:SIMSUN.TTF^FS\n"
|
||
# zpl_code += "^CI28\n"
|
||
#
|
||
# # 设置二维码位置
|
||
# zpl_code += "^FO50,50\n" # 调整二维码位置,使其与资产编号在同一行
|
||
# zpl_code += f"^BQN,2,6^FDLM,B0093{code}^FS\n"
|
||
#
|
||
# # 设置资产编号文本位置
|
||
# zpl_code += "^FO300,60\n" # 资产编号文本的位置,与二维码在同一行
|
||
# zpl_code += "^A1N,45,45^FD编码名称: ^FS\n"
|
||
#
|
||
# # 设置{code}文本位置
|
||
# # 假设{code}文本需要位于资产编号和二维码下方,中间位置
|
||
# # 设置{code}文本位置并启用自动换行
|
||
# zpl_code += "^FO300,120\n" # {code}文本的起始位置
|
||
# zpl_code += "^FB500,4,0,L,0\n" # 定义一个宽度为500点的文本框,最多4行,左对齐
|
||
# zpl_code += f"^A1N,40,40^FD{code}^FS\n"
|
||
#
|
||
# # 在{code}文本框周围绘制线框
|
||
# # 假设线框的外部尺寸为宽度500点,高度200点
|
||
# # zpl_code += "^FO300,110^GB500,200,2^FS\n" # 绘制线框,边框粗细为2点
|
||
#
|
||
# zpl_code += "^PQ1,0,1,Y\n"
|
||
# zpl_code += "^XZ\n"
|
||
# return zpl_code
|
||
#
|
||
# def send_to_printer(self, host, port, zpl_code):
|
||
# # 将ZPL代码转换为字节串
|
||
# print('zpl_code', zpl_code)
|
||
# zpl_bytes = zpl_code.encode('utf-8')
|
||
# print(zpl_bytes)
|
||
#
|
||
# # 创建socket对象
|
||
# mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
# try:
|
||
# mysocket.connect((host, port)) # 连接到打印机
|
||
# mysocket.send(zpl_bytes) # 发送ZPL代码
|
||
# print("ZPL code sent to printer successfully.")
|
||
# except Exception as e:
|
||
# print(f"Error with the connection: {e}")
|
||
# finally:
|
||
# mysocket.close() # 关闭连接
|
||
#
|
||
# def print_qr_code(self):
|
||
# self.ensure_one() # 确保这个方法只为一个记录调用
|
||
# # if not self.lot_id:
|
||
# # raise UserError("没有找到序列号。")
|
||
# # 假设_lot_qr_code方法已经生成了二维码并保存在字段中
|
||
# qr_code_data = self.lot_qr_code
|
||
# if not qr_code_data:
|
||
# raise UserError("没有找到二维码数据。")
|
||
# # 生成ZPL代码
|
||
# zpl_code = self.generate_zpl_code(self.lot_id.name)
|
||
# # 设置打印机的IP地址和端口号
|
||
# host = "192.168.50.110"
|
||
# port = 9100
|
||
# # 发送ZPL代码到打印机
|
||
# self.send_to_printer(host, port, zpl_code)
|
||
#
|
||
# # # 生成下载链接或直接触发下载
|
||
# # # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例
|
||
# # attachment = self.env['ir.attachment'].sudo().create({
|
||
# # 'datas': self.lot_qr_code,
|
||
# # 'type': 'binary',
|
||
# # 'description': '二维码图片',
|
||
# # 'name': self.lot_name + '.png',
|
||
# # # 'res_id': invoice.id,
|
||
# # # 'res_model': 'stock.picking',
|
||
# # 'public': True,
|
||
# # 'mimetype': 'application/x-png',
|
||
# # # 'model_name': 'stock.picking',
|
||
# # })
|
||
# # # 返回附件的下载链接
|
||
# # download_url = '/web/content/%s?download=true' % attachment.id
|
||
# # base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
|
||
# # return {
|
||
# # 'type': 'ir.actions.act_url',
|
||
# # 'url': str(base_url) + download_url,
|
||
# # 'target': 'self',
|
||
# # }
|
||
|
||
# # # 定义一个方法,用于根据序列号生成二维码
|
||
# # @api.depends('lot_id')
|
||
# def generate_lot_qr_code(self):
|
||
# # 创建一个QRCode对象
|
||
# qr = qrcode.QRCode(
|
||
# version=1, # 设置版本, 1-40,控制二维码的大小
|
||
# error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级
|
||
# box_size=10, # 设置每个格子的像素大小
|
||
# border=4, # 设置边框的格子宽度
|
||
# )
|
||
#
|
||
# # 添加数据
|
||
# qr.add_data(self.lot_id.name)
|
||
# qr.make(fit=True)
|
||
#
|
||
# # 创建二维码图像
|
||
# img = qr.make_image(fill_color="black", back_color="white")
|
||
#
|
||
# # 创建一个内存文件
|
||
# buffer = io.BytesIO()
|
||
# img.save(buffer, format="PNG") # 将图像保存到内存文件中
|
||
#
|
||
# # 获取二进制数据
|
||
# binary_data = buffer.getvalue()
|
||
#
|
||
# # 使用Base64编码这些二进制数据
|
||
# data = base64.b64encode(binary_data)
|
||
# self.lot_qr_code = data
|
||
# attachment = self.env['ir.attachment'].sudo().create({
|
||
# 'datas': data,
|
||
# 'type': 'binary',
|
||
# 'description': '二维码图片',
|
||
# 'name': self.lot_id.name + '.png',
|
||
# # 'res_id': invoice.id,
|
||
# # 'res_model': 'stock.picking',
|
||
# 'public': True,
|
||
# 'mimetype': 'application/pdf',
|
||
# # 'model_name': 'stock.picking',
|
||
# })
|
||
|
||
# def button_test(self):
|
||
# print(self.picking_id.name)
|
||
# stock_picking = self.env['stock.picking'].search([('name', '=', self.picking_id.name)], limit=1)
|
||
# print(self.picking_id.name)
|
||
# print(aa.move_line_ids.lot_id.name)
|
||
# # 获取当前的stock.picking对象
|
||
# current_picking = self.env['stock.picking'].search([('name', '=', self.picking_id.name)], limit=1)
|
||
#
|
||
# # 获取当前picking的第一个stock.move对象
|
||
# current_move = current_picking.move_ids[0] if current_picking.move_ids else False
|
||
#
|
||
# # 如果存在相关的stock.move对象
|
||
# if current_move:
|
||
# # 获取源stock.move对象
|
||
# origin_move = current_move.move_orig_ids[0] if current_move.move_orig_ids else False
|
||
#
|
||
# # 从源stock.move对象获取源stock.picking对象
|
||
# origin_picking = origin_move.picking_id if origin_move else False
|
||
# # 现在,origin_picking就是current_picking的上一步
|
||
# # 获取目标stock.move对象
|
||
# dest_move = current_move.move_dest_ids[0] if current_move.move_dest_ids else False
|
||
#
|
||
# # 从目标stock.move对象获取目标stock.picking对象
|
||
# dest_picking = dest_move.picking_id if dest_move else False
|
||
# # 现在,dest_picking就是current_picking的下一步
|
||
@api.depends('location_id')
|
||
def _compute_current_location_id(self):
|
||
for record in self:
|
||
# 使用record代替self来引用当前遍历到的记录
|
||
logging.info('record.picking_id.name: %s' % record.picking_id.name)
|
||
logging.info('record.env: %s' % record.env['stock.picking'].search([('name', '=', record.picking_id.name)]))
|
||
|
||
# 获取当前的stock.picking对象
|
||
current_picking = record.env['stock.picking'].search([('name', '=', record.picking_id.name)], limit=1)
|
||
|
||
# 获取当前picking的第一个stock.move对象
|
||
current_move = current_picking.move_ids[0] if current_picking.move_ids else False
|
||
|
||
# 如果存在相关的stock.move对象
|
||
if current_move:
|
||
# 获取源stock.move对象
|
||
origin_move = current_move.move_orig_ids[0] if current_move.move_orig_ids else False
|
||
|
||
# 从源stock.move对象获取源stock.picking对象
|
||
origin_picking = origin_move.picking_id if origin_move else False
|
||
|
||
# 如果前一个调拨单有目标货位
|
||
if origin_picking:
|
||
for i in current_picking.move_line_ids:
|
||
for j in origin_picking.move_line_ids:
|
||
if j.destination_location_id and i.lot_id == j.lot_id:
|
||
# 更新当前记录的current_location_id字段
|
||
record.current_location_id = j.destination_location_id
|
||
# # 获取目标stock.move对象
|
||
# dest_move = current_move.move_dest_ids[0] if current_move.move_dest_ids else False
|
||
#
|
||
# # 从目标stock.move对象获取目标stock.picking对象
|
||
# dest_picking = dest_move.picking_id if dest_move else False
|
||
# # 现在,dest_picking就是current_picking的下一步
|
||
|
||
# 是一张单据一张单据往下走的,所以这里的目标货位是上一张单据的当前货位,且这样去计算是可以的。
|
||
@api.depends('location_dest_id')
|
||
def _compute_location_dest_id_value(self):
|
||
for record in self:
|
||
record.location_dest_id_value = record.location_dest_id.id if record.location_dest_id else False
|
||
record.current_product_id = record.product_id.id if record.product_id else False
|
||
|
||
destination_location_id = fields.Many2one(
|
||
'sf.shelf.location', string='目标货位')
|
||
|
||
def compute_destination_location_id(self):
|
||
for record in self:
|
||
obj = self.env['sf.shelf.location'].search([('name', '=',
|
||
self.destination_location_id.name)])
|
||
if record.lot_id:
|
||
if record.product_id.tracking == 'serial':
|
||
shelf_location_obj = self.env['sf.shelf.location'].search(
|
||
[('product_sn_id', '=', record.lot_id.id)])
|
||
if shelf_location_obj:
|
||
shelf_location_obj.product_sn_id = False
|
||
if obj:
|
||
obj.product_sn_id = record.lot_id.id
|
||
else:
|
||
if obj:
|
||
obj.product_sn_id = record.lot_id.id
|
||
elif record.product_id.tracking == 'lot':
|
||
self.put_shelf_location(record)
|
||
if not obj.product_id:
|
||
obj.product_id = record.product_id.id
|
||
else:
|
||
if obj:
|
||
obj.product_id = record.product_id.id
|
||
# obj.location_status = '占用'
|
||
obj.product_num += record.qty_done
|
||
|
||
@api.onchange('destination_location_id')
|
||
def _check_destination_location_id(self):
|
||
for item in self:
|
||
if item:
|
||
i = 0
|
||
barcode = item.destination_location_id.barcode
|
||
for line in item.picking_id.move_line_ids_without_package:
|
||
if barcode and barcode == line.destination_location_id.barcode:
|
||
i += 1
|
||
if i > 1:
|
||
raise ValidationError(
|
||
'【%s】货位已经被占用,请重新选择!!!' % item.destination_location_id.barcode)
|
||
|
||
def put_shelf_location(self, vals):
|
||
"""
|
||
对货位的批量数据进行数量计算
|
||
"""
|
||
for record in vals:
|
||
if record.lot_id and record.product_id.tracking == 'lot':
|
||
if record.current_location_id:
|
||
location_lot = self.env['sf.shelf.location.lot'].sudo().search(
|
||
[('shelf_location_id', '=', record.current_location_id.id), ('lot_id', '=', record.lot_id.id)])
|
||
if location_lot:
|
||
location_lot.qty -= record.qty_done
|
||
if location_lot.qty == 0:
|
||
location_lot.unlink()
|
||
elif location_lot.qty < 0:
|
||
raise ValidationError('【%s】货位【%s】批次的【%s】产品数量不足!' % (
|
||
record.current_location_id.barcode, record.lot_id.name, record.product_id.name))
|
||
else:
|
||
raise ValidationError('【%s】货位不存在【%s】批次的【%s】产品' % (
|
||
record.current_location_id.barcode, record.lot_id.name, record.product_id.name))
|
||
if record.destination_location_id:
|
||
location_lot = self.env['sf.shelf.location.lot'].sudo().search(
|
||
[('shelf_location_id', '=', record.destination_location_id.id),
|
||
('lot_id', '=', record.lot_id.id)])
|
||
if location_lot:
|
||
location_lot.qty += record.qty_done
|
||
else:
|
||
self.env['sf.shelf.location.lot'].sudo().create({
|
||
'shelf_location_id': record.destination_location_id.id,
|
||
'lot_id': record.lot_id.id,
|
||
'qty': record.qty_done
|
||
})
|
||
if not record.destination_location_id.product_id:
|
||
record.destination_location_id.product_id = record.product_id.id
|
||
|
||
@api.model_create_multi
|
||
def create(self, vals_list):
|
||
|
||
records = super(SfStockMoveLine, self).create(vals_list)
|
||
self.put_shelf_location(records)
|
||
return records
|
||
|
||
|
||
class SfStockPicking(models.Model):
|
||
_inherit = 'stock.picking'
|
||
|
||
check_in = fields.Char(string='查询是否为入库单', compute='_check_is_in')
|
||
|
||
def batch_stock_move(self):
|
||
"""
|
||
批量调拨,非就绪状态的会被忽略,完成后有通知提示
|
||
"""
|
||
for record in self:
|
||
if record.state != 'assigned':
|
||
continue
|
||
record.action_set_quantities_to_reservation()
|
||
record.button_validate()
|
||
|
||
notification_message = '批量调拨完成!请注意,状态非就绪的单据会被忽略'
|
||
return {
|
||
'effect': {
|
||
'fadeout': 'fast',
|
||
'message': notification_message,
|
||
'img_url': '/web/image/%s/%s/image_1024' % (
|
||
self.create_uid._name, self.create_uid.id) if 0 else '/web/static/img/smile.svg',
|
||
'type': 'rainbow_man',
|
||
}
|
||
}
|
||
|
||
@api.depends('name')
|
||
def _check_is_in(self):
|
||
"""
|
||
判断是否为出库单
|
||
"""
|
||
for record in self:
|
||
if record.name:
|
||
is_check_in = record.name.split('/')
|
||
record.check_in = is_check_in[1]
|
||
|
||
def button_validate(self):
|
||
"""
|
||
重写验证方法,当验证时意味着调拨单已经完成,已经移动到了目标货位,所以需要将当前货位的状态改为空闲
|
||
"""
|
||
res = super(SfStockPicking, self).button_validate()
|
||
for line in self.move_line_ids:
|
||
if line:
|
||
if line.destination_location_id:
|
||
# 调用入库方法进行入库刀货位
|
||
line.compute_destination_location_id()
|
||
else:
|
||
# 对除刀柄之外的刀具物料进行 目标货位必填校验
|
||
if self.location_dest_id.name == '刀具房' and line.product_id.cutting_tool_material_id.name not in (
|
||
'刀柄', False):
|
||
raise ValidationError('请选择【%s】产品的目标货位!' % line.product_id.name)
|
||
if line.current_location_id:
|
||
if line.current_location_id.product_sn_id:
|
||
line.current_location_id.product_sn_id = False
|
||
# line.current_location_id.location_status = '空闲'
|
||
line.current_location_id.product_num = 0
|
||
|
||
# 对入库作业的刀柄和托盘进行Rfid绑定校验
|
||
for move in self.move_ids:
|
||
if move and move.product_id.cutting_tool_material_id.name == '刀柄' or '托盘' in (
|
||
move.product_id.fixture_material_id.name or ''):
|
||
for item in move.move_line_nosuggest_ids:
|
||
if item.rfid:
|
||
if self.env['stock.lot'].search([('rfid', '=', item.rfid)]):
|
||
raise ValidationError('该Rfid【%s】在系统中已经存在,请重新录入!' % item.rfid)
|
||
if item.location_dest_id.name == '进货':
|
||
if not item.rfid:
|
||
raise ValidationError('你需要提供%s的Rfid' % move.product_id.name)
|
||
self.env['stock.lot'].search([('name', '=', item.lot_name)]).write({'rfid': item.rfid})
|
||
return res
|
||
|
||
|
||
# def print_all_barcode(self):
|
||
# """
|
||
# 打印所有编码
|
||
# """
|
||
# print('================')
|
||
# for record in self.move_ids_without_package:
|
||
# print('record', record)
|
||
# print('record.move_line_ids', record.move_line_ids)
|
||
#
|
||
# # record.move_line_ids.print_qr_code()
|
||
#
|
||
# print('record.move_line_ids.lot_id', record.move_line_ids.lot_id)
|
||
# print('record.move_line_ids.lot_id.name', record.move_line_ids.lot_id.name)
|
||
|
||
|
||
class SfProcurementGroup(models.Model):
|
||
_inherit = 'procurement.group'
|
||
|
||
@api.model
|
||
def _search_rule(self, route_ids, packaging_id, product_id, warehouse_id, domain):
|
||
"""
|
||
修改路线多规则条件选取
|
||
"""
|
||
if warehouse_id:
|
||
domain = expression.AND(
|
||
[['|', ('warehouse_id', '=', warehouse_id.id), ('warehouse_id', '=', False)], domain])
|
||
Rule = self.env['stock.rule']
|
||
res = self.env['stock.rule']
|
||
if route_ids:
|
||
res_list = Rule.search(expression.AND([[('route_id', 'in', route_ids.ids)], domain]),
|
||
order='route_sequence, sequence')
|
||
for res1 in res_list:
|
||
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
|
||
res1.location_src_id.product_type:
|
||
res = res1
|
||
if not res:
|
||
res = Rule.search(expression.AND([[('route_id', 'in', route_ids.ids)], domain]),
|
||
order='route_sequence, sequence', limit=1)
|
||
|
||
if not res and packaging_id:
|
||
packaging_routes = packaging_id.route_ids
|
||
if packaging_routes:
|
||
res_list = Rule.search(expression.AND([[('route_id', 'in', packaging_routes.ids)], domain]),
|
||
order='route_sequence, sequence')
|
||
for res1 in res_list:
|
||
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
|
||
res1.location_src_id.product_type:
|
||
res = res1
|
||
if not res:
|
||
res = Rule.search(expression.AND([[('route_id', 'in', packaging_routes.ids)], domain]),
|
||
order='route_sequence, sequence', limit=1)
|
||
if not res:
|
||
product_routes = product_id.route_ids | product_id.categ_id.total_route_ids
|
||
if product_routes:
|
||
res_list = Rule.search(expression.AND([[('route_id', 'in', product_routes.ids)], domain]),
|
||
order='route_sequence, sequence')
|
||
for res1 in res_list:
|
||
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
|
||
res1.location_src_id.product_type:
|
||
res = res1
|
||
if not res:
|
||
res = Rule.search(expression.AND([[('route_id', 'in', product_routes.ids)], domain]),
|
||
order='route_sequence, sequence', limit=1)
|
||
if not res and warehouse_id:
|
||
warehouse_routes = warehouse_id.route_ids
|
||
if warehouse_routes:
|
||
res_list = Rule.search(expression.AND([[('route_id', 'in', warehouse_routes.ids)], domain]),
|
||
order='route_sequence, sequence')
|
||
for res1 in res_list:
|
||
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
|
||
res1.location_src_id.product_type:
|
||
res = res1
|
||
if not res:
|
||
res = Rule.search(expression.AND([[('route_id', 'in', warehouse_routes.ids)], domain]),
|
||
order='route_sequence, sequence', limit=1)
|
||
return res
|
||
|
||
|
||
# class SfPickingType(models.Model):
|
||
# _inherit = 'stock.picking.type'
|
||
#
|
||
# def _default_show_operations(self):
|
||
# return self.user_has_groups('stock.group_production_lot,'
|
||
# 'stock.group_stock_multi_locations,'
|
||
# 'stock.group_tracking_lot',
|
||
# 'sf_warehouse.group_sf_stock_user',
|
||
# 'sf_warehouse.group_sf_stock_manager')
|
||
|
||
class SfPickingType(models.Model):
|
||
_inherit = 'stock.picking.type'
|
||
|
||
def _default_show_operations(self):
|
||
return self.user_has_groups(
|
||
'stock.group_production_lot,'
|
||
'stock.group_stock_multi_locations,'
|
||
'stock.group_tracking_lot,'
|
||
'sf_warehouse.group_sf_stock_user,'
|
||
'sf_warehouse.group_sf_stock_manager'
|
||
)
|
||
|
||
|
||
class CustomStockMove(models.Model):
|
||
_name = 'stock.move'
|
||
_inherit = ['stock.move', 'printing.utils', 'barcodes.barcode_events_mixin']
|
||
|
||
def on_barcode_scanned(self, barcode):
|
||
"""
|
||
采购入库扫码绑定Rfid码
|
||
"""
|
||
for record in self:
|
||
if record:
|
||
lot = self.env['stock.lot'].sudo().search([('rfid', '=', barcode)])
|
||
if lot:
|
||
if lot.product_id.cutting_tool_material_id:
|
||
material = lot.product_id.cutting_tool_material_id.name
|
||
else:
|
||
material = lot.product_id.fixture_material_id.name
|
||
raise ValidationError(
|
||
'该Rfid【%s】已经被序列号为【%s】的【%s】物料所占用!' % (barcode, lot.name, material))
|
||
if '刀柄' in (record.product_id.cutting_tool_material_id.name or '') or '托盘' in (
|
||
record.product_id.fixture_material_id.name or ''):
|
||
for move_line_nosuggest_id in record.move_line_nosuggest_ids:
|
||
if move_line_nosuggest_id.rfid:
|
||
if move_line_nosuggest_id.rfid == barcode:
|
||
if record.product_id.cutting_tool_material_id.name:
|
||
raise ValidationError('该刀柄的Rfid已经录入,请勿重复录入!!!')
|
||
else:
|
||
raise ValidationError('该托盘的Rfid已经录入,请勿重复录入!!!')
|
||
else:
|
||
line_id = int(re.sub(r"\D", "", str(move_line_nosuggest_id.id)))
|
||
self.env['stock.move.line'].sudo().search([('id', '=', line_id)]).write({'rfid': barcode})
|
||
move_line_nosuggest_id.rfid = barcode
|
||
break
|
||
else:
|
||
raise ValidationError('该产品不需要录入Rfid!!!')
|
||
|
||
def action_assign_serial_show_details(self):
|
||
# 首先执行原有逻辑
|
||
result = super(CustomStockMove, self).action_assign_serial_show_details()
|
||
# 接着为每个 lot_name 生成二维码
|
||
move_lines = self.move_line_ids # 获取当前 stock.move 对应的所有 stock.move.line 记录
|
||
for line in move_lines:
|
||
if line.lot_name: # 确保 lot_name 存在
|
||
lot_name = line.lot_name
|
||
if line.product_id.categ_id.name == '坯料':
|
||
lot_name = lot_name.split('[', 1)[0]
|
||
qr_data = self.compute_lot_qr_code(lot_name)
|
||
# 假设 stock.move.line 模型中有一个字段叫做 lot_qr_code 用于存储二维码数据
|
||
line.lot_qr_code = qr_data
|
||
return result
|
||
|
||
def compute_lot_qr_code(self, lot_name):
|
||
qr = qrcode.QRCode(
|
||
version=1,
|
||
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
||
box_size=10,
|
||
border=4,
|
||
)
|
||
qr.add_data(lot_name)
|
||
qr.make(fit=True)
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
buffer = io.BytesIO()
|
||
img.save(buffer, format="PNG")
|
||
binary_data = buffer.getvalue()
|
||
data = base64.b64encode(binary_data).decode() # 确保返回的是字符串形式的数据
|
||
return data
|
||
|
||
def print_all_barcode(self):
|
||
"""
|
||
打印所有编码
|
||
"""
|
||
print('================')
|
||
for record in self.move_line_ids:
|
||
print('record', record)
|
||
if not record.lot_name:
|
||
continue
|
||
record.ensure_one()
|
||
# qr_code_data = record.lot_qr_code
|
||
# if not qr_code_data:
|
||
# raise UserError("没有找到二维码数据。")
|
||
lot_name = record.lot_name
|
||
# todo 待控制
|
||
if not lot_name:
|
||
raise ValidationError("请先分配序列号")
|
||
# host = "192.168.50.110" # 可以根据实际情况修改
|
||
# port = 9100 # 可以根据实际情况修改
|
||
|
||
# 获取默认打印机配置
|
||
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
|
||
if not printer_config:
|
||
raise UserError('请先配置打印机')
|
||
host = printer_config.printer_id.ip_address
|
||
port = printer_config.printer_id.port
|
||
record.print_qr_code(lot_name, host, port)
|