Files
test/quality_control/models/quality.py
2025-05-08 15:57:16 +08:00

1091 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from math import sqrt
from dateutil.relativedelta import relativedelta
from datetime import datetime
import random
from odoo import api, models, fields, _
from odoo.api import depends
from odoo.tools import DEFAULT_SERVER_DATETIME_FORMAT, float_round
from odoo.osv.expression import OR
from odoo.exceptions import UserError
from odoo.tools import image_data_uri
from base64 import b64encode
import requests
import json
import base64
class QualityPoint(models.Model):
_inherit = "quality.point"
failure_message = fields.Html('Failure Message')
measure_on = fields.Selection([
('operation', 'Operation'),
('product', 'Product'),
('move_line', 'Quantity')], string="Control per", default='product', required=True,
help="""Operation = One quality check is requested at the operation level.
Product = A quality check is requested per product.
Quantity = A quality check is requested for each new product quantity registered, with partial quantity checks also possible.""")
measure_frequency_type = fields.Selection([
('all', 'All'),
('random', 'Randomly'),
('periodical', 'Periodically')], string="Control Frequency",
default='all', required=True)
measure_frequency_value = fields.Float('Percentage') # TDE RENAME ?
measure_frequency_unit_value = fields.Integer('Frequency Unit Value') # TDE RENAME ?
measure_frequency_unit = fields.Selection([
('day', 'Days'),
('week', 'Weeks'),
('month', 'Months')], default="day") # TDE RENAME ?
is_lot_tested_fractionally = fields.Boolean(string="Lot Tested Fractionally",
help="Determines if only a fraction of the lot should be tested")
testing_percentage_within_lot = fields.Float(help="Defines the percentage within a lot that should be tested")
norm = fields.Float('Norm', digits='Quality Tests') # TDE RENAME ?
tolerance_min = fields.Float('Min Tolerance', digits='Quality Tests')
tolerance_max = fields.Float('Max Tolerance', digits='Quality Tests')
norm_unit = fields.Char('Norm Unit', default=lambda self: 'mm') # TDE RENAME ?
average = fields.Float(compute="_compute_standard_deviation_and_average")
standard_deviation = fields.Float(compute="_compute_standard_deviation_and_average")
def _compute_standard_deviation_and_average(self):
# The variance and mean are computed by the Welfords method and used the Bessel's
# correction because are working on a sample.
for point in self:
if point.test_type != 'measure':
point.average = 0
point.standard_deviation = 0
continue
mean = 0.0
s = 0.0
n = 0
for check in point.check_ids.filtered(lambda x: x.quality_state != 'none'):
n += 1
delta = check.measure - mean
mean += delta / n
delta2 = check.measure - mean
s += delta * delta2
if n > 1:
point.average = mean
point.standard_deviation = sqrt(s / (n - 1))
elif n == 1:
point.average = mean
point.standard_deviation = 0.0
else:
point.average = 0.0
point.standard_deviation = 0.0
@api.onchange('norm')
def onchange_norm(self):
if self.tolerance_max == 0.0:
self.tolerance_max = self.norm
def check_execute_now(self):
self.ensure_one()
if self.measure_frequency_type == 'all':
return True
elif self.measure_frequency_type == 'random':
return (random.random() < self.measure_frequency_value / 100.0)
elif self.measure_frequency_type == 'periodical':
delta = False
if self.measure_frequency_unit == 'day':
delta = relativedelta(days=self.measure_frequency_unit_value)
elif self.measure_frequency_unit == 'week':
delta = relativedelta(weeks=self.measure_frequency_unit_value)
elif self.measure_frequency_unit == 'month':
delta = relativedelta(months=self.measure_frequency_unit_value)
date_previous = datetime.today() - delta
checks = self.env['quality.check'].search([
('point_id', '=', self.id),
('create_date', '>=', date_previous.strftime(DEFAULT_SERVER_DATETIME_FORMAT))], limit=1)
return not (bool(checks))
return super(QualityPoint, self).check_execute_now()
def _get_type_default_domain(self):
domain = super(QualityPoint, self)._get_type_default_domain()
domain.append(('technical_name', '=', 'passfail'))
return domain
def action_see_quality_checks(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
action['domain'] = [('point_id', '=', self.id)]
action['context'] = {
'default_company_id': self.company_id.id,
'default_point_id': self.id
}
return action
def action_see_spc_control(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_spc")
if self.test_type == 'measure':
action['context'] = {'group_by': ['name', 'point_id'], 'graph_measure': ['measure'], 'graph_mode': 'line'}
action['domain'] = [('point_id', '=', self.id), ('quality_state', '!=', 'none')]
return action
class QualityCheck(models.Model):
_inherit = "quality.check"
part_name = fields.Char('零件名称', related='product_id.part_name', readonly=False, store=True)
part_number = fields.Char('零件图号', related='product_id.part_number', readonly=False, store=True)
material_name = fields.Char('材料名称', compute='_compute_material_name')
# # 总数量值为调拨单_产品明细_数量
# total_qty = fields.Float('总数量', compute='_compute_total_qty', readonly=True)
# # 检验数
# check_qty = fields.Float('检验数', compute='_compute_check_qty', readonly=True)
# # 出厂检验报告编号
# report_number = fields.Char('出厂检验报告编号', compute='_compute_report_number', readonly=True)
# 总数量值为调拨单_产品明细_数量
total_qty = fields.Char('总数量', compute='_compute_total_qty')
column_nums = fields.Integer('测量值列数', default=1)
@api.depends('picking_id')
def _compute_total_qty(self):
for record in self:
if record.picking_id:
total_qty = 0
for move in record.picking_id.move_ids_without_package:
if move.product_id == record.product_id:
total_qty = int(move.product_uom_qty)
record.total_qty = total_qty if total_qty > 0 else ''
else:
record.total_qty = ''
# 检验数
check_qty = fields.Integer('检验数', default=lambda self: self._get_default_check_qty())
def _get_default_check_qty(self):
"""根据条件设置检验数的默认值"""
# 这里需要使用_origin来获取已存储的记录因为新记录在创建时可能还没有这些值
if self._origin:
if self._origin.measure_on == 'product' and self._origin.test_type_id.name == '出厂检验报告':
return ''
elif self._origin.measure_on == 'product':
return '1'
return ''
@api.onchange('test_type_id', 'measure_on')
def _onchange_check_qty(self):
"""当测试类型或测量对象变化时,更新检验数"""
if self.measure_on == 'product' and self.test_type_id.name == '出厂检验报告':
self.check_qty = 0
elif self.measure_on == 'product':
self.check_qty = 1
# 出厂检验报告编号
report_number_id = fields.Many2one('documents.document', string='出厂检验报告编号', readonly=True)
report_number_name = fields.Char('出厂检验报告编号名称', compute='_compute_report_number_name')
old_report_name = fields.Char('旧出厂检验报告编号', default='')
@api.depends('serial_number', 'part_number')
def _compute_report_number_name(self):
for record in self:
str_serial_number = '0' + str(record.serial_number) if record.serial_number < 10 else str(
record.serial_number)
str_part_number = record.part_number if record.part_number else ''
record.report_number_name = f'FQC{str_part_number}{str_serial_number}'
# 出厂检验报告、关联文档的数据
report_content = fields.Binary(string='出厂检验报告', related='report_number_id.datas')
is_out_check = fields.Boolean(string='是否出库检验', compute='_compute_is_out_check', readonly=True)
measure_line_ids = fields.One2many('quality.check.measure.line', 'check_id', string='测量明细')
categ_type = fields.Selection(string='产品的类别', related='product_id.categ_id.type', store=True)
report_result = fields.Selection([
('OK', 'OK'),
('NG', 'NG')
], string='出厂检验报告结果', default='OK')
measure_operator = fields.Many2one('res.users', string='操机员')
quality_manager = fields.Many2one('res.users', string='质检员', compute='_compute_quality_manager')
@api.depends('measure_line_ids')
def _compute_quality_manager(self):
for record in self:
if record.measure_line_ids:
record.quality_manager = record.env.user.id
else:
record.quality_manager = False
# 流水号(从1开始最大99)
serial_number = fields.Integer('流水号', default=1, readonly=True)
# 发布历史
report_history_ids = fields.One2many('quality.check.report.history', 'check_id', string='发布历史')
# 发布状态
publish_status = fields.Selection([
('draft', '草稿'),
('published', '已发布'),
('canceled', '已撤销')
], string='发布状态', default='draft')
# 出厂检验报告是否已上传
is_factory_report_uploaded = fields.Boolean(string='出厂检验报告是否已上传', default=False)
def add_measure_line(self):
"""
新增测量值,如果测量值有5列了则提示“最多只能有5列测量值”
"""
if self.column_nums >= 5:
raise UserError(_('最多只能有5列测量值'))
else:
for line in self.measure_line_ids:
field_name = f'measure_value{self.column_nums + 1}'
if hasattr(line, field_name):
line[field_name] = False
self.column_nums = self.column_nums + 1
def remove_measure_line(self):
"""
删除测量值
"""
if self.column_nums <= 1:
raise UserError(_('最少要有1列测量值'))
else:
for line in self.measure_line_ids:
field_name = f'measure_value{self.column_nums}'
if hasattr(line, field_name):
line[field_name] = False
self.column_nums = self.column_nums - 1
def upload_measure_line(self):
"""
上传测量值
"""
for record in self:
if not record.part_name or not record.part_number:
raise UserError(_('零件名称和零件图号均不能为空'))
# 如果验证通过,返回原动作
action = self.env.ref('quality_control.import_complex_model_wizard').read()[0]
action['context'] = {
'default_model_name': 'quality.check.measure.line',
'default_check_id': self.id,
}
return action
def do_preview(self):
"""
预览出厂检验报告
"""
pass
def do_publish(self):
"""发布出厂检验报告"""
self.ensure_one()
self._check_part_number()
self._check_measure_line()
self._check_check_qty_and_total_qty()
# 打开确认向导而不是直接发布
return {
'name': _('发布确认'),
'type': 'ir.actions.act_window',
'res_model': 'quality.check.publish.wizard',
'view_mode': 'form',
'target': 'new',
'context': {
'default_check_id': self.id,
'default_product_name': self.product_id.name,
'default_total_qty': self.total_qty,
'default_check_qty': self.check_qty,
'default_measure_count': self.column_nums,
'default_item_count': len(self.measure_line_ids),
'default_old_report_name': self.old_report_name,
'default_publish_status': self.publish_status,
}
}
def _do_publish_implementation(self):
"""实际执行发布操作的方法"""
self.ensure_one()
# 1. 获取已发布的文档文件夹
workspace = self.env['documents.folder'].search(
[('parent_folder_id', '=', self.env.ref('sf_quality.documents_purchase_contracts_folder').id),
('name', '=', '已发布')], limit=1)
if self.serial_number > 99:
raise UserError(_('流水号不能大于99'))
# 2. 先创建空文档记录
doc_vals = {
'name': self.report_number_name,
'mimetype': 'application/pdf',
'res_id': self.id,
'folder_id': workspace.id,
'res_model': self._name,
}
doc = self.env['documents.document'].create(doc_vals)
# 3. 关联文档到质检记录
self.write({
'report_number_id': doc.id,
'quality_state': 'pass'
})
# 4. 获取报告动作并生成PDF此时二维码将包含正确的文档ID
report_action = self.env.ref('sf_quality.action_report_quality_inspection')
pdf_content, _ = report_action._render_qweb_pdf(
report_ref=report_action.report_name,
res_ids=self.ids
)
# 5. 更新文档内容
doc.write({
'raw': pdf_content
})
# 6. 记录发布历史
self.env['quality.check.report.history'].create({
'check_id': self.id,
'report_number_id': doc.id,
'action': 'publish',
'operator': self.env.user.name,
'operation_time': datetime.now(),
'document_status': 'published',
'sequence': len(self.report_history_ids) + 1
})
# 7. 更新其他信息
self.serial_number += 1
if self.publish_status == 'canceled' and self.picking_id.state == 'done':
self.upload_factory_report()
self.write({
'publish_status': 'published',
})
return True
# 发布前检验零件图号、操机员、质检员
def _check_part_number(self):
if not self.part_number:
raise UserError(_('零件图号不能为空'))
if not self.measure_operator:
raise UserError(_('操机员不能为空'))
# 发布前校验明细行列均非空
def _check_measure_line(self):
for record in self:
if not record.measure_line_ids:
raise UserError(_('请先添加测量明细'))
for line in record.measure_line_ids:
if not line.measure_item:
raise UserError(_('有检测项目值为空'))
for i in range(1, record.column_nums + 1):
if not getattr(line, f'measure_value{i}'):
raise UserError(_('有测量值为空'))
# 发布前校验检验数与总数量、检验数与测量件数(即测量列数)
def _check_check_qty_and_total_qty(self):
for record in self:
if not record.check_qty:
raise UserError(_('请先输入检验数'))
if not record.total_qty:
raise UserError(_('总数量不能为空'))
if record.check_qty > int(record.total_qty):
raise UserError(_('检验数不可超过总数量'))
if record.column_nums > record.check_qty:
raise UserError(_('测量件数不可超过检验数'))
def do_cancel_publish(self):
"""
取消发布出厂检验报告(将当前质检单关联的出厂检验报告文档位置移动到废弃文件夹), 并记录发布历史
"""
self.ensure_one()
# 1. 获取已发布的文档文件夹
workspace = self.env['documents.folder'].search(
[('parent_folder_id', '=', self.env.ref('sf_quality.documents_purchase_contracts_folder').id),
('name', '=', '已发布')], limit=1)
# 2. 将当前质检单关联的出厂检验报告文档位置移动到废弃文件夹
self.report_number_id.write({
'folder_id': self.env.ref('sf_quality.documents_purchase_contracts_folder_canceled').id,
})
# 3. 记录发布历史
self.env['quality.check.report.history'].create({
'check_id': self.id,
'report_number_id': self.report_number_id.id,
'action': 'cancel_publish',
'operator': self.env.user.name,
'operation_time': datetime.now(),
'document_status': 'canceled',
'sequence': len(self.report_history_ids) + 1
})
self.write({
'old_report_name': self.report_number_id.name
})
# 3. 更新发布状态
self.write({
'publish_status': 'canceled',
'report_number_id': False,
'quality_state': 'none'
})
if self.is_factory_report_uploaded:
# 4. 删除加工订单明细中的出厂检验报告
self.delete_factory_report()
return True
def do_re_publish(self):
"""
重新发布出厂检验报告,参考发布规则
"""
return self.do_publish()
def generate_qr_code(self):
"""生成二维码URL"""
self.ensure_one()
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
return image_data_uri(
b64encode(self.env['ir.actions.report'].barcode(
'QR', base_url + '/#/index/publicPay?order_id=' + str(self.id) + '&source=%2Findex%2Fmyorder',
width=140, height=140)
)
)
def get_latest_report_attachment(self, check_id):
"""获取指定质检记录的最新报告附件,并删除旧的报告附件"""
# 查找特定质检记录的所有附件
attachments = self.env['ir.attachment'].search([
('res_model', '=', 'quality.check'),
('res_id', '=', check_id),
('name', 'like', 'QC-QC') # 根据您的命名规则调整
], order='create_date DESC') # 按创建日期降序排序
# # 如果附件数量大于1则删除除最新报告外的其他报告附件
# if len(attachments) > 1:
# for attachment in attachments[1:]:
# attachment.unlink()
# 返回最新的附件(如果存在)
return attachments and attachments[0] or False
def get_report_url(self):
"""生成报告访问URL"""
self.ensure_one()
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
if self.report_number_id:
print(f"{base_url}/quality/report/{self.report_number_id.id}")
return f"{base_url}/quality/report/{self.report_number_id.id}"
else:
return f"{base_url}/quality/report/not_published"
def upload_factory_report(self):
"""
上传出厂检验报告到加工订单明细中
将当前质检单的出厂检验报告上传到对应的加工订单明细中
"""
self.ensure_one()
if not self.report_content:
raise UserError(_('当前质检单没有出厂检验报告,请先发布报告'))
if not self.product_id.model_name:
raise UserError(_('产品模型名称为空'))
if not self.picking_id or not self.picking_id.origin:
raise UserError(_('无法找到相关的调拨单或来源单据'))
# 获取订单号(从调拨单的来源字段获取)
order_ref = self.picking_id.retrospect_ref
try:
# 准备请求数据
payload = {
"order_ref": order_ref,
"model_name": self.product_id.model_name,
"report_file": self.report_content.decode('utf-8') if isinstance(self.report_content,
bytes) else self.report_content
}
# 将Python字典转换为JSON字符串
json_data = json.dumps(payload)
# 获取服务器URL
base_url = self.env['ir.config_parameter'].sudo().get_param('bfm_url_new')
api_url = f"{base_url}/api/report/create"
# 设置请求头
headers = {
'Content-Type': 'application/json',
}
# 发送POST请求
response = requests.post(api_url, data=json_data, headers=headers)
# 处理响应
if response.status_code == 200:
result = response.json()
if result.get('success'):
# 上传成功,显示成功消息
self.is_factory_report_uploaded = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('上传成功'),
'message': _('出厂检验报告已成功上传到加工订单明细'),
'type': 'success',
'sticky': False,
}
}
else:
# API返回失败信息
raise UserError(_('上传失败: %s') % result.get('message', '未知错误'))
else:
# HTTP请求失败
raise UserError(_('请求失败,状态码: %s') % response.status_code)
except Exception as e:
raise UserError(_('上传过程中发生错误: %s') % str(e))
def delete_factory_report(self):
"""
删除加工订单明细中的出厂检验报告
"""
# 获取订单号(从调拨单的来源字段获取)
order_ref = self.picking_id.retrospect_ref
if not order_ref:
raise UserError(_('无法找到相关的调拨单或来源单据'))
if not self.product_id.model_name:
raise UserError(_('产品模型名称为空'))
try:
# 准备请求数据
payload = {
"order_ref": order_ref,
"model_name": self.product_id.model_name
}
# 将Python字典转换为JSON字符串
json_data = json.dumps(payload)
# 获取服务器URL
base_url = self.env['ir.config_parameter'].sudo().get_param('bfm_url_new')
api_url = f"{base_url}/api/report/delete"
# 设置请求头
headers = {
'Content-Type': 'application/json',
}
# 发送POST请求
response = requests.post(api_url, data=json_data, headers=headers)
# 处理响应
if response.status_code == 200:
result = response.json()
if result.get('success'):
# 删除成功,显示成功消息
self.is_factory_report_uploaded = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('删除成功'),
'message': _('出厂检验报告已成功删除'),
'type': 'success',
'sticky': False,
}
}
else:
# API返回失败信息
raise UserError(_('删除失败: %s') % result.get('message', '未知错误'))
else:
# HTTP请求失败
raise UserError(_('请求失败,状态码: %s') % response.status_code)
except Exception as e:
raise UserError(_('删除过程中发生错误: %s') % str(e))
@depends('product_id')
def _compute_material_name(self):
for record in self:
materials_id_name = record.product_id.materials_id.name if record.product_id.materials_id else ''
materials_type_name = record.product_id.materials_type_id.name if record.product_id.materials_type_id else ''
record.material_name = materials_id_name + ' ' + materials_type_name
@depends('test_type_id')
def _compute_is_out_check(self):
for record in self:
if record.test_type_id.name == '出厂检验报告':
record.is_out_check = True
else:
record.is_out_check = False
failure_message = fields.Html(related='point_id.failure_message', readonly=True)
measure = fields.Float('Measure', default=0.0, digits='Quality Tests', tracking=True)
measure_success = fields.Selection([
('none', 'No measure'),
('pass', 'Pass'),
('fail', 'Fail')], string="Measure Success", compute="_compute_measure_success",
readonly=True, store=True)
tolerance_min = fields.Float('Min Tolerance', related='point_id.tolerance_min', readonly=True)
tolerance_max = fields.Float('Max Tolerance', related='point_id.tolerance_max', readonly=True)
warning_message = fields.Text(compute='_compute_warning_message')
norm_unit = fields.Char(related='point_id.norm_unit', readonly=True)
qty_to_test = fields.Float(compute="_compute_qty_to_test", string="Quantity to Test",
help="Quantity of product to test within the lot")
qty_tested = fields.Float(string="Quantity Tested", help="Quantity of product tested within the lot")
measure_on = fields.Selection([
('operation', 'Operation'),
('product', 'Product'),
('move_line', 'Quantity')], string="Control per", default='product', required=True,
help="""Operation = One quality check is requested at the operation level.
Product = A quality check is requested per product.
Quantity = A quality check is requested for each new product quantity registered, with partial quantity checks also possible.""")
move_line_id = fields.Many2one('stock.move.line', 'Stock Move Line', check_company=True,
help="In case of Quality Check by Quantity, Move Line on which the Quality Check applies")
lot_name = fields.Char('Lot/Serial Number Name')
lot_line_id = fields.Many2one('stock.lot', store=True, compute='_compute_lot_line_id')
qty_line = fields.Float(compute='_compute_qty_line', string="Quantity")
uom_id = fields.Many2one(related='product_id.uom_id', string="Product Unit of Measure")
show_lot_text = fields.Boolean(compute='_compute_show_lot_text')
is_lot_tested_fractionally = fields.Boolean(related='point_id.is_lot_tested_fractionally')
testing_percentage_within_lot = fields.Float(related="point_id.testing_percentage_within_lot")
product_tracking = fields.Selection(related='product_id.tracking')
quality_check_type = fields.Selection([
('采购入库检', '采购入库检'),
('客供料入库检', '客供料入库检'),
('退货入库检', '退货入库检'),
('生产入库检', '生产入库检'),
('外协入库检', '外协入库检'),
('成品发货检', '成品发货检'),
('工序外协发货检', '工序外协发货检'),
('委外坯料发货检', '委外坯料发货检')], string='类型', compute='_compute_quality_check_type', store=True)
@api.depends('picking_id')
def _compute_quality_check_type(self):
for check in self:
if check.picking_id:
picking_type = check.picking_id.picking_type_id.sequence_code
type_mapping = {
'IN': '采购入库检',
'DL': '客供料入库检',
'RET': '退货入库检',
'SFP': '生产入库检',
'OCIN': '外协入库检',
'OUT': '成品发货检',
'OCOUT': '工序外协发货检',
'RES': '委外坯料发货检',
}
check.quality_check_type = type_mapping.get(picking_type, False)
else:
check.quality_check_type = False
@api.depends('measure_success')
def _compute_warning_message(self):
for rec in self:
if rec.measure_success == 'fail':
rec.warning_message = _('You measured %.2f %s and it should be between %.2f and %.2f %s.') % (
rec.measure, rec.norm_unit, rec.point_id.tolerance_min,
rec.point_id.tolerance_max, rec.norm_unit
)
else:
rec.warning_message = ''
@api.depends('move_line_id.qty_done')
def _compute_qty_line(self):
for qc in self:
qc.qty_line = qc.move_line_id.qty_done
@api.depends('move_line_id.lot_id')
def _compute_lot_line_id(self):
for qc in self:
qc.lot_line_id = qc.move_line_id.lot_id
if qc.lot_line_id:
qc.lot_id = qc.lot_line_id
@api.depends('measure')
def _compute_measure_success(self):
for rec in self:
if rec.point_id.test_type == 'passfail':
rec.measure_success = 'none'
else:
if rec.measure < rec.point_id.tolerance_min or rec.measure > rec.point_id.tolerance_max:
rec.measure_success = 'fail'
else:
rec.measure_success = 'pass'
# Add picture dependency
@api.depends('picture')
def _compute_result(self):
super(QualityCheck, self)._compute_result()
@api.depends('qty_line', 'testing_percentage_within_lot', 'is_lot_tested_fractionally')
def _compute_qty_to_test(self):
for qc in self:
if qc.is_lot_tested_fractionally:
rounding = qc.product_id.uom_id.rounding if qc.product_id.uom_id else 0.01
qc.qty_to_test = float_round(qc.qty_line * qc.testing_percentage_within_lot / 100,
precision_rounding=rounding, rounding_method="UP")
else:
qc.qty_to_test = qc.qty_line
@api.depends('lot_line_id', 'move_line_id')
def _compute_show_lot_text(self):
for qc in self:
if qc.lot_line_id or not qc.move_line_id:
qc.show_lot_text = False
else:
qc.show_lot_text = True
def _is_pass_fail_applicable(self):
if self.test_type in ['passfail', 'measure']:
return True
return super()._is_pass_fail_applicable()
def _get_check_result(self):
if self.test_type == 'picture' and self.picture:
return _('Picture Uploaded')
else:
return super(QualityCheck, self)._get_check_result()
def _check_to_unlink(self):
return True
def do_measure(self):
self.ensure_one()
if self.measure < self.point_id.tolerance_min or self.measure > self.point_id.tolerance_max:
return self.do_fail()
else:
return self.do_pass()
def correct_measure(self):
self.ensure_one()
return {
'name': _('Quality Checks'),
'type': 'ir.actions.act_window',
'res_model': 'quality.check',
'view_mode': 'form',
'view_id': self.env.ref('quality_control.quality_check_view_form_small').id,
'target': 'new',
'res_id': self.id,
'context': self.env.context,
}
def do_alert(self):
self.ensure_one()
alert = self.env['quality.alert'].create({
'check_id': self.id,
'product_id': self.product_id.id,
'product_tmpl_id': self.product_id.product_tmpl_id.id,
'lot_id': self.lot_id.id,
'user_id': self.user_id.id,
'team_id': self.team_id.id,
'company_id': self.company_id.id
})
return {
'name': _('Quality Alert'),
'type': 'ir.actions.act_window',
'res_model': 'quality.alert',
'views': [(self.env.ref('quality_control.quality_alert_view_form').id, 'form')],
'res_id': alert.id,
'context': {'default_check_id': self.id},
}
def action_see_alerts(self):
self.ensure_one()
if len(self.alert_ids) == 1:
return {
'name': _('Quality Alert'),
'type': 'ir.actions.act_window',
'res_model': 'quality.alert',
'views': [(self.env.ref('quality_control.quality_alert_view_form').id, 'form')],
'res_id': self.alert_ids.ids[0],
'context': {'default_check_id': self.id},
}
else:
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_alert_action_check")
action['domain'] = [('id', 'in', self.alert_ids.ids)]
action['context'] = dict(self._context, default_check_id=self.id)
return action
def action_open_quality_check_wizard(self, current_check_id=None):
check_ids = sorted(self.ids)
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.action_quality_check_wizard")
action['context'] = self.env.context.copy()
action['context'].update({
'default_check_ids': check_ids,
'default_current_check_id': current_check_id or check_ids[0],
})
return action
class QualityAlert(models.Model):
_inherit = "quality.alert"
title = fields.Char('Title')
part_number = fields.Char(string='零件图号', compute='_compute_part_info', store=True)
part_name = fields.Char(string='零件名称', compute='_compute_part_info', store=True)
@api.depends('product_id', 'picking_id')
def _compute_part_info(self):
for alert in self:
if alert.product_tmpl_id.categ_id.name == '成品':
alert.part_number = alert.product_id.part_number
alert.part_name = alert.product_id.part_name
elif alert.product_id.categ_id.name == '坯料':
if alert.picking_id.move_ids_without_package:
alert.part_number = alert.picking_id.move_ids_without_package[0].part_number
alert.part_name = alert.picking_id.move_ids_without_package[0].part_name
def action_see_check(self):
return {
'name': _('Quality Check'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'quality.check',
'target': 'current',
'res_id': self.check_id.id,
}
@api.depends('name', 'title')
def name_get(self):
result = []
for record in self:
name = record.name + ' - ' + record.title if record.title else record.name
result.append((record.id, name))
return result
@api.model
def name_create(self, name):
""" Create an alert with name_create should use prepend the sequence in the name """
values = {
'title': name,
}
return self.create(values).name_get()[0]
@api.model
def message_new(self, msg_dict, custom_values=None):
""" Override, used with creation by email alias. The purpose of the override is
to use the subject for title and body for description instead of the name.
"""
# We need to add the name in custom_values or it will use the subject.
custom_values['name'] = self.env['ir.sequence'].next_by_code('quality.alert') or _('New')
if msg_dict.get('subject'):
custom_values['title'] = msg_dict['subject']
if msg_dict.get('body'):
custom_values['description'] = msg_dict['body']
return super(QualityAlert, self).message_new(msg_dict, custom_values)
class ProductTemplate(models.Model):
_inherit = "product.template"
quality_control_point_qty = fields.Integer(compute='_compute_quality_check_qty',
groups='quality.group_quality_user')
quality_pass_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
quality_fail_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
@api.depends('product_variant_ids')
def _compute_quality_check_qty(self):
for product_tmpl in self:
product_tmpl.quality_fail_qty, product_tmpl.quality_pass_qty = product_tmpl.product_variant_ids._count_quality_checks()
product_tmpl.quality_control_point_qty = product_tmpl.with_context(
active_test=product_tmpl.active).product_variant_ids._count_quality_points()
def action_see_quality_control_points(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_point_action")
action['context'] = dict(self.env.context, default_product_ids=self.product_variant_ids.ids)
domain_in_products_or_categs = ['|', ('product_ids', 'in', self.product_variant_ids.ids),
('product_category_ids', 'parent_of', self.categ_id.ids)]
domain_no_products_and_categs = [('product_ids', '=', False), ('product_category_ids', '=', False)]
action['domain'] = OR([domain_in_products_or_categs, domain_no_products_and_categs])
return action
def action_see_quality_checks(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
action['context'] = dict(self.env.context, default_product_id=self.product_variant_id.id, create=False)
action['domain'] = [
'|',
('product_id', 'in', self.product_variant_ids.ids),
'&',
('measure_on', '=', 'operation'),
('picking_id.move_ids.product_tmpl_id', '=', self.id),
]
return action
class ProductProduct(models.Model):
_inherit = "product.product"
quality_control_point_qty = fields.Integer(compute='_compute_quality_check_qty',
groups='quality.group_quality_user')
quality_pass_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
quality_fail_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
def _compute_quality_check_qty(self):
for product in self:
product.quality_fail_qty, product.quality_pass_qty = product._count_quality_checks()
product.quality_control_point_qty = product._count_quality_points()
def _count_quality_checks(self):
quality_fail_qty = 0
quality_pass_qty = 0
domain = [
'|',
('product_id', 'in', self.ids),
'&',
('measure_on', '=', 'operation'),
('picking_id.move_ids.product_id', 'in', self.ids),
('company_id', '=', self.env.company.id),
('quality_state', '!=', 'none')
]
quality_checks_by_state = self.env['quality.check']._read_group(domain, ['product_id'], ['quality_state'])
for checks_data in quality_checks_by_state:
if checks_data['quality_state'] == 'fail':
quality_fail_qty = checks_data['quality_state_count']
elif checks_data['quality_state'] == 'pass':
quality_pass_qty = checks_data['quality_state_count']
return quality_fail_qty, quality_pass_qty
def _count_quality_points(self):
""" Compute the count of all related quality points, which means quality points that have either
the product in common, a product category parent of this product's category or no product/category
set at all.
"""
query = self.env['quality.point']._where_calc([('company_id', '=', self.env.company.id)])
self.env['quality.point']._apply_ir_rules(query, 'read')
_, where_clause, where_clause_args = query.get_sql()
additional_where_clause = self._additional_quality_point_where_clause()
where_clause += additional_where_clause
parent_category_ids = [int(parent_id) for parent_id in
self.categ_id.parent_path.split('/')[:-1]] if self.categ_id else []
self.env.cr.execute("""
SELECT COUNT(*)
FROM quality_point
WHERE %s
AND (
(
-- QP has at least one linked product and one is right
EXISTS (SELECT 1 FROM product_product_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id AND rel.product_product_id = ANY(%%s))
-- Or QP has at least one linked product category and one is right
OR EXISTS (SELECT 1 FROM product_category_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id AND rel.product_category_id = ANY(%%s))
)
OR (
-- QP has no linked products
NOT EXISTS (SELECT 1 FROM product_product_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id)
-- And QP has no linked product categories
AND NOT EXISTS (SELECT 1 FROM product_category_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id)
)
)
""" % (where_clause,), where_clause_args + [self.ids, parent_category_ids]
)
return self.env.cr.fetchone()[0]
def action_see_quality_control_points(self):
self.ensure_one()
action = self.product_tmpl_id.action_see_quality_control_points()
action['context'].update(default_product_ids=self.ids)
domain_in_products_or_categs = ['|', ('product_ids', 'in', self.ids),
('product_category_ids', 'parent_of', self.categ_id.ids)]
domain_no_products_and_categs = [('product_ids', '=', False), ('product_category_ids', '=', False)]
action['domain'] = OR([domain_in_products_or_categs, domain_no_products_and_categs])
return action
def action_see_quality_checks(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
action['context'] = dict(self.env.context, default_product_id=self.id, create=False)
action['domain'] = [
'|',
('product_id', '=', self.id),
'&',
('measure_on', '=', 'operation'),
('picking_id.move_ids.product_id', '=', self.id),
]
return action
def _additional_quality_point_where_clause(self):
return ""
class QualityCheckMeasureLine(models.Model):
_name = 'quality.check.measure.line'
_description = '质检测量明细'
_order = 'sequence, id'
sequence = fields.Integer('序号')
check_id = fields.Many2one('quality.check', string='质检单', required=True, ondelete='cascade')
# 基本信息
product_name = fields.Char('产品名称', related='check_id.product_id.name', readonly=True)
drawing_no = fields.Char('图号')
measure_item = fields.Char('检测项目')
# 测量值
measure_value1 = fields.Char('测量值1')
measure_value2 = fields.Char('测量值2')
measure_value3 = fields.Char('测量值3')
measure_value4 = fields.Char('测量值4')
measure_value5 = fields.Char('测量值5')
# # 展示列数
# column_nums = fields.Integer('列数', related='check_id.column_nums')
# 判定结果
measure_result = fields.Selection([
('OK', '合格'),
('NG', '不合格')
], string='判定', default='OK')
remark = fields.Char('备注')
def del_measure_value(self):
self.ensure_one()
self.sudo().unlink()
# 增加出厂检验报告发布历史
class QualityCheckReportHistory(models.Model):
_name = 'quality.check.report.history'
_description = '出厂检验报告发布历史'
check_id = fields.Many2one('quality.check', string='质检单', required=True, ondelete='cascade')
report_number_id = fields.Many2one('documents.document', string='报告编号', readonly=True)
sequence = fields.Integer('序号')
# 操作(发布、撤销发布、重新发布)
action = fields.Selection([
('publish', '发布'),
('cancel_publish', '撤销发布'),
('re_publish', '重新发布')
], string='操作')
# 操作人
operator = fields.Char('操作人')
# 操作时间
operation_time = fields.Datetime('操作时间')
# 文档状态(已发布、废弃)
document_status = fields.Selection([
('published', '已发布'),
('canceled', '废弃')
], string='操作后文档状态')