+
diff --git a/quality_control/wizard/__init__.py b/quality_control/wizard/__init__.py
index 2b9d69c4..49415fd2 100644
--- a/quality_control/wizard/__init__.py
+++ b/quality_control/wizard/__init__.py
@@ -2,3 +2,4 @@
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from . import quality_check_wizard
+from . import import_complex_model
diff --git a/quality_control/wizard/import_complex_model.py b/quality_control/wizard/import_complex_model.py
new file mode 100644
index 00000000..e53e7bcd
--- /dev/null
+++ b/quality_control/wizard/import_complex_model.py
@@ -0,0 +1,418 @@
+# -*- coding: utf-8 -*-
+import base64
+import logging
+import os
+import traceback
+from datetime import datetime
+from io import BytesIO
+from openpyxl import load_workbook
+import pandas as pd
+import xlrd
+
+from odoo import fields, models, api, Command, _
+# from odoo.exceptions import ValidationError
+from odoo.exceptions import UserError
+from datetime import datetime, timedelta
+
+from odoo.http import request
+
+_logger = logging.getLogger(__name__)
+
+
+class ImportComplexModelWizard(models.TransientModel):
+ _name = 'quality.check.import.complex.model.wizard'
+ file_data = fields.Binary("数据文件")
+ model_name = fields.Char(string='Model Name')
+ field_basis = fields.Char(string='Field Basis')
+
+ def get_model_column_name_labels(self, model):
+ fields_info = model.fields_get()
+ # 提取字段名称和展示名称
+ field_labels = {field_info.get('string'): field_info for field_name, field_info in fields_info.items()}
+
+ return field_labels
+
+ def field_name_mapping(selfcolumn, column, field_data):
+ return {}
+
+ @api.model
+ def page_to_field(self, field_data):
+ return {}
+
+ def count_continuous_none(self, df):
+ # 用于存储间断的连续空值的区间
+ none_intervals = []
+ # if pd.isna(val):
+ # 遍历数据并查找连续的 None
+ start = 0
+ num = 0
+ for index, row in df.iterrows():
+ if pd.isna(row[0]):
+ continue
+ else:
+ # 记录区间
+ if num == 0:
+ start = index
+ num = 1
+ else:
+ end = index
+ none_intervals.append({'start': start, 'end': index})
+ start = end
+ # start = None # 重置
+
+ # 检查最后一个区间
+ if len(df) - start >= 1:
+ none_intervals.append({'start': start, 'end': len(df)})
+
+ return none_intervals
+
+ def find_repeated_ranges(self, data):
+ # 判断内联列表范围,有column列的为内联列表字段
+ if not data:
+ return []
+
+ repeats = []
+ start_index = 0
+ current_value = data[0]
+
+ for i in range(1, len(data)):
+ if data[i] == current_value:
+ continue
+ else:
+ if i - start_index > 1: # 有重复
+ repeats.append({'start': start_index, 'end': i, 'column': data[i - 1]})
+ # repeats.append((current_value, start_index, i - 1))
+ current_value = data[i]
+ start_index = i
+
+ # 检查最后一段
+ if len(data) - start_index > 1:
+ repeats.append({'start': start_index, 'end': i, 'column': data[i - 1]})
+ # repeats.append((current_value, start_index, len(data) - 1))
+
+ return repeats
+
+ def import_data(self):
+ """导入Excel数据"""
+ if not self.file_data:
+ raise UserError(_('请先上传Excel文件'))
+
+ # 解码文件数据
+ file_content = base64.b64decode(self.file_data)
+
+ try:
+ # 使用xlrd读取Excel文件
+ workbook = xlrd.open_workbook(file_contents=file_content)
+ sheet = workbook.sheet_by_index(0)
+
+ # 检查表头是否符合预期(忽略星号)
+ expected_headers = ['产品名称', '图号', '检测项目', '测量值1', '测量值2', '测量值3', '测量值4', '测量值5', '判定', '备注']
+ actual_headers = sheet.row_values(0)
+
+ # 处理合并单元格的情况
+ # 获取所有合并单元格
+ merged_cells = []
+ for crange in sheet.merged_cells:
+ rlo, rhi, clo, chi = crange
+ if rlo == 0: # 只关注第一行(表头)的合并单元格
+ merged_cells.append((clo, chi))
+
+ # 清理表头(移除星号和处理空值)
+ actual_headers_clean = []
+ for i, header in enumerate(actual_headers):
+ # 检查当前列是否在合并单元格范围内但不是合并单元格的起始列
+ is_merged_not_first = any(clo < i < chi for clo, chi in merged_cells)
+
+ if is_merged_not_first:
+ # 如果是合并单元格的非起始列,跳过
+ continue
+
+ # 处理表头文本
+ if isinstance(header, str):
+ header = header.replace('*', '').strip()
+
+ if header: # 只添加非空表头
+ actual_headers_clean.append(header)
+
+ # 检查表头数量
+ if len(actual_headers_clean) < len(expected_headers):
+ raise UserError(_('表头列数不足,请使用正确的模板文件'))
+
+ # 检查表头顺序(忽略星号)
+ for i, header in enumerate(expected_headers):
+ if i >= len(actual_headers_clean) or header != actual_headers_clean[i]:
+ actual_header = actual_headers_clean[i] if i < len(actual_headers_clean) else "缺失"
+ raise UserError(_('表头顺序不正确,第%s列应为"%s",但实际为"%s"') %
+ (i + 1, header, actual_header))
+
+ # 获取当前活动的quality.check记录
+ active_id = self.env.context.get('active_id')
+ quality_check = self.env['quality.check'].browse(active_id)
+
+ if not quality_check:
+ raise UserError(_('未找到相关的质检记录'))
+
+ # 记录是否有有效数据被导入
+ valid_data_imported = False
+
+ # 从第二行开始读取数据(跳过表头)
+ for row_index in range(1, sheet.nrows):
+ row = sheet.row_values(row_index)
+
+ # 检查行是否有数据
+ if not any(row):
+ continue
+
+ # 创建quality.check.measure.line记录
+ measure_line_vals = {
+ 'check_id': quality_check.id,
+ 'sequence': len(quality_check.measure_line_ids) + 1,
+ 'product_name': str(row[0]) if row[0] else '', # 产品名称列
+ 'drawing_no': str(row[1]) if row[1] else '', # 图号列
+ 'measure_item': row[2] or '', # 检测项目列
+ 'measure_value1': str(row[4]) if row[4] else '', # 测量值1
+ 'measure_value2': str(row[5]) if row[5] else '', # 测量值2
+ 'measure_value3': str(row[6]) if len(row) > 6 and row[6] else '', # 测量值3
+ 'measure_value4': str(row[7]) if len(row) > 7 and row[7] else '', # 测量值4
+ 'measure_value5': str(row[8]) if len(row) > 8 and row[8] else '', # 测量值5
+ 'measure_result': 'NG' if row[9] == 'NG' else 'OK', # 判定列
+ 'remark': row[10] if len(row) > 10 and row[10] else '', # 备注列
+ }
+
+ self.env['quality.check.measure.line'].create(measure_line_vals)
+ valid_data_imported = True
+
+ # 检查是否有有效数据被导入
+ if not valid_data_imported:
+ raise UserError(_('表格中没有有效数据行可导入,请检查表格内容'))
+
+ # 返回关闭弹窗的动作
+ return {
+ 'type': 'ir.actions.act_window_close'
+ }
+
+ except Exception as e:
+ _logger.error("导入Excel数据时出错: %s", str(e))
+ _logger.error(traceback.format_exc())
+ raise UserError(_('导入失败: %s') % str(e))
+
+ def process_first_line(self, df_columns, column_labels, field_data):
+ columns = []
+ last_column_name = None
+ for index, column in enumerate(df_columns):
+ if not column_labels.get(column):
+ if 'Unnamed' in column:
+ columns.append(last_column_name)
+ else:
+ field_name_map = self.page_to_field(field_data)
+ field_name = field_name_map.get(column)
+ if field_name:
+ columns.append(field_name)
+ last_column_name = field_name
+ else:
+ columns.append(column)
+ last_column_name = column
+ else:
+ columns.append(column)
+ last_column_name = column
+ return columns
+
+ def process_inline_list_column(self, columns, first_row, repeat_list):
+ for index, repeat_map in enumerate(repeat_list):
+ start = repeat_map.get('start')
+ end = int(repeat_map.get('end'))
+ if len(repeat_list) - 1 == index:
+ end = end + 1
+ for i in range(start, end):
+ field_name = columns[i]
+ embedded_fields = first_row[i]
+ if pd.isna(embedded_fields):
+ columns[i] = field_name
+ else:
+ columns[i] = field_name + '?' + embedded_fields
+
+ def process_data_list(self, model, data_list, column_labels, sheet):
+ try:
+ for index, data in enumerate(data_list):
+ # 转换每行数据到模型data = {dict: 11} {'刀具物料': '刀片', '刀尖特征': '刀尖倒角', '刀片形状': '五边形', '刀片物料参数': [{'刀片物料参数?内接圆直径IC/D(mm)': 23, '刀片物料参数?刀尖圆弧半径RE(mm)': 2, '刀片物料参数?刀片牙型': '石油管螺纹刀片', '刀片物料参数?切削刃长(mm)': 3, '刀片物料参数?厚度(mm)': 12, '刀片物料参数?后角(mm)': 4, '刀片物料参数?安装孔直径D1(mm)': 2, '刀片物料参数?有无断屑槽': '无', '刀片物料参数?物…视图
+ self.import_model_record(model, data, column_labels, sheet)
+ except Exception as e:
+ traceback_error = traceback.format_exc()
+ logging.error('批量导入失败sheet %s' % sheet)
+ logging.error('批量导入失败 : %s' % traceback_error)
+ raise UserError(e)
+
+ @api.model
+ def process_model_record_data(self, model, data, column_labels, sheet):
+ pass
+
+ def import_model_record(self, model, data, column_labels, sheet):
+ self.process_model_record_data(model, data, column_labels, sheet)
+ model_data = self.convert_column_name(data, column_labels)
+ logging.info('批量导入模型{} 数据: {}'.format(model, model_data))
+ new_model = model.create(model_data)
+ self.model_subsequent_processing(new_model, data)
+
+ @api.model
+ def model_subsequent_processing(self, model, data):
+ pass
+
+ def convert_column_name(self, data, column_labels):
+ tmp_map = {}
+ for key, value in data.items():
+ if not column_labels.get(key):
+ continue
+ if key == "执行标准":
+ print('fqwioiqwfo ', value, column_labels)
+ field_info = column_labels.get(key)
+ tmp_map[field_info.get('name')] = self.process_field_data(value, field_info)
+ return tmp_map
+
+ def process_field_data(self, value, field_info):
+ relation_field_types = ['many2one', 'one2many', 'many2many']
+ field_type = field_info.get('type')
+ if field_type not in relation_field_types:
+ return value
+ if field_type == 'Boolean':
+ if value == '是' or value == '有':
+ return True
+ else:
+ return False
+ if field_type == 'many2one':
+ relation_info = self.env[field_info.get('relation')].sudo().search([('name', '=', value)], limit=1)
+ if relation_info:
+ return int(relation_info)
+
+ if isinstance(value, list):
+ return self.process_basic_data_list(value, field_info)
+ else:
+ relation_info = self.env[field_info.get('relation')].sudo().search([('name', '=', value)], limit=1)
+ if relation_info:
+ return [Command.link(int(relation_info))]
+
+ def process_basic_data_list(self, value, field_info):
+ if self.is_basic_data_list(value):
+ return [
+ Command.link(
+ int(self.env[field_info.get('relation')].sudo().search([('name', '=', element)], limit=1)))
+ for element in value
+ ]
+ else:
+ association_column_labels = self.get_model_column_name_labels(
+ self.env[field_info.get('relation')].sudo())
+ data_list = [
+ {column_name.split('?')[1]: column_value
+ for column_name, column_value in association_column_data.items()
+ if
+ len(column_name.split('?')) == 2 and association_column_labels.get(column_name.split('?')[1])}
+ for association_column_data in value
+ ]
+ data_list = [self.convert_column_name(element, association_column_labels) for element in data_list]
+ return [
+ Command.create(
+ column_map
+ ) for column_map in data_list
+ ]
+
+ def get_remaining_ranges(self, ranges, full_list_length):
+ # 创建一个集合用于存储被覆盖的索引
+ covered_indices = set()
+
+ # 遍历范围集合,标记覆盖的索引
+ for r in ranges:
+ start = r['start']
+ end = r['end']
+ # 将该范围内的索引加入集合
+ covered_indices.update(range(start, end + 1))
+
+ # 计算未覆盖的范围
+ remaining_ranges = []
+ start = None
+
+ for i in range(full_list_length):
+ if i not in covered_indices:
+ if start is None:
+ start = i # 开始新的未覆盖范围
+ else:
+ if start is not None:
+ # 记录当前未覆盖范围
+ remaining_ranges.append({'start': start, 'end': i - 1})
+ start = None # 重置
+
+ # 处理最后一个范围
+ if start is not None:
+ remaining_ranges.append({'start': start, 'end': full_list_length - 1})
+
+ return remaining_ranges
+
+ def is_basic_data_list(self, lst):
+ basic_types = (int, float, str, bool)
+ lst_len = len(lst)
+ if lst_len < 1:
+ return False
+ if isinstance(lst[0], basic_types):
+ return True
+ return False
+
+ def index_retrieve_data(self, df, range_map, i, data_map):
+ for remaining_map in range_map:
+ relation_column_data_map = {}
+ for j in range(remaining_map.get('start'), int(remaining_map.get('end')) + 1):
+ value = df.iat[i, j]
+ if pd.isna(value):
+ continue
+ if remaining_map.get('column'):
+ relation_column_data_map.update({df.columns[j]: value})
+ else:
+ if not data_map.get(df.columns[j]):
+ data_map.update({df.columns[j]: value})
+ elif isinstance(data_map.get(df.columns[j]), list):
+ data_map.get(df.columns[j]).append(value)
+ else:
+ lst = [data_map.get(df.columns[j]), value]
+ data_map.update({df.columns[j]: lst})
+ if relation_column_data_map and len(relation_column_data_map) > 0:
+ data_map.setdefault(remaining_map.get('column'), []).append(relation_column_data_map)
+
+ def parse_excel_data_matrix(self, df, repeat_list):
+ row_interval_list = self.count_continuous_none(df)
+ remaining_ranges = self.get_remaining_ranges(repeat_list, len(df.columns))
+ data_list = []
+ for row_interval_map in row_interval_list:
+ data_map = {}
+ for index in range(row_interval_map.get('start'), int(row_interval_map.get('end'))):
+ if index == 0:
+ self.index_retrieve_data(df, remaining_ranges, index, data_map)
+ else:
+ self.index_retrieve_data(df, remaining_ranges, index, data_map)
+ self.index_retrieve_data(df, repeat_list, index, data_map)
+ if len(data_map) > 0:
+ data_list.append(data_map)
+ return data_list
+
+ def saadqw(self):
+
+ excel_template = self.env['excel.template'].sudo().search([('model_id.model', '=', self.model_name)], limit=1)
+ file_content = base64.b64decode(excel_template.file_data)
+ return {
+ 'type': 'ir.actions.act_url',
+ 'url': 'data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{file_content}',
+ 'target': 'self',
+ 'download': excel_template.file_name,
+ }
+ # return request.make_response(
+ # file_content,
+ # headers=[
+ # ('Content-Disposition', f'attachment; filename="{excel_template.file_name}"'),
+ # ('Content-Type', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'),
+ # ]
+ # )
+
+ def download_excel_template(self):
+ excel_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') + '/quality_control/static/src/binary/出厂检验报告上传模版.xlsx'
+ value = dict(
+ type='ir.actions.act_url',
+ target='self',
+ url=excel_url,
+ )
+ return value
diff --git a/quality_control/wizard/import_complex_model.xml b/quality_control/wizard/import_complex_model.xml
new file mode 100644
index 00000000..db7ea5d9
--- /dev/null
+++ b/quality_control/wizard/import_complex_model.xml
@@ -0,0 +1,33 @@
+
+
+
+ 请导入数据文件
+ quality.check.import.complex.model.wizard
+
+
+
+
+
+ 导入模型数据
+ ir.actions.act_window
+
+ quality.check.import.complex.model.wizard
+ form
+
+ new
+
+
+
+
+
+
+
\ No newline at end of file