master-sf1.0

This commit is contained in:
qihao.gong@jikimo.com
2023-05-06 16:03:00 +08:00
parent fcf0354a1e
commit 0077d97cfa
307 changed files with 20636 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
from . import tray
from . import mrp_production
from . import mrp_workcenter
from . import mrp_maintenance
from . import mrp_routing_workcenter
from . import mrp_workorder
from . import model_type
from . import stock
from . import res_user

View File

@@ -0,0 +1,59 @@
from odoo import fields, models
class ModelType(models.Model):
_name = 'sf.model.type'
_description = '模型类型'
name = fields.Char('名称')
embryo_tolerance = fields.Integer('胚料容余')
product_routing_tmpl_ids = fields.One2many('sf.product.model.type.routing.sort', 'product_model_type_id',
'成品工序模板')
embryo_routing_tmpl_ids = fields.One2many('sf.embryo.model.type.routing.sort', 'embryo_model_type_id',
'胚料工序模板')
class ProductModelTypeRoutingSort(models.Model):
_name = 'sf.product.model.type.routing.sort'
_description = '成品工序排序'
sequence = fields.Integer('Sequence')
route_workcenter_id = fields.Many2one('mrp.routing.workcenter')
is_repeat = fields.Boolean('重复', related='route_workcenter_id.is_repeat')
routing_type = fields.Selection([
('获取CNC加工程序', '获取CNC加工程序'),
('装夹', '装夹'),
('前置三元定位检测', '前置三元定位检测'),
('CNC加工', 'CNC加工'),
('后置三元质量检测', '后置三元质量检测'),
('解除装夹', '解除装夹'), ('切割', '切割')
], string="工序类型", related='route_workcenter_id.routing_type')
workcenter_ids = fields.Many2many('mrp.workcenter', required=False, related='route_workcenter_id.workcenter_ids')
product_model_type_id = fields.Many2one('sf.model.type')
_sql_constraints = [
('route_model_type_uniq', 'unique (route_workcenter_id,product_model_type_id)', '成品工序不能重复!')
]
class EmbryoModelTypeRoutingSort(models.Model):
_name = 'sf.embryo.model.type.routing.sort'
_description = '胚料工序排序'
sequence = fields.Integer('Sequence')
route_workcenter_id = fields.Many2one('mrp.routing.workcenter')
is_repeat = fields.Boolean('重复', related='route_workcenter_id.is_repeat')
routing_type = fields.Selection([
('获取CNC加工程序', '获取CNC加工程序'),
('装夹', '装夹'),
('前置三元定位检测', '前置三元定位检测'),
('CNC加工', 'CNC加工'),
('后置三元质量检测', '后置三元质量检测'),
('解除装夹', '解除装夹'), ('切割', '切割')
], string="工序类型", related='route_workcenter_id.routing_type')
workcenter_ids = fields.Many2many('mrp.workcenter', required=False, related='route_workcenter_id.workcenter_ids')
embryo_model_type_id = fields.Many2one('sf.model.type')
_sql_constraints = [
('route_model_type_uniq', 'unique (route_workcenter_id,embryo_model_type_id)', '胚料工序不能重复!')
]

View File

@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import api, fields, models, _
from dateutil.relativedelta import relativedelta
class MaintenanceEquipment(models.Model):
_inherit = "maintenance.equipment"
_check_company_auto = True
expected_mtbf = fields.Integer(string='Expected MTBF', help='Expected Mean Time Between Failure')
mtbf = fields.Integer(compute='_compute_maintenance_request', string='MTBF',
help='Mean Time Between Failure, computed based on done corrective maintenances.')
mttr = fields.Integer(compute='_compute_maintenance_request', string='MTTR', help='Mean Time To Repair')
estimated_next_failure = fields.Date(compute='_compute_maintenance_request',
string='Estimated time before next failure (in days)',
help='Computed as Latest Failure Date + MTBF')
latest_failure_date = fields.Date(compute='_compute_maintenance_request', string='Latest Failure Date')
workcenter_id = fields.Many2one(
'mrp.workcenter', string='Work Center', check_company=True)
@api.depends('effective_date', 'maintenance_ids.stage_id', 'maintenance_ids.close_date',
'maintenance_ids.request_date')
def _compute_maintenance_request(self):
for equipment in self:
maintenance_requests = equipment.maintenance_ids.filtered(
lambda x: x.maintenance_type == 'corrective' and x.stage_id.done)
mttr_days = 0
for maintenance in maintenance_requests:
if maintenance.stage_id.done and maintenance.close_date:
mttr_days += (maintenance.close_date - maintenance.request_date).days
equipment.mttr = len(maintenance_requests) and (mttr_days / len(maintenance_requests)) or 0
maintenance = maintenance_requests.sorted(lambda x: x.request_date)
if len(maintenance) >= 1:
equipment.mtbf = (maintenance[-1].request_date - equipment.effective_date).days / len(maintenance)
equipment.latest_failure_date = maintenance and maintenance[-1].request_date or False
if equipment.mtbf:
equipment.estimated_next_failure = equipment.latest_failure_date + relativedelta(days=equipment.mtbf)
else:
equipment.estimated_next_failure = False
def button_mrp_workcenter(self):
self.ensure_one()
return {
'name': _('work centers'),
'view_mode': 'form',
'res_model': 'mrp.workcenter',
'view_id': self.env.ref('mrp.mrp_workcenter_view').id,
'type': 'ir.actions.act_window',
'res_id': self.workcenter_id.id,
'context': {
'default_company_id': self.company_id.id
}
}
class MaintenanceRequest(models.Model):
_inherit = "maintenance.request"
_check_company_auto = True
production_id = fields.Many2one(
'mrp.production', string='Manufacturing Order', check_company=True)
workorder_id = fields.Many2one(
'mrp.workorder', string='Work Order', check_company=True)
production_company_id = fields.Many2one(string='Production Company', related='production_id.company_id')
company_id = fields.Many2one(domain="[('id', '=?', production_company_id)]")

