Files
test/sf_warehouse/models/model.py
2024-05-30 17:26:28 +08:00

1101 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import re
import datetime
import logging
import base64
import qrcode
import io
import requests
from odoo import api, fields, models, _
from odoo.osv import expression
from odoo.exceptions import UserError, ValidationError
class SfLocation(models.Model):
_inherit = 'stock.location'
# 重写字段定义
name = fields.Char('Location Name', required=True, size=20)
barcode = fields.Char('Barcode', copy=False, size=15)
# 仓库类别selection库区、库位、货位
# location_type = fields.Selection([
# ('库区', '库区'),
# ('货架', '货架'),
# ('货位', '货位')
# ], string='存储类型')
location_type = fields.Selection([
('库区', '库区')
], string='存储类型')
# 库区类型selection拣货区、存货区、收货区、退货区、次品区
area_type = fields.Selection([
('拣货区', '拣货区'),
('存货区', '存货区'),
('收货区', '收货区'),
('退货区', '退货区'),
('次品区', '次品区')
], string='库区类型')
# 当前位置
current_location_id = fields.Many2one('sf.shelf.location', string='当前位置')
# 目的位置
destination_location_id = fields.Many2one('sf.shelf.location', string='目的位置')
# 存储类型selection库区、货架
# storage_type = fields.Selection([
# ('库区', '库区'),
# ('货架', '货架')
# ], string='存储类型')
# 产品类别 关联product.category
product_type = fields.Many2many('product.category', string='产品类别')
# 货架独有字段通道、方向、货架高度m、货架层数、层数容量
channel = fields.Char(string='通道')
direction = fields.Selection([
('R', 'R'),
('L', 'L')
], string='方向')
shelf_height = fields.Float(string='货架高度(m)')
shelf_layer = fields.Integer(string='货架层数')
layer_capacity = fields.Integer(string='层数容量')
# 货位独有字段:货位状态、产品(关联产品对象)、产品序列号(关联产品序列号对象)
location_status = fields.Selection([
('空闲', '空闲'),
('占用', '占用'),
('禁用', '禁用')
], string='货位状态', default='空闲')
# product_id = fields.Many2one('product.template', string='产品')
product_id = fields.Many2one('product.product', string='产品', compute='_compute_product_id', readonly=True)
product_sn_id = fields.Many2one('stock.lot', string='产品序列号')
# time_test = fields.Char(string='time')
# 添加SQL约束
# _sql_constraints = [
# ('name_uniq', 'unique(name)', '位置名称必须唯一!'),
# ]
hide_location_type = fields.Boolean(compute='_compute_hide_what', string='隐藏仓库')
hide_area = fields.Boolean(compute='_compute_hide_what', string='隐藏库区')
hide_shelf = fields.Boolean(compute='_compute_hide_what', string='隐藏货架')
hide_location = fields.Boolean(compute='_compute_hide_what', string='隐藏货位')
# @api.model
# def create(self, vals):
# """
# 重写create方法添加自定义的约束
# """
# print('create', vals)
# if vals.get('location_id'):
# location = self.env['stock.location'].browse(vals.get('location_id'))
# if location.storage_type == '库区':
# raise UserError('库区不能作为父级仓库')
# return super().create(vals)
#
# @api.onchange('location_id')
# def _onchange_location_id(self):
# """
# 重写onchange方法添加自定义的约束
# """
# if self.location_id:
# if self.location_id.storage_type == '库区':
# raise UserError('库区不能作为父级仓库')
# @api.constrains('shelf_height')
# def _check_shelf_height(self):
# for record in self:
# if not (0 <= record.shelf_height < 1000): # 限制字段值在0到999之间
# raise UserError('shelf_height的值必须在0到1000之间')
#
# @api.constrains('shelf_layer')
# def _check_shelf_layer(self):
# for record in self:
# if not (0 < record.shelf_layer < 1000):
# raise UserError('shelf_layer的值必须在0到999之间,且不能为0')
#
# @api.constrains('layer_capacity')
# def _check_layer_capacity(self):
# for record in self:
# if not (0 <= record.layer_capacity < 1000):
# raise UserError('layer_capacity的值必须在0到999之间,且不能为0')
@api.depends('product_sn_id')
def _compute_product_id(self):
"""
根据产品序列号,获取产品
"""
for record in self:
if record.product_sn_id:
record.product_id = record.product_sn_id.product_id
# record.location_status = '占用'
else:
record.product_id = False
# record.location_status = '空闲'
@api.depends('location_type')
def _compute_hide_what(self):
"""
根据仓库类别,隐藏不需要的字段
:return:
"""
for record in self:
record.hide_location_type = False
record.hide_area = False
record.hide_shelf = False
record.hide_location = False
if record.location_type and record.location_type == '仓库':
record.hide_location_type = True
elif record.location_type and record.location_type == '库区':
record.hide_area = True
elif record.location_type and record.location_type == '货架':
record.hide_shelf = True
elif record.location_type and record.location_type == '货位':
record.hide_location = True
else:
pass
# # 添加Python约束
# @api.constrains('name', 'barcode')
# def _check_len(self):
# for rec in self:
# if len(rec.name) > 20:
# raise ValidationError("Location Name length must be less equal than 20!")
# if len(rec.barcode) > 15:
# raise ValidationError("Barcode length must be less equal than 15!")
# @api.model
# def default_get(self, fields):
# print('fields:', fields)
# res = super(SfLocation, self).default_get(fields)
# print('res:', res)
# if 'barcode' in fields and 'barcode' not in res:
# # 这里是你生成barcode的代码
# pass
# # res['barcode'] = self.generate_barcode() # 假设你有一个方法generate_barcode来生成barcode
# return res
# @api.model
# def create(self, vals):
# """
# 重写create方法当仓库类型为货架时自动生成其下面的货位数量为货架层数*层数容量
# """
# res = super(SfLocation, self).create(vals)
# if res.location_type == '货架':
# for i in range(res.shelf_layer):
# for j in range(res.layer_capacity):
# self.create({
# 'name': res.name + '-' + str(i+1) + '-' + str(j+1),
# 'location_id': res.id,
# 'location_type': '货位',
# 'barcode': self.generate_barcode(res, i, j),
# 'location_status': '空闲'
# })
# return res
# 生成货位
def create_location(self):
"""
当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量
"""
pass
# if self.location_type == '货架':
# for i in range(self.shelf_layer):
# for j in range(self.layer_capacity):
# self.create({
# 'name': self.name + '-' + str(i + 1) + '层' + '-' + str(j + 1) + '位置',
# 'location_id': self.id,
# 'location_type': '货位',
# 'barcode': self.generate_barcode(i, j),
# 'location_status': '空闲'
# })
def generate_barcode(self, i, j):
"""
生成货位条码
"""
pass
# # 这里是你生成barcode的代码
# # area_type_barcode = self.location_id.barcode
# area_type_barcode = self.barcode
# i_str = str(i + 1).zfill(3) # 确保是两位数如果不足两位左侧补0
# j_str = str(j + 1).zfill(3) # 确保是两位数如果不足两位左侧补0
# return area_type_barcode + self.channel + self.direction + '-' + self.barcode + '-' + i_str + '-' + j_str
class SfShelf(models.Model):
_name = 'sf.shelf'
_inherit = ['printing.utils']
_description = '货架'
_order = 'create_date desc'
name = fields.Char('货架名称', required=True, size=20)
active = fields.Boolean("有效", default=True)
barcode = fields.Char('编码', copy=False, size=15, required=True)
# 货位
location_ids = fields.One2many('sf.shelf.location', 'shelf_id', string='货位')
check_state = fields.Selection([
('enable', '启用'),
('close', '关闭')
], string='审核状态', default='close')
def action_check(self):
self.check_state = 'enable'
# 绑定库区
shelf_location_id = fields.Many2one('stock.location', string='所属库区')
# 货架独有字段通道、方向、货架高度m、货架层数、层数容量
channel = fields.Char(string='通道', required=True, size=10)
direction = fields.Selection([
('R', 'R'),
('L', 'L')
], string='方向', required=True)
shelf_height = fields.Float(string='货架高度(m)')
shelf_layer = fields.Integer(string='货架层数')
layer_capacity = fields.Integer(string='层数容量')
# 是否有货位
is_there_area = fields.Boolean(string='是否有货位', compute='_compute_is_there_area', default=False, store=True)
@api.depends('location_ids')
def _compute_is_there_area(self):
for record in self:
record.is_there_area = bool(record.location_ids)
@api.onchange('shelf_location_id')
def _onchange_shelf_location_id(self):
"""
根据货架的所属库区修改货位的所属库区
"""
if self.name:
all_location = self.env['sf.shelf.location'].search([('name', 'ilike', self.name)])
for record in self:
for location in all_location:
location.location_id = record.shelf_location_id.id
def create_location(self):
"""
当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量
"""
area_obj = self.env['sf.shelf.location']
for i in range(self.shelf_layer):
for j in range(self.layer_capacity):
location_name = self.name + '-' + str(i + 1) + '' + '-' + str(j + 1) + '位置'
# 检查是否已经有同名的位置存在
existing_location = area_obj.search([('name', '=', location_name)])
if not existing_location:
area_obj.create({
'name': location_name,
'location_id': self.shelf_location_id.id,
'barcode': self.generate_barcode(i, j),
'location_status': '空闲',
'shelf_id': self.id
})
def generate_barcode(self, i, j):
"""
生成货位条码
"""
# 这里是你生成barcode的代码
# area_type_barcode = self.location_id.barcode
area_type_barcode = self.barcode
i_str = str(i + 1).zfill(3) # 确保是两位数如果不足两位左侧补0
j_str = str(j + 1).zfill(3) # 确保是两位数如果不足两位左侧补0
return area_type_barcode + self.channel + self.direction + '-' + self.barcode + '-' + i_str + '-' + j_str
def print_all_location_barcode(self):
"""
打印所有货位编码
"""
print('=======打印货架所有货位编码=========')
for record in self.location_ids:
print('record', record)
if not record.barcode:
continue
record.ensure_one()
# qr_code_data = record.lot_qr_code
# if not qr_code_data:
# raise UserError("没有找到二维码数据。")
barcode = record.barcode
# todo 待控制
if not barcode:
raise ValidationError("请先分配序列号")
# host = "192.168.50.110" # 可以根据实际情况修改
# port = 9100 # 可以根据实际情况修改
# 获取默认打印机配置
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
if not printer_config:
raise UserError('请先配置打印机')
host = printer_config.printer_id.ip_address
port = printer_config.printer_id.port
self.print_qr_code(barcode, host, port)
class ShelfLocation(models.Model):
_name = 'sf.shelf.location'
_inherit = ['printing.utils']
_description = '货位'
_rec_name = 'barcode'
_order = 'id asc, create_date asc'
# current_location_id = fields.Many2one('sf.shelf.location', string='当前位置')
# # 目的位置
# destination_location_id = fields.Many2one('sf.shelf.location', string='目的位置')
current_move_ids = fields.One2many('stock.move.line', 'current_location_id', '当前位置调拨单')
destination_move_ids = fields.One2many('stock.move.line', 'destination_location_id', '目标位置调拨单')
storage_time = fields.Datetime('入库时间', compute='_compute_location_status')
production_id = fields.Many2one('mrp.production', string='制造订单')
active = fields.Boolean("有效", default=True)
@api.depends('location_status')
def _compute_location_status(self):
for record in self:
if record.location_status == '占用':
record.storage_time = datetime.datetime.now()
if record.location_status == '空闲':
record.storage_time = False
if record.location_status == '禁用':
record.storage_time = False
name = fields.Char('货位名称', required=True, size=20)
barcode = fields.Char('货位编码', copy=False, size=50)
qr_code = fields.Binary(string='二维码', compute='_compute_location_qr_code', store=True)
# 货架
shelf_id = fields.Many2one('sf.shelf', string='货架')
check_state = fields.Selection([
('enable', '启用'),
('close', '关闭')
], string='审核状态', default='close')
def action_check(self):
self.check_state = 'enable'
@api.depends('barcode')
def _compute_location_qr_code(self):
for record in self:
if record.barcode:
# 创建一个QRCode对象
qr = qrcode.QRCode(
version=1, # 设置版本, 1-40控制二维码的大小
error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级
box_size=10, # 设置每个格子的像素大小
border=4, # 设置边框的格子宽度
)
# 添加数据
qr.add_data(record.barcode)
qr.make(fit=True)
# 创建二维码图像
img = qr.make_image(fill_color="black", back_color="white")
# 创建一个内存文件
buffer = io.BytesIO()
img.save(buffer, format="PNG") # 将图像保存到内存文件中
# 获取二进制数据
binary_data = buffer.getvalue()
# 使用Base64编码这些二进制数据
data = base64.b64encode(binary_data)
self.qr_code = data
else:
record.qr_code = False
def print_single_location_qr_code(self):
self.ensure_one()
qr_code_data = self.qr_code
if not qr_code_data:
raise UserError("没有找到二维码数据。")
barcode = self.barcode
# host = "192.168.50.110" # 可以根据实际情况修改
# port = 9100 # 可以根据实际情况修改
# 获取默认打印机配置
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
if not printer_config:
raise UserError('请先配置打印机')
host = printer_config.printer_id.ip_address
port = printer_config.printer_id.port
self.print_qr_code(barcode, host, port)
# 获取当前wizard的视图ID或其他标识信息
view_id = self.env.context.get('view_id')
# 构造返回wizard页面的action字典
action = {
'type': 'ir.actions.act_window',
'name': '返回 Wizard',
'res_model': 'sf.shelf', # 替换为你的wizard模型名称
'view_mode': 'form',
'view_id': view_id, # 如果需要基于特定的视图返回
'target': 'new', # 如果需要在新的窗口或标签页打开
'res_id': self.shelf_id, # 如果你想要返回当前记录的视图
}
return action
# # 仓库类别selection库区、库位、货位
# location_type = fields.Selection([
# ('货架', '货架'),
# ('货位', '货位')
# ], string='存储类型')
# 绑定库区
# shelf_location_id = fields.Many2one('stock.location', string='所属库区', domain=[('location_type', '=', '库区')])
location_id = fields.Many2one('stock.location', string='所属库区')
# 产品类别 关联product.category
# product_type = fields.Many2many('product.category', string='产品类别')
# picking_product_type = fields.Many2many('stock.picking', string='调拨产品类别', related='location_dest_id.product_type')
# 货位独有字段:货位状态、产品(关联产品对象)、产品序列号(关联产品序列号对象)
location_status = fields.Selection([
('空闲', '空闲'),
('占用', '占用'),
('禁用', '禁用')
], string='货位状态', default='空闲', compute='_compute_product_num', store=True)
# product_id = fields.Many2one('product.template', string='产品')
product_id = fields.Many2one('product.product', string='产品', compute='_compute_product_id', store=True)
product_sn_id = fields.Many2one('stock.lot', string='产品序列号')
product_sn_ids = fields.Many2many('stock.lot', 'shelf_location_stock_lot', string='产品批次号')
# 产品数量
product_num = fields.Integer('总数量')
@api.depends('product_num')
def _compute_product_num(self):
for record in self:
if record.product_num > 0:
record.location_status = '占用'
elif record.product_num == 0:
record.location_status = '空闲'
# 修改货位状态为禁用
def action_location_status_disable(self):
self.location_status = '禁用'
# 修改货位状态为空闲
def action_location_status_enable(self):
self.location_status = '空闲'
@api.depends('product_sn_id')
def _compute_product_id(self):
"""
根据产品序列号,获取产品
"""
for record in self:
if record.product_sn_id:
try:
record.sudo().product_id = record.product_sn_id.product_id
# record.sudo().location_status = '占用'
record.sudo().product_num = 1
except Exception as e:
print('eeeeeee占用', e)
else:
try:
record.sudo().product_id = False
# record.sudo().location_status = '空闲'
record.sudo().product_num = 0
except Exception as e:
print('eeeeeee空闲', e)
# 调取获取货位信息接口
def get_sf_shelf_location_info(self):
config = self.env['res.config.settings'].get_values()
headers = {'Authorization': config['center_control_Authorization']}
crea_url = config['center_control_url'] + "/AutoDeviceApi/GetLocationInfos"
params = {'DeviceId': 'Cabinet-AL'}
r = requests.get(crea_url, params=params, headers=headers)
ret = r.json()
print(ret)
if ret['Succeed'] == True:
return ret['Datas']
else:
raise UserError("该库位无产品")
@api.model_create_multi
def create(self, vals_list):
# 编码重复校验
barcode_list = []
for val in vals_list:
location = self.search([('barcode', '=', val['barcode'])])
if location:
barcode_list.append(val['name'])
if barcode_list:
raise UserError("货位编码【%s】存在重复" % barcode_list)
records = super(ShelfLocation, self).create(vals_list)
return records
class Sf_stock_move_line(models.Model):
_name = 'stock.move.line'
_inherit = ['stock.move.line', 'printing.utils']
current_location_id = fields.Many2one(
'sf.shelf.location', string='当前货位', compute='_compute_current_location_id', store=True)
# location_dest_id = fields.Many2one('stock.location', string='目标库位')
location_dest_id_product_type = fields.Many2many(related='location_dest_id.product_type')
location_dest_id_value = fields.Integer(compute='_compute_location_dest_id_value', store=True)
# lot_qr_code = fields.Binary(string='二维码', compute='_compute_lot_qr_code', store=True)
lot_qr_code = fields.Binary(string='二维码', compute='_compute_lot_qr_code', store=True)
current_product_id = fields.Integer(compute='_compute_location_dest_id_value', store=True)
there_is_no_sn = fields.Boolean('是否有序列号', default=False)
rfid = fields.Char('Rfid')
rfid_barcode = fields.Char('Rfid', compute='_compute_rfid')
@api.depends('lot_id')
def _compute_rfid(self):
for item in self:
item.rfid_barcode = item.lot_id.rfid
def action_revert_inventory(self):
# 检查用户是否有执行操作的权限
if not self.env.user.has_group('sf_warehouse.group_sf_stock_user'):
raise UserError(_('抱歉,只有库管人员可以执行此动作'))
# 如果用户有权限,调用父类方法
return super().action_revert_inventory()
@api.depends('lot_name')
def _compute_lot_qr_code(self):
for record in self:
if record.lot_id:
# record.lot_qr_code = record.lot_id.lot_qr_code
# 创建一个QRCode对象
qr = qrcode.QRCode(
version=1, # 设置版本, 1-40控制二维码的大小
error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级
box_size=10, # 设置每个格子的像素大小
border=4, # 设置边框的格子宽度
)
# 添加数据
qr.add_data(record.lot_id.name)
qr.make(fit=True)
# 创建二维码图像
img = qr.make_image(fill_color="black", back_color="white")
# 创建一个内存文件
buffer = io.BytesIO()
img.save(buffer, format="PNG") # 将图像保存到内存文件中
# 获取二进制数据
binary_data = buffer.getvalue()
# 使用Base64编码这些二进制数据
data = base64.b64encode(binary_data)
self.lot_qr_code = data
else:
record.lot_qr_code = False
def print_single_method(self):
self.ensure_one()
qr_code_data = self.lot_qr_code
if not qr_code_data:
raise UserError("没有找到二维码数据。")
lot_name = self.lot_name
# host = "192.168.50.110" # 可以根据实际情况修改
# port = 9100 # 可以根据实际情况修改
# 获取默认打印机配置
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
if not printer_config:
raise UserError('请先配置打印机')
host = printer_config.printer_id.ip_address
port = printer_config.printer_id.port
self.print_qr_code(lot_name, host, port)
# 返回当前wizard页面
# base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
# return {
# 'type': 'ir.actions.act_url',
# 'url': str(base_url) + download_url,
# 'target': 'self',
# }
# 获取当前wizard的视图ID或其他标识信息
view_id = self.env.context.get('view_id')
# 构造返回wizard页面的action字典
action = {
'type': 'ir.actions.act_window',
'name': '返回 Wizard',
'res_model': 'stock.move', # 替换为你的wizard模型名称
'view_mode': 'form',
'view_id': view_id, # 如果需要基于特定的视图返回
'target': 'new', # 如果需要在新的窗口或标签页打开
'res_id': self.id, # 如果你想要返回当前记录的视图
}
return action
# def generate_zpl_code(self, code):
# # 初始化ZPL代码字符串
# zpl_code = "^XA\n"
# zpl_code += "^CW1,E:SIMSUN.TTF^FS\n"
# zpl_code += "^CI28\n"
#
# # 设置二维码位置
# zpl_code += "^FO50,50\n" # 调整二维码位置,使其与资产编号在同一行
# zpl_code += f"^BQN,2,6^FDLM,B0093{code}^FS\n"
#
# # 设置资产编号文本位置
# zpl_code += "^FO300,60\n" # 资产编号文本的位置,与二维码在同一行
# zpl_code += "^A1N,45,45^FD编码名称: ^FS\n"
#
# # 设置{code}文本位置
# # 假设{code}文本需要位于资产编号和二维码下方,中间位置
# # 设置{code}文本位置并启用自动换行
# zpl_code += "^FO300,120\n" # {code}文本的起始位置
# zpl_code += "^FB500,4,0,L,0\n" # 定义一个宽度为500点的文本框最多4行左对齐
# zpl_code += f"^A1N,40,40^FD{code}^FS\n"
#
# # 在{code}文本框周围绘制线框
# # 假设线框的外部尺寸为宽度500点高度200点
# # zpl_code += "^FO300,110^GB500,200,2^FS\n" # 绘制线框边框粗细为2点
#
# zpl_code += "^PQ1,0,1,Y\n"
# zpl_code += "^XZ\n"
# return zpl_code
#
# def send_to_printer(self, host, port, zpl_code):
# # 将ZPL代码转换为字节串
# print('zpl_code', zpl_code)
# zpl_bytes = zpl_code.encode('utf-8')
# print(zpl_bytes)
#
# # 创建socket对象
# mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# try:
# mysocket.connect((host, port)) # 连接到打印机
# mysocket.send(zpl_bytes) # 发送ZPL代码
# print("ZPL code sent to printer successfully.")
# except Exception as e:
# print(f"Error with the connection: {e}")
# finally:
# mysocket.close() # 关闭连接
#
# def print_qr_code(self):
# self.ensure_one() # 确保这个方法只为一个记录调用
# # if not self.lot_id:
# # raise UserError("没有找到序列号。")
# # 假设_lot_qr_code方法已经生成了二维码并保存在字段中
# qr_code_data = self.lot_qr_code
# if not qr_code_data:
# raise UserError("没有找到二维码数据。")
# # 生成ZPL代码
# zpl_code = self.generate_zpl_code(self.lot_id.name)
# # 设置打印机的IP地址和端口号
# host = "192.168.50.110"
# port = 9100
# # 发送ZPL代码到打印机
# self.send_to_printer(host, port, zpl_code)
#
# # # 生成下载链接或直接触发下载
# # # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例
# # attachment = self.env['ir.attachment'].sudo().create({
# # 'datas': self.lot_qr_code,
# # 'type': 'binary',
# # 'description': '二维码图片',
# # 'name': self.lot_name + '.png',
# # # 'res_id': invoice.id,
# # # 'res_model': 'stock.picking',
# # 'public': True,
# # 'mimetype': 'application/x-png',
# # # 'model_name': 'stock.picking',
# # })
# # # 返回附件的下载链接
# # download_url = '/web/content/%s?download=true' % attachment.id
# # base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
# # return {
# # 'type': 'ir.actions.act_url',
# # 'url': str(base_url) + download_url,
# # 'target': 'self',
# # }
# # # 定义一个方法,用于根据序列号生成二维码
# # @api.depends('lot_id')
# def generate_lot_qr_code(self):
# # 创建一个QRCode对象
# qr = qrcode.QRCode(
# version=1, # 设置版本, 1-40控制二维码的大小
# error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级
# box_size=10, # 设置每个格子的像素大小
# border=4, # 设置边框的格子宽度
# )
#
# # 添加数据
# qr.add_data(self.lot_id.name)
# qr.make(fit=True)
#
# # 创建二维码图像
# img = qr.make_image(fill_color="black", back_color="white")
#
# # 创建一个内存文件
# buffer = io.BytesIO()
# img.save(buffer, format="PNG") # 将图像保存到内存文件中
#
# # 获取二进制数据
# binary_data = buffer.getvalue()
#
# # 使用Base64编码这些二进制数据
# data = base64.b64encode(binary_data)
# self.lot_qr_code = data
# attachment = self.env['ir.attachment'].sudo().create({
# 'datas': data,
# 'type': 'binary',
# 'description': '二维码图片',
# 'name': self.lot_id.name + '.png',
# # 'res_id': invoice.id,
# # 'res_model': 'stock.picking',
# 'public': True,
# 'mimetype': 'application/pdf',
# # 'model_name': 'stock.picking',
# })
# def button_test(self):
# print(self.picking_id.name)
# stock_picking = self.env['stock.picking'].search([('name', '=', self.picking_id.name)], limit=1)
# print(self.picking_id.name)
# print(aa.move_line_ids.lot_id.name)
# # 获取当前的stock.picking对象
# current_picking = self.env['stock.picking'].search([('name', '=', self.picking_id.name)], limit=1)
#
# # 获取当前picking的第一个stock.move对象
# current_move = current_picking.move_ids[0] if current_picking.move_ids else False
#
# # 如果存在相关的stock.move对象
# if current_move:
# # 获取源stock.move对象
# origin_move = current_move.move_orig_ids[0] if current_move.move_orig_ids else False
#
# # 从源stock.move对象获取源stock.picking对象
# origin_picking = origin_move.picking_id if origin_move else False
# # 现在origin_picking就是current_picking的上一步
# # 获取目标stock.move对象
# dest_move = current_move.move_dest_ids[0] if current_move.move_dest_ids else False
#
# # 从目标stock.move对象获取目标stock.picking对象
# dest_picking = dest_move.picking_id if dest_move else False
# # 现在dest_picking就是current_picking的下一步
@api.depends('location_id')
def _compute_current_location_id(self):
for record in self:
# 使用record代替self来引用当前遍历到的记录
logging.info('record.picking_id.name: %s' % record.picking_id.name)
logging.info('record.env: %s' % record.env['stock.picking'].search([('name', '=', record.picking_id.name)]))
# 获取当前的stock.picking对象
current_picking = record.env['stock.picking'].search([('name', '=', record.picking_id.name)], limit=1)
# 获取当前picking的第一个stock.move对象
current_move = current_picking.move_ids[0] if current_picking.move_ids else False
# 如果存在相关的stock.move对象
if current_move:
# 获取源stock.move对象
origin_move = current_move.move_orig_ids[0] if current_move.move_orig_ids else False
# 从源stock.move对象获取源stock.picking对象
origin_picking = origin_move.picking_id if origin_move else False
# 如果前一个调拨单有目标货位
if origin_picking:
for i in current_picking.move_line_ids:
for j in origin_picking.move_line_ids:
if j.destination_location_id and i.lot_id == j.lot_id:
# 更新当前记录的current_location_id字段
record.current_location_id = j.destination_location_id
# # 获取目标stock.move对象
# dest_move = current_move.move_dest_ids[0] if current_move.move_dest_ids else False
#
# # 从目标stock.move对象获取目标stock.picking对象
# dest_picking = dest_move.picking_id if dest_move else False
# # 现在dest_picking就是current_picking的下一步
# 是一张单据一张单据往下走的,所以这里的目标货位是上一张单据的当前货位,且这样去计算是可以的。
@api.depends('location_dest_id')
def _compute_location_dest_id_value(self):
for record in self:
record.location_dest_id_value = record.location_dest_id.id if record.location_dest_id else False
record.current_product_id = record.product_id.id if record.product_id else False
destination_location_id = fields.Many2one(
'sf.shelf.location', string='目标货位')
def compute_destination_location_id(self):
for record in self:
obj = self.env['sf.shelf.location'].search([('name', '=',
self.destination_location_id.name)])
if record.lot_id:
if record.product_id.tracking == 'serial':
shelf_location_obj = self.env['sf.shelf.location'].search(
[('product_sn_id', '=', record.lot_id.id)])
if shelf_location_obj:
shelf_location_obj.product_sn_id = False
if obj:
obj.product_sn_id = record.lot_id.id
else:
if obj:
obj.product_sn_id = record.lot_id.id
elif record.product_id.tracking == 'lot':
obj.product_sn_ids |= record.lot_id
if not obj.product_id:
obj.product_id = record.product_id.id
else:
if obj:
obj.product_id = record.product_id.id
# obj.location_status = '占用'
obj.product_num += record.qty_done
@api.onchange('destination_location_id')
def _check_destination_location_id(self):
for item in self:
if item:
i = 0
barcode = item.destination_location_id.barcode
for line in item.picking_id.move_line_ids_without_package:
if barcode and barcode == line.destination_location_id.barcode:
i += 1
if i > 1:
raise ValidationError(
'%s】货位已经被占用,请重新选择!!!' % item.destination_location_id.barcode)
class SfStockPicking(models.Model):
_inherit = 'stock.picking'
check_in = fields.Char(string='查询是否为入库单', compute='_check_is_in')
@api.depends('name')
def _check_is_in(self):
"""
判断是否为出库单
"""
if self.name:
is_check_in = self.name.split('/')
self.check_in = is_check_in[1]
def button_validate(self):
"""
重写验证方法,当验证时意味着调拨单已经完成,已经移动到了目标货位,所以需要将当前货位的状态改为空闲
"""
res = super(SfStockPicking, self).button_validate()
for line in self.move_line_ids:
if line:
if line.destination_location_id:
# 调用入库方法进行入库刀货位
line.compute_destination_location_id()
else:
# 对除刀柄之外的刀具物料进行 目标货位必填校验
if self.location_dest_id.name == '刀具房' and line.product_id.cutting_tool_material_id.name not in (
'刀柄', False):
raise ValidationError('请选择【%s】产品的目标货位!' % line.product_id.name)
if line.current_location_id:
if line.current_location_id.product_sn_id:
line.current_location_id.product_sn_id = False
# line.current_location_id.location_status = '空闲'
line.current_location_id.product_num = 0
# 对入库作业的刀柄和托盘进行Rfid绑定校验
for move in self.move_ids:
if move and move.product_id.cutting_tool_material_id.name == '刀柄' or '托盘' in (
move.product_id.fixture_material_id.name or ''):
for item in move.move_line_nosuggest_ids:
if item.location_dest_id.name == '进货':
if not item.rfid:
raise ValidationError('你需要提供%s的Rfid' % move.product_id.name)
self.env['stock.lot'].search([('name', '=', item.lot_name)]).write({'rfid': item.rfid})
return res
# def print_all_barcode(self):
# """
# 打印所有编码
# """
# print('================')
# for record in self.move_ids_without_package:
# print('record', record)
# print('record.move_line_ids', record.move_line_ids)
#
# # record.move_line_ids.print_qr_code()
#
# print('record.move_line_ids.lot_id', record.move_line_ids.lot_id)
# print('record.move_line_ids.lot_id.name', record.move_line_ids.lot_id.name)
class SfProcurementGroup(models.Model):
_inherit = 'procurement.group'
@api.model
def _search_rule(self, route_ids, packaging_id, product_id, warehouse_id, domain):
"""
修改路线多规则条件选取
"""
if warehouse_id:
domain = expression.AND(
[['|', ('warehouse_id', '=', warehouse_id.id), ('warehouse_id', '=', False)], domain])
Rule = self.env['stock.rule']
res = self.env['stock.rule']
if route_ids:
res_list = Rule.search(expression.AND([[('route_id', 'in', route_ids.ids)], domain]),
order='route_sequence, sequence')
for res1 in res_list:
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
res1.location_src_id.product_type:
res = res1
if not res:
res = Rule.search(expression.AND([[('route_id', 'in', route_ids.ids)], domain]),
order='route_sequence, sequence', limit=1)
if not res and packaging_id:
packaging_routes = packaging_id.route_ids
if packaging_routes:
res_list = Rule.search(expression.AND([[('route_id', 'in', packaging_routes.ids)], domain]),
order='route_sequence, sequence')
for res1 in res_list:
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
res1.location_src_id.product_type:
res = res1
if not res:
res = Rule.search(expression.AND([[('route_id', 'in', packaging_routes.ids)], domain]),
order='route_sequence, sequence', limit=1)
if not res:
product_routes = product_id.route_ids | product_id.categ_id.total_route_ids
if product_routes:
res_list = Rule.search(expression.AND([[('route_id', 'in', product_routes.ids)], domain]),
order='route_sequence, sequence')
for res1 in res_list:
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
res1.location_src_id.product_type:
res = res1
if not res:
res = Rule.search(expression.AND([[('route_id', 'in', product_routes.ids)], domain]),
order='route_sequence, sequence', limit=1)
if not res and warehouse_id:
warehouse_routes = warehouse_id.route_ids
if warehouse_routes:
res_list = Rule.search(expression.AND([[('route_id', 'in', warehouse_routes.ids)], domain]),
order='route_sequence, sequence')
for res1 in res_list:
if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \
res1.location_src_id.product_type:
res = res1
if not res:
res = Rule.search(expression.AND([[('route_id', 'in', warehouse_routes.ids)], domain]),
order='route_sequence, sequence', limit=1)
return res
# class SfPickingType(models.Model):
# _inherit = 'stock.picking.type'
#
# def _default_show_operations(self):
# return self.user_has_groups('stock.group_production_lot,'
# 'stock.group_stock_multi_locations,'
# 'stock.group_tracking_lot',
# 'sf_warehouse.group_sf_stock_user',
# 'sf_warehouse.group_sf_stock_manager')
class SfPickingType(models.Model):
_inherit = 'stock.picking.type'
def _default_show_operations(self):
return self.user_has_groups(
'stock.group_production_lot,'
'stock.group_stock_multi_locations,'
'stock.group_tracking_lot,'
'sf_warehouse.group_sf_stock_user,'
'sf_warehouse.group_sf_stock_manager'
)
class CustomStockMove(models.Model):
_name = 'stock.move'
_inherit = ['stock.move', 'printing.utils', 'barcodes.barcode_events_mixin']
def on_barcode_scanned(self, barcode):
"""
采购入库扫码绑定Rfid码
"""
for record in self:
if record:
lot = self.env['stock.lot'].sudo().search([('rfid', '=', barcode)])
if lot:
if lot.product_id.cutting_tool_material_id:
material = lot.product_id.cutting_tool_material_id.name
else:
material = lot.product_id.fixture_material_id.name
raise ValidationError(
'该Rfid【%s】已经被序列号为【%s】的【%s】物料所占用!' % (barcode, lot.name, material))
if '刀柄' in (record.product_id.cutting_tool_material_id.name or '') or '托盘' in (
record.product_id.fixture_material_id.name or ''):
for move_line_nosuggest_id in record.move_line_nosuggest_ids:
if move_line_nosuggest_id.rfid:
if move_line_nosuggest_id.rfid == barcode:
if record.product_id.cutting_tool_material_id.name:
raise ValidationError('该刀柄的Rfid已经录入请勿重复录入')
else:
raise ValidationError('该托盘的Rfid已经录入请勿重复录入')
else:
line_id = int(re.sub(r"\D", "", str(move_line_nosuggest_id.id)))
self.env['stock.move.line'].sudo().search([('id', '=', line_id)]).write({'rfid': barcode})
move_line_nosuggest_id.rfid = barcode
break
else:
raise ValidationError('该产品不需要录入Rfid')
def action_assign_serial_show_details(self):
# 首先执行原有逻辑
result = super(CustomStockMove, self).action_assign_serial_show_details()
# 接着为每个 lot_name 生成二维码
move_lines = self.move_line_ids # 获取当前 stock.move 对应的所有 stock.move.line 记录
for line in move_lines:
if line.lot_name: # 确保 lot_name 存在
qr_data = self.compute_lot_qr_code(line.lot_name)
# 假设 stock.move.line 模型中有一个字段叫做 lot_qr_code 用于存储二维码数据
line.lot_qr_code = qr_data
return result
def compute_lot_qr_code(self, lot_name):
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(lot_name)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
binary_data = buffer.getvalue()
data = base64.b64encode(binary_data).decode() # 确保返回的是字符串形式的数据
return data
def print_all_barcode(self):
"""
打印所有编码
"""
print('================')
for record in self.move_line_ids:
print('record', record)
if not record.lot_name:
continue
record.ensure_one()
# qr_code_data = record.lot_qr_code
# if not qr_code_data:
# raise UserError("没有找到二维码数据。")
lot_name = record.lot_name
# todo 待控制
if not lot_name:
raise ValidationError("请先分配序列号")
# host = "192.168.50.110" # 可以根据实际情况修改
# port = 9100 # 可以根据实际情况修改
# 获取默认打印机配置
printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1)
if not printer_config:
raise UserError('请先配置打印机')
host = printer_config.printer_id.ip_address
port = printer_config.printer_id.port
record.print_qr_code(lot_name, host, port)