Files
jikimo_sf/quality_control/models/quality.py
2025-03-13 17:55:13 +08:00

813 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from math import sqrt
from dateutil.relativedelta import relativedelta
from datetime import datetime
import random
from odoo import api, models, fields, _
from odoo.api import depends
from odoo.tools import DEFAULT_SERVER_DATETIME_FORMAT, float_round
from odoo.osv.expression import OR
from odoo.exceptions import UserError
from odoo.tools import image_data_uri
from base64 import b64encode
class QualityPoint(models.Model):
_inherit = "quality.point"
failure_message = fields.Html('Failure Message')
measure_on = fields.Selection([
('operation', 'Operation'),
('product', 'Product'),
('move_line', 'Quantity')], string="Control per", default='product', required=True,
help="""Operation = One quality check is requested at the operation level.
Product = A quality check is requested per product.
Quantity = A quality check is requested for each new product quantity registered, with partial quantity checks also possible.""")
measure_frequency_type = fields.Selection([
('all', 'All'),
('random', 'Randomly'),
('periodical', 'Periodically')], string="Control Frequency",
default='all', required=True)
measure_frequency_value = fields.Float('Percentage') # TDE RENAME ?
measure_frequency_unit_value = fields.Integer('Frequency Unit Value') # TDE RENAME ?
measure_frequency_unit = fields.Selection([
('day', 'Days'),
('week', 'Weeks'),
('month', 'Months')], default="day") # TDE RENAME ?
is_lot_tested_fractionally = fields.Boolean(string="Lot Tested Fractionally",
help="Determines if only a fraction of the lot should be tested")
testing_percentage_within_lot = fields.Float(help="Defines the percentage within a lot that should be tested")
norm = fields.Float('Norm', digits='Quality Tests') # TDE RENAME ?
tolerance_min = fields.Float('Min Tolerance', digits='Quality Tests')
tolerance_max = fields.Float('Max Tolerance', digits='Quality Tests')
norm_unit = fields.Char('Norm Unit', default=lambda self: 'mm') # TDE RENAME ?
average = fields.Float(compute="_compute_standard_deviation_and_average")
standard_deviation = fields.Float(compute="_compute_standard_deviation_and_average")
def _compute_standard_deviation_and_average(self):
# The variance and mean are computed by the Welfords method and used the Bessel's
# correction because are working on a sample.
for point in self:
if point.test_type != 'measure':
point.average = 0
point.standard_deviation = 0
continue
mean = 0.0
s = 0.0
n = 0
for check in point.check_ids.filtered(lambda x: x.quality_state != 'none'):
n += 1
delta = check.measure - mean
mean += delta / n
delta2 = check.measure - mean
s += delta * delta2
if n > 1:
point.average = mean
point.standard_deviation = sqrt(s / (n - 1))
elif n == 1:
point.average = mean
point.standard_deviation = 0.0
else:
point.average = 0.0
point.standard_deviation = 0.0
@api.onchange('norm')
def onchange_norm(self):
if self.tolerance_max == 0.0:
self.tolerance_max = self.norm
def check_execute_now(self):
self.ensure_one()
if self.measure_frequency_type == 'all':
return True
elif self.measure_frequency_type == 'random':
return (random.random() < self.measure_frequency_value / 100.0)
elif self.measure_frequency_type == 'periodical':
delta = False
if self.measure_frequency_unit == 'day':
delta = relativedelta(days=self.measure_frequency_unit_value)
elif self.measure_frequency_unit == 'week':
delta = relativedelta(weeks=self.measure_frequency_unit_value)
elif self.measure_frequency_unit == 'month':
delta = relativedelta(months=self.measure_frequency_unit_value)
date_previous = datetime.today() - delta
checks = self.env['quality.check'].search([
('point_id', '=', self.id),
('create_date', '>=', date_previous.strftime(DEFAULT_SERVER_DATETIME_FORMAT))], limit=1)
return not (bool(checks))
return super(QualityPoint, self).check_execute_now()
def _get_type_default_domain(self):
domain = super(QualityPoint, self)._get_type_default_domain()
domain.append(('technical_name', '=', 'passfail'))
return domain
def action_see_quality_checks(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
action['domain'] = [('point_id', '=', self.id)]
action['context'] = {
'default_company_id': self.company_id.id,
'default_point_id': self.id
}
return action
def action_see_spc_control(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_spc")
if self.test_type == 'measure':
action['context'] = {'group_by': ['name', 'point_id'], 'graph_measure': ['measure'], 'graph_mode': 'line'}
action['domain'] = [('point_id', '=', self.id), ('quality_state', '!=', 'none')]
return action
class QualityCheck(models.Model):
_inherit = "quality.check"
part_name = fields.Char('零件名称', related='product_id.part_name')
part_number = fields.Char('零件图号', related='product_id.part_number', readonly=False, store=True)
material_name = fields.Char('材料名称', compute='_compute_material_name')
# # 总数量值为调拨单_产品明细_数量
# total_qty = fields.Float('总数量', compute='_compute_total_qty', readonly=True)
# # 检验数
# check_qty = fields.Float('检验数', compute='_compute_check_qty', readonly=True)
# # 出厂检验报告编号
# report_number = fields.Char('出厂检验报告编号', compute='_compute_report_number', readonly=True)
# 总数量值为调拨单_产品明细_数量
total_qty = fields.Char('总数量', compute='_compute_total_qty')
column_nums = fields.Integer('测量值列数', default=1)
@api.depends('picking_id')
def _compute_total_qty(self):
for record in self:
if record.picking_id:
total_qty = 0
for move in record.picking_id.move_ids_without_package:
total_qty += move.quantity_done
record.total_qty = total_qty if total_qty > 0 else ''
else:
record.total_qty = ''
# 检验数
check_qty = fields.Char('检验数', default=lambda self: self._get_default_check_qty())
def _get_default_check_qty(self):
"""根据条件设置检验数的默认值"""
# 这里需要使用_origin来获取已存储的记录因为新记录在创建时可能还没有这些值
if self._origin:
if self._origin.measure_on == 'product' and self._origin.test_type_id.name == '出厂检验报告':
return ''
elif self._origin.measure_on == 'product':
return '1'
return ''
@api.onchange('test_type_id', 'measure_on')
def _onchange_check_qty(self):
"""当测试类型或测量对象变化时,更新检验数"""
if self.measure_on == 'product' and self.test_type_id.name == '出厂检验报告':
self.check_qty = ''
elif self.measure_on == 'product':
self.check_qty = '1'
# 出厂检验报告编号
report_number_id = fields.Many2one('documents.document', string='出厂检验报告编号', readonly=True)
# 出厂检验报告、关联文档的数据
report_content = fields.Binary(string='出厂检验报告', related='report_number_id.datas')
is_out_check = fields.Boolean(string='是否出库检验', compute='_compute_is_out_check', readonly=True)
measure_line_ids = fields.One2many('quality.check.measure.line', 'check_id', string='测量明细')
categ_type = fields.Selection(string='产品的类别', related='product_id.categ_id.type', store=True)
report_result = fields.Selection([
('OK', 'OK'),
('NG', 'NG')
], string='出厂检验报告结果', default='OK')
measure_operator = fields.Many2one('res.users', string='操机员')
quality_manager = fields.Many2one('res.users', string='质检员')
# 流水号(从1开始最大99)
serial_number = fields.Integer('流水号', default=1, readonly=True)
# 发布历史
report_history_ids = fields.One2many('quality.check.report.history', 'check_id', string='发布历史')
# 发布状态
publish_status = fields.Selection([
('draft', '草稿'),
('published', '已发布'),
('canceled', '已撤销')
], string='发布状态', default='draft')
def add_measure_line(self):
"""
新增测量值,如果测量值有5列了则提示“最多只能有5列测量值”
"""
if self.column_nums >= 5:
raise UserError(_('最多只能有5列测量值'))
else:
self.column_nums = self.column_nums + 1
def remove_measure_line(self):
"""
删除测量值
"""
if self.column_nums <= 1:
raise UserError(_('最少要有1列测量值'))
else:
self.column_nums = self.column_nums - 1
def do_preview(self):
"""
预览出厂检验报告
"""
pass
def do_publish(self):
"""发布出厂检验报告"""
self.ensure_one()
# 1. 获取报告动作
report_action = self.env.ref('sf_quality.action_report_quality_inspection')
# 2. 生成PDF报告 - 修改这里的调用方式
pdf_content, _ = report_action._render_qweb_pdf(
report_ref=report_action.report_name, # 添加report_ref参数
res_ids=self.ids
)
attachment = self.env['ir.attachment'].create({
'name': self.name,
'type': 'binary',
'datas': pdf_content,
'res_model': self._name,
'res_id': self.id,
'mimetype': 'application/pdf',
})
# 获取已发布的文档文件夹
workspace = self.env['documents.folder'].search([('parent_folder_id', '=', self.env.ref('sf_quality.documents_purchase_contracts_folder').id), ('name', '=', '已发布')], limit=1)
if self.serial_number > 99:
raise UserError(_('流水号不能大于99'))
str_serial_number = '0' + str(self.serial_number) if self.serial_number < 10 else str(self.serial_number)
str_part_number = self.part_number if self.part_number else ''
# 3. 创建文档记录
doc_vals = {
'name': f'FQC{str_part_number}{str_serial_number}',
# 'raw': pdf_content,
'attachment_id': attachment.id,
# 'mimetype': 'application/pdf',
'res_id': self.id,
'folder_id': workspace.id,
'res_model': self._name,
}
doc = self.env['documents.document'].create(doc_vals)
# 关联到当前质检记录
self.write({
'report_number_id': doc.id,
'publish_status': 'published'
})
# 记录发布历史
self.env['quality.check.report.history'].create({
'check_id': self.id,
'report_number_id': doc.id,
'action': 'publish',
'operator': self.env.user.name,
'operation_time': datetime.now(),
'document_status': 'published',
'sequence': len(self.report_history_ids) + 1
})
# 更新流水号
self.serial_number += 1
# 返回成功消息
return True
def do_cancel_publish(self):
"""
取消发布出厂检验报告(将当前质检单关联的出厂检验报告文档位置移动到废弃文件夹), 并记录发布历史
"""
self.ensure_one()
# 1. 获取已发布的文档文件夹
workspace = self.env['documents.folder'].search([('parent_folder_id', '=', self.env.ref('sf_quality.documents_purchase_contracts_folder').id), ('name', '=', '已发布')], limit=1)
# 2. 将当前质检单关联的出厂检验报告文档位置移动到废弃文件夹
self.report_number_id.write({
'folder_id': self.env.ref('sf_quality.documents_purchase_contracts_folder_canceled').id,
})
# 3. 更新发布状态
self.publish_status = 'canceled'
# 3. 记录发布历史
self.env['quality.check.report.history'].create({
'check_id': self.id,
'report_number_id': self.report_number_id.id,
'action': 'cancel_publish',
'operator': self.env.user.name,
'operation_time': datetime.now(),
'document_status': 'canceled',
'sequence': len(self.report_history_ids) + 1
})
return True
def do_re_publish(self):
"""
重新发布出厂检验报告,参考发布规则
"""
self.do_publish()
def generate_qr_code(self):
"""生成二维码URL"""
self.ensure_one()
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
return image_data_uri(
b64encode(self.env['ir.actions.report'].barcode(
'QR', base_url + '/#/index/publicPay?order_id=' + str(self.id) + '&source=%2Findex%2Fmyorder',
width=140, height=140)
)
)
@depends('product_id')
def _compute_material_name(self):
for record in self:
materials_id_name = record.product_id.materials_id.name if record.product_id.materials_id else ''
materials_type_name = record.product_id.materials_type_id.name if record.product_id.materials_type_id else ''
record.material_name = materials_id_name + ' ' + materials_type_name
@depends('test_type_id')
def _compute_is_out_check(self):
for record in self:
if record.test_type_id.name == '出厂检验报告':
record.is_out_check = True
else:
record.is_out_check = False
failure_message = fields.Html(related='point_id.failure_message', readonly=True)
measure = fields.Float('Measure', default=0.0, digits='Quality Tests', tracking=True)
measure_success = fields.Selection([
('none', 'No measure'),
('pass', 'Pass'),
('fail', 'Fail')], string="Measure Success", compute="_compute_measure_success",
readonly=True, store=True)
tolerance_min = fields.Float('Min Tolerance', related='point_id.tolerance_min', readonly=True)
tolerance_max = fields.Float('Max Tolerance', related='point_id.tolerance_max', readonly=True)
warning_message = fields.Text(compute='_compute_warning_message')
norm_unit = fields.Char(related='point_id.norm_unit', readonly=True)
qty_to_test = fields.Float(compute="_compute_qty_to_test", string="Quantity to Test",
help="Quantity of product to test within the lot")
qty_tested = fields.Float(string="Quantity Tested", help="Quantity of product tested within the lot")
measure_on = fields.Selection([
('operation', 'Operation'),
('product', 'Product'),
('move_line', 'Quantity')], string="Control per", default='product', required=True,
help="""Operation = One quality check is requested at the operation level.
Product = A quality check is requested per product.
Quantity = A quality check is requested for each new product quantity registered, with partial quantity checks also possible.""")
move_line_id = fields.Many2one('stock.move.line', 'Stock Move Line', check_company=True,
help="In case of Quality Check by Quantity, Move Line on which the Quality Check applies")
lot_name = fields.Char('Lot/Serial Number Name')
lot_line_id = fields.Many2one('stock.lot', store=True, compute='_compute_lot_line_id')
qty_line = fields.Float(compute='_compute_qty_line', string="Quantity")
uom_id = fields.Many2one(related='product_id.uom_id', string="Product Unit of Measure")
show_lot_text = fields.Boolean(compute='_compute_show_lot_text')
is_lot_tested_fractionally = fields.Boolean(related='point_id.is_lot_tested_fractionally')
testing_percentage_within_lot = fields.Float(related="point_id.testing_percentage_within_lot")
product_tracking = fields.Selection(related='product_id.tracking')
quality_check_type = fields.Selection([
('采购入库检', '采购入库检'),
('客供料入库检', '客供料入库检'),
('退货入库检', '退货入库检'),
('生产入库检', '生产入库检'),
('外协入库检', '外协入库检'),
('成品发货检', '成品发货检'),
('工序外协发货检', '工序外协发货检'),
('委外坯料发货检', '委外坯料发货检')], string='类型', compute='_compute_quality_check_type', store=True)
@api.depends('picking_id')
def _compute_quality_check_type(self):
for check in self:
if check.picking_id:
picking_type = check.picking_id.picking_type_id.sequence_code
type_mapping = {
'IN': '采购入库检',
'DL': '客供料入库检',
'RET': '退货入库检',
'SFP': '生产入库检',
'OCIN': '外协入库检',
'OUT': '成品发货检',
'OCOUT': '工序外协发货检',
'RES': '委外坯料发货检',
}
check.quality_check_type = type_mapping.get(picking_type, False)
else:
check.quality_check_type = False
@api.depends('measure_success')
def _compute_warning_message(self):
for rec in self:
if rec.measure_success == 'fail':
rec.warning_message = _('You measured %.2f %s and it should be between %.2f and %.2f %s.') % (
rec.measure, rec.norm_unit, rec.point_id.tolerance_min,
rec.point_id.tolerance_max, rec.norm_unit
)
else:
rec.warning_message = ''
@api.depends('move_line_id.qty_done')
def _compute_qty_line(self):
for qc in self:
qc.qty_line = qc.move_line_id.qty_done
@api.depends('move_line_id.lot_id')
def _compute_lot_line_id(self):
for qc in self:
qc.lot_line_id = qc.move_line_id.lot_id
if qc.lot_line_id:
qc.lot_id = qc.lot_line_id
@api.depends('measure')
def _compute_measure_success(self):
for rec in self:
if rec.point_id.test_type == 'passfail':
rec.measure_success = 'none'
else:
if rec.measure < rec.point_id.tolerance_min or rec.measure > rec.point_id.tolerance_max:
rec.measure_success = 'fail'
else:
rec.measure_success = 'pass'
# Add picture dependency
@api.depends('picture')
def _compute_result(self):
super(QualityCheck, self)._compute_result()
@api.depends('qty_line', 'testing_percentage_within_lot', 'is_lot_tested_fractionally')
def _compute_qty_to_test(self):
for qc in self:
if qc.is_lot_tested_fractionally:
qc.qty_to_test = float_round(qc.qty_line * qc.testing_percentage_within_lot / 100,
precision_rounding=self.product_id.uom_id.rounding, rounding_method="UP")
else:
qc.qty_to_test = qc.qty_line
@api.depends('lot_line_id', 'move_line_id')
def _compute_show_lot_text(self):
for qc in self:
if qc.lot_line_id or not qc.move_line_id:
qc.show_lot_text = False
else:
qc.show_lot_text = True
def _is_pass_fail_applicable(self):
if self.test_type in ['passfail', 'measure']:
return True
return super()._is_pass_fail_applicable()
def _get_check_result(self):
if self.test_type == 'picture' and self.picture:
return _('Picture Uploaded')
else:
return super(QualityCheck, self)._get_check_result()
def _check_to_unlink(self):
return True
def do_measure(self):
self.ensure_one()
if self.measure < self.point_id.tolerance_min or self.measure > self.point_id.tolerance_max:
return self.do_fail()
else:
return self.do_pass()
def correct_measure(self):
self.ensure_one()
return {
'name': _('Quality Checks'),
'type': 'ir.actions.act_window',
'res_model': 'quality.check',
'view_mode': 'form',
'view_id': self.env.ref('quality_control.quality_check_view_form_small').id,
'target': 'new',
'res_id': self.id,
'context': self.env.context,
}
def do_alert(self):
self.ensure_one()
alert = self.env['quality.alert'].create({
'check_id': self.id,
'product_id': self.product_id.id,
'product_tmpl_id': self.product_id.product_tmpl_id.id,
'lot_id': self.lot_id.id,
'user_id': self.user_id.id,
'team_id': self.team_id.id,
'company_id': self.company_id.id
})
return {
'name': _('Quality Alert'),
'type': 'ir.actions.act_window',
'res_model': 'quality.alert',
'views': [(self.env.ref('quality_control.quality_alert_view_form').id, 'form')],
'res_id': alert.id,
'context': {'default_check_id': self.id},
}
def action_see_alerts(self):
self.ensure_one()
if len(self.alert_ids) == 1:
return {
'name': _('Quality Alert'),
'type': 'ir.actions.act_window',
'res_model': 'quality.alert',
'views': [(self.env.ref('quality_control.quality_alert_view_form').id, 'form')],
'res_id': self.alert_ids.ids[0],
'context': {'default_check_id': self.id},
}
else:
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_alert_action_check")
action['domain'] = [('id', 'in', self.alert_ids.ids)]
action['context'] = dict(self._context, default_check_id=self.id)
return action
def action_open_quality_check_wizard(self, current_check_id=None):
check_ids = sorted(self.ids)
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.action_quality_check_wizard")
action['context'] = self.env.context.copy()
action['context'].update({
'default_check_ids': check_ids,
'default_current_check_id': current_check_id or check_ids[0],
})
return action
class QualityAlert(models.Model):
_inherit = "quality.alert"
title = fields.Char('Title')
part_number = fields.Char(string='零件图号', compute='_compute_part_info', store=True)
part_name = fields.Char(string='零件名称', compute='_compute_part_info', store=True)
@api.depends('product_id', 'picking_id')
def _compute_part_info(self):
for alert in self:
if alert.product_tmpl_id.categ_id.name == '成品':
alert.part_number = alert.product_id.part_number
alert.part_name = alert.product_id.part_name
elif alert.product_id.categ_id.name == '坯料':
if alert.picking_id.move_ids_without_package:
alert.part_number = alert.picking_id.move_ids_without_package[0].part_number
alert.part_name = alert.picking_id.move_ids_without_package[0].part_name
def action_see_check(self):
return {
'name': _('Quality Check'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'quality.check',
'target': 'current',
'res_id': self.check_id.id,
}
@api.depends('name', 'title')
def name_get(self):
result = []
for record in self:
name = record.name + ' - ' + record.title if record.title else record.name
result.append((record.id, name))
return result
@api.model
def name_create(self, name):
""" Create an alert with name_create should use prepend the sequence in the name """
values = {
'title': name,
}
return self.create(values).name_get()[0]
@api.model
def message_new(self, msg_dict, custom_values=None):
""" Override, used with creation by email alias. The purpose of the override is
to use the subject for title and body for description instead of the name.
"""
# We need to add the name in custom_values or it will use the subject.
custom_values['name'] = self.env['ir.sequence'].next_by_code('quality.alert') or _('New')
if msg_dict.get('subject'):
custom_values['title'] = msg_dict['subject']
if msg_dict.get('body'):
custom_values['description'] = msg_dict['body']
return super(QualityAlert, self).message_new(msg_dict, custom_values)
class ProductTemplate(models.Model):
_inherit = "product.template"
quality_control_point_qty = fields.Integer(compute='_compute_quality_check_qty',
groups='quality.group_quality_user')
quality_pass_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
quality_fail_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
@api.depends('product_variant_ids')
def _compute_quality_check_qty(self):
for product_tmpl in self:
product_tmpl.quality_fail_qty, product_tmpl.quality_pass_qty = product_tmpl.product_variant_ids._count_quality_checks()
product_tmpl.quality_control_point_qty = product_tmpl.with_context(
active_test=product_tmpl.active).product_variant_ids._count_quality_points()
def action_see_quality_control_points(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_point_action")
action['context'] = dict(self.env.context, default_product_ids=self.product_variant_ids.ids)
domain_in_products_or_categs = ['|', ('product_ids', 'in', self.product_variant_ids.ids),
('product_category_ids', 'parent_of', self.categ_id.ids)]
domain_no_products_and_categs = [('product_ids', '=', False), ('product_category_ids', '=', False)]
action['domain'] = OR([domain_in_products_or_categs, domain_no_products_and_categs])
return action
def action_see_quality_checks(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
action['context'] = dict(self.env.context, default_product_id=self.product_variant_id.id, create=False)
action['domain'] = [
'|',
('product_id', 'in', self.product_variant_ids.ids),
'&',
('measure_on', '=', 'operation'),
('picking_id.move_ids.product_tmpl_id', '=', self.id),
]
return action
class ProductProduct(models.Model):
_inherit = "product.product"
quality_control_point_qty = fields.Integer(compute='_compute_quality_check_qty',
groups='quality.group_quality_user')
quality_pass_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
quality_fail_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
def _compute_quality_check_qty(self):
for product in self:
product.quality_fail_qty, product.quality_pass_qty = product._count_quality_checks()
product.quality_control_point_qty = product._count_quality_points()
def _count_quality_checks(self):
quality_fail_qty = 0
quality_pass_qty = 0
domain = [
'|',
('product_id', 'in', self.ids),
'&',
('measure_on', '=', 'operation'),
('picking_id.move_ids.product_id', 'in', self.ids),
('company_id', '=', self.env.company.id),
('quality_state', '!=', 'none')
]
quality_checks_by_state = self.env['quality.check']._read_group(domain, ['product_id'], ['quality_state'])
for checks_data in quality_checks_by_state:
if checks_data['quality_state'] == 'fail':
quality_fail_qty = checks_data['quality_state_count']
elif checks_data['quality_state'] == 'pass':
quality_pass_qty = checks_data['quality_state_count']
return quality_fail_qty, quality_pass_qty
def _count_quality_points(self):
""" Compute the count of all related quality points, which means quality points that have either
the product in common, a product category parent of this product's category or no product/category
set at all.
"""
query = self.env['quality.point']._where_calc([('company_id', '=', self.env.company.id)])
self.env['quality.point']._apply_ir_rules(query, 'read')
_, where_clause, where_clause_args = query.get_sql()
additional_where_clause = self._additional_quality_point_where_clause()
where_clause += additional_where_clause
parent_category_ids = [int(parent_id) for parent_id in
self.categ_id.parent_path.split('/')[:-1]] if self.categ_id else []
self.env.cr.execute("""
SELECT COUNT(*)
FROM quality_point
WHERE %s
AND (
(
-- QP has at least one linked product and one is right
EXISTS (SELECT 1 FROM product_product_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id AND rel.product_product_id = ANY(%%s))
-- Or QP has at least one linked product category and one is right
OR EXISTS (SELECT 1 FROM product_category_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id AND rel.product_category_id = ANY(%%s))
)
OR (
-- QP has no linked products
NOT EXISTS (SELECT 1 FROM product_product_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id)
-- And QP has no linked product categories
AND NOT EXISTS (SELECT 1 FROM product_category_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id)
)
)
""" % (where_clause,), where_clause_args + [self.ids, parent_category_ids]
)
return self.env.cr.fetchone()[0]
def action_see_quality_control_points(self):
self.ensure_one()
action = self.product_tmpl_id.action_see_quality_control_points()
action['context'].update(default_product_ids=self.ids)
domain_in_products_or_categs = ['|', ('product_ids', 'in', self.ids),
('product_category_ids', 'parent_of', self.categ_id.ids)]
domain_no_products_and_categs = [('product_ids', '=', False), ('product_category_ids', '=', False)]
action['domain'] = OR([domain_in_products_or_categs, domain_no_products_and_categs])
return action
def action_see_quality_checks(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
action['context'] = dict(self.env.context, default_product_id=self.id, create=False)
action['domain'] = [
'|',
('product_id', '=', self.id),
'&',
('measure_on', '=', 'operation'),
('picking_id.move_ids.product_id', '=', self.id),
]
return action
def _additional_quality_point_where_clause(self):
return ""
class QualityCheckMeasureLine(models.Model):
_name = 'quality.check.measure.line'
_description = '质检测量明细'
_order = 'sequence, id'
sequence = fields.Integer('序号')
check_id = fields.Many2one('quality.check', string='质检单', required=True, ondelete='cascade')
# 基本信息
product_name = fields.Char('产品名称', related='check_id.product_id.name', readonly=True)
drawing_no = fields.Char('图号')
measure_item = fields.Char('检测项目')
# 测量值
measure_value1 = fields.Char('测量值1')
measure_value2 = fields.Char('测量值2')
measure_value3 = fields.Char('测量值3')
measure_value4 = fields.Char('测量值4')
measure_value5 = fields.Char('测量值5')
# 展示列数
column_nums = fields.Integer('列数', related='check_id.column_nums')
# 判定结果
measure_result = fields.Selection([
('OK', '合格'),
('NG', '不合格')
], string='判定', default='OK')
remark = fields.Char('备注')
def del_measure_value(self):
self.ensure_one()
self.sudo().unlink()
# 增加出厂检验报告发布历史
class QualityCheckReportHistory(models.Model):
_name = 'quality.check.report.history'
_description = '出厂检验报告发布历史'
check_id = fields.Many2one('quality.check', string='质检单', required=True, ondelete='cascade')
report_number_id = fields.Many2one('documents.document', string='报告编号', readonly=True)
sequence = fields.Integer('序号')
# 操作(发布、撤销发布、重新发布)
action = fields.Selection([
('publish', '发布'),
('cancel_publish', '撤销发布'),
('re_publish', '重新发布')
], string='操作')
# 操作人
operator = fields.Char('操作人')
# 操作时间
operation_time = fields.Datetime('操作时间')
# 文档状态(已发布、废弃)
document_status = fields.Selection([
('published', '已发布'),
('canceled', '废弃')
], string='操作后文档状态')