View File

@@ -0,0 +1,237 @@
# -*- coding: utf-8 -*-
from odoo import api, fields, models, _
class resProduct(models.Model):
_inherit = 'product.template'
model_file = fields.Binary('模型文件')
class MrpProduction(models.Model):
_inherit = 'mrp.production'
_description = "制造订单"
tray_ids = fields.One2many('sf.tray', 'production_id', string="托盘")
maintenance_count = fields.Integer(compute='_compute_maintenance_count', string="Number of maintenance requests")
request_ids = fields.One2many('maintenance.request', 'production_id')
model_file = fields.Binary('模型文件', related='product_id.model_file')
@api.depends('request_ids')
def _compute_maintenance_count(self):
for production in self:
production.maintenance_count = len(production.request_ids)
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
def open_maintenance_request_mo(self):
self.ensure_one()
action = {
'name': _('Maintenance Requests'),
'view_mode': 'kanban,tree,form,pivot,graph,calendar',
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_production_id': self.id,
},
'domain': [('production_id', '=', self.id)],
}
if self.maintenance_count == 1:
production = self.env['maintenance.request'].search([('production_id', '=', self.id)])
action['view_mode'] = 'form'
action['res_id'] = production.id
return action
def action_generate_serial(self):
self.ensure_one()
self.lot_producing_id = self.env['stock.lot'].create({
'product_id': self.product_id.id,
'company_id': self.company_id.id,
'name': self.env['stock.lot']._get_next_serial(self.company_id, self.product_id) or self.env[
'ir.sequence'].next_by_code('stock.lot.serial'),
})
if self.move_finished_ids.filtered(lambda m: m.product_id == self.product_id).move_line_ids:
self.move_finished_ids.filtered(
lambda m: m.product_id == self.product_id).move_line_ids.lot_id = self.lot_producing_id
if self.product_id.tracking == 'serial':
self._set_qty_producing()
# 重载根据工序生成工单的程序如果产品BOM中没有工序时
# 根据产品对应的模板类型中工序,去生成工单;
# CNC加工工序的选取规则
# 如果自动报价有带过来预分配的机床,
# 则根据设备找到工作中心;否则采用前面描述的工作中心分配机制;
# 其他规则限制: 默认只分配给工作中心状态为非故障的工作中心;
def _create_workorder3(self):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
if production.product_id.categ_id.type == '成品':
# 根据加工面板的面数及对应的工序模板生成工单
i = 0
processing_panel_len = len(production.product_id.model_processing_panel.split(','))
for k in (production.product_id.model_processing_panel.split(',')):
product_routing_workcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in product_routing_workcenter:
if i == 1 and route.routing_type == '获取CNC加工程序':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str('', production, route))
if route.is_repeat == True:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route))
if i == processing_panel_len and route.routing_type == '解除装夹':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str(k, production, route))
elif production.product_id.categ_id.type == '胚料':
embryo_routing_workcenter = self.env['sf.embryo.model.type.routing.sort'].search(
[('embryo_model_type_id', '=', production.product_id.embryo_model_type_id.id)],
order='sequence asc'
)
for route in embryo_routing_workcenter:
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str('', production, route))
production.workorder_ids = workorders_values
for workorder in production.workorder_ids:
workorder.duration_expected = workorder._get_duration_expected()
def create_production1_values(self, production):
production_values_str = {'origin': production.origin,
'product_id': production.product_id.id,
'product_description_variants': production.product_description_variants,
'product_qty': production.product_qty,
'product_uom_id': production.product_uom_id.id,
'location_src_id': production.location_src_id.id,
'location_dest_id': production.location_dest_id.id,
'bom_id': production.bom_id.id,
'date_deadline': production.date_deadline,
'date_planned_start': production.date_planned_start,
'date_planned_finished': production.date_planned_finished,
'procurement_group_id': False,
'propagate_cancel': production.propagate_cancel,
'orderpoint_id': production.orderpoint_id.id,
'picking_type_id': production.picking_type_id.id,
'company_id': production.company_id.id,
'move_dest_ids': production.move_dest_ids.ids,
'user_id': production.user_id.id}
return production_values_str
def _reset_work_order_sequence1(self, k):
for rec in self:
current_sequence = 1
for work in rec.workorder_ids:
work.sequence = current_sequence
current_sequence += 1
sfa = rec
for a in sfa:
print(a)
def _create_workorder1(self, k):
for production in self:
if not production.bom_id or not production.product_id:
continue
workorders_values = []
product_qty = production.product_uom_id._compute_quantity(production.product_qty,
production.bom_id.product_uom_id)
exploded_boms, dummy = production.bom_id.explode(production.product_id,
product_qty / production.bom_id.product_qty,
picking_type=production.bom_id.picking_type_id)
for bom, bom_data in exploded_boms:
# If the operations of the parent BoM and phantom BoM are the same, don't recreate work orders.
if not (bom.operation_ids and (not bom_data['parent_line'] or bom_data[
'parent_line'].bom_id.operation_ids != bom.operation_ids)):
continue
for operation in bom.operation_ids:
if operation._skip_operation_line(bom_data['product']):
continue
workorders_values += [{
'name': operation.name,
'production_id': production.id,
'workcenter_id': operation.workcenter_id.id,
'product_uom_id': production.product_uom_id.id,
'operation_id': operation.id,
'state': 'pending',
}]
# 根据加工面板的面数及对应的成品工序模板生成工单
i = 0
production.product_id.model_processing_panel = k
for k in (production.product_id.model_processing_panel.split(',')):
routingworkcenter = self.env['sf.product.model.type.routing.sort'].search(
[('product_model_type_id', '=', production.product_id.product_model_type_id.id)],
order='sequence asc'
)
i += 1
for route in routingworkcenter:
if route.routing_type == '后置三元质量检测':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route)
)
if route.routing_type == 'CNC加工':
workorders_values.append(
self.env['mrp.workorder'].json_workorder_str1(k, production, route))
production.workorder_ids = workorders_values
for workorder in production.workorder_ids:
workorder.duration_expected = workorder._get_duration_expected()
def _create_workorder2(self, k):
res = self._create_workorder1(k)
self._reset_work_order_sequence1(k)
return res
def _reset_work_order_sequence(self):
for rec in self:
current_sequence = 1
for work in rec.workorder_ids:
work.sequence = current_sequence
current_sequence += 1
def _create_workorder(self):
res = self._create_workorder3()
self._reset_work_order_sequence()
return res

