# -*- coding: utf-8 -*- import re import ast import json import base64 import logging import psycopg2 from datetime import datetime, timedelta from odoo import http from odoo.http import request # 数据库连接配置 db_config = { "database": "timeseries_db", "user": "postgres", "password": "postgres", "port": "5432", "host": "172.16.10.98" } def convert_to_seconds(time_str): # 修改正则表达式,使 H、M、S 部分可选 pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?" match = re.match(pattern, time_str) if match: # 提取各时间单位,如果某个单位缺失则默认设为0 hours = int(match.group(1)) if match.group(1) else 0 minutes = int(match.group(2)) if match.group(2) else 0 seconds = int(match.group(3)) if match.group(3) else 0 # 计算总秒数 total_seconds = hours * 3600 + minutes * 60 + seconds if total_seconds == 0: # return None pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?" match = re.match(pattern, time_str) if match: # 提取各时间单位,如果某个单位缺失则默认设为0 hours = int(match.group(1)) if match.group(1) else 0 minutes = int(match.group(2)) if match.group(2) else 0 seconds = int(match.group(3)) if match.group(3) else 0 # 计算总秒数 total_seconds = hours * 3600 + minutes * 60 + seconds return total_seconds else: return None return total_seconds class Sf_Dashboard_Connect(http.Controller): @http.route('/api/get_machine_datas/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def get_machine_datas_list(self, **kw): """ 拿到机床数据返回给大屏展示 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': []} logging.info('前端请求机床数据的参数为:%s' % kw) # 获取当前时间的时间戳 current_timestamp = datetime.now().timestamp() print(current_timestamp) # tem_list = [ # "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14" # ] try: equipment_obj = request.env['maintenance.equipment'].sudo() # 获取请求的机床数据 machine_list = ast.literal_eval(kw['machine_list']) for item in machine_list: machine_data = equipment_obj.search([('code', '=', item)]) # 机床上线时间段 first_online_duration = current_timestamp - int(machine_data.first_online_time.timestamp()) power_off_time = None power_off_rate = None if machine_data.machine_power_on_time: power_off_time = first_online_duration - convert_to_seconds(machine_data.machine_power_on_time) power_off_rate = round((power_off_time / first_online_duration), 3) else: power_off_time = False power_off_rate = False if machine_data: res['data'].append({ 'active': machine_data.status, 'id': machine_data.id, 'name': machine_data.name, 'brand': machine_data.type_id.name, 'code': machine_data.code, 'status': machine_data.status, 'run_status': machine_data.run_status, 'run_time': machine_data.run_time, 'system_date': machine_data.system_date, 'system_time': machine_data.system_time, 'cut_time': machine_data.cut_time, 'cut_status': machine_data.cut_status, 'program': machine_data.program, 'program_name': machine_data.program_name, 'program_status': machine_data.program_status, 'tool_num': machine_data.tool_num, 'machine_power_on_time': machine_data.machine_power_on_time, 'product_counts': machine_data.product_counts, 'mode': machine_data.mode, 'start_time': machine_data.start_time, 'end_time': machine_data.end_time, 'program_start_time': machine_data.program_start_time, 'program_end_time': machine_data.program_end_time, 'standby_start_time': machine_data.standby_start_time, 'standby_end_time': machine_data.standby_end_time, 'offline_start_time': machine_data.offline_start_time, 'offline_end_time': machine_data.offline_end_time, 'emg_status': machine_data.emg_status, 'current_program': machine_data.current_program, 'current_program_seq': machine_data.current_program_seq, 'x_abs_pos': machine_data.x_abs_pos, 'y_abs_pos': machine_data.y_abs_pos, 'z_abs_pos': machine_data.z_abs_pos, 'feed_speed_set': machine_data.feed_speed_set, 'act_feed_speed': machine_data.act_feed_speed, 'spindle_speed_set': machine_data.spindle_speed_set, 'act_spindle_speed': machine_data.act_spindle_speed, 'spindle_load': machine_data.spindle_load, 'x_axis_load': machine_data.x_axis_load, 'y_axis_load': machine_data.y_axis_load, 'z_axis_load': machine_data.z_axis_load, 'rapid_feed': machine_data.rapid_feed, 'feed_rate': machine_data.feed_rate, 'x_mach_coord': machine_data.x_mach_coord, 'y_mach_coord': machine_data.y_mach_coord, 'z_mach_coord': machine_data.z_mach_coord, 'x_rel_coord': machine_data.x_rel_coord, 'y_rel_coord': machine_data.y_rel_coord, 'z_rel_coord': machine_data.z_rel_coord, 'x_dis_coord': machine_data.x_dis_coord, 'y_dis_coord': machine_data.y_dis_coord, 'z_dis_coord': machine_data.z_dis_coord, 'alarm_time': machine_data.alarm_time, 'alarm_msg': machine_data.alarm_msg, 'clear_time': machine_data.clear_time, # 计算出来的数据 # 开动率:运行时间/通电时间 'run_rate': machine_data.run_rate, # 关机时长:初次上线时间 - 通电时间 'power_off_time': power_off_time, # 关机率:关机时长/初次上线时间 'power_off_rate': power_off_rate, 'first_online_duration': first_online_duration, # 停机时间:关机时间 - 运行时间 # 停机时长:关机时间 - 初次上线时间 'img': f'data:image/png;base64,{machine_data.machine_tool_picture.decode("utf-8")}', 'equipment_type': machine_data.category_id.name, }) return json.dumps(res) except Exception as e: logging.info('前端请求机床数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求机床数据失败,原因:%s' % e return json.JSONEncoder().encode(res) @http.route('/api/logs/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def logs_list(self, **kw): """ 拿到日志数据返回给大屏展示 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求日志数据的参数为:%s' % kw) try: # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() machine_list = ast.literal_eval(kw['machine_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') print('begin_time: %s' % begin_time) for item in machine_list: sql = ''' SELECT time, device_state, program_name FROM device_data WHERE device_name = %s AND time >= %s AND time <= %s ORDER BY time DESC; ''' # 执行SQL命令,使用参数绑定 cur.execute(sql, (item, begin_time, end_time)) results = cur.fetchall() # 将数据按照 equipment_code 进行分组 if item not in res['data']: res['data'][item] = [] for result in results: res['data'][item].append({ 'time': result[0].strftime('%Y-%m-%d %H:%M:%S'), 'state': result[1], 'production_name': result[2], }) return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode() except Exception as e: logging.info('前端请求日志数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求日志数据失败,原因:%s' % e return json.dumps(res) # 返回CNC机床列表 @http.route('/api/CNCList', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def CNCList(self, **kw): """ 获取CNC机床列表 :param kw: :return: """ # logging.info('CNCList:%s' % kw) try: res = {'Succeed': True} # cnc_list = request.env['sf.cnc.equipment'].sudo().search([]) # cnc_list = ["XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"] cnc_list_obj = request.env['maintenance.equipment'].sudo().search( [('function_type', '!=', False), ('active', '=', True)]) cnc_list = list(map(lambda x: x.code, cnc_list_obj)) print('cnc_list: %s' % cnc_list) res['CNCList'] = cnc_list except Exception as e: res = {'Succeed': False, 'ErrorCode': 202, 'Error': e} logging.info('CNCList error:%s' % e) return json.JSONEncoder().encode(res) # 返回产线列表 @http.route('/api/LineList', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def LineList(self, **kw): """ 获取产线列表 :param kw: :return: """ try: res = {'Succeed': True} line_list_obj = request.env['sf.production.line'].sudo().search([('name', 'ilike', 'CNC')]) line_list = list(map(lambda x: x.name, line_list_obj)) print('line_list: %s' % line_list) res['LineList'] = line_list except Exception as e: res = {'Succeed': False, 'ErrorCode': 202, 'Error': e} logging.info('LineList error:%s' % e) return json.JSONEncoder().encode(res) # 获取产线产量相关 @http.route('/api/LineProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def LineProduct(self, **kw): """ 获取产线产量相关 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求产线产量数据的参数为:%s' % kw) try: plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) print('line_list: %s' % line_list) for line in line_list: plan_data = plan_obj.search([('production_line_id.name', '=', line)]) # 工单总量 plan_data_total_counts = plan_obj.search_count([('production_line_id.name', '=', line)]) # 工单完成量 plan_data_finish_counts = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'in', ['finished'])]) # 工单计划量 plan_data_plan_counts = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'not in', ['finished'])]) # 工单不良累计 plan_data_fault_counts = plan_obj.search_count( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['scrap', 'cancel'])]) # 工单返工数量 plan_data_rework_counts = plan_obj.search_count( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['rework'])]) # 工单完成率 finishe_rate = round( (plan_data_finish_counts / plan_data_total_counts if plan_data_total_counts > 0 else 0), 3) # 工单进度偏差 plan_data_progress_deviation = plan_data_finish_counts - plan_data_plan_counts if plan_data: data = { 'plan_data_total_counts': plan_data_total_counts, 'plan_data_finish_counts': plan_data_finish_counts, 'plan_data_plan_counts': plan_data_plan_counts, 'plan_data_fault_counts': plan_data_fault_counts, 'finishe_rate': finishe_rate, 'plan_data_progress_deviation': plan_data_progress_deviation, 'plan_data_rework_counts': plan_data_rework_counts } res['data'][line] = data return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode() except Exception as e: logging.info('前端请求产线产量数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求产线产量数据失败,原因:%s' % e return json.dumps(res) # 日完成量统计 @http.route('/api/DailyFinishCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def DailyFinishCount(self, **kw): """ 获取日完成量统计 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') print('line_list: %s' % line_list) def get_date_list(start_date, end_date): date_list = [] current_date = start_date while current_date <= end_date: date_list.append(current_date) current_date += timedelta(days=1) return date_list for line in line_list: date_list = get_date_list(begin_time, end_time) order_counts = [] date_field_name = 'actual_end_time' # 替换为你模型中的实际字段名 for date in date_list: next_day = date + timedelta(days=1) orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'in', ['finished']), (date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')), (date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00')) ]) rework_orders = plan_obj.search( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['rework']), (date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')), (date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00')) ]) not_passed_orders = plan_obj.search( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['scrap', 'cancel']), (date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')), (date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00')) ]) order_counts.append({ 'date': date.strftime('%Y-%m-%d'), 'order_count': len(orders), 'rework_orders': len(rework_orders), 'not_passed_orders': len(not_passed_orders) }) # 外面包一层,没什么是包一层不能解决的,包一层就能区分了,类似于包一层div # 外面包一层的好处是,可以把多个数据结构打包在一起,方便前端处理 # date_list_dict = {line: order_counts} res['data'][line] = order_counts return json.dumps(res) # 实时产量 @http.route('/api/RealTimeProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def RealTimeProduct(self, **kw): """ 获取实时产量 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') def get_hourly_intervals(start_time, end_time): intervals = [] current_time = start_time while current_time < end_time: next_hour = current_time + timedelta(hours=1) intervals.append((current_time, min(next_hour, end_time))) current_time = next_hour return intervals # 当班计划量 for line in line_list: plan_order_nums = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'not in', ['draft']), ('date_planned_start', '>=', begin_time), ('date_planned_start', '<', end_time) ]) finish_order_nums = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'in', ['finished']), ('date_planned_start', '>=', begin_time), ('date_planned_start', '<', end_time) ]) hourly_intervals = get_hourly_intervals(begin_time, end_time) production_counts = [] for start, end in hourly_intervals: orders = plan_obj.search([ ('actual_end_time', '>=', start.strftime('%Y-%m-%d %H:%M:%S')), ('actual_end_time', '<', end.strftime('%Y-%m-%d %H:%M:%S')), ('production_line_id.name', '=', line) ]) production_counts.append({ 'start_time': start.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': end.strftime('%Y-%m-%d %H:%M:%S'), 'production_count': len(orders) }) production_counts_dict = {'production_counts': production_counts, 'plan_order_nums': plan_order_nums, 'finish_order_nums': finish_order_nums, } res['data'][line] = production_counts_dict # res['data'].append({line: production_counts}) return json.dumps(res) # 工单明细 @http.route('/api/OrderDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def OrderDetail(self, **kw): """ 获取工单明细 :param kw: :return: """ # res = {'status': 1, 'message': '成功', 'not_done_data': [], 'done_data': []} res = {'status': 1, 'message': '成功', 'data': {}} plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') print('line_list: %s' % line_list) not_done_data = [] done_data = [] final_data = {} for line in line_list: # 未完成订单 not_done_orders = plan_obj.search( [('production_line_id.name', '=', line), ('state', 'not in', ['finished'])]) print(not_done_orders) # 完成订单 finish_orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'in', ['finished'])]) print(finish_orders) # 获取所有未完成订单的ID列表 order_ids = [order.id for order in not_done_orders] # 获取所有已完成订单的ID列表 finish_order_ids = [order.id for order in finish_orders] # 对ID进行排序 sorted_order_ids = sorted(order_ids) finish_sorted_order_ids = sorted(finish_order_ids) # 创建ID与序号的对应关系 id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(sorted_order_ids)} finish_id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(finish_sorted_order_ids)} # # 输出结果或进一步处理 # for order_id, sequence in id_to_sequence.items(): # print(f"Order ID: {order_id} - Sequence: {sequence}") for order in not_done_orders: blank_name = '' try: blank_name = order.production_id.move_raw_ids[0].product_id.name except: continue # blank_name = 'R-S00109-1 [碳素结构钢 Q235-118.0 * 72.0 * 21.0]' # 正则表达式 material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止 dimensions = blank_name.split('-')[-1].split(']')[0] # 匹配材料名称 material_match = re.search(material_pattern, blank_name) material = material_match.group(1) if material_match else 'No match found' state_dict = { 'draft': '待排程', 'done': '已排程', 'processing': '生产中', 'finished': '已完成' } line_dict = { 'sequence': id_to_sequence[order.id], 'workorder_name': order.name, 'blank_name': blank_name, 'material': material, 'dimensions': dimensions, 'order_qty': order.product_qty, 'state': state_dict[order.state], } not_done_data.append(line_dict) for finish_order in finish_orders: blank_name = '' try: blank_name = finish_order.production_id.move_raw_ids[0].product_id.name except: continue material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止 dimensions = blank_name.split('-')[-1].split(']')[0] # 匹配材料名称 material_match = re.search(material_pattern, blank_name) material = material_match.group(1) if material_match else 'No match found' line_dict = { 'sequence': finish_id_to_sequence[finish_order.id], 'workorder_name': finish_order.name, 'blank_name': blank_name, 'material': material, 'dimensions': dimensions, 'order_qty': finish_order.product_qty, 'finish_time': finish_order.actual_end_time.strftime('%Y-%m-%d %H:%M:%S'), } done_data.append(line_dict) # 开始包一层 res['data'][line] = {'not_done_data': not_done_data, 'done_data': done_data} return json.dumps(res) # 查询pg库来获得待机次数 @http.route('/api/IdleAlarmCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def idle_alarm_count(self, **kw): """ 查询设备的待机次数 """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求机床数据的参数为:%s' % kw) # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() try: # 获取请求的机床数据 machine_list = ast.literal_eval(kw['machine_list']) total_alarm_time = 0 alarm_count_num = 0 for item in machine_list: sql = ''' SELECT COUNT(*) FROM ( SELECT DISTINCT ON (idle_start_time) idle_start_time FROM device_data WHERE device_name = %s AND idle_start_time IS NOT NULL ORDER BY idle_start_time, time ) subquery; ''' sql2 = ''' SELECT DISTINCT ON (alarm_time) alarm_time, alarm_repair_time FROM device_data WHERE device_name = %s AND alarm_time IS NOT NULL ORDER BY alarm_time, time; ''' # 执行SQL命令 cur.execute(sql, (item,)) result = cur.fetchall() print('result========', result) cur.execute(sql2, (item,)) result2 = cur.fetchall() print('result2========', result2) # for row in result: res['data'][item] = {'idle_count': row[0]} alarm_count = [] for row in result2: alarm_count.append(row[0]) total_alarm_time += abs(float(row[0])) if len(list(set(alarm_count))) == 1: if list(set(alarm_count))[0] is None: alarm_count_num = 0 else: alarm_count_num = 1 else: alarm_count_num = len(list(set(alarm_count))) res['data'][item]['total_alarm_time'] = total_alarm_time / 3600 res['data'][item]['alarm_count_num'] = alarm_count_num # 返回统计结果 return json.dumps(res) except Exception as e: print(f"An error occurred: {e}") return json.dumps(res) finally: cur.close() conn.close() # 查询pg库来获得异常情况 @http.route('/api/alarm/logs', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def alarm_logs(self, **kw): """ 查询设备的异常情况 """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求机床数据的参数为:%s' % kw) # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() try: # 获取请求的机床数据 # machine_list = ast.literal_eval(kw['machine_list']) # idle_times = [] # idle_dict = {} # for item in machine_list: sql = ''' SELECT DISTINCT ON (alarm_time) alarm_time, alarm_message, system_date, system_time, alarm_repair_time FROM device_data WHERE alarm_time IS NOT NULL ORDER BY alarm_time, time; ''' # 执行SQL命令 cur.execute(sql) result = cur.fetchall() print('result', result) # 将查询结果转换为字典列表 data = [] for row in result: record = { 'alarm_time': row[0], 'alarm_message': row[1], 'system_date': row[2], 'system_time': row[3], 'alarm_repair_time': row[4] } data.append(record) # 将数据填充到返回结果中 res['data'] = data # 返回统计结果 return json.dumps(res, ensure_ascii=False) except Exception as e: print(f"An error occurred: {e}") return json.dumps(res) finally: cur.close() conn.close() # 设备oee @http.route('/api/OEE', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def OEE(self, **kw): """ 获取产线等oee """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求oee数据的参数为:%s' % kw) try: count_oee = 1 workcenter_obj = request.env['mrp.workcenter'].sudo() workcenter_list = ast.literal_eval(kw['workcenter_list']) print('workcenter_list: %s' % workcenter_list) for line in workcenter_list: res['data'][line] = workcenter_obj.search([('name', '=', line)]).oee count_oee *= workcenter_obj.search([('name', '=', line)]).oee res['data']['综合oee'] = count_oee / 1000000 except Exception as e: print(f"An error occurred: {e}") return json.dumps(res) # # 查询某段时间的设备oee # @http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") # def OEEByTime(self, **kw): # """ # 获取某段时间的oee # """ # res = {'status': 1, 'message': '成功', 'data': {}} # logging.info('前端请求获取某段时间的oee的参数为:%s' % kw) # workcenter_list = ast.literal_eval(kw['workcenter_list']) # begin_time_str = kw['begin_time'].strip('"') # end_time_str = kw['end_time'].strip('"') # begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') # end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # print('workcenter_list: %s' % workcenter_list) # # 连接数据库 # conn = psycopg2.connect(**db_config) # cur = conn.cursor() # # 查询并计算OEE平均值 # oee_data = {} # for workcenter in workcenter_list: # cur.execute(""" # SELECT AVG(oee) as avg_oee # FROM oee_data # WHERE workcenter_name = %s # AND time BETWEEN %s AND %s # """, (workcenter, begin_time, end_time)) # # result = cur.fetchone() # avg_oee = result[0] if result else 0.0 # oee_data[workcenter] = avg_oee # # # 返回数据 # res['data'] = oee_data # return json.dumps(res) @http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def OEEByTime(self, **kw): """ 获取某段时间的OEE,根据用户指定的时间单位(day或hour)返回对应的平均值。 如果不传time_unit,则默认按天返回,并补全没有数据的时间段,填充0值。 """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求获取某段时间的OEE的参数为:%s' % kw) # 获取并解析参数 workcenter_list = ast.literal_eval(kw['workcenter_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') time_unit = kw.get('time_unit', 'day') # 默认单位为天 begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() # 根据时间单位选择不同的时间格式 if time_unit == 'hour': time_format = 'YYYY-MM-DD HH24:00:00' time_delta = timedelta(hours=1) else: # 默认为'day' time_format = 'YYYY-MM-DD' time_delta = timedelta(days=1) # 查询并计算OEE平均值 oee_data = {} for workcenter in workcenter_list: cur.execute(f""" SELECT to_char(time, '{time_format}') as time_unit, AVG(oee) as avg_oee FROM oee_data WHERE workcenter_name = %s AND time BETWEEN %s AND %s GROUP BY time_unit ORDER BY time_unit """, (workcenter, begin_time, end_time)) results = cur.fetchall() # 初始化当前产线的OEE数据字典 workcenter_oee = {row[0]: row[1] for row in results} # 补全缺失的时间段 current_time = begin_time if time_unit != 'hour': while current_time <= end_time: time_key = current_time.strftime('%Y-%m-%d') if time_key not in workcenter_oee: workcenter_oee[time_key] = 0 current_time += time_delta # 按时间排序 oee_data[workcenter] = dict(sorted(workcenter_oee.items())) # 关闭数据库连接 cur.close() conn.close() # 返回数据 res['data'] = oee_data return json.dumps(res)