# -*- coding: utf-8 -*- import base64 import logging import os import traceback from datetime import datetime from io import BytesIO from openpyxl import load_workbook import pandas as pd import xlrd from odoo import fields, models, api, Command, _ # from odoo.exceptions import ValidationError from odoo.exceptions import UserError from datetime import datetime, timedelta from odoo.http import request _logger = logging.getLogger(__name__) class ImportComplexModelWizard(models.TransientModel): _name = 'quality.check.import.complex.model.wizard' file_data = fields.Binary("数据文件") model_name = fields.Char(string='Model Name') field_basis = fields.Char(string='Field Basis') check_id = fields.Many2one(string='质检单', comodel_name='quality.check') def get_model_column_name_labels(self, model): fields_info = model.fields_get() # 提取字段名称和展示名称 field_labels = {field_info.get('string'): field_info for field_name, field_info in fields_info.items()} return field_labels def field_name_mapping(selfcolumn, column, field_data): return {} @api.model def page_to_field(self, field_data): return {} def count_continuous_none(self, df): # 用于存储间断的连续空值的区间 none_intervals = [] # if pd.isna(val): # 遍历数据并查找连续的 None start = 0 num = 0 for index, row in df.iterrows(): if pd.isna(row[0]): continue else: # 记录区间 if num == 0: start = index num = 1 else: end = index none_intervals.append({'start': start, 'end': index}) start = end # start = None # 重置 # 检查最后一个区间 if len(df) - start >= 1: none_intervals.append({'start': start, 'end': len(df)}) return none_intervals def find_repeated_ranges(self, data): # 判断内联列表范围,有column列的为内联列表字段 if not data: return [] repeats = [] start_index = 0 current_value = data[0] for i in range(1, len(data)): if data[i] == current_value: continue else: if i - start_index > 1: # 有重复 repeats.append({'start': start_index, 'end': i, 'column': data[i - 1]}) # repeats.append((current_value, start_index, i - 1)) current_value = data[i] start_index = i # 检查最后一段 if len(data) - start_index > 1: repeats.append({'start': start_index, 'end': i, 'column': data[i - 1]}) # repeats.append((current_value, start_index, len(data) - 1)) return repeats def import_data(self): """导入Excel数据""" if not self.file_data: raise UserError(_('请先上传Excel文件')) if self.check_id.measure_line_ids: self.sudo().check_id.measure_line_ids.unlink() # 解码文件数据 file_content = base64.b64decode(self.file_data) try: # 使用xlrd读取Excel文件 workbook = xlrd.open_workbook(file_contents=file_content) sheet = workbook.sheet_by_index(0) # 检查表头是否符合预期(忽略星号) expected_headers = ['产品名称', '图号', '检测项目', '测量值1', '测量值2', '测量值3', '测量值4', '测量值5', '判定', '备注'] actual_headers = sheet.row_values(0) # 处理合并单元格的情况 # 获取所有合并单元格 merged_cells = [] for crange in sheet.merged_cells: rlo, rhi, clo, chi = crange if rlo == 0: # 只关注第一行(表头)的合并单元格 merged_cells.append((clo, chi)) # 清理表头(移除星号和处理空值) actual_headers_clean = [] for i, header in enumerate(actual_headers): # 检查当前列是否在合并单元格范围内但不是合并单元格的起始列 is_merged_not_first = any(clo < i < chi for clo, chi in merged_cells) if is_merged_not_first: # 如果是合并单元格的非起始列,跳过 continue # 处理表头文本 if isinstance(header, str): header = header.replace('*', '').strip() if header: # 只添加非空表头 actual_headers_clean.append(header) # 检查表头数量 if len(actual_headers_clean) < len(expected_headers): raise UserError(_('表头列数不足,请使用正确的模板文件')) # 检查表头顺序(忽略星号) for i, header in enumerate(expected_headers): if i >= len(actual_headers_clean) or header != actual_headers_clean[i]: actual_header = actual_headers_clean[i] if i < len(actual_headers_clean) else "缺失" raise UserError(_('表头顺序不正确,第%s列应为"%s",但实际为"%s"') % (i + 1, header, actual_header)) # 获取当前活动的quality.check记录 active_id = self.env.context.get('active_id') quality_check = self.env['quality.check'].browse(active_id) if not quality_check: raise UserError(_('未找到相关的质检记录')) # 记录是否有有效数据被导入 valid_data_imported = False # 从第二行开始读取数据(跳过表头) max_columns = 1 for row_index in range(1, sheet.nrows): row = sheet.row_values(row_index) # 检查行是否有数据 if not any(row): continue if row[2] == '': continue if row[1] != quality_check.part_number: print(sheet.row_values(row_index)) raise UserError(_('上传内容图号错误,请修改')) for row_index in range(1, sheet.nrows): row = sheet.row_values(row_index) # 检查行是否有数据 if not any(row): continue if row[2] == '': continue # 创建quality.check.measure.line记录 measure_line_vals = { 'check_id': quality_check.id, 'sequence': len(quality_check.measure_line_ids) + 1, 'product_name': str(row[0]) if row[0] else '', # 产品名称列 'drawing_no': str(row[1]) if row[1] else '', # 图号列 'measure_item': row[2] or '', # 检测项目列 'measure_value1': str(row[4]) if row[4] else '', # 测量值1 'measure_value2': str(row[5]) if row[5] else '', # 测量值2 'measure_value3': str(row[6]) if len(row) > 6 and row[6] else '', # 测量值3 'measure_value4': str(row[7]) if len(row) > 7 and row[7] else '', # 测量值4 'measure_value5': str(row[8]) if len(row) > 8 and row[8] else '', # 测量值5 'measure_result': 'NG' if row[9] == 'NG' else 'OK', # 判定列 'remark': row[10] if len(row) > 10 and row[10] else '', # 备注列 } for i in range(1, 6): if measure_line_vals.get(f'measure_value{i}'): if i > max_columns: max_columns = i self.env['quality.check.measure.line'].create(measure_line_vals) valid_data_imported = True quality_check.column_nums = max_columns # 检查是否有有效数据被导入 if not valid_data_imported: raise UserError(_('表格中没有有效数据行可导入,请检查表格内容')) # 返回关闭弹窗的动作 return { 'type': 'ir.actions.act_window_close' } except Exception as e: _logger.error("导入Excel数据时出错: %s", str(e)) _logger.error(traceback.format_exc()) raise UserError(_('导入失败: %s') % str(e)) def process_first_line(self, df_columns, column_labels, field_data): columns = [] last_column_name = None for index, column in enumerate(df_columns): if not column_labels.get(column): if 'Unnamed' in column: columns.append(last_column_name) else: field_name_map = self.page_to_field(field_data) field_name = field_name_map.get(column) if field_name: columns.append(field_name) last_column_name = field_name else: columns.append(column) last_column_name = column else: columns.append(column) last_column_name = column return columns def process_inline_list_column(self, columns, first_row, repeat_list): for index, repeat_map in enumerate(repeat_list): start = repeat_map.get('start') end = int(repeat_map.get('end')) if len(repeat_list) - 1 == index: end = end + 1 for i in range(start, end): field_name = columns[i] embedded_fields = first_row[i] if pd.isna(embedded_fields): columns[i] = field_name else: columns[i] = field_name + '?' + embedded_fields def process_data_list(self, model, data_list, column_labels, sheet): try: for index, data in enumerate(data_list): # 转换每行数据到模型data = {dict: 11} {'刀具物料': '刀片', '刀尖特征': '刀尖倒角', '刀片形状': '五边形', '刀片物料参数': [{'刀片物料参数?内接圆直径IC/D(mm)': 23, '刀片物料参数?刀尖圆弧半径RE(mm)': 2, '刀片物料参数?刀片牙型': '石油管螺纹刀片', '刀片物料参数?切削刃长(mm)': 3, '刀片物料参数?厚度(mm)': 12, '刀片物料参数?后角(mm)': 4, '刀片物料参数?安装孔直径D1(mm)': 2, '刀片物料参数?有无断屑槽': '无', '刀片物料参数?物…视图 self.import_model_record(model, data, column_labels, sheet) except Exception as e: traceback_error = traceback.format_exc() logging.error('批量导入失败sheet %s' % sheet) logging.error('批量导入失败 : %s' % traceback_error) raise UserError(e) @api.model def process_model_record_data(self, model, data, column_labels, sheet): pass def import_model_record(self, model, data, column_labels, sheet): self.process_model_record_data(model, data, column_labels, sheet) model_data = self.convert_column_name(data, column_labels) logging.info('批量导入模型{} 数据: {}'.format(model, model_data)) new_model = model.create(model_data) self.model_subsequent_processing(new_model, data) @api.model def model_subsequent_processing(self, model, data): pass def convert_column_name(self, data, column_labels): tmp_map = {} for key, value in data.items(): if not column_labels.get(key): continue if key == "执行标准": print('fqwioiqwfo ', value, column_labels) field_info = column_labels.get(key) tmp_map[field_info.get('name')] = self.process_field_data(value, field_info) return tmp_map def process_field_data(self, value, field_info): relation_field_types = ['many2one', 'one2many', 'many2many'] field_type = field_info.get('type') if field_type not in relation_field_types: return value if field_type == 'Boolean': if value == '是' or value == '有': return True else: return False if field_type == 'many2one': relation_info = self.env[field_info.get('relation')].sudo().search([('name', '=', value)], limit=1) if relation_info: return int(relation_info) if isinstance(value, list): return self.process_basic_data_list(value, field_info) else: relation_info = self.env[field_info.get('relation')].sudo().search([('name', '=', value)], limit=1) if relation_info: return [Command.link(int(relation_info))] def process_basic_data_list(self, value, field_info): if self.is_basic_data_list(value): return [ Command.link( int(self.env[field_info.get('relation')].sudo().search([('name', '=', element)], limit=1))) for element in value ] else: association_column_labels = self.get_model_column_name_labels( self.env[field_info.get('relation')].sudo()) data_list = [ {column_name.split('?')[1]: column_value for column_name, column_value in association_column_data.items() if len(column_name.split('?')) == 2 and association_column_labels.get(column_name.split('?')[1])} for association_column_data in value ] data_list = [self.convert_column_name(element, association_column_labels) for element in data_list] return [ Command.create( column_map ) for column_map in data_list ] def get_remaining_ranges(self, ranges, full_list_length): # 创建一个集合用于存储被覆盖的索引 covered_indices = set() # 遍历范围集合,标记覆盖的索引 for r in ranges: start = r['start'] end = r['end'] # 将该范围内的索引加入集合 covered_indices.update(range(start, end + 1)) # 计算未覆盖的范围 remaining_ranges = [] start = None for i in range(full_list_length): if i not in covered_indices: if start is None: start = i # 开始新的未覆盖范围 else: if start is not None: # 记录当前未覆盖范围 remaining_ranges.append({'start': start, 'end': i - 1}) start = None # 重置 # 处理最后一个范围 if start is not None: remaining_ranges.append({'start': start, 'end': full_list_length - 1}) return remaining_ranges def is_basic_data_list(self, lst): basic_types = (int, float, str, bool) lst_len = len(lst) if lst_len < 1: return False if isinstance(lst[0], basic_types): return True return False def index_retrieve_data(self, df, range_map, i, data_map): for remaining_map in range_map: relation_column_data_map = {} for j in range(remaining_map.get('start'), int(remaining_map.get('end')) + 1): value = df.iat[i, j] if pd.isna(value): continue if remaining_map.get('column'): relation_column_data_map.update({df.columns[j]: value}) else: if not data_map.get(df.columns[j]): data_map.update({df.columns[j]: value}) elif isinstance(data_map.get(df.columns[j]), list): data_map.get(df.columns[j]).append(value) else: lst = [data_map.get(df.columns[j]), value] data_map.update({df.columns[j]: lst}) if relation_column_data_map and len(relation_column_data_map) > 0: data_map.setdefault(remaining_map.get('column'), []).append(relation_column_data_map) def parse_excel_data_matrix(self, df, repeat_list): row_interval_list = self.count_continuous_none(df) remaining_ranges = self.get_remaining_ranges(repeat_list, len(df.columns)) data_list = [] for row_interval_map in row_interval_list: data_map = {} for index in range(row_interval_map.get('start'), int(row_interval_map.get('end'))): if index == 0: self.index_retrieve_data(df, remaining_ranges, index, data_map) else: self.index_retrieve_data(df, remaining_ranges, index, data_map) self.index_retrieve_data(df, repeat_list, index, data_map) if len(data_map) > 0: data_list.append(data_map) return data_list def saadqw(self): excel_template = self.env['excel.template'].sudo().search([('model_id.model', '=', self.model_name)], limit=1) file_content = base64.b64decode(excel_template.file_data) return { 'type': 'ir.actions.act_url', 'url': 'data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{file_content}', 'target': 'self', 'download': excel_template.file_name, } # return request.make_response( # file_content, # headers=[ # ('Content-Disposition', f'attachment; filename="{excel_template.file_name}"'), # ('Content-Type', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'), # ] # ) def download_excel_template(self): base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') + '/quality_control/static/src/binary/出厂检验报告上传模版.xlsx' # 只有当原始 URL 使用 http 时才替换为 https if base_url.startswith("http://"): excel_url = base_url.replace("http://", "https://") else: excel_url = base_url value = dict( type='ir.actions.act_url', target='self', url=excel_url, ) return value