View File

@@ -0,0 +1,41 @@
from odoo import fields, models
class ResMrpRoutingWorkcenter(models.Model):
_inherit = 'mrp.routing.workcenter'
routing_type = fields.Selection([
('获取CNC加工程序', '获取CNC加工程序'),
('装夹', '装夹'),
('前置三元定位检测', '前置三元定位检测'),
('CNC加工', 'CNC加工'),
('后置三元质量检测', '后置三元质量检测'),
('解除装夹', '解除装夹'),
('切割', '切割')
], string="工序类型")
is_repeat = fields.Boolean('重复', default=False)
workcenter_id = fields.Many2one('mrp.workcenter', required=False)
workcenter_ids = fields.Many2many('mrp.workcenter', 'rel_workcenter_route', required=True)
bom_id = fields.Many2one('mrp.bom', required=False)
# 获得当前登陆者公司
def get_company_id(self):
self.company_id = self.env.user.company_id.id
company_id = fields.Many2one('res.company', compute="get_company_id", related=False)
# 工单对应的工作中心,根据工序中的工作中心去匹配,
# 如果只配置了一个工作中心,则默认采用该工作中心;
# 如果有多个工作中心,
# 则根据该工作中心的工单个数进行分配(优先分配给工单个数最少的);
def get_workcenter(self, workcenter_ids):
if workcenter_ids:
if len(workcenter_ids) == 1:
return workcenter_ids[0]
elif len(workcenter_ids) >= 2:
# workcenter_ids_str = ','.join([str(s) for s in workcenter_ids])
self.env.cr.execute("""
SELECT workcenter_id FROM mrp_workorder where workcenter_id
in %s group by workcenter_id
order by count(*),workcenter_id asc limit 1 """, [tuple(workcenter_ids)])
return self.env.cr.dictfetchall()[0].get('workcenter_id')

View File

@@ -0,0 +1,81 @@
from odoo import api, fields, models
from datetime import datetime
from collections import defaultdict
from odoo.addons.resource.models.resource import Intervals
class ResWorkcenter(models.Model):
_inherit = "mrp.workcenter"
machine_tool_id = fields.Many2one('sf.machine_tool', '机床')
users_ids = fields.Many2many("res.users", 'users_workcenter')
equipment_ids = fields.One2many(
'maintenance.equipment', 'workcenter_id', string="Maintenance Equipment",
check_company=True)
@api.onchange('machine_tool_id')
def update_machine_tool_is_binding(self):
machine_tool = self.env["sf.machine_tool"].search([('is_binding', '=', True)])
if machine_tool:
for item in machine_tool:
workcenter_machine_tool = self.env["mrp.workcenter"].search([('machine_tool_id', '=', item.id)])
if workcenter_machine_tool:
if self.machine_tool_id.id:
if workcenter_machine_tool.id != self.machine_tool_id.id:
self.machine_tool_id.is_binding = True
else:
self.machine_tool_id.is_binding = True
else:
self.machine_tool_id.is_binding = True
item.is_binding = False
else:
self.machine_tool_id.is_binding = True
def action_work_order(self):
if not self.env.context.get('desktop_list_view', False):
action = self.env["ir.actions.actions"]._for_xml_id("sf_manufacturing.mrp_workorder_action_tablet")
return action
else:
return super(ResWorkcenter, self).action_work_order()
def _get_unavailability_intervals(self, start_datetime, end_datetime):
res = super(ResWorkcenter, self)._get_unavailability_intervals(start_datetime, end_datetime)
if not self:
return res
sql = """
SELECT workcenter_id, ARRAY_AGG((schedule_date || '|' || schedule_date + INTERVAL '1h' * duration)) as date_intervals
FROM maintenance_request
LEFT JOIN maintenance_equipment
ON maintenance_request.equipment_id = maintenance_equipment.id
WHERE
schedule_date IS NOT NULL
AND duration IS NOT NULL
AND equipment_id IS NOT NULL
AND maintenance_equipment.workcenter_id IS NOT NULL
AND maintenance_equipment.workcenter_id IN %s
AND (schedule_date, schedule_date + INT ERVAL '1h' * duration) OVERLAPS (%s, %s)
GROUP BY maintenance_equipment.workcenter_id;
"""
self.env.cr.execute(sql, [tuple(self.ids), fields.Datetime.to_string(start_datetime.astimezone()),
fields.Datetime.to_string(end_datetime.astimezone())])
res_maintenance = defaultdict(list)
for wc_row in self.env.cr.dictfetchall():
res_maintenance[wc_row.get('workcenter_id')] = [
[fields.Datetime.to_datetime(i) for i in intervals.split('|')]
for intervals in wc_row.get('date_intervals')
]
for wc_id in self.ids:
intervals_previous_list = [(s.timestamp(), e.timestamp(), self.env['maintenance.request']) for s, e in
res[wc_id]]
intervals_maintenances_list = [(m[0].timestamp(), m[1].timestamp(), self.env['maintenance.request']) for m
in res_maintenance[wc_id]]
final_intervals_wc = Intervals(intervals_previous_list + intervals_maintenances_list)
res[wc_id] = [(datetime.fromtimestamp(s), datetime.fromtimestamp(e)) for s, e, _ in final_intervals_wc]
return res
class ResWorkcenterProductivity(models.Model):
_inherit = 'mrp.workcenter.productivity'
workcenter_id = fields.Many2one('mrp.workcenter', required=False)

