Files
test/quality_control/wizard/import_complex_model.py

440 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import logging
import os
import traceback
from datetime import datetime
from io import BytesIO
from openpyxl import load_workbook
import pandas as pd
import xlrd
from odoo import fields, models, api, Command, _
# from odoo.exceptions import ValidationError
from odoo.exceptions import UserError
from datetime import datetime, timedelta
from odoo.http import request
_logger = logging.getLogger(__name__)
class ImportComplexModelWizard(models.TransientModel):
_name = 'quality.check.import.complex.model.wizard'
file_data = fields.Binary("数据文件")
model_name = fields.Char(string='Model Name')
field_basis = fields.Char(string='Field Basis')
check_id = fields.Many2one(string='质检单', comodel_name='quality.check')
def get_model_column_name_labels(self, model):
fields_info = model.fields_get()
# 提取字段名称和展示名称
field_labels = {field_info.get('string'): field_info for field_name, field_info in fields_info.items()}
return field_labels
def field_name_mapping(selfcolumn, column, field_data):
return {}
@api.model
def page_to_field(self, field_data):
return {}
def count_continuous_none(self, df):
# 用于存储间断的连续空值的区间
none_intervals = []
# if pd.isna(val):
# 遍历数据并查找连续的 None
start = 0
num = 0
for index, row in df.iterrows():
if pd.isna(row[0]):
continue
else:
# 记录区间
if num == 0:
start = index
num = 1
else:
end = index
none_intervals.append({'start': start, 'end': index})
start = end
# start = None # 重置
# 检查最后一个区间
if len(df) - start >= 1:
none_intervals.append({'start': start, 'end': len(df)})
return none_intervals
def find_repeated_ranges(self, data):
# 判断内联列表范围有column列的为内联列表字段
if not data:
return []
repeats = []
start_index = 0
current_value = data[0]
for i in range(1, len(data)):
if data[i] == current_value:
continue
else:
if i - start_index > 1: # 有重复
repeats.append({'start': start_index, 'end': i, 'column': data[i - 1]})
# repeats.append((current_value, start_index, i - 1))
current_value = data[i]
start_index = i
# 检查最后一段
if len(data) - start_index > 1:
repeats.append({'start': start_index, 'end': i, 'column': data[i - 1]})
# repeats.append((current_value, start_index, len(data) - 1))
return repeats
def import_data(self):
"""导入Excel数据"""
if not self.file_data:
raise UserError(_('请先上传Excel文件'))
if self.check_id.measure_line_ids:
self.sudo().check_id.measure_line_ids.unlink()
# 解码文件数据
file_content = base64.b64decode(self.file_data)
try:
# 使用xlrd读取Excel文件
workbook = xlrd.open_workbook(file_contents=file_content)
sheet = workbook.sheet_by_index(0)
# 检查表头是否符合预期(忽略星号)
expected_headers = ['产品名称', '图号', '检测项目', '测量值1', '测量值2', '测量值3', '测量值4', '测量值5', '判定', '备注']
actual_headers = sheet.row_values(0)
# 处理合并单元格的情况
# 获取所有合并单元格
merged_cells = []
for crange in sheet.merged_cells:
rlo, rhi, clo, chi = crange
if rlo == 0: # 只关注第一行(表头)的合并单元格
merged_cells.append((clo, chi))
# 清理表头(移除星号和处理空值)
actual_headers_clean = []
for i, header in enumerate(actual_headers):
# 检查当前列是否在合并单元格范围内但不是合并单元格的起始列
is_merged_not_first = any(clo < i < chi for clo, chi in merged_cells)
if is_merged_not_first:
# 如果是合并单元格的非起始列,跳过
continue
# 处理表头文本
if isinstance(header, str):
header = header.replace('*', '').strip()
if header: # 只添加非空表头
actual_headers_clean.append(header)
# 检查表头数量
if len(actual_headers_clean) < len(expected_headers):
raise UserError(_('表头列数不足,请使用正确的模板文件'))
# 检查表头顺序(忽略星号)
for i, header in enumerate(expected_headers):
if i >= len(actual_headers_clean) or header != actual_headers_clean[i]:
actual_header = actual_headers_clean[i] if i < len(actual_headers_clean) else "缺失"
raise UserError(_('表头顺序不正确,第%s列应为"%s",但实际为"%s"') %
(i + 1, header, actual_header))
# 获取当前活动的quality.check记录
active_id = self.env.context.get('active_id')
quality_check = self.env['quality.check'].browse(active_id)
if not quality_check:
raise UserError(_('未找到相关的质检记录'))
# 记录是否有有效数据被导入
valid_data_imported = False
# 从第二行开始读取数据(跳过表头)
max_columns = 1
for row_index in range(1, sheet.nrows):
row = sheet.row_values(row_index)
# 检查行是否有数据
if not any(row):
continue
if row[2] == '':
continue
# 创建quality.check.measure.line记录
measure_line_vals = {
'check_id': quality_check.id,
'sequence': len(quality_check.measure_line_ids) + 1,
'product_name': str(row[0]) if row[0] else '', # 产品名称列
'drawing_no': str(row[1]) if row[1] else '', # 图号列
'measure_item': row[2] or '', # 检测项目列
'measure_value1': str(row[4]) if row[4] else '', # 测量值1
'measure_value2': str(row[5]) if row[5] else '', # 测量值2
'measure_value3': str(row[6]) if len(row) > 6 and row[6] else '', # 测量值3
'measure_value4': str(row[7]) if len(row) > 7 and row[7] else '', # 测量值4
'measure_value5': str(row[8]) if len(row) > 8 and row[8] else '', # 测量值5
'measure_result': 'NG' if row[9] == 'NG' else 'OK', # 判定列
'remark': row[10] if len(row) > 10 and row[10] else '', # 备注列
}
for i in range(1, 6):
if measure_line_vals.get(f'measure_value{i}'):
if i > max_columns:
max_columns = i
self.env['quality.check.measure.line'].create(measure_line_vals)
valid_data_imported = True
quality_check.column_nums = max_columns
# 检查是否有有效数据被导入
if not valid_data_imported:
raise UserError(_('表格中没有有效数据行可导入,请检查表格内容'))
# 返回关闭弹窗的动作
return {
'type': 'ir.actions.act_window_close'
}
except Exception as e:
_logger.error("导入Excel数据时出错: %s", str(e))
_logger.error(traceback.format_exc())
raise UserError(_('导入失败: %s') % str(e))
def process_first_line(self, df_columns, column_labels, field_data):
columns = []
last_column_name = None
for index, column in enumerate(df_columns):
if not column_labels.get(column):
if 'Unnamed' in column:
columns.append(last_column_name)
else:
field_name_map = self.page_to_field(field_data)
field_name = field_name_map.get(column)
if field_name:
columns.append(field_name)
last_column_name = field_name
else:
columns.append(column)
last_column_name = column
else:
columns.append(column)
last_column_name = column
return columns
def process_inline_list_column(self, columns, first_row, repeat_list):
for index, repeat_map in enumerate(repeat_list):
start = repeat_map.get('start')
end = int(repeat_map.get('end'))
if len(repeat_list) - 1 == index:
end = end + 1
for i in range(start, end):
field_name = columns[i]
embedded_fields = first_row[i]
if pd.isna(embedded_fields):
columns[i] = field_name
else:
columns[i] = field_name + '?' + embedded_fields
def process_data_list(self, model, data_list, column_labels, sheet):
try:
for index, data in enumerate(data_list):
# 转换每行数据到模型data = {dict: 11} {'刀具物料': '刀片', '刀尖特征': '刀尖倒角', '刀片形状': '五边形', '刀片物料参数': [{'刀片物料参数?内接圆直径IC/D(mm)': 23, '刀片物料参数?刀尖圆弧半径RE(mm)': 2, '刀片物料参数?刀片牙型': '石油管螺纹刀片', '刀片物料参数?切削刃长(mm)': 3, '刀片物料参数?厚度(mm)': 12, '刀片物料参数?后角(mm)': 4, '刀片物料参数?安装孔直径D1(mm)': 2, '刀片物料参数?有无断屑槽': '无', '刀片物料参数?物…视图
self.import_model_record(model, data, column_labels, sheet)
except Exception as e:
traceback_error = traceback.format_exc()
logging.error('批量导入失败sheet %s' % sheet)
logging.error('批量导入失败 : %s' % traceback_error)
raise UserError(e)
@api.model
def process_model_record_data(self, model, data, column_labels, sheet):
pass
def import_model_record(self, model, data, column_labels, sheet):
self.process_model_record_data(model, data, column_labels, sheet)
model_data = self.convert_column_name(data, column_labels)
logging.info('批量导入模型{} 数据: {}'.format(model, model_data))
new_model = model.create(model_data)
self.model_subsequent_processing(new_model, data)
@api.model
def model_subsequent_processing(self, model, data):
pass
def convert_column_name(self, data, column_labels):
tmp_map = {}
for key, value in data.items():
if not column_labels.get(key):
continue
if key == "执行标准":
print('fqwioiqwfo ', value, column_labels)
field_info = column_labels.get(key)
tmp_map[field_info.get('name')] = self.process_field_data(value, field_info)
return tmp_map
def process_field_data(self, value, field_info):
relation_field_types = ['many2one', 'one2many', 'many2many']
field_type = field_info.get('type')
if field_type not in relation_field_types:
return value
if field_type == 'Boolean':
if value == '' or value == '':
return True
else:
return False
if field_type == 'many2one':
relation_info = self.env[field_info.get('relation')].sudo().search([('name', '=', value)], limit=1)
if relation_info:
return int(relation_info)
if isinstance(value, list):
return self.process_basic_data_list(value, field_info)
else:
relation_info = self.env[field_info.get('relation')].sudo().search([('name', '=', value)], limit=1)
if relation_info:
return [Command.link(int(relation_info))]
def process_basic_data_list(self, value, field_info):
if self.is_basic_data_list(value):
return [
Command.link(
int(self.env[field_info.get('relation')].sudo().search([('name', '=', element)], limit=1)))
for element in value
]
else:
association_column_labels = self.get_model_column_name_labels(
self.env[field_info.get('relation')].sudo())
data_list = [
{column_name.split('?')[1]: column_value
for column_name, column_value in association_column_data.items()
if
len(column_name.split('?')) == 2 and association_column_labels.get(column_name.split('?')[1])}
for association_column_data in value
]
data_list = [self.convert_column_name(element, association_column_labels) for element in data_list]
return [
Command.create(
column_map
) for column_map in data_list
]
def get_remaining_ranges(self, ranges, full_list_length):
# 创建一个集合用于存储被覆盖的索引
covered_indices = set()
# 遍历范围集合,标记覆盖的索引
for r in ranges:
start = r['start']
end = r['end']
# 将该范围内的索引加入集合
covered_indices.update(range(start, end + 1))
# 计算未覆盖的范围
remaining_ranges = []
start = None
for i in range(full_list_length):
if i not in covered_indices:
if start is None:
start = i # 开始新的未覆盖范围
else:
if start is not None:
# 记录当前未覆盖范围
remaining_ranges.append({'start': start, 'end': i - 1})
start = None # 重置
# 处理最后一个范围
if start is not None:
remaining_ranges.append({'start': start, 'end': full_list_length - 1})
return remaining_ranges
def is_basic_data_list(self, lst):
basic_types = (int, float, str, bool)
lst_len = len(lst)
if lst_len < 1:
return False
if isinstance(lst[0], basic_types):
return True
return False
def index_retrieve_data(self, df, range_map, i, data_map):
for remaining_map in range_map:
relation_column_data_map = {}
for j in range(remaining_map.get('start'), int(remaining_map.get('end')) + 1):
value = df.iat[i, j]
if pd.isna(value):
continue
if remaining_map.get('column'):
relation_column_data_map.update({df.columns[j]: value})
else:
if not data_map.get(df.columns[j]):
data_map.update({df.columns[j]: value})
elif isinstance(data_map.get(df.columns[j]), list):
data_map.get(df.columns[j]).append(value)
else:
lst = [data_map.get(df.columns[j]), value]
data_map.update({df.columns[j]: lst})
if relation_column_data_map and len(relation_column_data_map) > 0:
data_map.setdefault(remaining_map.get('column'), []).append(relation_column_data_map)
def parse_excel_data_matrix(self, df, repeat_list):
row_interval_list = self.count_continuous_none(df)
remaining_ranges = self.get_remaining_ranges(repeat_list, len(df.columns))
data_list = []
for row_interval_map in row_interval_list:
data_map = {}
for index in range(row_interval_map.get('start'), int(row_interval_map.get('end'))):
if index == 0:
self.index_retrieve_data(df, remaining_ranges, index, data_map)
else:
self.index_retrieve_data(df, remaining_ranges, index, data_map)
self.index_retrieve_data(df, repeat_list, index, data_map)
if len(data_map) > 0:
data_list.append(data_map)
return data_list
def saadqw(self):
excel_template = self.env['excel.template'].sudo().search([('model_id.model', '=', self.model_name)], limit=1)
file_content = base64.b64decode(excel_template.file_data)
return {
'type': 'ir.actions.act_url',
'url': 'data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{file_content}',
'target': 'self',
'download': excel_template.file_name,
}
# return request.make_response(
# file_content,
# headers=[
# ('Content-Disposition', f'attachment; filename="{excel_template.file_name}"'),
# ('Content-Type', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'),
# ]
# )
def download_excel_template(self):
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') + '/quality_control/static/src/binary/出厂检验报告上传模版.xlsx'
# 只有当原始 URL 使用 http 时才替换为 https
if base_url.startswith("http://"):
excel_url = base_url.replace("http://", "https://")
else:
excel_url = base_url
value = dict(
type='ir.actions.act_url',
target='self',
url=excel_url,
)
return value