# -*- coding: utf-8 -*- import re import ast import json import base64 import logging import psycopg2 from datetime import datetime, timedelta from odoo import http, fields from odoo.http import request # 数据库连接配置 db_config = { "database": "timeseries_db", "user": "postgres", "password": "postgres", "port": "5432", "host": "172.16.10.131" } def convert_to_seconds(time_str): # 修改正则表达式,使 H、M、S 部分可选 if time_str is None: return 0 pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?" match = re.match(pattern, time_str) if match: # 提取各时间单位,如果某个单位缺失则默认设为0 hours = int(match.group(1)) if match.group(1) else 0 minutes = int(match.group(2)) if match.group(2) else 0 seconds = int(match.group(3)) if match.group(3) else 0 # 计算总秒数 total_seconds = hours * 3600 + minutes * 60 + seconds if total_seconds == 0: # return None pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?" match = re.match(pattern, time_str) if match: # 提取各时间单位,如果某个单位缺失则默认设为0 hours = int(match.group(1)) if match.group(1) else 0 minutes = int(match.group(2)) if match.group(2) else 0 seconds = int(match.group(3)) if match.group(3) else 0 # 计算总秒数 total_seconds = hours * 3600 + minutes * 60 + seconds return total_seconds else: return None return total_seconds class Sf_Dashboard_Connect(http.Controller): @http.route('/api/get_machine_datas/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def get_machine_datas_list(self, **kw): """ 拿到机床数据返回给大屏展示 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': []} logging.info('前端请求机床数据的参数为:%s' % kw) # 获取当前时间的时间戳 current_timestamp = datetime.now().timestamp() # print(current_timestamp) # tem_list = [ # "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14" # ] try: equipment_obj = request.env['maintenance.equipment'].sudo() # 获取请求的机床数据 machine_list = ast.literal_eval(kw['machine_list']) for item in machine_list: machine_data = equipment_obj.search([('code', '=', item)]) # 机床上线时间段 first_online_duration = current_timestamp - int(machine_data.first_online_time.timestamp()) power_off_time = None power_off_rate = None if machine_data.machine_power_on_time: power_off_time = first_online_duration - convert_to_seconds(machine_data.machine_power_on_time) power_off_rate = round((power_off_time / first_online_duration), 3) else: power_off_time = False power_off_rate = False if machine_data: res['data'].append({ 'active': machine_data.status, 'id': machine_data.id, 'name': machine_data.name, 'brand': machine_data.type_id.name, 'code': machine_data.code, 'status': machine_data.status, 'run_status': machine_data.run_status, 'run_time': machine_data.run_time, 'system_date': machine_data.system_date, 'system_time': machine_data.system_time, 'cut_time': machine_data.cut_time, 'cut_status': machine_data.cut_status, 'program': machine_data.program, 'program_name': machine_data.program_name, 'program_status': machine_data.program_status, 'tool_num': machine_data.tool_num, 'machine_power_on_time': machine_data.machine_power_on_time, 'product_counts': machine_data.product_counts, 'mode': machine_data.mode, 'start_time': machine_data.start_time, 'end_time': machine_data.end_time, 'program_start_time': machine_data.program_start_time, 'program_end_time': machine_data.program_end_time, 'standby_start_time': machine_data.standby_start_time, 'standby_end_time': machine_data.standby_end_time, 'offline_start_time': machine_data.offline_start_time, 'offline_end_time': machine_data.offline_end_time, 'emg_status': machine_data.emg_status, 'current_program': machine_data.current_program, 'current_program_seq': machine_data.current_program_seq, 'x_abs_pos': machine_data.x_abs_pos, 'y_abs_pos': machine_data.y_abs_pos, 'z_abs_pos': machine_data.z_abs_pos, 'feed_speed_set': machine_data.feed_speed_set, 'act_feed_speed': machine_data.act_feed_speed, 'spindle_speed_set': machine_data.spindle_speed_set, 'act_spindle_speed': machine_data.act_spindle_speed, 'spindle_load': machine_data.spindle_load, 'x_axis_load': machine_data.x_axis_load, 'y_axis_load': machine_data.y_axis_load, 'z_axis_load': machine_data.z_axis_load, 'rapid_feed': machine_data.rapid_feed, 'feed_rate': machine_data.feed_rate, 'x_mach_coord': machine_data.x_mach_coord, 'y_mach_coord': machine_data.y_mach_coord, 'z_mach_coord': machine_data.z_mach_coord, 'x_rel_coord': machine_data.x_rel_coord, 'y_rel_coord': machine_data.y_rel_coord, 'z_rel_coord': machine_data.z_rel_coord, 'x_dis_coord': machine_data.x_dis_coord, 'y_dis_coord': machine_data.y_dis_coord, 'z_dis_coord': machine_data.z_dis_coord, 'alarm_time': machine_data.alarm_time, 'alarm_msg': machine_data.alarm_msg, 'clear_time': machine_data.clear_time, # 计算出来的数据 # 开动率:运行时间/通电时间 'run_rate': machine_data.run_rate, # 关机时长:初次上线时间 - 通电时间 'power_off_time': power_off_time, # 关机率:关机时长/初次上线时间 # 'power_off_rate': power_off_rate, 'first_online_duration': first_online_duration, # 停机时间:关机时间 - 运行时间 # 停机时长:关机时间 - 初次上线时间 # 'img': f'data:image/png;base64,{machine_data.machine_tool_picture.decode("utf-8")}', 'img': f'https://xt.sf.jikimo.com/equipment/get_image/{machine_data.id}', 'equipment_type': machine_data.category_id.name, }) return json.dumps(res) except Exception as e: logging.info('前端请求机床数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求机床数据失败,原因:%s' % e return json.JSONEncoder().encode(res) @http.route('/api/logs/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def logs_list(self, **kw): """ 拿到日志数据返回给大屏展示 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求日志数据的参数为:%s' % kw) # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() try: machine_list = ast.literal_eval(kw['machine_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # print('begin_time: %s' % begin_time) for item in machine_list: sql = ''' SELECT time, device_state, program_name FROM device_data WHERE device_name = %s AND time >= %s AND time <= %s ORDER BY time DESC; ''' # 执行SQL命令,使用参数绑定 cur.execute(sql, (item, begin_time, end_time)) results = cur.fetchall() # 将数据按照 equipment_code 进行分组 if item not in res['data']: res['data'][item] = [] for result in results: res['data'][item].append({ 'time': result[0].strftime('%Y-%m-%d %H:%M:%S'), 'state': result[1], 'production_name': result[2], }) return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode() except Exception as e: logging.info('前端请求日志数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求日志数据失败,原因:%s' % e return json.dumps(res) finally: if cur: cur.close() if conn: conn.close() @http.route('/api/logs/page_data', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def logs_page_data(self, **kw): """ 拿到日志数据返回给大屏展示(支持时间戳分页) :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求日志数据的参数为:%s' % kw) # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() try: # 获取并解析传递的参数 machine_list = ast.literal_eval(kw.get('machine_list', '[]')) begin_time_str = kw.get('begin_time', '').strip('"') end_time_str = kw.get('end_time', '').strip('"') page = int(kw.get('page', 1)) # 默认页码为1 page_size = int(kw.get('page_size', 80)) # 默认每页条数为10 begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # 计算分页的 offset offset = (page - 1) * page_size # 先查询符合条件的总记录数 total_records = 0 for item in machine_list: count_sql = ''' SELECT COUNT(*) FROM device_data WHERE device_name = %s AND time >= %s AND time <= %s; ''' # 执行总记录数查询 cur.execute(count_sql, (item, begin_time, end_time)) record_count = cur.fetchone()[0] # 获取总记录数 total_records += record_count # 计算总页数 if total_records > 0: total_pages = (total_records + page_size - 1) // page_size # 向上取整 else: total_pages = 0 # 将总页数和总记录数返回到响应中 res['total_records'] = total_records res['total_pages'] = total_pages for item in machine_list: sql = ''' SELECT time, device_state, program_name FROM device_data WHERE device_name = %s AND time >= %s AND time <= %s ORDER BY time DESC LIMIT %s OFFSET %s; ''' # 执行SQL命令,使用参数绑定 cur.execute(sql, (item, begin_time, end_time, page_size, offset)) results = cur.fetchall() # 将数据按照 equipment_code 进行分组 if item not in res['data']: res['data'][item] = [] for result in results: res['data'][item].append({ 'time': result[0].strftime('%Y-%m-%d %H:%M:%S'), 'state': result[1], 'production_name': result[2], }) return json.dumps(res) # 返回分页数据 except Exception as e: logging.info('前端请求日志数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求日志数据失败,原因:%s' % e return json.dumps(res) finally: if cur: cur.close() if conn: conn.close() # 返回CNC机床列表 @http.route('/api/CNCList', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def CNCList(self, **kw): """ 获取CNC机床列表 :param kw: :return: """ # logging.info('CNCList:%s' % kw) try: res = {'Succeed': True} # cnc_list = request.env['sf.cnc.equipment'].sudo().search([]) # cnc_list = ["XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7", # "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12", # "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"] cnc_list_obj = request.env['maintenance.equipment'].sudo().search( [('function_type', '!=', False), ('active', '=', True)]) cnc_list = list(map(lambda x: x.code, cnc_list_obj)) print('cnc_list: %s' % cnc_list) res['CNCList'] = cnc_list except Exception as e: res = {'Succeed': False, 'ErrorCode': 202, 'Error': e} logging.info('CNCList error:%s' % e) return json.JSONEncoder().encode(res) # 返回产线列表 @http.route('/api/LineList', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def LineList(self, **kw): """ 获取产线列表 :param kw: :return: """ try: res = {'Succeed': True} line_list_obj = request.env['sf.production.line'].sudo().search([('name', 'ilike', 'CNC')]) line_list = list(map(lambda x: x.name, line_list_obj)) # print('line_list: %s' % line_list) res['LineList'] = line_list except Exception as e: res = {'Succeed': False, 'ErrorCode': 202, 'Error': e} logging.info('LineList error:%s' % e) return json.JSONEncoder().encode(res) # 获取产线产量相关 @http.route('/api/LineProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def LineProduct(self, **kw): """ 获取产线产量相关 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求产线产量数据的参数为:%s' % kw) try: plan_obj = request.env['sf.production.plan'].sudo() production_obj = request.env['mrp.production'].sudo() line_list = ast.literal_eval(kw['line_list']) # print('line_list: %s' % line_list) for line in line_list: # 工单计划量 plan_data_total_counts = production_obj.search_count( [('production_line_id.name', '=', line), ('state', 'not in', ['cancel']), ('active', '=', True)]) # 工单完成量 plan_data_finish_counts = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'in', ['finished'])]) # # 工单计划量 # plan_data_plan_counts = plan_obj.search_count( # [('production_line_id.name', '=', line), ('state', 'not in', ['finished'])]) # 查找符合条件的生产计划记录 plan_data = plan_obj.search([ ('production_line_id.name', '=', line), ]) # 过滤出那些检测结果状态为 '返工' 或 '报废' 的记录 faulty_plans = plan_data.filtered(lambda p: any( result.test_results in ['返工', '报废'] for result in p.production_id.detection_result_ids )) # 查找制造订单取消与归档的数量 cancel_order_count = production_obj.search_count( [('production_line_id.name', '=', line), ('state', 'in', ['cancel']), ('active', '=', False)]) # 计算符合条件的记录数量 # plan_data_fault_counts = len(faulty_plans) + cancel_order_count plan_data_fault_counts = len(faulty_plans) # 工单返工数量 plan_data_rework_counts = plan_obj.search_count( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['rework'])]) # 工单完成率 finishe_rate = round( (plan_data_finish_counts / plan_data_total_counts if plan_data_total_counts > 0 else 0), 3) # 工单进度偏差 plan_data_progress_deviation = plan_data_total_counts - plan_data_finish_counts - plan_data_fault_counts # 完成记录 plan_data_finish_orders = plan_obj.search( [('production_line_id.name', '=', line), ('state', 'in', ['finished'])]) # # 检测量 # detection_nums = 0 # for i in plan_data_finish_orders: # print('i: %s' % i) # if i.production_id.detection_result_ids: # detection_nums += 1 # # print('detection_nums: %s' % detection_nums) # 检测量 detection_data = len( plan_data_finish_orders.mapped('production_id.detection_result_ids').filtered(lambda r: r)) # 检测合格量 pass_nums = plan_data_finish_orders.filtered(lambda p: any( result.test_results in ['合格'] for result in p.production_id.detection_result_ids )) # 质量合格率 pass_rate = 1 if pass_nums: pass_rate = round( # (len(pass_nums) / detection_data if len(plan_data_finish_orders) > 0 else 0), 3) (len(pass_nums) / len(plan_data_finish_orders) if len(plan_data_finish_orders) > 0 else 0), 3) # 返工率 rework_rate = round( (plan_data_rework_counts / plan_data_finish_counts if plan_data_finish_counts > 0 else 0), 3) # 交付准时率 delay_num = 0 for plan in plan_data_finish_orders: sale_obj = request.env['sale.order'].sudo().search([('name', '=', plan.origin)]) if sale_obj: if sale_obj.deadline_of_delivery and plan.actual_end_time: # 将 deadline_of_delivery 转换为字符串 date_as_string = sale_obj.deadline_of_delivery.strftime('%Y-%m-%d') # 然后使用 strptime 将字符串转换为 datetime 对象 date_as_datetime = datetime.strptime(date_as_string, '%Y-%m-%d') # 将 actual_end_time 转换为 datetime 对象 datetime_value = fields.Datetime.from_string(plan.actual_end_time) # 给 datetime_value 加1天 new_datetime_value = datetime_value + timedelta(days=1) # 比较 new_datetime_value 和 date_as_datetime 的大小 if new_datetime_value.date() > date_as_datetime.date(): delay_num += 1 delay_rate = round((delay_num / plan_data_finish_counts if plan_data_finish_counts > 0 else 0), 3) on_time_rate = 1 - delay_rate if plan_data: data = { 'plan_data_total_counts': plan_data_total_counts, 'plan_data_finish_counts': plan_data_finish_counts, 'plan_data_plan_counts': plan_data_total_counts, 'plan_data_fault_counts': plan_data_fault_counts, 'nopass_orders_counts': detection_data - len(pass_nums), 'finishe_rate': finishe_rate, 'plan_data_progress_deviation': plan_data_progress_deviation, 'plan_data_rework_counts': plan_data_rework_counts, 'on_time_rate': on_time_rate, # 'detection_data': detection_data, 'detection_data': plan_data_finish_counts, 'pass_rate': (plan_data_finish_counts - plan_data_fault_counts) / plan_data_finish_counts } res['data'][line] = data return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode() except Exception as e: logging.info('前端请求产线产量数据失败,原因:%s' % e) res['status'] = -1 res['message'] = '前端请求产线产量数据失败,原因:%s' % e return json.dumps(res) # 日完成量统计 @http.route('/api/DailyFinishCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def DailyFinishCount(self, **kw): """ 获取日完成量统计 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # print('line_list: %s' % line_list) print('kw', kw) time_unit = kw.get('time_unit', 'day').strip('"') # 默认单位为天 print('time_unit: %s' % time_unit) # 日期或小时循环生成器,根据time_unit决定是按天还是按小时 def get_time_intervals(start_time, end_time, time_unit): intervals = [] current_time = start_time if time_unit == 'hour': delta = timedelta(hours=1) else: delta = timedelta(days=1) while current_time < end_time: next_time = current_time + delta # 确保最后一个时间段不会超出end_time if next_time > end_time: next_time = end_time intervals.append((current_time, next_time)) current_time = next_time return intervals def get_date_list(start_date, end_date): date_list = [] current_date = start_date while current_date <= end_date: date_list.append(current_date) current_date += timedelta(days=1) return date_list for line in line_list: date_field_name = 'actual_end_time' # 替换为你模型中的实际字段名 order_counts = [] if time_unit == 'hour': time_intervals = get_time_intervals(begin_time, end_time, time_unit) print('============================= %s' % time_intervals) time_count_dict = {} for time_interval in time_intervals: start_time, end_time = time_interval orders = plan_obj.search([ ('production_line_id.name', '=', line), ('state', 'in', ['finished']), (date_field_name, '>=', start_time.strftime('%Y-%m-%d %H:%M:%S')), (date_field_name, '<=', end_time.strftime('%Y-%m-%d %H:%M:%S')) # 包括结束时间 ]) # 使用小时和分钟作为键,确保每个小时的数据有独立的键 key = start_time.strftime('%H:%M:%S') # 只取小时:分钟:秒作为键 time_count_dict[key] = len(orders) # order_counts.append() res['data'][line] = { 'finish_order_nums': time_count_dict, 'plan_order_nums': 28 } return json.dumps(res) date_list = get_date_list(begin_time, end_time) for date in date_list: next_day = date + timedelta(days=1) orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'in', ['finished']), (date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')), (date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00')) ]) rework_orders = plan_obj.search( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['rework']), (date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')), (date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00')) ]) not_passed_orders = plan_obj.search( [('production_line_id.name', '=', line), ('production_id.state', 'in', ['scrap', 'cancel']), (date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')), (date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00')) ]) order_counts.append({ 'date': date.strftime('%Y-%m-%d'), 'order_count': len(orders), 'rework_orders': len(rework_orders), 'not_passed_orders': len(not_passed_orders) }) # 外面包一层,没什么是包一层不能解决的,包一层就能区分了,类似于包一层div # 外面包一层的好处是,可以把多个数据结构打包在一起,方便前端处理 # date_list_dict = {line: order_counts} res['data'][line] = order_counts return json.dumps(res) # 实时产量 @http.route('/api/RealTimeProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def RealTimeProduct(self, **kw): """ 获取实时产量 :param kw: :return: """ res = {'status': 1, 'message': '成功', 'data': {}} plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') def get_hourly_intervals(start_time, end_time): intervals = [] current_time = start_time while current_time < end_time: next_hour = current_time + timedelta(hours=1) intervals.append((current_time, min(next_hour, end_time))) current_time = next_hour return intervals # 当班计划量 for line in line_list: plan_order_nums = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'not in', ['draft']), ('date_planned_start', '>=', begin_time), ('date_planned_start', '<', end_time) ]) finish_order_nums = plan_obj.search_count( [('production_line_id.name', '=', line), ('state', 'in', ['finished']), ('date_planned_start', '>=', begin_time), ('date_planned_start', '<', end_time) ]) hourly_intervals = get_hourly_intervals(begin_time, end_time) production_counts = [] for start, end in hourly_intervals: orders = plan_obj.search([ ('actual_end_time', '>=', start.strftime('%Y-%m-%d %H:%M:%S')), ('actual_end_time', '<', end.strftime('%Y-%m-%d %H:%M:%S')), ('production_line_id.name', '=', line) ]) production_counts.append({ 'start_time': start.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': end.strftime('%Y-%m-%d %H:%M:%S'), 'production_count': len(orders) }) production_counts_dict = {'production_counts': production_counts, 'plan_order_nums': plan_order_nums, 'finish_order_nums': finish_order_nums, } res['data'][line] = production_counts_dict # res['data'].append({line: production_counts}) return json.dumps(res) # 工单明细 @http.route('/api/OrderDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def OrderDetail(self, **kw): """ 获取工单明细 :param kw: :return: """ # res = {'status': 1, 'message': '成功', 'not_done_data': [], 'done_data': []} res = {'status': 1, 'message': '成功', 'data': {}} plan_obj = request.env['sf.production.plan'].sudo() line_list = ast.literal_eval(kw['line_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # print('line_list: %s' % line_list) not_done_data = [] done_data = [] final_data = {} for line in line_list: # 未完成订单 not_done_orders = plan_obj.search( [('production_line_id.name', '=', line), ('state', 'not in', ['finished']), ('production_id.state', 'not in', ['cancel', 'done']), ('active', '=', True) ]) # print(not_done_orders) # 完成订单 # 获取当前时间,并计算24小时前的时间 current_time = datetime.now() time_24_hours_ago = current_time - timedelta(hours=24) finish_orders = plan_obj.search([ ('production_line_id.name', '=', line), ('state', 'in', ['finished']), ('production_id.state', 'not in', ['cancel']), ('active', '=', True), ('actual_end_time', '>=', time_24_hours_ago) ]) # print(finish_orders) # 获取所有未完成订单的ID列表 order_ids = [order.id for order in not_done_orders] # 获取所有已完成订单的ID列表 finish_order_ids = [order.id for order in finish_orders] # 对ID进行排序 sorted_order_ids = sorted(order_ids) finish_sorted_order_ids = sorted(finish_order_ids) # 创建ID与序号的对应关系 id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(sorted_order_ids)} finish_id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(finish_sorted_order_ids)} # # 输出结果或进一步处理 # for order_id, sequence in id_to_sequence.items(): # print(f"Order ID: {order_id} - Sequence: {sequence}") for order in not_done_orders: blank_name = '' try: blank_name = order.production_id.move_raw_ids[0].product_id.name except: continue # blank_name = 'R-S00109-1 [碳素结构钢 Q235-118.0 * 72.0 * 21.0]' # 正则表达式 material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止 dimensions = blank_name.split('-')[-1].split(']')[0] # 匹配材料名称 material_match = re.search(material_pattern, blank_name) material = material_match.group(1) if material_match else 'No match found' state_dict = { 'draft': '待排程', 'done': '已排程', 'processing': '生产中', 'finished': '已完成' } line_dict = { 'sequence': id_to_sequence[order.id], 'workorder_name': order.name, 'blank_name': blank_name, 'material': material, 'dimensions': dimensions, 'order_qty': order.product_qty, 'state': state_dict[order.state], } not_done_data.append(line_dict) for finish_order in finish_orders: if not finish_order.actual_end_time: continue blank_name = '' try: blank_name = finish_order.production_id.move_raw_ids[0].product_id.name except: continue material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止 dimensions = blank_name.split('-')[-1].split(']')[0] # 匹配材料名称 material_match = re.search(material_pattern, blank_name) material = material_match.group(1) if material_match else 'No match found' line_dict = { 'sequence': finish_id_to_sequence[finish_order.id], 'workorder_name': finish_order.name, 'blank_name': blank_name, 'material': material, 'dimensions': dimensions, 'order_qty': finish_order.product_qty, 'finish_time': finish_order.actual_end_time.strftime( '%Y-%m-%d %H:%M:%S') if finish_order.actual_end_time else ' ' } done_data.append(line_dict) # 开始包一层 res['data'][line] = {'not_done_data': not_done_data, 'done_data': done_data} return json.dumps(res) # 查询pg库来获得待机次数 @http.route('/api/IdleAlarmCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def idle_alarm_count(self, **kw): """ 查询设备的待机次数 """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求机床数据的参数为:%s' % kw) # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() try: # 获取请求的机床数据 machine_list = ast.literal_eval(kw['machine_list']) total_alarm_time = 0 alarm_count_num = 0 for item in machine_list: sql = ''' SELECT COUNT(*) FROM ( SELECT DISTINCT ON (idle_start_time) idle_start_time FROM device_data WHERE device_name = %s AND idle_start_time IS NOT NULL ORDER BY idle_start_time, time ) subquery; ''' sql2 = ''' SELECT DISTINCT ON (alarm_start_time) alarm_time, alarm_start_time FROM device_data WHERE device_name = %s AND alarm_start_time IS NOT NULL ORDER BY alarm_start_time, time; ''' # 执行SQL命令 cur.execute(sql, (item,)) result = cur.fetchall() # print('result========', result) cur.execute(sql2, (item,)) result2 = cur.fetchall() # print('result2========', result2) # for row in result: res['data'][item] = {'idle_count': row[0]} alarm_count = [] for row in result2: alarm_count.append(row[1]) if row[0]: total_alarm_time += abs(float(row[0])) else: total_alarm_time += 0.0 if len(list(set(alarm_count))) == 1: if list(set(alarm_count))[0] is None: alarm_count_num = 0 else: alarm_count_num = 1 else: alarm_count_num = len(list(set(alarm_count))) res['data'][item]['total_alarm_time'] = total_alarm_time / 3600 res['data'][item]['alarm_count_num'] = alarm_count_num # 返回统计结果 return json.dumps(res) except Exception as e: print(f"An error occurred: {e}") return json.dumps(res) finally: cur.close() conn.close() # 查询pg库来获得异常情况 @http.route('/api/alarm/logs', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def alarm_logs(self, **kw): """ 查询设备的异常情况 """ res = {'status': 1, 'message': '成功', 'data': []} logging.info('前端请求机床数据的参数为:%s' % kw) try: maintenance_logs_obj = request.env['sf.maintenance.logs'].sudo() # # 获取请求的机床数据 # machine_list = ast.literal_eval(kw['machine_list']) # for item in machine_list: # machine_data = equipment_obj.search([('code', '=', item)]) for log in maintenance_logs_obj.search([]): res['data'].append({ 'name': log.name, 'alarm_time': log.alarm_time.strftime('%Y-%m-%d %H:%M:%S'), 'fault_alarm_info': log.fault_alarm_info if log.fault_alarm_info else ' ', 'fault_process': log.fault_process if log.fault_process else ' ', }) except Exception as e: logging.error(f"An error occurred: {e}") return json.dumps(res) # 设备oee @http.route('/api/OEE', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def OEE(self, **kw): """ 获取产线等oee """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求oee数据的参数为:%s' % kw) try: count_oee = 1 workcenter_obj = request.env['mrp.workcenter'].sudo() workcenter_list = ast.literal_eval(kw['workcenter_list']) print('workcenter_list: %s' % workcenter_list) for line in workcenter_list: res['data'][line] = workcenter_obj.search([('name', '=', line)]).oee count_oee *= workcenter_obj.search([('name', '=', line)]).oee res['data']['综合oee'] = count_oee / 1000000 except Exception as e: print(f"An error occurred: {e}") return json.dumps(res) # # 查询某段时间的设备oee # @http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") # def OEEByTime(self, **kw): # """ # 获取某段时间的oee # """ # res = {'status': 1, 'message': '成功', 'data': {}} # logging.info('前端请求获取某段时间的oee的参数为:%s' % kw) # workcenter_list = ast.literal_eval(kw['workcenter_list']) # begin_time_str = kw['begin_time'].strip('"') # end_time_str = kw['end_time'].strip('"') # begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') # end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # print('workcenter_list: %s' % workcenter_list) # # 连接数据库 # conn = psycopg2.connect(**db_config) # cur = conn.cursor() # # 查询并计算OEE平均值 # oee_data = {} # for workcenter in workcenter_list: # cur.execute(""" # SELECT AVG(oee) as avg_oee # FROM oee_data # WHERE workcenter_name = %s # AND time BETWEEN %s AND %s # """, (workcenter, begin_time, end_time)) # # result = cur.fetchone() # avg_oee = result[0] if result else 0.0 # oee_data[workcenter] = avg_oee # # # 返回数据 # res['data'] = oee_data # return json.dumps(res) @http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def OEEByTime(self, **kw): """ 获取某段时间的OEE,根据用户指定的时间单位(day或hour)返回对应的平均值。 如果不传time_unit,则默认按天返回,并补全没有数据的时间段,填充0值。 """ res = {'status': 1, 'message': '成功', 'data': {}} logging.info('前端请求获取某段时间的OEE的参数为:%s' % kw) # 获取并解析参数 workcenter_list = ast.literal_eval(kw['workcenter_list']) begin_time_str = kw['begin_time'].strip('"') end_time_str = kw['end_time'].strip('"') time_unit = kw.get('time_unit', 'day') # 默认单位为天 begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S') end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S') # 连接数据库 conn = psycopg2.connect(**db_config) cur = conn.cursor() # 根据时间单位选择不同的时间格式 if time_unit == 'hour': time_format = 'YYYY-MM-DD HH24:00:00' time_delta = timedelta(hours=1) else: # 默认为'day' time_format = 'YYYY-MM-DD' time_delta = timedelta(days=1) # 查询并计算OEE平均值 oee_data = {} for workcenter in workcenter_list: cur.execute(f""" SELECT to_char(time, '{time_format}') as time_unit, AVG(oee) as avg_oee FROM oee_data WHERE workcenter_name = %s AND time BETWEEN %s AND %s GROUP BY time_unit ORDER BY time_unit """, (workcenter, begin_time, end_time)) results = cur.fetchall() # 初始化当前产线的OEE数据字典 workcenter_oee = {row[0]: row[1] for row in results} # 补全缺失的时间段 current_time = begin_time if time_unit != 'hour': while current_time <= end_time: time_key = current_time.strftime('%Y-%m-%d') if time_key not in workcenter_oee: workcenter_oee[time_key] = 0 current_time += time_delta # 按时间排序 oee_data[workcenter] = dict(sorted(workcenter_oee.items())) # 关闭数据库连接 cur.close() conn.close() # 返回数据 res['data'] = oee_data return json.dumps(res) @http.route(['/equipment/get_image/'], type='http', auth="public", website=True) def get_image(self, record_id, **kwargs): # 获取模型中的记录 record = request.env['maintenance.equipment'].sudo().browse(record_id) # 获取图片字段的数据 image_data_base64 = record.machine_tool_picture if image_data_base64: # 将Base64解码为二进制数据 image_data_binary = base64.b64decode(image_data_base64) # 返回图片数据,并设置正确的Content-Type return request.make_response(image_data_binary, headers=[('Content-Type', 'image/png')]) else: # 如果没有图片数据,返回404 return request.not_found() # 设备运行率 @http.route('/api/RunningTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def RunningTime(self, **kw): """ 获取设备运行时长 """ res = {'status': 1, 'message': '成功', 'data': {}} # 连接数据库 conn = psycopg2.connect(**db_config) # 获取请求的机床数据 machine_list = ast.literal_eval(kw['machine_list']) def fetch_result_as_dict(cursor): """辅助函数:将查询结果转为字典""" columns = [desc[0] for desc in cursor.description] return dict(zip(columns, cursor.fetchone())) if cursor.rowcount != 0 else None # 初始化当天、当月和有记录以来的总时长 day_total_running_time = 0 day_total_process_time = 0 day_work_rate = 0 month_total_running_time = 0 month_total_process_time = 0 month_work_rate = 0 all_time_total_running_time = 0 all_time_total_process_time = 0 all_time_work_rate = 0 for item in machine_list: # 获取当天第一条记录(排除device_state等于‘离线’的记录) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND time::date = CURRENT_DATE AND device_state != '离线' ORDER BY time ASC LIMIT 1; """, (item,)) first_today = fetch_result_as_dict(cur) # print("当天第一条记录(非离线):", first_today) # 获取当天最新一条记录(排除device_state等于‘离线’的记录) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND time::date = CURRENT_DATE AND device_state != '离线' ORDER BY time DESC LIMIT 1; """, (item,)) last_today = fetch_result_as_dict(cur) # print("当天最新一条记录(非离线):", last_today) # 计算当天运行时长 if first_today and last_today: running_time = convert_to_seconds(last_today['run_time']) - convert_to_seconds(first_today['run_time']) process_time = convert_to_seconds(last_today['process_time']) - convert_to_seconds( first_today['process_time']) day_total_running_time += abs(running_time) day_total_process_time += abs(process_time) # 获取当月第一条记录(排除device_state等于‘离线’的记录) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND EXTRACT(YEAR FROM time) = EXTRACT(YEAR FROM CURRENT_DATE) AND EXTRACT(MONTH FROM time) = EXTRACT(MONTH FROM CURRENT_DATE) AND device_state != '离线' ORDER BY time ASC LIMIT 1; """, (item,)) first_month = fetch_result_as_dict(cur) # print("当月第一条记录(非离线):", first_month) # 获取当月最新一条记录(排除device_state等于‘离线’的记录) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND EXTRACT(YEAR FROM time) = EXTRACT(YEAR FROM CURRENT_DATE) AND EXTRACT(MONTH FROM time) = EXTRACT(MONTH FROM CURRENT_DATE) AND device_state != '离线' ORDER BY time DESC LIMIT 1; """, (item,)) last_month = fetch_result_as_dict(cur) # print("当月最新一条记录(非离线):", last_month) # 计算当月运行时长 if first_month and last_month: month_running_time = convert_to_seconds(last_month['run_time']) - convert_to_seconds( first_month['run_time']) month_process_time = convert_to_seconds(last_month['process_time']) - convert_to_seconds( first_month['process_time']) month_total_running_time += abs(month_running_time) month_total_process_time += abs(month_process_time) # 获取有记录以来的第一条记录(排除device_state等于‘离线’的记录) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND device_state != '离线' ORDER BY time ASC LIMIT 1; """, (item,)) first_all_time = fetch_result_as_dict(cur) # print("有记录以来的第一条记录(非离线):", first_all_time) # 获取有记录以来的最新一条记录(排除device_state等于‘离线’的记录) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND device_state != '离线' ORDER BY time DESC LIMIT 1; """, (item,)) last_all_time = fetch_result_as_dict(cur) # print("有记录以来的最新一条记录(非离线):", last_all_time) # 计算有记录以来的运行时长 if first_all_time and last_all_time: all_time_running_time = convert_to_seconds(last_all_time['run_time']) - convert_to_seconds( first_all_time['run_time']) all_time_process_time = convert_to_seconds(last_all_time['process_time']) - convert_to_seconds( first_all_time['process_time']) all_time_total_running_time += abs(all_time_running_time) all_time_total_process_time += abs(all_time_process_time) # 计算当天工作效率 if day_total_running_time > day_total_process_time: day_work_rate = day_total_process_time / day_total_running_time if day_total_running_time != 0 else 0 else: day_work_rate = day_total_running_time / day_total_process_time if day_total_process_time != 0 else 0 print("当天工作效率: %s" % day_work_rate) # 计算当月工作效率 if month_total_running_time > month_total_process_time: month_work_rate = month_total_process_time / month_total_running_time if month_total_running_time != 0 else 0 else: month_work_rate = month_total_running_time / month_total_process_time if month_total_process_time != 0 else 0 print("当月工作效率: %s" % month_work_rate) # 计算有记录以来的工作效率 if all_time_total_running_time > all_time_total_process_time: all_time_work_rate = all_time_total_process_time / all_time_total_running_time if all_time_total_running_time != 0 else 0 else: all_time_work_rate = all_time_total_running_time / all_time_total_process_time if all_time_total_process_time != 0 else 0 print("有记录以来的工作效率: %s" % all_time_work_rate) conn.close() # 返回数据 res['data']['day_work_rate'] = day_work_rate res['data']['month_work_rate'] = month_work_rate res['data']['all_time_work_rate'] = all_time_work_rate return json.dumps(res) # 设备运行时长 @http.route('/api/RunningTimeDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*") def RunningTimeDetail(self, **kw): """ 获取 """ res = {'status': 1, 'message': '成功', 'data': {}} # 连接数据库 conn = psycopg2.connect(**db_config) # 获取请求的机床数据 machine_list = ast.literal_eval(kw['machine_list']) time_threshold = datetime.now() - timedelta(days=1) alarm_last_24_time = 0.0 def fetch_result_as_dict(cursor): """辅助函数:将查询结果转为字典""" columns = [desc[0] for desc in cursor.description] return dict(zip(columns, cursor.fetchone())) if cursor.rowcount != 0 else None # 获取当前时间的时间戳 current_timestamp = datetime.now().timestamp() for item in machine_list: euipment_obj = request.env['maintenance.equipment'].sudo().search([('code', '=', item)]) # 机床上线时间段 first_online_duration = current_timestamp - euipment_obj.first_online_time.timestamp() with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND device_state != '离线' AND process_time IS NOT NULL ORDER BY time DESC LIMIT 1; """, (item,)) last_all_time = fetch_result_as_dict(cur) with conn.cursor() as cur: cur.execute(""" SELECT * FROM device_data WHERE device_name = %s AND device_state != '离线' AND time >= %s AND process_time IS NOT NULL ORDER BY time ASC LIMIT 1; """, (item, time_threshold)) last_24_time = fetch_result_as_dict(cur) with conn.cursor() as cur: cur.execute(""" SELECT COUNT(*) FROM ( SELECT DISTINCT ON (idle_start_time) idle_start_time FROM device_data WHERE device_name = %s AND idle_start_time IS NOT NULL AND time >= %s ORDER BY idle_start_time, time ) subquery; """, (item, time_threshold)) idle_count = cur.fetchone()[0] alarm_last_24_nums = [] with conn.cursor() as cur: cur.execute(""" SELECT DISTINCT ON (alarm_start_time) alarm_time, alarm_start_time FROM device_data WHERE device_name = %s AND alarm_start_time IS NOT NULL AND time >= %s; """, (item, time_threshold)) results = cur.fetchall() for result in results: alarm_last_24_nums.append(result[1]) if result[0]: if float(result[0]) >= 1000: continue alarm_last_24_time += float(result[0]) else: alarm_last_24_time += 0.0 # 返回数据 res['data'][item] = { 'wait_time': last_all_time['run_time'] if last_all_time['run_time'] is not None else 0, 'cut_time': last_all_time['process_time'] if last_all_time['process_time'] is not None else 0, 'cut_24_time': last_24_time['process_time'] if last_24_time else 0, 'power_on_time': last_all_time['power_on_time'] if last_all_time['power_on_time'] is not None else 0, 'power_on_24_time': last_24_time['power_on_time'] if last_24_time else 0, 'alarm_last_24_time': alarm_last_24_time, 'alarm_last_24_nums': len(list(set(alarm_last_24_nums))), 'idle_count': idle_count, 'first_online_time': first_online_duration, } conn.close() return json.dumps(res)