1063 lines
44 KiB
Python
1063 lines
44 KiB
Python
# -*- coding: utf-8 -*-
|
||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
from math import sqrt
|
||
from dateutil.relativedelta import relativedelta
|
||
from datetime import datetime
|
||
|
||
import random
|
||
|
||
from odoo import api, models, fields, _
|
||
from odoo.api import depends
|
||
from odoo.tools import DEFAULT_SERVER_DATETIME_FORMAT, float_round
|
||
from odoo.osv.expression import OR
|
||
from odoo.exceptions import UserError
|
||
from odoo.tools import image_data_uri
|
||
from base64 import b64encode
|
||
import requests
|
||
import json
|
||
import base64
|
||
|
||
|
||
class QualityPoint(models.Model):
|
||
_inherit = "quality.point"
|
||
|
||
failure_message = fields.Html('Failure Message')
|
||
measure_on = fields.Selection([
|
||
('operation', 'Operation'),
|
||
('product', 'Product'),
|
||
('move_line', 'Quantity')], string="Control per", default='product', required=True,
|
||
help="""Operation = One quality check is requested at the operation level.
|
||
Product = A quality check is requested per product.
|
||
Quantity = A quality check is requested for each new product quantity registered, with partial quantity checks also possible.""")
|
||
measure_frequency_type = fields.Selection([
|
||
('all', 'All'),
|
||
('random', 'Randomly'),
|
||
('periodical', 'Periodically')], string="Control Frequency",
|
||
default='all', required=True)
|
||
measure_frequency_value = fields.Float('Percentage') # TDE RENAME ?
|
||
measure_frequency_unit_value = fields.Integer('Frequency Unit Value') # TDE RENAME ?
|
||
measure_frequency_unit = fields.Selection([
|
||
('day', 'Days'),
|
||
('week', 'Weeks'),
|
||
('month', 'Months')], default="day") # TDE RENAME ?
|
||
is_lot_tested_fractionally = fields.Boolean(string="Lot Tested Fractionally",
|
||
help="Determines if only a fraction of the lot should be tested")
|
||
testing_percentage_within_lot = fields.Float(help="Defines the percentage within a lot that should be tested")
|
||
norm = fields.Float('Norm', digits='Quality Tests') # TDE RENAME ?
|
||
tolerance_min = fields.Float('Min Tolerance', digits='Quality Tests')
|
||
tolerance_max = fields.Float('Max Tolerance', digits='Quality Tests')
|
||
norm_unit = fields.Char('Norm Unit', default=lambda self: 'mm') # TDE RENAME ?
|
||
average = fields.Float(compute="_compute_standard_deviation_and_average")
|
||
standard_deviation = fields.Float(compute="_compute_standard_deviation_and_average")
|
||
|
||
def _compute_standard_deviation_and_average(self):
|
||
# The variance and mean are computed by the Welford’s method and used the Bessel's
|
||
# correction because are working on a sample.
|
||
for point in self:
|
||
if point.test_type != 'measure':
|
||
point.average = 0
|
||
point.standard_deviation = 0
|
||
continue
|
||
mean = 0.0
|
||
s = 0.0
|
||
n = 0
|
||
for check in point.check_ids.filtered(lambda x: x.quality_state != 'none'):
|
||
n += 1
|
||
delta = check.measure - mean
|
||
mean += delta / n
|
||
delta2 = check.measure - mean
|
||
s += delta * delta2
|
||
|
||
if n > 1:
|
||
point.average = mean
|
||
point.standard_deviation = sqrt(s / (n - 1))
|
||
elif n == 1:
|
||
point.average = mean
|
||
point.standard_deviation = 0.0
|
||
else:
|
||
point.average = 0.0
|
||
point.standard_deviation = 0.0
|
||
|
||
@api.onchange('norm')
|
||
def onchange_norm(self):
|
||
if self.tolerance_max == 0.0:
|
||
self.tolerance_max = self.norm
|
||
|
||
def check_execute_now(self):
|
||
self.ensure_one()
|
||
if self.measure_frequency_type == 'all':
|
||
return True
|
||
elif self.measure_frequency_type == 'random':
|
||
return (random.random() < self.measure_frequency_value / 100.0)
|
||
elif self.measure_frequency_type == 'periodical':
|
||
delta = False
|
||
if self.measure_frequency_unit == 'day':
|
||
delta = relativedelta(days=self.measure_frequency_unit_value)
|
||
elif self.measure_frequency_unit == 'week':
|
||
delta = relativedelta(weeks=self.measure_frequency_unit_value)
|
||
elif self.measure_frequency_unit == 'month':
|
||
delta = relativedelta(months=self.measure_frequency_unit_value)
|
||
date_previous = datetime.today() - delta
|
||
checks = self.env['quality.check'].search([
|
||
('point_id', '=', self.id),
|
||
('create_date', '>=', date_previous.strftime(DEFAULT_SERVER_DATETIME_FORMAT))], limit=1)
|
||
return not (bool(checks))
|
||
return super(QualityPoint, self).check_execute_now()
|
||
|
||
def _get_type_default_domain(self):
|
||
domain = super(QualityPoint, self)._get_type_default_domain()
|
||
domain.append(('technical_name', '=', 'passfail'))
|
||
return domain
|
||
|
||
def action_see_quality_checks(self):
|
||
self.ensure_one()
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
|
||
action['domain'] = [('point_id', '=', self.id)]
|
||
action['context'] = {
|
||
'default_company_id': self.company_id.id,
|
||
'default_point_id': self.id
|
||
}
|
||
return action
|
||
|
||
def action_see_spc_control(self):
|
||
self.ensure_one()
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_spc")
|
||
if self.test_type == 'measure':
|
||
action['context'] = {'group_by': ['name', 'point_id'], 'graph_measure': ['measure'], 'graph_mode': 'line'}
|
||
action['domain'] = [('point_id', '=', self.id), ('quality_state', '!=', 'none')]
|
||
return action
|
||
|
||
|
||
class QualityCheck(models.Model):
|
||
_inherit = "quality.check"
|
||
part_name = fields.Char('零件名称', related='product_id.part_name')
|
||
part_number = fields.Char('零件图号', related='product_id.part_number', readonly=False, store=True)
|
||
material_name = fields.Char('材料名称', compute='_compute_material_name')
|
||
|
||
# # 总数量,值为调拨单_产品明细_数量
|
||
# total_qty = fields.Float('总数量', compute='_compute_total_qty', readonly=True)
|
||
# # 检验数
|
||
# check_qty = fields.Float('检验数', compute='_compute_check_qty', readonly=True)
|
||
# # 出厂检验报告编号
|
||
# report_number = fields.Char('出厂检验报告编号', compute='_compute_report_number', readonly=True)
|
||
# 总数量,值为调拨单_产品明细_数量
|
||
total_qty = fields.Char('总数量', compute='_compute_total_qty')
|
||
|
||
column_nums = fields.Integer('测量值列数', default=1)
|
||
|
||
@api.depends('picking_id')
|
||
def _compute_total_qty(self):
|
||
for record in self:
|
||
if record.picking_id:
|
||
total_qty = 0
|
||
for move in record.picking_id.move_ids_without_package:
|
||
if move.product_id == record.product_id:
|
||
total_qty = int(move.product_uom_qty)
|
||
record.total_qty = total_qty if total_qty > 0 else ''
|
||
else:
|
||
record.total_qty = ''
|
||
|
||
# 检验数
|
||
check_qty = fields.Integer('检验数', default=lambda self: self._get_default_check_qty())
|
||
|
||
def _get_default_check_qty(self):
|
||
"""根据条件设置检验数的默认值"""
|
||
# 这里需要使用_origin来获取已存储的记录,因为新记录在创建时可能还没有这些值
|
||
if self._origin:
|
||
if self._origin.measure_on == 'product' and self._origin.test_type_id.name == '出厂检验报告':
|
||
return ''
|
||
elif self._origin.measure_on == 'product':
|
||
return '1'
|
||
return ''
|
||
|
||
@api.onchange('test_type_id', 'measure_on')
|
||
def _onchange_check_qty(self):
|
||
"""当测试类型或测量对象变化时,更新检验数"""
|
||
if self.measure_on == 'product' and self.test_type_id.name == '出厂检验报告':
|
||
self.check_qty = 0
|
||
elif self.measure_on == 'product':
|
||
self.check_qty = 1
|
||
|
||
# 出厂检验报告编号
|
||
report_number_id = fields.Many2one('documents.document', string='出厂检验报告编号', readonly=True)
|
||
report_number_name = fields.Char('出厂检验报告编号名称', compute='_compute_report_number_name')
|
||
|
||
old_report_name = fields.Char('旧出厂检验报告编号', default='')
|
||
|
||
@api.depends('serial_number', 'part_number')
|
||
def _compute_report_number_name(self):
|
||
for record in self:
|
||
str_serial_number = '0' + str(record.serial_number) if record.serial_number < 10 else str(
|
||
record.serial_number)
|
||
str_part_number = record.part_number if record.part_number else ''
|
||
record.report_number_name = f'FQC{str_part_number}{str_serial_number}'
|
||
|
||
# 出厂检验报告、关联文档的数据
|
||
report_content = fields.Binary(string='出厂检验报告', related='report_number_id.datas')
|
||
|
||
is_out_check = fields.Boolean(string='是否出库检验', compute='_compute_is_out_check', readonly=True)
|
||
|
||
measure_line_ids = fields.One2many('quality.check.measure.line', 'check_id', string='测量明细')
|
||
|
||
categ_type = fields.Selection(string='产品的类别', related='product_id.categ_id.type', store=True)
|
||
|
||
report_result = fields.Selection([
|
||
('OK', 'OK'),
|
||
('NG', 'NG')
|
||
], string='出厂检验报告结果', default='OK')
|
||
measure_operator = fields.Many2one('res.users', string='操机员')
|
||
quality_manager = fields.Many2one('res.users', string='质检员')
|
||
|
||
# 流水号(从1开始,最大99)
|
||
serial_number = fields.Integer('流水号', default=1, readonly=True)
|
||
|
||
# 发布历史
|
||
report_history_ids = fields.One2many('quality.check.report.history', 'check_id', string='发布历史')
|
||
|
||
# 发布状态
|
||
publish_status = fields.Selection([
|
||
('draft', '草稿'),
|
||
('published', '已发布'),
|
||
('canceled', '已撤销')
|
||
], string='发布状态', default='draft')
|
||
|
||
# 出厂检验报告是否已上传
|
||
is_factory_report_uploaded = fields.Boolean(string='出厂检验报告是否已上传', default=False)
|
||
|
||
def add_measure_line(self):
|
||
"""
|
||
新增测量值,如果测量值有5列了,则提示“最多只能有5列测量值”
|
||
"""
|
||
if self.column_nums >= 5:
|
||
raise UserError(_('最多只能有5列测量值'))
|
||
else:
|
||
for line in self.measure_line_ids:
|
||
field_name = f'measure_value{self.column_nums + 1}'
|
||
if hasattr(line, field_name):
|
||
line[field_name] = False
|
||
self.column_nums = self.column_nums + 1
|
||
|
||
def remove_measure_line(self):
|
||
"""
|
||
删除测量值
|
||
"""
|
||
if self.column_nums <= 1:
|
||
raise UserError(_('最少要有1列测量值'))
|
||
else:
|
||
for line in self.measure_line_ids:
|
||
field_name = f'measure_value{self.column_nums}'
|
||
if hasattr(line, field_name):
|
||
line[field_name] = False
|
||
self.column_nums = self.column_nums - 1
|
||
|
||
def do_preview(self):
|
||
"""
|
||
预览出厂检验报告
|
||
"""
|
||
pass
|
||
|
||
def do_publish(self):
|
||
"""发布出厂检验报告"""
|
||
self.ensure_one()
|
||
self._check_part_number()
|
||
self._check_measure_line()
|
||
self._check_check_qty_and_total_qty()
|
||
|
||
# 打开确认向导而不是直接发布
|
||
return {
|
||
'name': _('发布确认'),
|
||
'type': 'ir.actions.act_window',
|
||
'res_model': 'quality.check.publish.wizard',
|
||
'view_mode': 'form',
|
||
'target': 'new',
|
||
'context': {
|
||
'default_check_id': self.id,
|
||
'default_product_name': self.product_id.name,
|
||
'default_total_qty': self.total_qty,
|
||
'default_check_qty': self.check_qty,
|
||
'default_measure_count': self.column_nums,
|
||
'default_item_count': len(self.measure_line_ids),
|
||
'default_old_report_name': self.old_report_name,
|
||
'default_publish_status': self.publish_status,
|
||
}
|
||
}
|
||
|
||
def _do_publish_implementation(self):
|
||
"""实际执行发布操作的方法"""
|
||
self.ensure_one()
|
||
|
||
# 1. 获取已发布的文档文件夹
|
||
workspace = self.env['documents.folder'].search(
|
||
[('parent_folder_id', '=', self.env.ref('sf_quality.documents_purchase_contracts_folder').id),
|
||
('name', '=', '已发布')], limit=1)
|
||
|
||
if self.serial_number > 99:
|
||
raise UserError(_('流水号不能大于99'))
|
||
|
||
# 2. 先创建空文档记录
|
||
doc_vals = {
|
||
'name': self.report_number_name,
|
||
'mimetype': 'application/pdf',
|
||
'res_id': self.id,
|
||
'folder_id': workspace.id,
|
||
'res_model': self._name,
|
||
}
|
||
|
||
doc = self.env['documents.document'].create(doc_vals)
|
||
|
||
# 3. 关联文档到质检记录
|
||
self.write({
|
||
'report_number_id': doc.id,
|
||
'quality_state': 'pass'
|
||
})
|
||
|
||
# 4. 获取报告动作并生成PDF(此时二维码将包含正确的文档ID)
|
||
report_action = self.env.ref('sf_quality.action_report_quality_inspection')
|
||
pdf_content, _ = report_action._render_qweb_pdf(
|
||
report_ref=report_action.report_name,
|
||
res_ids=self.ids
|
||
)
|
||
|
||
# 5. 更新文档内容
|
||
doc.write({
|
||
'raw': pdf_content
|
||
})
|
||
|
||
# 6. 记录发布历史
|
||
self.env['quality.check.report.history'].create({
|
||
'check_id': self.id,
|
||
'report_number_id': doc.id,
|
||
'action': 'publish',
|
||
'operator': self.env.user.name,
|
||
'operation_time': datetime.now(),
|
||
'document_status': 'published',
|
||
'sequence': len(self.report_history_ids) + 1
|
||
})
|
||
|
||
# 7. 更新其他信息
|
||
self.serial_number += 1
|
||
self.quality_manager = self.env.user.id
|
||
|
||
if self.publish_status == 'canceled' and self.picking_id.state == 'done':
|
||
self.upload_factory_report()
|
||
|
||
self.write({
|
||
'publish_status': 'published',
|
||
})
|
||
|
||
return True
|
||
|
||
# 发布前检验零件图号、操机员、质检员
|
||
def _check_part_number(self):
|
||
if not self.part_number:
|
||
raise UserError(_('零件图号不能为空'))
|
||
if not self.measure_operator:
|
||
raise UserError(_('操机员不能为空'))
|
||
|
||
# 发布前校验明细行列均非空
|
||
def _check_measure_line(self):
|
||
for record in self:
|
||
if not record.measure_line_ids:
|
||
raise UserError(_('请先添加测量明细'))
|
||
for line in record.measure_line_ids:
|
||
if not line.measure_item:
|
||
raise UserError(_('有检测项目值为空'))
|
||
for i in range(1, record.column_nums + 1):
|
||
if not getattr(line, f'measure_value{i}'):
|
||
raise UserError(_('有测量值为空'))
|
||
|
||
# 发布前校验检验数与总数量、检验数与测量件数(即测量列数)
|
||
def _check_check_qty_and_total_qty(self):
|
||
for record in self:
|
||
if not record.check_qty:
|
||
raise UserError(_('请先输入检验数'))
|
||
if not record.total_qty:
|
||
raise UserError(_('总数量不能为空'))
|
||
if record.check_qty > int(record.total_qty):
|
||
raise UserError(_('检验数不可超过总数量'))
|
||
if record.column_nums > record.check_qty:
|
||
raise UserError(_('测量件数不可超过检验数'))
|
||
|
||
def do_cancel_publish(self):
|
||
"""
|
||
取消发布出厂检验报告(将当前质检单关联的出厂检验报告文档位置移动到废弃文件夹), 并记录发布历史
|
||
"""
|
||
self.ensure_one()
|
||
# 1. 获取已发布的文档文件夹
|
||
workspace = self.env['documents.folder'].search(
|
||
[('parent_folder_id', '=', self.env.ref('sf_quality.documents_purchase_contracts_folder').id),
|
||
('name', '=', '已发布')], limit=1)
|
||
# 2. 将当前质检单关联的出厂检验报告文档位置移动到废弃文件夹
|
||
self.report_number_id.write({
|
||
'folder_id': self.env.ref('sf_quality.documents_purchase_contracts_folder_canceled').id,
|
||
})
|
||
|
||
# 3. 记录发布历史
|
||
self.env['quality.check.report.history'].create({
|
||
'check_id': self.id,
|
||
'report_number_id': self.report_number_id.id,
|
||
'action': 'cancel_publish',
|
||
'operator': self.env.user.name,
|
||
'operation_time': datetime.now(),
|
||
'document_status': 'canceled',
|
||
'sequence': len(self.report_history_ids) + 1
|
||
})
|
||
|
||
self.write({
|
||
'old_report_name': self.report_number_id.name
|
||
})
|
||
|
||
# 3. 更新发布状态
|
||
self.write({
|
||
'publish_status': 'canceled',
|
||
'report_number_id': False,
|
||
'quality_state': 'none'
|
||
})
|
||
|
||
if self.is_factory_report_uploaded:
|
||
# 4. 删除加工订单明细中的出厂检验报告
|
||
self.delete_factory_report()
|
||
|
||
return True
|
||
|
||
def do_re_publish(self):
|
||
"""
|
||
重新发布出厂检验报告,参考发布规则
|
||
"""
|
||
return self.do_publish()
|
||
|
||
def generate_qr_code(self):
|
||
"""生成二维码URL"""
|
||
self.ensure_one()
|
||
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
|
||
return image_data_uri(
|
||
b64encode(self.env['ir.actions.report'].barcode(
|
||
'QR', base_url + '/#/index/publicPay?order_id=' + str(self.id) + '&source=%2Findex%2Fmyorder',
|
||
width=140, height=140)
|
||
)
|
||
)
|
||
|
||
def get_latest_report_attachment(self, check_id):
|
||
"""获取指定质检记录的最新报告附件,并删除旧的报告附件"""
|
||
# 查找特定质检记录的所有附件
|
||
attachments = self.env['ir.attachment'].search([
|
||
('res_model', '=', 'quality.check'),
|
||
('res_id', '=', check_id),
|
||
('name', 'like', 'QC-QC') # 根据您的命名规则调整
|
||
], order='create_date DESC') # 按创建日期降序排序
|
||
|
||
# # 如果附件数量大于1,则删除除最新报告外的其他报告附件
|
||
# if len(attachments) > 1:
|
||
# for attachment in attachments[1:]:
|
||
# attachment.unlink()
|
||
|
||
# 返回最新的附件(如果存在)
|
||
return attachments and attachments[0] or False
|
||
|
||
def get_report_url(self):
|
||
"""生成报告访问URL"""
|
||
self.ensure_one()
|
||
if self.report_number_id:
|
||
return f'/quality/report/{self.report_number_id.id}'
|
||
return False
|
||
|
||
def upload_factory_report(self):
|
||
"""
|
||
上传出厂检验报告到加工订单明细中
|
||
将当前质检单的出厂检验报告上传到对应的加工订单明细中
|
||
"""
|
||
self.ensure_one()
|
||
if not self.report_content:
|
||
raise UserError(_('当前质检单没有出厂检验报告,请先发布报告'))
|
||
|
||
if not self.product_id.model_name:
|
||
raise UserError(_('产品模型名称为空'))
|
||
|
||
if not self.picking_id or not self.picking_id.origin:
|
||
raise UserError(_('无法找到相关的调拨单或来源单据'))
|
||
|
||
# 获取订单号(从调拨单的来源字段获取)
|
||
order_ref = self.picking_id.retrospect_ref
|
||
|
||
try:
|
||
# 准备请求数据
|
||
payload = {
|
||
"order_ref": order_ref,
|
||
"model_name": self.product_id.model_name,
|
||
"report_file": self.report_content.decode('utf-8') if isinstance(self.report_content,
|
||
bytes) else self.report_content
|
||
}
|
||
|
||
# 将Python字典转换为JSON字符串
|
||
json_data = json.dumps(payload)
|
||
|
||
# 获取服务器URL
|
||
base_url = self.env['ir.config_parameter'].sudo().get_param('bfm_url_new')
|
||
api_url = f"{base_url}/api/report/create"
|
||
|
||
# 设置请求头
|
||
headers = {
|
||
'Content-Type': 'application/json',
|
||
}
|
||
|
||
# 发送POST请求
|
||
response = requests.post(api_url, data=json_data, headers=headers)
|
||
|
||
# 处理响应
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('success'):
|
||
# 上传成功,显示成功消息
|
||
self.is_factory_report_uploaded = True
|
||
return {
|
||
'type': 'ir.actions.client',
|
||
'tag': 'display_notification',
|
||
'params': {
|
||
'title': _('上传成功'),
|
||
'message': _('出厂检验报告已成功上传到加工订单明细'),
|
||
'type': 'success',
|
||
'sticky': False,
|
||
}
|
||
}
|
||
else:
|
||
# API返回失败信息
|
||
raise UserError(_('上传失败: %s') % result.get('message', '未知错误'))
|
||
else:
|
||
# HTTP请求失败
|
||
raise UserError(_('请求失败,状态码: %s') % response.status_code)
|
||
|
||
except Exception as e:
|
||
raise UserError(_('上传过程中发生错误: %s') % str(e))
|
||
|
||
def delete_factory_report(self):
|
||
"""
|
||
删除加工订单明细中的出厂检验报告
|
||
"""
|
||
# 获取订单号(从调拨单的来源字段获取)
|
||
order_ref = self.picking_id.retrospect_ref
|
||
|
||
if not order_ref:
|
||
raise UserError(_('无法找到相关的调拨单或来源单据'))
|
||
|
||
if not self.product_id.model_name:
|
||
raise UserError(_('产品模型名称为空'))
|
||
|
||
try:
|
||
# 准备请求数据
|
||
payload = {
|
||
"order_ref": order_ref,
|
||
"model_name": self.product_id.model_name
|
||
}
|
||
|
||
# 将Python字典转换为JSON字符串
|
||
json_data = json.dumps(payload)
|
||
|
||
# 获取服务器URL
|
||
base_url = self.env['ir.config_parameter'].sudo().get_param('bfm_url_new')
|
||
api_url = f"{base_url}/api/report/delete"
|
||
|
||
# 设置请求头
|
||
headers = {
|
||
'Content-Type': 'application/json',
|
||
}
|
||
|
||
# 发送POST请求
|
||
response = requests.post(api_url, data=json_data, headers=headers)
|
||
|
||
# 处理响应
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('success'):
|
||
# 删除成功,显示成功消息
|
||
self.is_factory_report_uploaded = False
|
||
return {
|
||
'type': 'ir.actions.client',
|
||
'tag': 'display_notification',
|
||
'params': {
|
||
'title': _('删除成功'),
|
||
'message': _('出厂检验报告已成功删除'),
|
||
'type': 'success',
|
||
'sticky': False,
|
||
}
|
||
}
|
||
else:
|
||
# API返回失败信息
|
||
raise UserError(_('删除失败: %s') % result.get('message', '未知错误'))
|
||
else:
|
||
# HTTP请求失败
|
||
raise UserError(_('请求失败,状态码: %s') % response.status_code)
|
||
|
||
except Exception as e:
|
||
raise UserError(_('删除过程中发生错误: %s') % str(e))
|
||
|
||
@depends('product_id')
|
||
def _compute_material_name(self):
|
||
for record in self:
|
||
materials_id_name = record.product_id.materials_id.name if record.product_id.materials_id else ''
|
||
materials_type_name = record.product_id.materials_type_id.name if record.product_id.materials_type_id else ''
|
||
record.material_name = materials_id_name + ' ' + materials_type_name
|
||
|
||
@depends('test_type_id')
|
||
def _compute_is_out_check(self):
|
||
for record in self:
|
||
if record.test_type_id.name == '出厂检验报告':
|
||
record.is_out_check = True
|
||
else:
|
||
record.is_out_check = False
|
||
|
||
failure_message = fields.Html(related='point_id.failure_message', readonly=True)
|
||
measure = fields.Float('Measure', default=0.0, digits='Quality Tests', tracking=True)
|
||
measure_success = fields.Selection([
|
||
('none', 'No measure'),
|
||
('pass', 'Pass'),
|
||
('fail', 'Fail')], string="Measure Success", compute="_compute_measure_success",
|
||
readonly=True, store=True)
|
||
tolerance_min = fields.Float('Min Tolerance', related='point_id.tolerance_min', readonly=True)
|
||
tolerance_max = fields.Float('Max Tolerance', related='point_id.tolerance_max', readonly=True)
|
||
warning_message = fields.Text(compute='_compute_warning_message')
|
||
norm_unit = fields.Char(related='point_id.norm_unit', readonly=True)
|
||
qty_to_test = fields.Float(compute="_compute_qty_to_test", string="Quantity to Test",
|
||
help="Quantity of product to test within the lot")
|
||
qty_tested = fields.Float(string="Quantity Tested", help="Quantity of product tested within the lot")
|
||
measure_on = fields.Selection([
|
||
('operation', 'Operation'),
|
||
('product', 'Product'),
|
||
('move_line', 'Quantity')], string="Control per", default='product', required=True,
|
||
help="""Operation = One quality check is requested at the operation level.
|
||
Product = A quality check is requested per product.
|
||
Quantity = A quality check is requested for each new product quantity registered, with partial quantity checks also possible.""")
|
||
move_line_id = fields.Many2one('stock.move.line', 'Stock Move Line', check_company=True,
|
||
help="In case of Quality Check by Quantity, Move Line on which the Quality Check applies")
|
||
lot_name = fields.Char('Lot/Serial Number Name')
|
||
lot_line_id = fields.Many2one('stock.lot', store=True, compute='_compute_lot_line_id')
|
||
qty_line = fields.Float(compute='_compute_qty_line', string="Quantity")
|
||
uom_id = fields.Many2one(related='product_id.uom_id', string="Product Unit of Measure")
|
||
show_lot_text = fields.Boolean(compute='_compute_show_lot_text')
|
||
is_lot_tested_fractionally = fields.Boolean(related='point_id.is_lot_tested_fractionally')
|
||
testing_percentage_within_lot = fields.Float(related="point_id.testing_percentage_within_lot")
|
||
product_tracking = fields.Selection(related='product_id.tracking')
|
||
quality_check_type = fields.Selection([
|
||
('采购入库检', '采购入库检'),
|
||
('客供料入库检', '客供料入库检'),
|
||
('退货入库检', '退货入库检'),
|
||
('生产入库检', '生产入库检'),
|
||
('外协入库检', '外协入库检'),
|
||
('成品发货检', '成品发货检'),
|
||
('工序外协发货检', '工序外协发货检'),
|
||
('委外坯料发货检', '委外坯料发货检')], string='类型', compute='_compute_quality_check_type', store=True)
|
||
|
||
@api.depends('picking_id')
|
||
def _compute_quality_check_type(self):
|
||
for check in self:
|
||
if check.picking_id:
|
||
picking_type = check.picking_id.picking_type_id.sequence_code
|
||
type_mapping = {
|
||
'IN': '采购入库检',
|
||
'DL': '客供料入库检',
|
||
'RET': '退货入库检',
|
||
'SFP': '生产入库检',
|
||
'OCIN': '外协入库检',
|
||
'OUT': '成品发货检',
|
||
'OCOUT': '工序外协发货检',
|
||
'RES': '委外坯料发货检',
|
||
}
|
||
check.quality_check_type = type_mapping.get(picking_type, False)
|
||
else:
|
||
check.quality_check_type = False
|
||
|
||
@api.depends('measure_success')
|
||
def _compute_warning_message(self):
|
||
for rec in self:
|
||
if rec.measure_success == 'fail':
|
||
rec.warning_message = _('You measured %.2f %s and it should be between %.2f and %.2f %s.') % (
|
||
rec.measure, rec.norm_unit, rec.point_id.tolerance_min,
|
||
rec.point_id.tolerance_max, rec.norm_unit
|
||
)
|
||
else:
|
||
rec.warning_message = ''
|
||
|
||
@api.depends('move_line_id.qty_done')
|
||
def _compute_qty_line(self):
|
||
for qc in self:
|
||
qc.qty_line = qc.move_line_id.qty_done
|
||
|
||
@api.depends('move_line_id.lot_id')
|
||
def _compute_lot_line_id(self):
|
||
for qc in self:
|
||
qc.lot_line_id = qc.move_line_id.lot_id
|
||
if qc.lot_line_id:
|
||
qc.lot_id = qc.lot_line_id
|
||
|
||
@api.depends('measure')
|
||
def _compute_measure_success(self):
|
||
for rec in self:
|
||
if rec.point_id.test_type == 'passfail':
|
||
rec.measure_success = 'none'
|
||
else:
|
||
if rec.measure < rec.point_id.tolerance_min or rec.measure > rec.point_id.tolerance_max:
|
||
rec.measure_success = 'fail'
|
||
else:
|
||
rec.measure_success = 'pass'
|
||
|
||
# Add picture dependency
|
||
@api.depends('picture')
|
||
def _compute_result(self):
|
||
super(QualityCheck, self)._compute_result()
|
||
|
||
@api.depends('qty_line', 'testing_percentage_within_lot', 'is_lot_tested_fractionally')
|
||
def _compute_qty_to_test(self):
|
||
for qc in self:
|
||
if qc.is_lot_tested_fractionally:
|
||
qc.qty_to_test = float_round(qc.qty_line * qc.testing_percentage_within_lot / 100,
|
||
precision_rounding=self.product_id.uom_id.rounding, rounding_method="UP")
|
||
else:
|
||
qc.qty_to_test = qc.qty_line
|
||
|
||
@api.depends('lot_line_id', 'move_line_id')
|
||
def _compute_show_lot_text(self):
|
||
for qc in self:
|
||
if qc.lot_line_id or not qc.move_line_id:
|
||
qc.show_lot_text = False
|
||
else:
|
||
qc.show_lot_text = True
|
||
|
||
def _is_pass_fail_applicable(self):
|
||
if self.test_type in ['passfail', 'measure']:
|
||
return True
|
||
return super()._is_pass_fail_applicable()
|
||
|
||
def _get_check_result(self):
|
||
if self.test_type == 'picture' and self.picture:
|
||
return _('Picture Uploaded')
|
||
else:
|
||
return super(QualityCheck, self)._get_check_result()
|
||
|
||
def _check_to_unlink(self):
|
||
return True
|
||
|
||
def do_measure(self):
|
||
self.ensure_one()
|
||
if self.measure < self.point_id.tolerance_min or self.measure > self.point_id.tolerance_max:
|
||
return self.do_fail()
|
||
else:
|
||
return self.do_pass()
|
||
|
||
def correct_measure(self):
|
||
self.ensure_one()
|
||
return {
|
||
'name': _('Quality Checks'),
|
||
'type': 'ir.actions.act_window',
|
||
'res_model': 'quality.check',
|
||
'view_mode': 'form',
|
||
'view_id': self.env.ref('quality_control.quality_check_view_form_small').id,
|
||
'target': 'new',
|
||
'res_id': self.id,
|
||
'context': self.env.context,
|
||
}
|
||
|
||
def do_alert(self):
|
||
self.ensure_one()
|
||
alert = self.env['quality.alert'].create({
|
||
'check_id': self.id,
|
||
'product_id': self.product_id.id,
|
||
'product_tmpl_id': self.product_id.product_tmpl_id.id,
|
||
'lot_id': self.lot_id.id,
|
||
'user_id': self.user_id.id,
|
||
'team_id': self.team_id.id,
|
||
'company_id': self.company_id.id
|
||
})
|
||
return {
|
||
'name': _('Quality Alert'),
|
||
'type': 'ir.actions.act_window',
|
||
'res_model': 'quality.alert',
|
||
'views': [(self.env.ref('quality_control.quality_alert_view_form').id, 'form')],
|
||
'res_id': alert.id,
|
||
'context': {'default_check_id': self.id},
|
||
}
|
||
|
||
def action_see_alerts(self):
|
||
self.ensure_one()
|
||
if len(self.alert_ids) == 1:
|
||
return {
|
||
'name': _('Quality Alert'),
|
||
'type': 'ir.actions.act_window',
|
||
'res_model': 'quality.alert',
|
||
'views': [(self.env.ref('quality_control.quality_alert_view_form').id, 'form')],
|
||
'res_id': self.alert_ids.ids[0],
|
||
'context': {'default_check_id': self.id},
|
||
}
|
||
else:
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_alert_action_check")
|
||
action['domain'] = [('id', 'in', self.alert_ids.ids)]
|
||
action['context'] = dict(self._context, default_check_id=self.id)
|
||
return action
|
||
|
||
def action_open_quality_check_wizard(self, current_check_id=None):
|
||
check_ids = sorted(self.ids)
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.action_quality_check_wizard")
|
||
action['context'] = self.env.context.copy()
|
||
action['context'].update({
|
||
'default_check_ids': check_ids,
|
||
'default_current_check_id': current_check_id or check_ids[0],
|
||
})
|
||
return action
|
||
|
||
|
||
class QualityAlert(models.Model):
|
||
_inherit = "quality.alert"
|
||
|
||
title = fields.Char('Title')
|
||
part_number = fields.Char(string='零件图号', compute='_compute_part_info', store=True)
|
||
part_name = fields.Char(string='零件名称', compute='_compute_part_info', store=True)
|
||
|
||
@api.depends('product_id', 'picking_id')
|
||
def _compute_part_info(self):
|
||
for alert in self:
|
||
if alert.product_tmpl_id.categ_id.name == '成品':
|
||
alert.part_number = alert.product_id.part_number
|
||
alert.part_name = alert.product_id.part_name
|
||
elif alert.product_id.categ_id.name == '坯料':
|
||
if alert.picking_id.move_ids_without_package:
|
||
alert.part_number = alert.picking_id.move_ids_without_package[0].part_number
|
||
alert.part_name = alert.picking_id.move_ids_without_package[0].part_name
|
||
|
||
def action_see_check(self):
|
||
return {
|
||
'name': _('Quality Check'),
|
||
'type': 'ir.actions.act_window',
|
||
'view_mode': 'form',
|
||
'res_model': 'quality.check',
|
||
'target': 'current',
|
||
'res_id': self.check_id.id,
|
||
}
|
||
|
||
@api.depends('name', 'title')
|
||
def name_get(self):
|
||
result = []
|
||
for record in self:
|
||
name = record.name + ' - ' + record.title if record.title else record.name
|
||
result.append((record.id, name))
|
||
return result
|
||
|
||
@api.model
|
||
def name_create(self, name):
|
||
""" Create an alert with name_create should use prepend the sequence in the name """
|
||
values = {
|
||
'title': name,
|
||
}
|
||
return self.create(values).name_get()[0]
|
||
|
||
@api.model
|
||
def message_new(self, msg_dict, custom_values=None):
|
||
""" Override, used with creation by email alias. The purpose of the override is
|
||
to use the subject for title and body for description instead of the name.
|
||
"""
|
||
# We need to add the name in custom_values or it will use the subject.
|
||
custom_values['name'] = self.env['ir.sequence'].next_by_code('quality.alert') or _('New')
|
||
if msg_dict.get('subject'):
|
||
custom_values['title'] = msg_dict['subject']
|
||
if msg_dict.get('body'):
|
||
custom_values['description'] = msg_dict['body']
|
||
return super(QualityAlert, self).message_new(msg_dict, custom_values)
|
||
|
||
|
||
class ProductTemplate(models.Model):
|
||
_inherit = "product.template"
|
||
|
||
quality_control_point_qty = fields.Integer(compute='_compute_quality_check_qty',
|
||
groups='quality.group_quality_user')
|
||
quality_pass_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
|
||
quality_fail_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
|
||
|
||
@api.depends('product_variant_ids')
|
||
def _compute_quality_check_qty(self):
|
||
for product_tmpl in self:
|
||
product_tmpl.quality_fail_qty, product_tmpl.quality_pass_qty = product_tmpl.product_variant_ids._count_quality_checks()
|
||
product_tmpl.quality_control_point_qty = product_tmpl.with_context(
|
||
active_test=product_tmpl.active).product_variant_ids._count_quality_points()
|
||
|
||
def action_see_quality_control_points(self):
|
||
self.ensure_one()
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_point_action")
|
||
action['context'] = dict(self.env.context, default_product_ids=self.product_variant_ids.ids)
|
||
|
||
domain_in_products_or_categs = ['|', ('product_ids', 'in', self.product_variant_ids.ids),
|
||
('product_category_ids', 'parent_of', self.categ_id.ids)]
|
||
domain_no_products_and_categs = [('product_ids', '=', False), ('product_category_ids', '=', False)]
|
||
action['domain'] = OR([domain_in_products_or_categs, domain_no_products_and_categs])
|
||
return action
|
||
|
||
def action_see_quality_checks(self):
|
||
self.ensure_one()
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
|
||
action['context'] = dict(self.env.context, default_product_id=self.product_variant_id.id, create=False)
|
||
action['domain'] = [
|
||
'|',
|
||
('product_id', 'in', self.product_variant_ids.ids),
|
||
'&',
|
||
('measure_on', '=', 'operation'),
|
||
('picking_id.move_ids.product_tmpl_id', '=', self.id),
|
||
]
|
||
return action
|
||
|
||
|
||
class ProductProduct(models.Model):
|
||
_inherit = "product.product"
|
||
|
||
quality_control_point_qty = fields.Integer(compute='_compute_quality_check_qty',
|
||
groups='quality.group_quality_user')
|
||
quality_pass_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
|
||
quality_fail_qty = fields.Integer(compute='_compute_quality_check_qty', groups='quality.group_quality_user')
|
||
|
||
def _compute_quality_check_qty(self):
|
||
for product in self:
|
||
product.quality_fail_qty, product.quality_pass_qty = product._count_quality_checks()
|
||
product.quality_control_point_qty = product._count_quality_points()
|
||
|
||
def _count_quality_checks(self):
|
||
quality_fail_qty = 0
|
||
quality_pass_qty = 0
|
||
domain = [
|
||
'|',
|
||
('product_id', 'in', self.ids),
|
||
'&',
|
||
('measure_on', '=', 'operation'),
|
||
('picking_id.move_ids.product_id', 'in', self.ids),
|
||
('company_id', '=', self.env.company.id),
|
||
('quality_state', '!=', 'none')
|
||
]
|
||
quality_checks_by_state = self.env['quality.check']._read_group(domain, ['product_id'], ['quality_state'])
|
||
for checks_data in quality_checks_by_state:
|
||
if checks_data['quality_state'] == 'fail':
|
||
quality_fail_qty = checks_data['quality_state_count']
|
||
elif checks_data['quality_state'] == 'pass':
|
||
quality_pass_qty = checks_data['quality_state_count']
|
||
|
||
return quality_fail_qty, quality_pass_qty
|
||
|
||
def _count_quality_points(self):
|
||
""" Compute the count of all related quality points, which means quality points that have either
|
||
the product in common, a product category parent of this product's category or no product/category
|
||
set at all.
|
||
"""
|
||
|
||
query = self.env['quality.point']._where_calc([('company_id', '=', self.env.company.id)])
|
||
self.env['quality.point']._apply_ir_rules(query, 'read')
|
||
_, where_clause, where_clause_args = query.get_sql()
|
||
additional_where_clause = self._additional_quality_point_where_clause()
|
||
where_clause += additional_where_clause
|
||
parent_category_ids = [int(parent_id) for parent_id in
|
||
self.categ_id.parent_path.split('/')[:-1]] if self.categ_id else []
|
||
|
||
self.env.cr.execute("""
|
||
SELECT COUNT(*)
|
||
FROM quality_point
|
||
WHERE %s
|
||
AND (
|
||
(
|
||
-- QP has at least one linked product and one is right
|
||
EXISTS (SELECT 1 FROM product_product_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id AND rel.product_product_id = ANY(%%s))
|
||
-- Or QP has at least one linked product category and one is right
|
||
OR EXISTS (SELECT 1 FROM product_category_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id AND rel.product_category_id = ANY(%%s))
|
||
)
|
||
OR (
|
||
-- QP has no linked products
|
||
NOT EXISTS (SELECT 1 FROM product_product_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id)
|
||
-- And QP has no linked product categories
|
||
AND NOT EXISTS (SELECT 1 FROM product_category_quality_point_rel rel WHERE rel.quality_point_id = quality_point.id)
|
||
)
|
||
)
|
||
""" % (where_clause,), where_clause_args + [self.ids, parent_category_ids]
|
||
)
|
||
return self.env.cr.fetchone()[0]
|
||
|
||
def action_see_quality_control_points(self):
|
||
self.ensure_one()
|
||
action = self.product_tmpl_id.action_see_quality_control_points()
|
||
action['context'].update(default_product_ids=self.ids)
|
||
|
||
domain_in_products_or_categs = ['|', ('product_ids', 'in', self.ids),
|
||
('product_category_ids', 'parent_of', self.categ_id.ids)]
|
||
domain_no_products_and_categs = [('product_ids', '=', False), ('product_category_ids', '=', False)]
|
||
action['domain'] = OR([domain_in_products_or_categs, domain_no_products_and_categs])
|
||
return action
|
||
|
||
def action_see_quality_checks(self):
|
||
self.ensure_one()
|
||
action = self.env["ir.actions.actions"]._for_xml_id("quality_control.quality_check_action_main")
|
||
action['context'] = dict(self.env.context, default_product_id=self.id, create=False)
|
||
action['domain'] = [
|
||
'|',
|
||
('product_id', '=', self.id),
|
||
'&',
|
||
('measure_on', '=', 'operation'),
|
||
('picking_id.move_ids.product_id', '=', self.id),
|
||
]
|
||
return action
|
||
|
||
def _additional_quality_point_where_clause(self):
|
||
return ""
|
||
|
||
|
||
class QualityCheckMeasureLine(models.Model):
|
||
_name = 'quality.check.measure.line'
|
||
_description = '质检测量明细'
|
||
_order = 'sequence, id'
|
||
|
||
sequence = fields.Integer('序号')
|
||
|
||
check_id = fields.Many2one('quality.check', string='质检单', required=True, ondelete='cascade')
|
||
|
||
# 基本信息
|
||
product_name = fields.Char('产品名称', related='check_id.product_id.name', readonly=True)
|
||
drawing_no = fields.Char('图号')
|
||
measure_item = fields.Char('检测项目')
|
||
|
||
# 测量值
|
||
measure_value1 = fields.Char('测量值1')
|
||
measure_value2 = fields.Char('测量值2')
|
||
measure_value3 = fields.Char('测量值3')
|
||
measure_value4 = fields.Char('测量值4')
|
||
measure_value5 = fields.Char('测量值5')
|
||
|
||
# # 展示列数
|
||
# column_nums = fields.Integer('列数', related='check_id.column_nums')
|
||
|
||
# 判定结果
|
||
measure_result = fields.Selection([
|
||
('OK', '合格'),
|
||
('NG', '不合格')
|
||
], string='判定', default='OK')
|
||
|
||
remark = fields.Char('备注')
|
||
|
||
def del_measure_value(self):
|
||
self.ensure_one()
|
||
self.sudo().unlink()
|
||
|
||
|
||
# 增加出厂检验报告发布历史
|
||
class QualityCheckReportHistory(models.Model):
|
||
_name = 'quality.check.report.history'
|
||
_description = '出厂检验报告发布历史'
|
||
|
||
check_id = fields.Many2one('quality.check', string='质检单', required=True, ondelete='cascade')
|
||
report_number_id = fields.Many2one('documents.document', string='报告编号', readonly=True)
|
||
|
||
sequence = fields.Integer('序号')
|
||
# 操作(发布、撤销发布、重新发布)
|
||
action = fields.Selection([
|
||
('publish', '发布'),
|
||
('cancel_publish', '撤销发布'),
|
||
('re_publish', '重新发布')
|
||
], string='操作')
|
||
# 操作人
|
||
operator = fields.Char('操作人')
|
||
# 操作时间
|
||
operation_time = fields.Datetime('操作时间')
|
||
# 文档状态(已发布、废弃)
|
||
document_status = fields.Selection([
|
||
('published', '已发布'),
|
||
('canceled', '废弃')
|
||
], string='操作后文档状态')
|