View File

@@ -0,0 +1,541 @@
import os
import json
import math
import requests
import logging
import base64
# import subprocess
from datetime import datetime
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, SUPERUSER_ID, _
from odoo.exceptions import ValidationError
from odoo.addons.sf_base.commons.common import Common
from odoo.exceptions import UserError
from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController
class ResMrpWorkOrder(models.Model):
_inherit = 'mrp.workorder'
_order = 'sequence'
workcenter_id = fields.Many2one('mrp.workcenter', required=False)
users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids")
processing_panel = fields.Char('加工面')
sequence = fields.Integer(string='工序')
routing_type = fields.Selection([
('获取CNC加工程序', '获取CNC加工程序'),
('装夹', '装夹'),
('前置三元定位检测', '前置三元定位检测'),
('CNC加工', 'CNC加工'),
('后置三元质量检测', '后置三元质量检测'),
('解除装夹', '解除装夹'),
('切割', '切割')
], string="工序类型")
results = fields.Char('检测结果')
@api.onchange('users_ids')
def get_user_permissions(self):
uid = self.env.uid
for workorder in self:
if workorder.users_ids:
list_user_id = []
for item in workorder.users_ids:
list_user_id.append(item.id)
if uid in list_user_id:
workorder.user_permissions = True
else:
workorder.user_permissions = False
else:
workorder.user_permissions = False
user_permissions = fields.Boolean('用户权限', compute='get_user_permissions')
programming_no = fields.Char('编程单号')
work_state = fields.Char('业务状态')
programming_state = fields.Char('编程状态')
cnc_worksheet = fields.Binary(
'工作指令', readonly=True)
material_center_point = fields.Char(string='胚料中心点')
X1_axis = fields.Float(default=0)
Y1_axis = fields.Float(default=0)
Z1_axis = fields.Float(default=0)
X2_axis = fields.Float(default=0)
Y2_axis = fields.Float(default=0)
Z2_axis = fields.Float(default=0)
X3_axis = fields.Float(default=0)
Y3_axis = fields.Float(default=0)
Z3_axis = fields.Float(default=0)
X4_axis = fields.Float(default=0)
Y4_axis = fields.Float(default=0)
Z4_axis = fields.Float(default=0)
X5_axis = fields.Float(default=0)
Y5_axis = fields.Float(default=0)
Z5_axis = fields.Float(default=0)
X6_axis = fields.Float(default=0)
Y6_axis = fields.Float(default=0)
Z6_axis = fields.Float(default=0)
X7_axis = fields.Float(default=0)
Y7_axis = fields.Float(default=0)
Z7_axis = fields.Float(default=0)
X8_axis = fields.Float(default=0)
Y8_axis = fields.Float(default=0)
Z8_axis = fields.Float(default=0)
X9_axis = fields.Float(default=0)
Y9_axis = fields.Float(default=0)
Z9_axis = fields.Float(default=0)
X10_axis = fields.Float(default=0)
Y10_axis = fields.Float(default=0)
Z10_axis = fields.Float(default=0)
X_deviation_angle = fields.Integer(string="X轴偏差度", default=0)
test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], default='合格',
string="检测结果")
cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工")
tray_code = fields.Char(string="托盘")
# 计算配料中心点和与x轴倾斜度方法
def getcenter(self):
try:
x1 = self.X1_axis
x2 = self.X2_axis
x3 = self.X3_axis
x4 = self.X4_axis
x5 = self.X5_axis
x6 = self.X6_axis
x7 = self.X7_axis
x8 = self.X8_axis
y1 = self.Y1_axis
y2 = self.Y2_axis
y3 = self.Y3_axis
y4 = self.Y4_axis
y5 = self.Y5_axis
y6 = self.Y6_axis
y7 = self.Y7_axis
y8 = self.Y8_axis
z1 = self.Z9_axis
x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / (
(x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4))
y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / (
(y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4))
x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / (
(x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8));
y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / (
(y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8))
x = (x0 + x1) / 2
y = (y0 + y1) / 2
z = z1 / 2
jd = math.atan2((x5 - x6), (y5 - y6))
jdz = jd * 180 / math.pi
print("(%.2f,%.2f)" % (x, y))
self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z))
self.X_deviation_angle = jdz
# 将补偿值写入CNC加工工单
workorder = self.env['mrp.workorder'].browse(self.ids)
work = workorder.production_id.workorder_ids
work.compensation_value_x = eval(self.material_center_point)[0]
work.compensation_value_y = eval(self.material_center_point)[1]
except:
raise UserError("参数计算有误")
def json_workorder_str(self, k, production, route):
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': route.route_workcenter_id.name,
'processing_panel': k,
'routing_type': route.routing_type,
'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起',
'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids),
'date_planned_start': False,
'date_planned_finished': False,
'duration_expected': 60,
'duration': 0
}]
return workorders_values_str
# 工作中心看板按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')],
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_workorder_id': self.id,
'default_production_id': self.production_id.id,
'discard_on_footer_button': True,
},
'target': 'new',
'domain': [('workorder_id', '=', self.id)]
}
tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True)
# 扫码绑定托盘方法
def gettray(self):
if self.tray_code != False:
values = self.env['sf.tray'].search([("code", "=", self.tray_code)])
if values:
if values.state == "占用":
raise UserError('该托盘已占用')
if values.state == "报损":
raise UserError('该托盘已损坏')
else:
values.update({
'workorder_id': self,
'production_id': self.production_id,
'state': '占用',
})
self.work_state = "已绑定"
orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)])
for a in orders:
a.tray_id = values
else:
raise UserError('该托盘编码已失效')
else:
raise UserError('托盘码不能为空')
def gettray_auto(self, barcode):
if barcode != False:
values = self.env['sf.tray'].search([("code", "=", barcode)])
if values:
if values.state == "占用":
raise UserError('该托盘已占用')
if values.state == "报损":
raise UserError('该托盘已损坏')
else:
values.update({
'workorder_id': self,
'production_id': self.production_id,
'state': '占用',
})
self.work_state = "已绑定"
orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)])
for a in orders:
a.tray_id = values
return values
else:
raise UserError('该托盘编码已失效')
else:
raise UserError('托盘码不能为空')
# 解除托盘绑定
def unbindtray(self):
tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)])
if tray:
tray.unclamp()
self.tray_id = False
# return {
# 'name': _('New Maintenance Request'),
# 'view_mode': 'form',
# 'res_model': 'maintenance.request',
# 'res_id':self.id,
# 'type': 'ir.actions.act_window',
# 'context': {
# 'default_company_id': self.company_id.id,
# 'default_production_id': self.id,
# },
# 'domain': [('production_id', '=', self.id)],
# 'target':'new'
# }
def recreateManufacturingOrWorkerOrder(self):
"""
重新生成制造订单或者重新生成工单
"""
if self.test_results == '报废':
values = self.env['mrp.production'].create_production1_values(self.production_id)
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(
self.production_id.company_id).create(
values)
self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production in productions:
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
if self.test_results == '返工':
productions = self.production_id
self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder2(self.processing_panel)
else:
self.results = '合格'
# cnc程序获取
def fetchCNC(self):
try:
cnc = self.env['mrp.workorder'].search(
[('routing_type', '=', 'CNC加工'), ('production_id', '=', self.production_id.id)], limit=1)
logging.info('fetchCNC-cnc:%s' % cnc)
# if cnc.product_id.upload_model_file:
# logging.info('fetchCNC-upload_model_file:%s' % cnc.product_id.upload_model_file)
# attachments = cnc.product_id.upload_model_file[0]
# logging.info('fetchCNC-attachment1:%s' % attachments)
# logging.info('fetchCNC-attachment1:%s' % cnc.product_id.upload_model_file[0])
# logging.info('fetchCNC-attachment2:%s' % cnc.product_id.upload_model_file[0].datas)
# logging.info('fetchCNC-attachment:%s' % attachments.datas)
# base64_data = base64.b64encode(attachments.datas)
# logging.info('fetchCNC-attachment1:%s' % attachments)
# base64_datas = base64_data.decode('utf-8')
# model_code = hashlib.sha1(base64_datas.encode('utf-8')).hexdigest()
# logging.info('fetchCNC-model_code:%s' % model_code)
logging.info('fetchCNC-model_code1:%s' % cnc.product_id.model_code)
res = {'model_code': '' if not cnc.product_id.model_code else cnc.product_id.model_code,
'production_no': self.production_id.name,
'machine_tool_code': cnc.workcenter_id.machine_tool_id.code,
'material_code': cnc.env['sf.production.materials'].search(
[('id', '=', cnc.product_id.materials_id.id)]).materials_no,
'material_type_code': cnc.env['sf.materials.model'].search(
[('id', '=', cnc.product_id.materials_type_id.id)]).materials_no,
'machining_processing_panel': cnc.product_id.model_processing_panel,
'machining_precision': cnc.product_id.model_machining_precision,
'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length,
'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height,
'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width,
'order_no': cnc.production_id.origin,
'user': self.env.user.name,
'model_file': '' if not cnc.product_id.model_file else base64.b64encode(
cnc.product_id.model_file).decode('utf-8')
}
logging.info('res:%s' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/create'
config_url = configsettings['sf_url'] + url
# res_str = json.dumps(res)
ret = requests.post(config_url, json={}, data=res, headers=config_header)
ret = ret.json()
logging.info('fetchCNC-ret:%s' % ret)
if ret['status'] == 1:
self.write(
{'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('fetchCNC error:%s' % e)
raise UserError("cnc程序获取编程单失败,请联系管理员")
def json_workorder_str1(self, k, production, route):
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': route.route_workcenter_id.name,
'processing_panel': k,
'routing_type': route.routing_type,
'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起',
'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids),
'date_planned_start': False,
'date_planned_finished': False,
'duration_expected': 60,
'duration': 0
}]
return workorders_values_str
# 重写工单开始按钮方法
def button_start(self):
if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress':
self.ensure_one()
if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)):
return True
# As button_start is automatically called in the new view
if self.state in ('done', 'cancel'):
return True
if self.product_tracking == 'serial':
self.qty_producing = 1.0
else:
self.qty_producing = self.qty_remaining
self.env['mrp.workcenter.productivity'].create(
self._prepare_timeline_vals(self.duration, datetime.now())
)
if self.production_id.state != 'progress':
self.production_id.write({
'date_start': datetime.now(),
})
if self.state == 'progress':
return True
start_date = datetime.now()
vals = {
'state': 'progress',
'date_start': start_date,
}
if not self.leave_id:
leave = self.env['resource.calendar.leaves'].create({
'name': self.display_name,
'calendar_id': self.workcenter_id.resource_calendar_id.id,
'date_from': start_date,
'date_to': start_date + relativedelta(minutes=self.duration_expected),
'resource_id': self.workcenter_id.resource_id.id,
'time_type': 'other'
})
vals['leave_id'] = leave.id
return self.write(vals)
else:
if self.date_planned_start > start_date:
vals['date_planned_start'] = start_date
if self.date_planned_finished and self.date_planned_finished < start_date:
vals['date_planned_finished'] = start_date
return self.write(vals)
else:
raise UserError(_('请先完成上一步工单'))
class CNCprocessing(models.Model):
_name = 'sf.cnc.processing'
_description = "CNC加工"
_rec_name = 'program_name'
cnc_id = fields.Many2one('ir.attachment')
sequence_number = fields.Char('序号')
program_name = fields.Char('程序名')
cutting_tool_name = fields.Char('刀具名称')
cutting_tool_no = fields.Char('刀号')
processing_type = fields.Char('加工类型')
margin_x_y = fields.Char('余量_X/Y')
margin_z = fields.Char('余量_Z')
depth_of_processing_z = fields.Char('加工深度(Z)')
cutting_tool_extension_length = fields.Char('刀具伸出长度')
cutting_tool_handle_type = fields.Char('刀柄型号')
estimated_processing_time = fields.Char('预计加工时间')
remark = fields.Text('备注')
workorder_id = fields.Many2one('mrp.workorder', string="工单")
button_state = fields.Boolean(string='是否已经下发')
# mrs下发编程单创建CNC加工
def cnc_processing_create(self, cnc_workorder, ret):
logging.info('ret:%s' % ret)
for obj in ret['programming_list']:
workorder = self.env['mrp.workorder'].search([('production_id.name', '=', ret['production_order_no']),
('processing_panel', '=', obj['processing_panel']),
('routing_type', '=', 'CNC加工')])
cnc_processing = self.env['sf.cnc.processing'].create({
'workorder_id': workorder.id,
'sequence_number': obj['sequence_number'],
'program_name': obj['program_name'],
'cutting_tool_name': obj['cutting_tool_name'],
'cutting_tool_no': obj['cutting_tool_no'],
'processing_type': obj['processing_type'],
'margin_x_y': obj['margin_x_y'],
'margin_z': obj['margin_z'],
'depth_of_processing_z': obj['depth_of_processing_z'],
'cutting_tool_extension_length': obj['cutting_tool_extension_length'],
'cutting_tool_handle_type': obj['cutting_tool_handle_type'],
'estimated_processing_time': obj['estimated_processing_time'],
'remark': obj['remark']
})
self.get_cnc_processing_file(ret['folder_name'], cnc_processing, workorder.processing_panel)
cnc_workorder.state = 'done'
cnc_workorder.work_state = '已编程'
cnc_workorder.programming_state = '已编程'
cnc_workorder.time_ids.date_end = datetime.now()
cnc_workorder.button_finish()
def get_cnc_processing_file(self, folder_name, cnc_processing, processing_panel):
logging.info('folder_name:%s' % folder_name)
serverdir = os.path.join('/tmp', folder_name, 'return', processing_panel)
logging.info('serverdir:%s' % serverdir)
for root, dirs, files in os.walk(serverdir):
for f in files:
logging.info('f:%s' % f)
if os.path.splitext(f)[1] == ".pdf":
full_path = os.path.join(serverdir, root, f)
logging.info('pdf:%s' % full_path)
if full_path != False:
if not cnc_processing.workorder_id.cnc_worksheet:
cnc_processing.workorder_id.cnc_worksheet = base64.b64encode(
open(full_path, 'rb').read())
else:
if cnc_processing.program_name == f.split('.')[0]:
cnc_file_path = os.path.join(serverdir, root, f)
logging.info('cnc_file_path:%s' % cnc_file_path)
self.write_file(cnc_file_path, cnc_processing)
# 创建附件(nc文件)
def attachment_create(self, name, data):
attachment = self.env['ir.attachment'].create({
'datas': base64.b64encode(data),
'type': 'binary',
'public': True,
'description': '程序文件',
'name': name
})
return attachment
# 将FTP的nc文件下载到临时目录
def download_file_tmp(self, production_no, processing_panel):
remotepath = os.path.join('/', production_no, 'return', processing_panel)
serverdir = os.path.join('/tmp', production_no, 'return', processing_panel)
ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password'])
download_state = ftp.download_file_tree(remotepath, serverdir)
return download_state
# 将nc文件存到attach的datas里
def write_file(self, nc_file_path, cnc):
if os.path.exists(nc_file_path):
with open(nc_file_path, 'rb') as file:
data_bytes = file.read()
attachment = self.attachment_create(cnc.program_name + '.NC', data_bytes)
cnc.write({'cnc_id': attachment.id})
file.close()
else:
return False
class SfWorkOrderBarcodes(models.Model):
"""
智能工厂工单处扫码绑定托盘
"""
_name = "mrp.workorder"
_inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"]
def on_barcode_scanned(self, barcode):
workorder = self.env['mrp.workorder'].browse(self.ids)
logging.info(111111111111111111111111111111111)
if "*" not in barcode:
logging.info(222222222222222222222222222222222222222)
if self.routing_type == '装夹':
tray_code = self.env['sf.tray'].search([('code', '=', barcode)])
self.tray_code = tray_code.code
self.tray_id = workorder.gettray_auto(barcode)
elif self.routing_type == '前置三元定位检测':
print('我是前置三元检测')
logging.info('我是前置三元检测')
elif self.routing_type == 'CNC加工':
if barcode == 'UP-ALL':
print("我是一键合并下发")
logging.info('我是一键合并下发')
self.up_merge_all()
else:
print('CNC加工')

