# -*- coding: utf-8 -*- import re import datetime import logging import base64 import qrcode import io import requests from odoo import api, fields, models, _ from odoo.osv import expression from odoo.exceptions import UserError, ValidationError class SfLocation(models.Model): _inherit = 'stock.location' # 重写字段定义 name = fields.Char('Location Name', required=True, size=20) barcode = fields.Char('Barcode', copy=False, size=15) # 仓库类别(selection:库区、库位、货位) # location_type = fields.Selection([ # ('库区', '库区'), # ('货架', '货架'), # ('货位', '货位') # ], string='存储类型') location_type = fields.Selection([ ('库区', '库区') ], string='存储类型') # 库区类型(selection:拣货区、存货区、收货区、退货区、次品区) area_type = fields.Selection([ ('拣货区', '拣货区'), ('存货区', '存货区'), ('收货区', '收货区'), ('退货区', '退货区'), ('次品区', '次品区') ], string='库区类型') # 当前位置 current_location_id = fields.Many2one('sf.shelf.location', string='当前位置') # 目的位置 destination_location_id = fields.Many2one('sf.shelf.location', string='目的位置') # 存储类型(selection:库区、货架) # storage_type = fields.Selection([ # ('库区', '库区'), # ('货架', '货架') # ], string='存储类型') # 产品类别 (关联:product.category) product_type = fields.Many2many('product.category', string='产品类别') # 货架独有字段:通道、方向、货架高度(m)、货架层数、层数容量 channel = fields.Char(string='通道') direction = fields.Selection([ ('R', 'R'), ('L', 'L') ], string='方向') shelf_height = fields.Float(string='货架高度(m)') shelf_layer = fields.Integer(string='货架层数') layer_capacity = fields.Integer(string='层数容量') # 货位独有字段:货位状态、产品(关联产品对象)、产品序列号(关联产品序列号对象) location_status = fields.Selection([ ('空闲', '空闲'), ('占用', '占用'), ('禁用', '禁用') ], string='货位状态', default='空闲') # product_id = fields.Many2one('product.template', string='产品') product_id = fields.Many2one('product.product', string='产品', compute='_compute_product_id', readonly=True) product_sn_id = fields.Many2one('stock.lot', string='产品序列号') # time_test = fields.Char(string='time') # 添加SQL约束 # _sql_constraints = [ # ('name_uniq', 'unique(name)', '位置名称必须唯一!'), # ] hide_location_type = fields.Boolean(compute='_compute_hide_what', string='隐藏仓库') hide_area = fields.Boolean(compute='_compute_hide_what', string='隐藏库区') hide_shelf = fields.Boolean(compute='_compute_hide_what', string='隐藏货架') hide_location = fields.Boolean(compute='_compute_hide_what', string='隐藏货位') # @api.model # def create(self, vals): # """ # 重写create方法,添加自定义的约束 # """ # print('create', vals) # if vals.get('location_id'): # location = self.env['stock.location'].browse(vals.get('location_id')) # if location.storage_type == '库区': # raise UserError('库区不能作为父级仓库') # return super().create(vals) # # @api.onchange('location_id') # def _onchange_location_id(self): # """ # 重写onchange方法,添加自定义的约束 # """ # if self.location_id: # if self.location_id.storage_type == '库区': # raise UserError('库区不能作为父级仓库') # @api.constrains('shelf_height') # def _check_shelf_height(self): # for record in self: # if not (0 <= record.shelf_height < 1000): # 限制字段值在0到999之间 # raise UserError('shelf_height的值必须在0到1000之间') # # @api.constrains('shelf_layer') # def _check_shelf_layer(self): # for record in self: # if not (0 < record.shelf_layer < 1000): # raise UserError('shelf_layer的值必须在0到999之间,且不能为0') # # @api.constrains('layer_capacity') # def _check_layer_capacity(self): # for record in self: # if not (0 <= record.layer_capacity < 1000): # raise UserError('layer_capacity的值必须在0到999之间,且不能为0') @api.depends('product_sn_id') def _compute_product_id(self): """ 根据产品序列号,获取产品 """ for record in self: if record.product_sn_id: record.product_id = record.product_sn_id.product_id # record.location_status = '占用' else: record.product_id = False # record.location_status = '空闲' @api.depends('location_type') def _compute_hide_what(self): """ 根据仓库类别,隐藏不需要的字段 :return: """ for record in self: record.hide_location_type = False record.hide_area = False record.hide_shelf = False record.hide_location = False if record.location_type and record.location_type == '仓库': record.hide_location_type = True elif record.location_type and record.location_type == '库区': record.hide_area = True elif record.location_type and record.location_type == '货架': record.hide_shelf = True elif record.location_type and record.location_type == '货位': record.hide_location = True else: pass # # 添加Python约束 # @api.constrains('name', 'barcode') # def _check_len(self): # for rec in self: # if len(rec.name) > 20: # raise ValidationError("Location Name length must be less equal than 20!") # if len(rec.barcode) > 15: # raise ValidationError("Barcode length must be less equal than 15!") # @api.model # def default_get(self, fields): # print('fields:', fields) # res = super(SfLocation, self).default_get(fields) # print('res:', res) # if 'barcode' in fields and 'barcode' not in res: # # 这里是你生成barcode的代码 # pass # # res['barcode'] = self.generate_barcode() # 假设你有一个方法generate_barcode来生成barcode # return res # @api.model # def create(self, vals): # """ # 重写create方法,当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量 # """ # res = super(SfLocation, self).create(vals) # if res.location_type == '货架': # for i in range(res.shelf_layer): # for j in range(res.layer_capacity): # self.create({ # 'name': res.name + '-' + str(i+1) + '-' + str(j+1), # 'location_id': res.id, # 'location_type': '货位', # 'barcode': self.generate_barcode(res, i, j), # 'location_status': '空闲' # }) # return res # 生成货位 def create_location(self): """ 当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量 """ pass # if self.location_type == '货架': # for i in range(self.shelf_layer): # for j in range(self.layer_capacity): # self.create({ # 'name': self.name + '-' + str(i + 1) + '层' + '-' + str(j + 1) + '位置', # 'location_id': self.id, # 'location_type': '货位', # 'barcode': self.generate_barcode(i, j), # 'location_status': '空闲' # }) def generate_barcode(self, i, j): """ 生成货位条码 """ pass # # 这里是你生成barcode的代码 # # area_type_barcode = self.location_id.barcode # area_type_barcode = self.barcode # i_str = str(i + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0 # j_str = str(j + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0 # return area_type_barcode + self.channel + self.direction + '-' + self.barcode + '-' + i_str + '-' + j_str class SfShelf(models.Model): _name = 'sf.shelf' _inherit = ['printing.utils'] _description = '货架' _order = 'create_date desc' name = fields.Char('货架名称', required=True, size=20) active = fields.Boolean("有效", default=True) barcode = fields.Char('编码', copy=False, size=15, required=True) # 货位 location_ids = fields.One2many('sf.shelf.location', 'shelf_id', string='货位') check_state = fields.Selection([ ('enable', '启用'), ('close', '关闭') ], string='审核状态', default='close') def action_check(self): self.check_state = 'enable' # 绑定库区 shelf_location_id = fields.Many2one('stock.location', string='所属库区') # 货架独有字段:通道、方向、货架高度(m)、货架层数、层数容量 channel = fields.Char(string='通道', required=True, size=10) direction = fields.Selection([ ('R', 'R'), ('L', 'L') ], string='方向', required=True) shelf_height = fields.Float(string='货架高度(m)') shelf_layer = fields.Integer(string='货架层数') layer_capacity = fields.Integer(string='层数容量') shelf_rotative_Boolean = fields.Boolean('循环货架', default=False) # 是否有货位 is_there_area = fields.Boolean(string='是否有货位', compute='_compute_is_there_area', default=False, store=True) @api.depends('location_ids') def _compute_is_there_area(self): for record in self: record.is_there_area = bool(record.location_ids) @api.onchange('shelf_location_id') def _onchange_shelf_location_id(self): """ 根据货架的所属库区修改货位的所属库区 """ if self.name: all_location = self.env['sf.shelf.location'].search([('name', 'ilike', self.name)]) for record in self: for location in all_location: location.location_id = record.shelf_location_id.id def create_location(self): """ 当仓库类型为货架时,自动生成其下面的货位,数量为货架层数*层数容量 """ area_obj = self.env['sf.shelf.location'] for i in range(self.shelf_layer): for j in range(self.layer_capacity): location_name = self.name + '-' + str(i + 1) + '层' + '-' + str(j + 1) + '位置' # 检查是否已经有同名的位置存在 existing_location = area_obj.search([('name', '=', location_name)]) if not existing_location: area_obj.create({ 'name': location_name, 'location_id': self.shelf_location_id.id, 'barcode': self.generate_barcode(i, j), 'location_status': '空闲', 'shelf_id': self.id }) def generate_barcode(self, i, j): """ 生成货位条码 """ # 这里是你生成barcode的代码 # area_type_barcode = self.location_id.barcode area_type_barcode = self.barcode i_str = str(i + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0 j_str = str(j + 1).zfill(3) # 确保是两位数,如果不足两位,左侧补0 return area_type_barcode + self.channel + self.direction + '-' + self.barcode + '-' + i_str + '-' + j_str def print_all_location_barcode(self): """ 打印所有货位编码 """ print('=======打印货架所有货位编码=========') for record in self.location_ids: print('record', record) if not record.barcode: continue record.ensure_one() # qr_code_data = record.lot_qr_code # if not qr_code_data: # raise UserError("没有找到二维码数据。") barcode = record.barcode # todo 待控制 if not barcode: raise ValidationError("请先分配序列号") # host = "192.168.50.110" # 可以根据实际情况修改 # port = 9100 # 可以根据实际情况修改 # 获取默认打印机配置 printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1) if not printer_config: raise UserError('请先配置打印机') host = printer_config.printer_id.ip_address port = printer_config.printer_id.port self.print_qr_code(barcode, host, port) class ShelfLocation(models.Model): _name = 'sf.shelf.location' _inherit = ['printing.utils'] _description = '货位' # _rec_name = 'barcode' _order = 'id asc, create_date asc' # current_location_id = fields.Many2one('sf.shelf.location', string='当前位置') # # 目的位置 # destination_location_id = fields.Many2one('sf.shelf.location', string='目的位置') current_move_ids = fields.One2many('stock.move.line', 'current_location_id', '当前位置调拨单') destination_move_ids = fields.One2many('stock.move.line', 'destination_location_id', '目标位置调拨单') storage_time = fields.Datetime('入库时间', compute='_compute_location_status') production_id = fields.Many2one('mrp.production', string='制造订单') active = fields.Boolean("有效", default=True) @api.depends('location_status') def _compute_location_status(self): for record in self: if record.location_status == '占用': record.storage_time = datetime.datetime.now() if record.location_status == '空闲': record.storage_time = False if record.location_status == '禁用': record.storage_time = False name = fields.Char('货位名称', required=True, size=20) barcode = fields.Char('货位编码', copy=False, size=50) rotative_Boolean = fields.Boolean('循环货位', related='shelf_id.shelf_rotative_Boolean', store=True) qr_code = fields.Binary(string='二维码', compute='_compute_location_qr_code', store=True) # 货架 shelf_id = fields.Many2one('sf.shelf', string='货架') check_state = fields.Selection([ ('enable', '启用'), ('close', '关闭') ], string='审核状态', default='close') def action_check(self): self.check_state = 'enable' @api.depends('barcode') def _compute_location_qr_code(self): for record in self: if record.barcode: # 创建一个QRCode对象 qr = qrcode.QRCode( version=1, # 设置版本, 1-40,控制二维码的大小 error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级 box_size=10, # 设置每个格子的像素大小 border=4, # 设置边框的格子宽度 ) # 添加数据 qr.add_data(record.barcode) qr.make(fit=True) # 创建二维码图像 img = qr.make_image(fill_color="black", back_color="white") # 创建一个内存文件 buffer = io.BytesIO() img.save(buffer, format="PNG") # 将图像保存到内存文件中 # 获取二进制数据 binary_data = buffer.getvalue() # 使用Base64编码这些二进制数据 data = base64.b64encode(binary_data) self.qr_code = data else: record.qr_code = False def print_single_location_qr_code(self): self.ensure_one() qr_code_data = self.qr_code if not qr_code_data: raise UserError("没有找到二维码数据。") barcode = self.barcode # host = "192.168.50.110" # 可以根据实际情况修改 # port = 9100 # 可以根据实际情况修改 # 获取默认打印机配置 printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1) if not printer_config: raise UserError('请先配置打印机') host = printer_config.printer_id.ip_address port = printer_config.printer_id.port self.print_qr_code(barcode, host, port) # # 获取当前wizard的视图ID或其他标识信息 # view_id = self.env.context.get('view_id') # # 构造返回wizard页面的action字典 # action = { # 'type': 'ir.actions.act_window', # 'name': '返回 Wizard', # 'res_model': 'sf.shelf', # 替换为你的wizard模型名称 # 'view_mode': 'form', # 'view_id': view_id, # 如果需要基于特定的视图返回 # 'target': 'new', # 如果需要在新的窗口或标签页打开 # 'res_id': self.shelf_id, # 如果你想要返回当前记录的视图 # } # return action # # 仓库类别(selection:库区、库位、货位) # location_type = fields.Selection([ # ('货架', '货架'), # ('货位', '货位') # ], string='存储类型') # 绑定库区 # shelf_location_id = fields.Many2one('stock.location', string='所属库区', domain=[('location_type', '=', '库区')]) location_id = fields.Many2one('stock.location', string='所属库区') # 产品类别 (关联:product.category) # product_type = fields.Many2many('product.category', string='产品类别') # picking_product_type = fields.Many2many('stock.picking', string='调拨产品类别', related='location_dest_id.product_type') # 货位独有字段:货位状态、产品(关联产品对象)、产品序列号(关联产品序列号对象) location_status = fields.Selection([ ('空闲', '空闲'), ('占用', '占用'), ('禁用', '禁用') ], string='货位状态', default='空闲', compute='_compute_product_num', store=True) # product_id = fields.Many2one('product.template', string='产品') product_id = fields.Many2one('product.product', string='产品', compute='_compute_product_id', store=True) product_sn_id = fields.Many2one('stock.lot', string='产品序列号') product_sn_ids = fields.One2many('sf.shelf.location.lot', 'shelf_location_id', string='产品批次号') # 产品数量 product_num = fields.Integer('总数量', compute='_compute_number', store=True) @api.depends('product_num') def _compute_product_num(self): for record in self: if record.product_num > 0: record.location_status = '占用' elif record.product_num == 0: record.location_status = '空闲' @api.depends('product_sn_ids.qty') def _compute_number(self): for item in self: if item.product_sn_ids: qty = 0 for product_sn_id in item.product_sn_ids: qty += product_sn_id.qty item.product_num = qty # 修改货位状态为禁用 def action_location_status_disable(self): self.location_status = '禁用' # 修改货位状态为空闲 def action_location_status_enable(self): self.location_status = '空闲' @api.depends('product_sn_id', 'product_sn_ids') def _compute_product_id(self): """ 根据产品序列号,获取产品 """ for record in self: if record.product_sn_id: try: record.sudo().product_id = record.product_sn_id.product_id # record.sudo().location_status = '占用' record.sudo().product_num = 1 except Exception as e: print('eeeeeee占用', e) elif record.product_sn_ids: return True else: try: record.sudo().product_id = False # record.sudo().location_status = '空闲' record.sudo().product_num = 0 except Exception as e: print('eeeeeee空闲', e) # 调取获取货位信息接口 def get_sf_shelf_location_info(self, device_id='Cabinet-AL'): config = self.env['res.config.settings'].get_values() headers = {'Authorization': config['center_control_Authorization']} crea_url = config['center_control_url'] + "/AutoDeviceApi/GetLocationInfos" params = {'DeviceId': device_id} r = requests.get(crea_url, params=params, headers=headers) ret = r.json() print(ret) if ret['Succeed'] == True: return ret['Datas'] else: raise UserError("该库位无产品") @api.model_create_multi def create(self, vals_list): # 编码重复校验 barcode_list = [] for val in vals_list: location = self.search([('barcode', '=', val['barcode'])]) if location: barcode_list.append(val['name']) if barcode_list: raise UserError("货位编码【%s】存在重复" % barcode_list) records = super(ShelfLocation, self).create(vals_list) return records class SfShelfLocationLot(models.Model): _name = 'sf.shelf.location.lot' _description = '批次数量' name = fields.Char('名称', related='lot_id.name') shelf_location_id = fields.Many2one('sf.shelf.location', string="货位") lot_id = fields.Many2one('stock.lot', string='批次号') qty = fields.Integer('数量') qty_num = fields.Integer('变更数量') @api.onchange('qty_num') def _onchange_qty_num(self): for item in self: if item.qty_num > item.qty: raise ValidationError('变更数量不能比库存数量大!!!') class SfStockMoveLine(models.Model): _name = 'stock.move.line' _inherit = ['stock.move.line', 'printing.utils'] stock_lot_name = fields.Char('序列号名称', related='lot_id.name') current_location_id = fields.Many2one( 'sf.shelf.location', string='当前货位', compute='_compute_current_location_id', store=True, readonly=False, domain="[('product_id', '=', product_id),'|',('product_sn_id.name', '=', stock_lot_name),('product_sn_ids.lot_id.name','=',stock_lot_name)]") # location_dest_id = fields.Many2one('stock.location', string='目标库位') location_dest_id_product_type = fields.Many2many(related='location_dest_id.product_type') location_dest_id_value = fields.Integer(compute='_compute_location_dest_id_value', store=True) # lot_qr_code = fields.Binary(string='二维码', compute='_compute_lot_qr_code', store=True) lot_qr_code = fields.Binary(string='二维码', compute='_compute_lot_qr_code', store=True) current_product_id = fields.Integer(compute='_compute_location_dest_id_value', store=True) there_is_no_sn = fields.Boolean('是否有序列号', default=False) rfid = fields.Char('Rfid') rfid_barcode = fields.Char('Rfid', compute='_compute_rfid') @api.depends('lot_id') def _compute_rfid(self): for item in self: item.rfid_barcode = item.lot_id.rfid def action_revert_inventory(self): # 检查用户是否有执行操作的权限 if not self.env.user.has_group('sf_warehouse.group_sf_stock_user'): raise UserError(_('抱歉,只有库管人员可以执行此动作')) # 如果用户有权限,调用父类方法 return super().action_revert_inventory() @api.depends('lot_name') def _compute_lot_qr_code(self): for record in self: if record.lot_id: # record.lot_qr_code = record.lot_id.lot_qr_code # 创建一个QRCode对象 qr = qrcode.QRCode( version=1, # 设置版本, 1-40,控制二维码的大小 error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级 box_size=10, # 设置每个格子的像素大小 border=4, # 设置边框的格子宽度 ) # 添加数据 qr.add_data(record.lot_id.name) qr.make(fit=True) # 创建二维码图像 img = qr.make_image(fill_color="black", back_color="white") # 创建一个内存文件 buffer = io.BytesIO() img.save(buffer, format="PNG") # 将图像保存到内存文件中 # 获取二进制数据 binary_data = buffer.getvalue() # 使用Base64编码这些二进制数据 data = base64.b64encode(binary_data) self.lot_qr_code = data else: record.lot_qr_code = False def print_single_method(self): self.ensure_one() qr_code_data = self.lot_qr_code if not qr_code_data: raise UserError("没有找到二维码数据。") lot_name = self.lot_name # 增加"当为坯料时,只打印序列号的前面部分" if self.lot_name: # 确保 lot_name 存在 if self.product_id.categ_id.name == '坯料': lot_name = lot_name.split('[', 1)[0] # host = "192.168.50.110" # 可以根据实际情况修改 # port = 9100 # 可以根据实际情况修改 # 获取默认打印机配置 printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1) if not printer_config: raise UserError('请先配置打印机') host = printer_config.printer_id.ip_address port = printer_config.printer_id.port self.print_qr_code(lot_name, host, port) # 返回当前wizard页面 # base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') # return { # 'type': 'ir.actions.act_url', # 'url': str(base_url) + download_url, # 'target': 'self', # } # 获取当前wizard的视图ID或其他标识信息 view_id = self.env.context.get('view_id') # 构造返回wizard页面的action字典 action = { 'type': 'ir.actions.act_window', 'name': '返回 Wizard', 'res_model': 'stock.move', # 替换为你的wizard模型名称 'view_mode': 'form', 'view_id': view_id, # 如果需要基于特定的视图返回 'target': 'new', # 如果需要在新的窗口或标签页打开 'res_id': self.id, # 如果你想要返回当前记录的视图 } return action # def generate_zpl_code(self, code): # # 初始化ZPL代码字符串 # zpl_code = "^XA\n" # zpl_code += "^CW1,E:SIMSUN.TTF^FS\n" # zpl_code += "^CI28\n" # # # 设置二维码位置 # zpl_code += "^FO50,50\n" # 调整二维码位置,使其与资产编号在同一行 # zpl_code += f"^BQN,2,6^FDLM,B0093{code}^FS\n" # # # 设置资产编号文本位置 # zpl_code += "^FO300,60\n" # 资产编号文本的位置,与二维码在同一行 # zpl_code += "^A1N,45,45^FD编码名称: ^FS\n" # # # 设置{code}文本位置 # # 假设{code}文本需要位于资产编号和二维码下方,中间位置 # # 设置{code}文本位置并启用自动换行 # zpl_code += "^FO300,120\n" # {code}文本的起始位置 # zpl_code += "^FB500,4,0,L,0\n" # 定义一个宽度为500点的文本框,最多4行,左对齐 # zpl_code += f"^A1N,40,40^FD{code}^FS\n" # # # 在{code}文本框周围绘制线框 # # 假设线框的外部尺寸为宽度500点,高度200点 # # zpl_code += "^FO300,110^GB500,200,2^FS\n" # 绘制线框,边框粗细为2点 # # zpl_code += "^PQ1,0,1,Y\n" # zpl_code += "^XZ\n" # return zpl_code # # def send_to_printer(self, host, port, zpl_code): # # 将ZPL代码转换为字节串 # print('zpl_code', zpl_code) # zpl_bytes = zpl_code.encode('utf-8') # print(zpl_bytes) # # # 创建socket对象 # mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # try: # mysocket.connect((host, port)) # 连接到打印机 # mysocket.send(zpl_bytes) # 发送ZPL代码 # print("ZPL code sent to printer successfully.") # except Exception as e: # print(f"Error with the connection: {e}") # finally: # mysocket.close() # 关闭连接 # # def print_qr_code(self): # self.ensure_one() # 确保这个方法只为一个记录调用 # # if not self.lot_id: # # raise UserError("没有找到序列号。") # # 假设_lot_qr_code方法已经生成了二维码并保存在字段中 # qr_code_data = self.lot_qr_code # if not qr_code_data: # raise UserError("没有找到二维码数据。") # # 生成ZPL代码 # zpl_code = self.generate_zpl_code(self.lot_id.name) # # 设置打印机的IP地址和端口号 # host = "192.168.50.110" # port = 9100 # # 发送ZPL代码到打印机 # self.send_to_printer(host, port, zpl_code) # # # # 生成下载链接或直接触发下载 # # # 此处的实现依赖于你的具体需求,以下是触发下载的一种示例 # # attachment = self.env['ir.attachment'].sudo().create({ # # 'datas': self.lot_qr_code, # # 'type': 'binary', # # 'description': '二维码图片', # # 'name': self.lot_name + '.png', # # # 'res_id': invoice.id, # # # 'res_model': 'stock.picking', # # 'public': True, # # 'mimetype': 'application/x-png', # # # 'model_name': 'stock.picking', # # }) # # # 返回附件的下载链接 # # download_url = '/web/content/%s?download=true' % attachment.id # # base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') # # return { # # 'type': 'ir.actions.act_url', # # 'url': str(base_url) + download_url, # # 'target': 'self', # # } # # # 定义一个方法,用于根据序列号生成二维码 # # @api.depends('lot_id') # def generate_lot_qr_code(self): # # 创建一个QRCode对象 # qr = qrcode.QRCode( # version=1, # 设置版本, 1-40,控制二维码的大小 # error_correction=qrcode.constants.ERROR_CORRECT_L, # 设置错误校正等级 # box_size=10, # 设置每个格子的像素大小 # border=4, # 设置边框的格子宽度 # ) # # # 添加数据 # qr.add_data(self.lot_id.name) # qr.make(fit=True) # # # 创建二维码图像 # img = qr.make_image(fill_color="black", back_color="white") # # # 创建一个内存文件 # buffer = io.BytesIO() # img.save(buffer, format="PNG") # 将图像保存到内存文件中 # # # 获取二进制数据 # binary_data = buffer.getvalue() # # # 使用Base64编码这些二进制数据 # data = base64.b64encode(binary_data) # self.lot_qr_code = data # attachment = self.env['ir.attachment'].sudo().create({ # 'datas': data, # 'type': 'binary', # 'description': '二维码图片', # 'name': self.lot_id.name + '.png', # # 'res_id': invoice.id, # # 'res_model': 'stock.picking', # 'public': True, # 'mimetype': 'application/pdf', # # 'model_name': 'stock.picking', # }) # def button_test(self): # print(self.picking_id.name) # stock_picking = self.env['stock.picking'].search([('name', '=', self.picking_id.name)], limit=1) # print(self.picking_id.name) # print(aa.move_line_ids.lot_id.name) # # 获取当前的stock.picking对象 # current_picking = self.env['stock.picking'].search([('name', '=', self.picking_id.name)], limit=1) # # # 获取当前picking的第一个stock.move对象 # current_move = current_picking.move_ids[0] if current_picking.move_ids else False # # # 如果存在相关的stock.move对象 # if current_move: # # 获取源stock.move对象 # origin_move = current_move.move_orig_ids[0] if current_move.move_orig_ids else False # # # 从源stock.move对象获取源stock.picking对象 # origin_picking = origin_move.picking_id if origin_move else False # # 现在,origin_picking就是current_picking的上一步 # # 获取目标stock.move对象 # dest_move = current_move.move_dest_ids[0] if current_move.move_dest_ids else False # # # 从目标stock.move对象获取目标stock.picking对象 # dest_picking = dest_move.picking_id if dest_move else False # # 现在,dest_picking就是current_picking的下一步 @api.depends('location_id') def _compute_current_location_id(self): for record in self: # 使用record代替self来引用当前遍历到的记录 logging.info('record.picking_id.name: %s' % record.picking_id.name) logging.info('record.env: %s' % record.env['stock.picking'].search([('name', '=', record.picking_id.name)])) # 获取当前的stock.picking对象 current_picking = record.env['stock.picking'].search([('name', '=', record.picking_id.name)], limit=1) # 获取当前picking的第一个stock.move对象 current_move = current_picking.move_ids[0] if current_picking.move_ids else False # 如果存在相关的stock.move对象 if current_move: # 获取源stock.move对象 origin_move = current_move.move_orig_ids[0] if current_move.move_orig_ids else False # 从源stock.move对象获取源stock.picking对象 origin_picking = origin_move.picking_id if origin_move else False # 如果前一个调拨单有目标货位 if origin_picking: for i in current_picking.move_line_ids: for j in origin_picking.move_line_ids: if j.destination_location_id and i.lot_id == j.lot_id: # 更新当前记录的current_location_id字段 record.current_location_id = j.destination_location_id # # 获取目标stock.move对象 # dest_move = current_move.move_dest_ids[0] if current_move.move_dest_ids else False # # # 从目标stock.move对象获取目标stock.picking对象 # dest_picking = dest_move.picking_id if dest_move else False # # 现在,dest_picking就是current_picking的下一步 # 是一张单据一张单据往下走的,所以这里的目标货位是上一张单据的当前货位,且这样去计算是可以的。 @api.depends('location_dest_id') def _compute_location_dest_id_value(self): for record in self: record.location_dest_id_value = record.location_dest_id.id if record.location_dest_id else False record.current_product_id = record.product_id.id if record.product_id else False destination_location_id = fields.Many2one( 'sf.shelf.location', string='目标货位') def compute_destination_location_id(self): for record in self: obj = self.env['sf.shelf.location'].search([('name', '=', record.destination_location_id.name)]) if obj and obj.product_id and obj.product_id != record.product_id: raise ValidationError('目标货位【%s】已被【%s】产品占用!' % (obj.code, obj.product_id)) if record.lot_id: if record.product_id.tracking == 'serial': shelf_location_obj = self.env['sf.shelf.location'].search( [('product_sn_id', '=', record.lot_id.id)]) if shelf_location_obj: shelf_location_obj.product_sn_id = False if obj: obj.product_sn_id = record.lot_id.id else: if obj: obj.product_sn_id = record.lot_id.id elif record.product_id.tracking == 'lot': record.put_shelf_location(record) if not obj.product_id: obj.product_id = record.product_id.id else: if obj: obj.product_id = record.product_id.id # obj.location_status = '占用' obj.product_num += record.qty_done @api.onchange('destination_location_id') def _check_destination_location_id(self): for item in self: if item: barcode = item.destination_location_id.barcode for line in item.picking_id.move_line_ids_without_package: if line.destination_location_id: if (barcode and barcode == line.destination_location_id.barcode and item.product_id != line.product_id): raise ValidationError( '【%s】货位已经被占用,请重新选择!!!' % item.destination_location_id.barcode) def put_shelf_location(self, vals): """ 对货位的批量数据进行数量计算 """ for record in vals: if record.lot_id and record.product_id.tracking == 'lot': if record.current_location_id: location_lot = self.env['sf.shelf.location.lot'].sudo().search( [('shelf_location_id', '=', record.current_location_id.id), ('lot_id', '=', record.lot_id.id)]) if location_lot: location_lot.qty -= record.qty_done if location_lot.qty == 0: location_lot.unlink() elif location_lot.qty < 0: raise ValidationError('【%s】货位【%s】批次的【%s】产品数量不足!' % ( record.current_location_id.barcode, record.lot_id.name, record.product_id.name)) else: raise ValidationError('【%s】货位不存在【%s】批次的【%s】产品' % ( record.current_location_id.barcode, record.lot_id.name, record.product_id.name)) if record.destination_location_id: location_lot = self.env['sf.shelf.location.lot'].sudo().search( [('shelf_location_id', '=', record.destination_location_id.id), ('lot_id', '=', record.lot_id.id)]) if location_lot: location_lot.qty += record.qty_done else: self.env['sf.shelf.location.lot'].sudo().create({ 'shelf_location_id': record.destination_location_id.id, 'lot_id': record.lot_id.id, 'qty': record.qty_done }) if not record.destination_location_id.product_id: record.destination_location_id.product_id = record.product_id.id class SfStockPicking(models.Model): _inherit = 'stock.picking' check_in = fields.Char(string='查询是否为入库单', compute='_check_is_in') product_uom_qty_sp = fields.Float('需求数量', compute='_compute_product_uom_qty_sp', store=True) @api.depends('move_ids_without_package', 'move_ids_without_package.product_uom_qty') def _compute_product_uom_qty_sp(self): for sp in self: if sp.move_ids_without_package: sp.product_uom_qty_sp = 0 for move_id in sp.move_ids_without_package: sp.product_uom_qty_sp += move_id.product_uom_qty else: sp.product_uom_qty_sp = 0 def batch_stock_move(self): """ 批量调拨,非就绪状态的会被忽略,完成后有通知提示 """ for record in self: if record.state != 'assigned': continue for move in record.move_ids: move.action_show_details() record.action_set_quantities_to_reservation() record.button_validate() notification_message = '批量调拨完成!请注意,状态非就绪的单据会被忽略' return { 'effect': { 'fadeout': 'fast', 'message': notification_message, 'img_url': '/web/image/%s/%s/image_1024' % ( self.create_uid._name, self.create_uid.id) if 0 else '/web/static/img/smile.svg', 'type': 'rainbow_man', } } @api.depends('name') def _check_is_in(self): """ 判断是否为出库单 """ for record in self: if record.name: is_check_in = record.name.split('/') record.check_in = is_check_in[1] def button_validate(self): """ 重写验证方法,当验证时意味着调拨单已经完成,已经移动到了目标货位,所以需要将当前货位的状态改为空闲 """ res = super(SfStockPicking, self).button_validate() for line in self.move_line_ids: if line: if line.destination_location_id: # 调用入库方法进行入库刀货位 line.compute_destination_location_id() else: # 对除刀柄之外的刀具物料入库到刀具房进行 目标货位必填校验 if self.location_dest_id.name == '刀具房' and line.product_id.cutting_tool_material_id.name not in ( '刀柄', False): raise ValidationError('请选择【%s】产品的目标货位!' % line.product_id.name) if line.current_location_id: # 对货位的批次产品进行出货 line.put_shelf_location(line) if line.current_location_id: # 按序列号管理的产品 if line.current_location_id.product_sn_id: line.current_location_id.product_sn_id = False # line.current_location_id.location_status = '空闲' line.current_location_id.product_num = 0 line.current_location_id.product_id = False else: # 对除刀柄之外的刀具物料从刀具房出库进行 当前货位必填校验 if self.location_id.name == '刀具房' and line.product_id.cutting_tool_material_id.name not in ( '刀柄', False): raise ValidationError('请选择【%s】产品的当前货位!' % line.product_id.name) # 对入库作业的刀柄和托盘进行Rfid绑定校验 for move in self.move_ids: if move and move.product_id.cutting_tool_material_id.name == '刀柄' or '托盘' in ( move.product_id.fixture_material_id.name or ''): for item in move.move_line_nosuggest_ids: if item.rfid: if self.env['stock.lot'].search([('rfid', '=', item.rfid)]): raise ValidationError('该Rfid【%s】在系统中已经存在,请重新录入!' % item.rfid) if item.location_dest_id.name == '进货': if not item.rfid: raise ValidationError('你需要提供%s的Rfid' % move.product_id.name) self.env['stock.lot'].search([('name', '=', item.lot_name)]).write({'rfid': item.rfid}) return res # def print_all_barcode(self): # """ # 打印所有编码 # """ # print('================') # for record in self.move_ids_without_package: # print('record', record) # print('record.move_line_ids', record.move_line_ids) # # # record.move_line_ids.print_qr_code() # # print('record.move_line_ids.lot_id', record.move_line_ids.lot_id) # print('record.move_line_ids.lot_id.name', record.move_line_ids.lot_id.name) class SfProcurementGroup(models.Model): _inherit = 'procurement.group' @api.model def _search_rule(self, route_ids, packaging_id, product_id, warehouse_id, domain): """ 修改路线多规则条件选取 """ if warehouse_id: domain = expression.AND( [['|', ('warehouse_id', '=', warehouse_id.id), ('warehouse_id', '=', False)], domain]) Rule = self.env['stock.rule'] res = self.env['stock.rule'] if route_ids: res_list = Rule.search(expression.AND([[('route_id', 'in', route_ids.ids)], domain]), order='route_sequence, sequence') for res1 in res_list: if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \ res1.location_src_id.product_type: res = res1 if not res: res = Rule.search(expression.AND([[('route_id', 'in', route_ids.ids)], domain]), order='route_sequence, sequence', limit=1) if not res and packaging_id: packaging_routes = packaging_id.route_ids if packaging_routes: res_list = Rule.search(expression.AND([[('route_id', 'in', packaging_routes.ids)], domain]), order='route_sequence, sequence') for res1 in res_list: if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \ res1.location_src_id.product_type: res = res1 if not res: res = Rule.search(expression.AND([[('route_id', 'in', packaging_routes.ids)], domain]), order='route_sequence, sequence', limit=1) if not res: product_routes = product_id.route_ids | product_id.categ_id.total_route_ids if product_routes: res_list = Rule.search(expression.AND([[('route_id', 'in', product_routes.ids)], domain]), order='route_sequence, sequence') for res1 in res_list: if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \ res1.location_src_id.product_type: res = res1 if not res: res = Rule.search(expression.AND([[('route_id', 'in', product_routes.ids)], domain]), order='route_sequence, sequence', limit=1) if not res and warehouse_id: warehouse_routes = warehouse_id.route_ids if warehouse_routes: res_list = Rule.search(expression.AND([[('route_id', 'in', warehouse_routes.ids)], domain]), order='route_sequence, sequence') for res1 in res_list: if product_id.categ_id in res1.location_dest_id.product_type or product_id.categ_id in \ res1.location_src_id.product_type: res = res1 if not res: res = Rule.search(expression.AND([[('route_id', 'in', warehouse_routes.ids)], domain]), order='route_sequence, sequence', limit=1) return res # class SfPickingType(models.Model): # _inherit = 'stock.picking.type' # # def _default_show_operations(self): # return self.user_has_groups('stock.group_production_lot,' # 'stock.group_stock_multi_locations,' # 'stock.group_tracking_lot', # 'sf_warehouse.group_sf_stock_user', # 'sf_warehouse.group_sf_stock_manager') class SfPickingType(models.Model): _inherit = 'stock.picking.type' code = fields.Selection([('incoming', 'Receipt'), ('outgoing', 'Delivery'), ('internal', '厂内出入库')], 'Type of Operation', required=True) def _default_show_operations(self): return self.user_has_groups( 'stock.group_production_lot,' 'stock.group_stock_multi_locations,' 'stock.group_tracking_lot,' 'sf_warehouse.group_sf_stock_user,' 'sf_warehouse.group_sf_stock_manager' ) def _get_action(self, action_xmlid): action = super(SfPickingType, self)._get_action(action_xmlid) if not self.env.user.has_group('base.group_system'): action['context']['create'] = False if self.sequence_code in ['DL', 'INT', 'PC']: action['context']['search_default_retrospect'] = 1 return action class CustomStockMove(models.Model): _name = 'stock.move' _inherit = ['stock.move', 'printing.utils', 'barcodes.barcode_events_mixin'] def on_barcode_scanned(self, barcode): """ 采购入库扫码绑定Rfid码 """ for record in self: logging.info('Rfid:%s' % barcode) if record: lot = self.env['stock.lot'].sudo().search([('rfid', '=', barcode)]) if lot: if lot.product_id.cutting_tool_material_id: material = lot.product_id.cutting_tool_material_id.name else: material = lot.product_id.fixture_material_id.name raise ValidationError( '该Rfid【%s】已经被序列号为【%s】的【%s】物料所占用!' % (barcode, lot.name, material)) if '刀柄' in (record.product_id.cutting_tool_material_id.name or '') or '托盘' in ( record.product_id.fixture_material_id.name or ''): logging.info('开始录入Rfid:%s' % record.move_line_nosuggest_ids) for move_line_nosuggest_id in record.move_line_nosuggest_ids: logging.info('录入的记录%s , Rfid:%s' % (move_line_nosuggest_id, move_line_nosuggest_id.rfid)) if move_line_nosuggest_id.rfid: if move_line_nosuggest_id.rfid == barcode: if record.product_id.cutting_tool_material_id.name: raise ValidationError('该刀柄的Rfid已经录入,请勿重复录入!!!') else: raise ValidationError('该托盘的Rfid已经录入,请勿重复录入!!!') else: line_id = int(re.sub(r"\D", "", str(move_line_nosuggest_id.id))) res = self.env['stock.move.line'].sudo().search([('id', '=', line_id)]).write( {'rfid': barcode}) logging.info('Rfid是否录入:%s' % res) move_line_nosuggest_id.rfid = barcode break else: raise ValidationError('该产品不需要录入Rfid!!!') def action_assign_serial_show_details(self): # 首先执行原有逻辑 result = super(CustomStockMove, self).action_assign_serial_show_details() # 接着为每个 lot_name 生成二维码 move_lines = self.move_line_ids # 获取当前 stock.move 对应的所有 stock.move.line 记录 for line in move_lines: if line.lot_name: # 确保 lot_name 存在 lot_name = line.lot_name if line.product_id.categ_id.name == '坯料': lot_name = lot_name.split('[', 1)[0] qr_data = self.compute_lot_qr_code(lot_name) # 假设 stock.move.line 模型中有一个字段叫做 lot_qr_code 用于存储二维码数据 line.lot_qr_code = qr_data return result def compute_lot_qr_code(self, lot_name): qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(lot_name) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") binary_data = buffer.getvalue() data = base64.b64encode(binary_data).decode() # 确保返回的是字符串形式的数据 return data def print_all_barcode(self): """ 打印所有编码 """ print('================') for record in self.move_line_ids: print('record', record) if not record.lot_name: continue record.ensure_one() # qr_code_data = record.lot_qr_code # if not qr_code_data: # raise UserError("没有找到二维码数据。") lot_name = record.lot_name # todo 待控制 if not lot_name: raise ValidationError("请先分配序列号") # 增加"当为坯料时,只打印序列号的前面部分" if record.lot_name: # 确保 lot_name 存在 if record.product_id.categ_id.name == '坯料': lot_name = lot_name.split('[', 1)[0] # host = "192.168.50.110" # 可以根据实际情况修改 # port = 9100 # 可以根据实际情况修改 # 获取默认打印机配置 printer_config = self.env['printer.configuration'].sudo().search([('model', '=', self._name)], limit=1) if not printer_config: raise UserError('请先配置打印机') host = printer_config.printer_id.ip_address port = printer_config.printer_id.port record.print_qr_code(lot_name, host, port)