import json import requests import logging from odoo import fields, models, api from odoo.exceptions import ValidationError from datetime import datetime from odoo.addons.sf_base.commons.common import Common from odoo.tools import float_round, float_compare class QualityCheck(models.Model): _inherit = "quality.check" quality_state = fields.Selection([ ('waiting', '等待'), ('none', '待处理'), ('pass', '通过的'), ('fail', '失败的'), ('cancel', '已取消'), ], string='状态', tracking=True, store=True, default='none', copy=False, compute='_compute_quality_state') individuation_page_list = fields.Char('个性化记录', related='workorder_id.individuation_page_list') work_state = fields.Selection(related='workorder_id.state', string='工单状态') processing_panel = fields.Char(related='workorder_id.processing_panel', string='加工面') production_line_id = fields.Many2one(related='workorder_id.production_line_id', string='生产线') equipment_id = fields.Many2one(related='workorder_id.equipment_id', string='加工设备') model_file = fields.Binary(related='workorder_id.glb_file', string='加工模型') glb_url = fields.Char(related='workorder_id.glb_url', string='加工模型') detection_report = fields.Binary(related='workorder_id.detection_report', readonly=True, string='检测报告') test_results = fields.Selection([("合格", "合格"), ("返工", "返工")], string="检测结果", default='合格') reason = fields.Selection( [("programming", "编程"), ("cutter", "刀具"), ("clamping", "装夹"), ("operate computer", "操机"), ("technology", "工艺"), ("customer redrawing", "客户改图")], string="原因") detailed_reason = fields.Text('详细原因') machining_drawings = fields.Binary('2D加工图纸', related='product_id.machining_drawings') quality_standard = fields.Binary('质检标准', related='product_id.quality_standard') operation_id = fields.Many2one('mrp.routing.workcenter', '作业', store=True, compute='_compute_operation_id') is_inspect = fields.Boolean('需送检', related='point_id.is_inspect') lot_name = fields.Char('批次/序列号 名称', compute='_compute_lot_name', store=True) @api.depends('move_line_id', 'move_line_id.lot_name') def _compute_lot_name(self): for qc in self: if qc.move_line_id: qc.lot_name = qc.move_line_id.lot_name @api.depends('point_id.operation_id') def _compute_operation_id(self): for qc in self: if qc.point_id.operation_id: qc.operation_id = qc.point_id.operation_id.id @api.depends('point_id.is_inspect') def _compute_quality_state(self): for qc in self: if qc.point_id.is_inspect and qc.quality_state == 'none' and qc.workorder_id.state != 'to be detected': qc.quality_state = 'waiting' elif not qc.point_id.is_inspect and qc.quality_state == 'waiting': qc.quality_state = 'none' @api.onchange('test_results') def _onchange_test_results(self): if self.test_results == '合格': self.reason = False self.detailed_reason = False def do_pass(self): self.ensure_one() super().do_pass() if self.workorder_id: if self.workorder_id.state in ('pending', 'waiting'): raise ValidationError('工单未就绪!') # 1)将页签“判定结果”的检测结果值同步到【工单_后置三元检测_检测结果】 if self.test_results in ['返工', '报废']: raise ValidationError('请重新选择【判定结果】-【检测结果】') if self.workorder_id.state not in ['done']: self.workorder_id.write({'test_results': '合格'}) # 2)将关联的工单状态更新为“已完成” self.workorder_id.button_finish() def do_fail(self): self.ensure_one() super().do_fail() if self.workorder_id: if self.workorder_id.state in ('pending', 'waiting'): raise ValidationError('工单未就绪!') # 1)将页签“判定结果”的检测结果值同步到【工单_后置三元检测_检测结果】 if not self.test_results: raise ValidationError('请填写【判定结果】里的信息') if self.test_results == '合格': raise ValidationError('请重新选择【判定结果】-【检测结果】') if (self.workorder_id.routing_type != 'CNC加工' and self.workorder_id.individuation_page_list and 'PTD' not in self.workorder_id.individuation_page_list): self.workorder_id.production_id.write({'detection_result_ids': [(0, 0, { 'rework_reason': self.reason, 'detailed_reason': self.detailed_reason, 'processing_panel': self.workorder_id.processing_panel, 'routing_type': self.workorder_id.routing_type, 'handle_result': '待处理', 'test_results': self.test_results, 'test_report': self.workorder_id.detection_report})], 'is_scrap': True if self.test_results == '报废' else False }) if self.workorder_id.state not in ['done']: self.workorder_id.write( {'test_results': self.test_results, 'reason': self.reason, 'detailed_reason': self.detailed_reason}) # 2)将关联的工单状态更新为“已完成” self.workorder_id.button_finish() # ==========零件特采接口========== def _register_quality_check(self): config = self.env['res.config.settings'].get_values() # token = sf_sync_config['token'Ba F2CF5DCC-1A00-4234-9E95-65603F70CC8A] headers = {'Authorization': config['center_control_Authorization']} crea_url = config['center_control_url'] + "/AutoDeviceApi/PartSpecProc" origin = self.picking_id.origin production_id = self.env['mrp.production'].sudo().search([('name', '=', origin)]) rfid = '' if not production_id.workorder_ids else production_id.workorder_ids[-1].rfid_code or '' val = [rfid] # todo 需修改 val = ['0037818516'] logging.info('获取到的工单信息%s' % val) # r = requests.post(crea_url, json=val, headers=headers) r = self.env['api.request.log'].log_request( 'get', crea_url, name='零件特采', responser='中控系统', json=val, headers=headers ) ret = r.json() logging.info('_register_quality_check:%s' % ret) if ret['Succeed']: return "零件特采发送成功" else: raise ValidationError("零件特采发送失败") @api.model_create_multi def create(self, vals_list): for val in vals_list: if 'point_id' in val and 'measure_on' not in val: # 如果没有控制方式字段,则从检查点读取质量方式 point_id = self.env['quality.point'].browse(val['point_id']) val.update({'measure_on': point_id.measure_on}) return super(QualityCheck, self).create(vals_list) @api.depends('total_qty','testing_percentage_within_lot', 'is_lot_tested_fractionally') def _compute_workorder_qty_to_test(self): for qc in self: if qc.is_lot_tested_fractionally: rounding = qc.product_id.uom_id.rounding if qc.product_id.uom_id else 0.01 qc.workorder_qty_to_test = float_round(float(qc.total_qty) * qc.testing_percentage_within_lot / 100, precision_rounding=rounding, rounding_method="UP") else: qc.workorder_qty_to_test = qc.total_qty @api.depends('picking_id', 'workorder_id') def _compute_total_qty(self): super(QualityCheck, self)._compute_total_qty() for qc in self: if not qc.picking_id and qc.workorder_id: qc.total_qty = qc.workorder_id.production_id.product_qty @api.depends('workorder_qty_to_test') def _compute_workorder_qty_tested(self): for qc in self: qc.workorder_qty_tested = qc.workorder_qty_to_test workorder_qty_to_test = fields.Float('应检', compute='_compute_workorder_qty_to_test', store=True) workorder_qty_tested = fields.Float('已检', compute='_compute_workorder_qty_tested', store=True) workorder_qty_test_failed = fields.Float('不合格数') @api.onchange('total_qty', 'workorder_qty_test_failed', 'workorder_qty_to_test', 'workorder_qty_tested') def _onchage_qty(self): for record in self: if record.total_qty and record.workorder_qty_to_test and record.workorder_qty_to_test > float(record.total_qty): record.workorder_qty_to_test = float(record.total_qty) return { 'warning': { 'title': '警告', 'message': '待检数量不能超过总数量' } } if record.workorder_qty_to_test and record.workorder_qty_tested and record.workorder_qty_tested > record.workorder_qty_to_test: record.workorder_qty_tested = record.workorder_qty_to_test return { 'warning': { 'title': '警告', 'message': '已检数量不能超过待检数量' } } if record.workorder_qty_tested and record.workorder_qty_test_failed and record.workorder_qty_test_failed > record.workorder_qty_tested: record.workorder_qty_test_failed = record.workorder_qty_tested return { 'warning': { 'title': '警告', 'message': '不合格数量不能超过已检数量' } } total_qty_readonly = fields.Boolean(compute='_compute_total_qty_readonly', store=True) def _compute_total_qty_readonly(self): report_test_type_id = self.env.ref('sf_quality.test_type_factory_inspection').id for record in self: if (record.measure_on != 'move_line' or record.workorder_id is False) and record.point_id.test_type_id.id != report_test_type_id: record.total_qty_readonly = True else: record.total_qty_readonly = False def preview_doc(self): """预览出厂检验报告""" self.ensure_one() picking_qty = sum(self.picking_id.move_ids.mapped('product_uom_qty')) if not self._check_total_qty(picking_qty) and self.quality_state in ['waiting', 'none']: return { 'type': 'ir.actions.client', 'tag': 'jikimo_confirm_dialog', 'params': { 'active_id': self.id, 'message': f"拣货调拨单号{self.picking_id.name}需求数量为{picking_qty},当前质量检查单产品数量为{self.total_qty},数量不一致,是否确认继续?", 'next_model': self._name, 'next_method': 'preview_doc_confirm', 'context': self.env.context } } action = self.env.ref("sf_quality.action_report_quality_inspection_preview").read()[0] return action def preview_doc_confirm(self): self.ensure_one() action = self.env.ref("sf_quality.action_report_quality_inspection_preview").read()[0] action['context'] = { 'active_id': self.id, 'active_ids': [self.id] } return action def _check_total_qty(self, compare_qty): """ 检查质量检查单的总数量是否匹配 """ self.ensure_one() return float_compare(float(self.total_qty), compare_qty, self.picking_id.product_id.uom_id.rounding) == 0 def preview_do_publish(self): self.ensure_one() self._check_part_number() self._check_measure_line() self._check_check_qty_and_total_qty() picking_qty = sum(self.picking_id.move_ids.mapped('product_uom_qty')) if not self._check_total_qty(picking_qty): return { 'type': 'ir.actions.client', 'tag': 'jikimo_confirm_dialog', 'params': { 'active_id': self.id, 'message': f"拣货调拨单号{self.picking_id.name}需求数量为{picking_qty},当前质量检查单产品数量为{self.total_qty},数量不一致,是否确认继续?", 'next_model': self._name, 'next_method': 'preview_do_publish_confirm', 'context': self.env.context, } } else: return self.do_publish() def preview_do_publish_confirm(self): self.ensure_one() return { 'name': '发布确认', 'type': 'ir.actions.act_window', 'res_model': 'quality.check.publish.wizard', 'view_mode': 'form', 'views': [[False, 'form']], 'target': 'new', 'context': { 'default_check_id': self.id, 'default_product_name': self.product_id.name, 'default_total_qty': self.total_qty, 'default_check_qty': self.check_qty, 'default_measure_count': self.column_nums, 'default_item_count': len(self.measure_line_ids), 'default_old_report_name': self.old_report_name, 'default_publish_status': self.publish_status, 'is_web_request': True } }