View File

@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from odoo import SUPERUSER_ID, _, api, fields, models, registry
class Users(models.Model):
_inherit = 'res.users'
workcenter_ids = fields.Many2many("mrp.workcenter", 'users_workcenter')

View File

@@ -0,0 +1,229 @@
# -*- coding: utf-8 -*-
import logging
from collections import defaultdict, namedtuple
from odoo.addons.stock.models.stock_rule import ProcurementException
from re import findall as regex_findall
from re import split as regex_split
from odoo import SUPERUSER_ID, _, api, fields, models, registry
from odoo.tools import float_compare, float_is_zero, html_escape
class StockRule(models.Model):
_inherit = 'stock.rule'
@api.model
def _run_pull(self, procurements):
moves_values_by_company = defaultdict(list)
mtso_products_by_locations = defaultdict(list)
# To handle the `mts_else_mto` procure method, we do a preliminary loop to
# isolate the products we would need to read the forecasted quantity,
# in order to to batch the read. We also make a sanitary check on the
# `location_src_id` field.
# list1 = []
# for item in procurements:
# num = int(item[0].product_qty)
# if num > 1:
# for no in range(1, num+1):
#
# Procurement = namedtuple('Procurement', ['product_id', 'product_qty',
# 'product_uom', 'location_id', 'name', 'origin',
# 'company_id',
# 'values'])
# s = Procurement(product_id=item[0].product_id,product_qty=1.0,product_uom=item[0].product_uom,
# location_id=item[0].location_id,
# name=item[0].name,
# origin=item[0].origin,
# company_id=item[0].company_id,
# values=item[0].values,
# )
# item1 = list(item)
# item1[0]=s
#
# list1.append(tuple(item1))
# else:
# list1.append(item)
for procurement, rule in procurements:
if not rule.location_src_id:
msg = _('No source location defined on stock rule: %s!') % (rule.name,)
raise ProcurementException([(procurement, msg)])
if rule.procure_method == 'mts_else_mto':
mtso_products_by_locations[rule.location_src_id].append(procurement.product_id.id)
# Get the forecasted quantity for the `mts_else_mto` procurement.
forecasted_qties_by_loc = {}
for location, product_ids in mtso_products_by_locations.items():
products = self.env['product.product'].browse(product_ids).with_context(location=location.id)
forecasted_qties_by_loc[location] = {product.id: product.free_qty for product in products}
# Prepare the move values, adapt the `procure_method` if needed.
procurements = sorted(procurements, key=lambda proc: float_compare(proc[0].product_qty, 0.0,
precision_rounding=proc[
0].product_uom.rounding) > 0)
list2 = []
for item in procurements:
num = int(item[0].product_qty)
product = self.env['product.product'].search(
[("id", '=', item[0].product_id.id)])
product_tmpl = self.env['product.template'].search(
["&", ("id", '=', product.product_tmpl_id.id), ('single_manufacturing', "!=", False)])
if product_tmpl:
if num > 1:
for no in range(1, num + 1):
Procurement = namedtuple('Procurement', ['product_id', 'product_qty',
'product_uom', 'location_id', 'name', 'origin',
'company_id',
'values'])
s = Procurement(product_id=item[0].product_id, product_qty=1.0, product_uom=item[0].product_uom,
location_id=item[0].location_id,
name=item[0].name,
origin=item[0].origin,
company_id=item[0].company_id,
values=item[0].values,
)
item1 = list(item)
item1[0] = s
list2.append(tuple(item1))
else:
list2.append(item)
else:
list2.append(item)
for procurement, rule in list2:
procure_method = rule.procure_method
if rule.procure_method == 'mts_else_mto':
qty_needed = procurement.product_uom._compute_quantity(procurement.product_qty,
procurement.product_id.uom_id)
if float_compare(qty_needed, 0, precision_rounding=procurement.product_id.uom_id.rounding) <= 0:
procure_method = 'make_to_order'
for move in procurement.values.get('group_id', self.env['procurement.group']).stock_move_ids:
if move.rule_id == rule and float_compare(move.product_uom_qty, 0,
precision_rounding=move.product_uom.rounding) > 0:
procure_method = move.procure_method
break
forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed
elif float_compare(qty_needed, forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id],
precision_rounding=procurement.product_id.uom_id.rounding) > 0:
procure_method = 'make_to_order'
else:
forecasted_qties_by_loc[rule.location_src_id][procurement.product_id.id] -= qty_needed
procure_method = 'make_to_stock'
move_values = rule._get_stock_move_values(*procurement)
move_values['procure_method'] = procure_method
moves_values_by_company[procurement.company_id.id].append(move_values)
for company_id, moves_values in moves_values_by_company.items():
# create the move as SUPERUSER because the current user may not have the rights to do it (mto product launched by a sale for example)
moves = self.env['stock.move'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(moves_values)
# Since action_confirm launch following procurement_group we should activate it.
moves._action_confirm()
return True
@api.model
def _run_manufacture(self, procurements):
productions_values_by_company = defaultdict(list)
errors = []
for procurement, rule in procurements:
if float_compare(procurement.product_qty, 0, precision_rounding=procurement.product_uom.rounding) <= 0:
# If procurement contains negative quantity, don't create a MO that would be for a negative value.
continue
bom = rule._get_matching_bom(procurement.product_id, procurement.company_id, procurement.values)
productions_values_by_company[procurement.company_id.id].append(rule._prepare_mo_vals(*procurement, bom))
if errors:
raise ProcurementException(errors)
for company_id, productions_values in productions_values_by_company.items():
# create the MO as SUPERUSER because the current user may not have the rights to do it (mto product launched by a sale for example)
'''创建制造订单'''
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(company_id).create(
productions_values)
self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
'''
创建工单
'''
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production in productions:
'''
创建制造订单时生成序列号
'''
production.action_generate_serial()
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
return True
class ProductionLot(models.Model):
_inherit = 'stock.lot'
@api.model
def generate_lot_names1(self, display_name, first_lot, count):
"""Generate `lot_names` from a string."""
if first_lot.__contains__(display_name):
first_lot = first_lot[(len(display_name) + 1):]
# We look if the first lot contains at least one digit.
caught_initial_number = regex_findall(r"\d+", first_lot)
if not caught_initial_number:
return self.generate_lot_names1(display_name, first_lot + "0", count)
# We base the series on the last number found in the base lot.
initial_number = caught_initial_number[-1]
padding = len(initial_number)
# We split the lot name to get the prefix and suffix.
splitted = regex_split(initial_number, first_lot)
# initial_number could appear several times, e.g. BAV023B00001S00001
prefix = initial_number.join(splitted[:-1])
suffix = splitted[-1]
initial_number = int(initial_number)
lot_names = []
for i in range(0, count):
lot_names.append('%s-%s%s%s' % (
display_name,
prefix,
str(initial_number + i).zfill(padding),
suffix
))
return lot_names
@api.model
def _get_next_serial(self, company, product):
"""Return the next serial number to be attributed to the product."""
if product.tracking == "serial":
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id)],
limit=1, order='id DESC')
if last_serial:
return self.env['stock.lot'].generate_lot_names1(product.name, last_serial.name, 2)[
1]
return "%s-%03d" % (product.name, 1)

View File

@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Part of SmartGo. See LICENSE file for full copyright and licensing details.
import base64
from io import BytesIO
from odoo import api, fields, models, SUPERUSER_ID, _
from pystrich.code128 import Code128Encoder
class Tray(models.Model):
_inherit = 'sf.tray'
_description = '托盘'
qr_image = fields.Binary(string="托盘二维码", compute='compute_qr_image')
production_id = fields.Many2one('mrp.production', string='制造订单',
related='workorder_id.production_id'
)
workorder_id = fields.Many2one('mrp.workorder', string="工单"
)
@api.onchange('production_id')
def updateTrayState(self):
if self.workorder_id != False and self.create_date != False:
self.state = '占用'
else:
self.state = '空闲'
def unclamp(self):
self.workorder_id = False
self.production_id = False
self.state = '空闲'
@api.depends('code')
def compute_qr_image(self):
for item in self:
if not item.code:
item.qr_image = False
continue
# 根据code动态生成二维码图片
# qr = qrcode.QRCode(
# version=1,
# error_correction=qrcode.constants.ERROR_CORRECT_L,
# box_size=10,
# border=4,
# )
# qr.add_data(item.code)
# qr.make(fit=True)
# img = qr.make_image()
# 生成条形码文件
# bar = barcode.get("ean13", "123456789102", writer=ImageWriter())
# a = bar.get_fullcode()
# b = bar.save('occ')
# 生成条形码图片
partner_encoder = Code128Encoder(item.code)
# 转换bytes流
temp = BytesIO()
partner_encoder.save(temp)
# img.save(temp, format='PNG')
qr_image = base64.b64encode(temp.getvalue())
item.qr_image = qr_image