Files
test/sf_machine_connect/controllers/controllers.py
2024-08-23 11:44:05 +08:00

766 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import re
import ast
import json
import base64
import logging
import psycopg2
from datetime import datetime, timedelta
from odoo import http
from odoo.http import request
# 数据库连接配置
db_config = {
"database": "timeseries_db",
"user": "postgres",
"password": "postgres",
"port": "5432",
"host": "172.16.10.98"
}
def convert_to_seconds(time_str):
# 修改正则表达式,使 H、M、S 部分可选
pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
if total_seconds == 0:
# return None
pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
else:
return None
return total_seconds
class Sf_Dashboard_Connect(http.Controller):
@http.route('/api/get_machine_datas/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def get_machine_datas_list(self, **kw):
"""
拿到机床数据返回给大屏展示
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': []}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 获取当前时间的时间戳
current_timestamp = datetime.now().timestamp()
print(current_timestamp)
# tem_list = [
# "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"
# ]
try:
equipment_obj = request.env['maintenance.equipment'].sudo()
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
for item in machine_list:
machine_data = equipment_obj.search([('code', '=', item)])
# 机床上线时间段
first_online_duration = current_timestamp - int(machine_data.first_online_time.timestamp())
power_off_time = None
power_off_rate = None
if machine_data.machine_power_on_time:
power_off_time = first_online_duration - convert_to_seconds(machine_data.machine_power_on_time)
power_off_rate = round((power_off_time / first_online_duration), 3)
else:
power_off_time = False
power_off_rate = False
if machine_data:
res['data'].append({
'active': machine_data.status,
'id': machine_data.id,
'name': machine_data.name,
'brand': machine_data.type_id.name,
'code': machine_data.code,
'status': machine_data.status,
'run_status': machine_data.run_status,
'run_time': machine_data.run_time,
'system_date': machine_data.system_date,
'system_time': machine_data.system_time,
'cut_time': machine_data.cut_time,
'cut_status': machine_data.cut_status,
'program': machine_data.program,
'program_name': machine_data.program_name,
'program_status': machine_data.program_status,
'tool_num': machine_data.tool_num,
'machine_power_on_time': machine_data.machine_power_on_time,
'product_counts': machine_data.product_counts,
'mode': machine_data.mode,
'start_time': machine_data.start_time,
'end_time': machine_data.end_time,
'program_start_time': machine_data.program_start_time,
'program_end_time': machine_data.program_end_time,
'standby_start_time': machine_data.standby_start_time,
'standby_end_time': machine_data.standby_end_time,
'offline_start_time': machine_data.offline_start_time,
'offline_end_time': machine_data.offline_end_time,
'emg_status': machine_data.emg_status,
'current_program': machine_data.current_program,
'current_program_seq': machine_data.current_program_seq,
'x_abs_pos': machine_data.x_abs_pos,
'y_abs_pos': machine_data.y_abs_pos,
'z_abs_pos': machine_data.z_abs_pos,
'feed_speed_set': machine_data.feed_speed_set,
'act_feed_speed': machine_data.act_feed_speed,
'spindle_speed_set': machine_data.spindle_speed_set,
'act_spindle_speed': machine_data.act_spindle_speed,
'spindle_load': machine_data.spindle_load,
'x_axis_load': machine_data.x_axis_load,
'y_axis_load': machine_data.y_axis_load,
'z_axis_load': machine_data.z_axis_load,
'rapid_feed': machine_data.rapid_feed,
'feed_rate': machine_data.feed_rate,
'x_mach_coord': machine_data.x_mach_coord,
'y_mach_coord': machine_data.y_mach_coord,
'z_mach_coord': machine_data.z_mach_coord,
'x_rel_coord': machine_data.x_rel_coord,
'y_rel_coord': machine_data.y_rel_coord,
'z_rel_coord': machine_data.z_rel_coord,
'x_dis_coord': machine_data.x_dis_coord,
'y_dis_coord': machine_data.y_dis_coord,
'z_dis_coord': machine_data.z_dis_coord,
'alarm_time': machine_data.alarm_time,
'alarm_msg': machine_data.alarm_msg,
'clear_time': machine_data.clear_time,
# 计算出来的数据
# 开动率:运行时间/通电时间
'run_rate': machine_data.run_rate,
# 关机时长:初次上线时间 - 通电时间
'power_off_time': power_off_time,
# 关机率:关机时长/初次上线时间
'power_off_rate': power_off_rate,
'first_online_duration': first_online_duration,
# 停机时间:关机时间 - 运行时间
# 停机时长:关机时间 - 初次上线时间
'img': f'data:image/png;base64,{machine_data.machine_tool_picture.decode("utf-8")}',
})
return json.dumps(res)
except Exception as e:
logging.info('前端请求机床数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求机床数据失败,原因:%s' % e
return json.JSONEncoder().encode(res)
# @http.route('/api/logs/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
# cors="*")
# def logs_list(self, **kw):
# """
# 拿到日志数据返回给大屏展示
# :param kw:
# :return:
# """
# res = {'status': 1, 'message': '成功', 'data': []}
# logging.info('前端请求日志数据的参数为:%s' % kw)
#
# try:
# # 获取请求的日志数据
# logs_obj = request.env['maintenance.equipment.oee.log.detail'].sudo()
# # 获取请求的机床数据
# machine_list = ast.literal_eval(kw['machine_list'])
# begin_time_str = kw['begin_time'].strip('"')
# end_time_str = kw['end_time'].strip('"')
#
# begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
# end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
#
# print('begin_time: %s' % begin_time)
# for item in machine_list:
# log_datas = logs_obj.search(
# [('equipment_code', '=', item), ('time', '>=', begin_time), ('time', '<=', end_time)])
# print('log_datas: %s' % log_datas)
# for log_data in log_datas:
# res['data'].append({
# 'equipment_code': log_data.equipment_code,
# 'time': log_data.time.strftime('%Y-%m-%d %H:%M:%S'),
# 'state': log_data.state
#
# })
#
# return json.JSONEncoder().encode(res)
#
# except Exception as e:
# logging.info('前端请求日志数据失败,原因:%s' % e)
# res['status'] = -1
# res['message'] = '前端请求日志数据失败,原因:%s' % e
# return json.JSONEncoder().encode(res)
@http.route('/api/logs/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def logs_list(self, **kw):
"""
拿到日志数据返回给大屏展示
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求日志数据的参数为:%s' % kw)
try:
# 获取请求的日志数据
logs_obj = request.env['maintenance.equipment.oee.log.detail'].sudo()
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
print('begin_time: %s' % begin_time)
for item in machine_list:
log_datas = logs_obj.search(
[('equipment_code', '=', item), ('time', '>=', begin_time), ('time', '<=', end_time)])
print('log_datas: %s' % log_datas)
# 将数据按照 equipment_code 进行分组
if item not in res['data']:
res['data'][item] = []
for log_data in log_datas:
res['data'][item].append({
'time': log_data.time.strftime('%Y-%m-%d %H:%M:%S'),
'state': log_data.state,
'production_name': log_data.production_name,
})
return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode()
except Exception as e:
logging.info('前端请求日志数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求日志数据失败,原因:%s' % e
return json.dumps(res)
# 返回CNC机床列表
@http.route('/api/CNCList', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def CNCList(self, **kw):
"""
获取CNC机床列表
:param kw:
:return:
"""
# logging.info('CNCList:%s' % kw)
try:
res = {'Succeed': True}
# cnc_list = request.env['sf.cnc.equipment'].sudo().search([])
# cnc_list = ["XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"]
cnc_list_obj = request.env['maintenance.equipment'].sudo().search(
[('function_type', '!=', False), ('active', '=', True)])
cnc_list = list(map(lambda x: x.code, cnc_list_obj))
print('cnc_list: %s' % cnc_list)
res['CNCList'] = cnc_list
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': e}
logging.info('CNCList error:%s' % e)
return json.JSONEncoder().encode(res)
# 返回产线列表
@http.route('/api/LineList', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def LineList(self, **kw):
"""
获取产线列表
:param kw:
:return:
"""
try:
res = {'Succeed': True}
line_list_obj = request.env['sf.production.line'].sudo().search([('name', 'ilike', 'CNC')])
line_list = list(map(lambda x: x.name, line_list_obj))
print('line_list: %s' % line_list)
res['LineList'] = line_list
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': e}
logging.info('LineList error:%s' % e)
return json.JSONEncoder().encode(res)
# 获取产线产量相关
@http.route('/api/LineProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def LineProduct(self, **kw):
"""
获取产线产量相关
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求产线产量数据的参数为:%s' % kw)
try:
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
print('line_list: %s' % line_list)
for line in line_list:
plan_data = plan_obj.search([('production_line_id.name', '=', line)])
# 工单总量
plan_data_total_counts = plan_obj.search_count([('production_line_id.name', '=', line)])
# 工单完成量
plan_data_finish_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'not in', ['draft'])])
# 工单计划量
plan_data_plan_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'not in', ['finished'])])
# 工单不良累计
plan_data_fault_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('production_id.state', 'in', ['scrap', 'cancel'])])
# 工单返工数量
plan_data_rework_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('production_id.state', 'in', ['rework'])])
# 工单完成率
finishe_rate = round(
(plan_data_finish_counts / plan_data_total_counts if plan_data_total_counts > 0 else 0), 3)
# 工单进度偏差
plan_data_progress_deviation = plan_data_finish_counts - plan_data_plan_counts
if plan_data:
data = {
'plan_data_total_counts': plan_data_total_counts,
'plan_data_finish_counts': plan_data_finish_counts,
'plan_data_plan_counts': plan_data_plan_counts,
'plan_data_fault_counts': plan_data_fault_counts,
'finishe_rate': finishe_rate,
'plan_data_progress_deviation': plan_data_progress_deviation,
'plan_data_rework_counts': plan_data_rework_counts
}
res['data'][line] = data
return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode()
except Exception as e:
logging.info('前端请求产线产量数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求产线产量数据失败,原因:%s' % e
return json.dumps(res)
# 日完成量统计
class DailyFinishCount(http.Controller):
@http.route('/api/DailyFinishCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def DailyFinishCount(self, **kw):
"""
获取日完成量统计
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
print('line_list: %s' % line_list)
def get_date_list(start_date, end_date):
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date)
current_date += timedelta(days=1)
return date_list
for line in line_list:
date_list = get_date_list(begin_time, end_time)
order_counts = []
date_field_name = 'actual_end_time' # 替换为你模型中的实际字段名
for date in date_list:
next_day = date + timedelta(days=1)
orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'not in', ['draft']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
rework_orders = plan_obj.search(
[('production_line_id.name', '=', line), ('state', 'in', ['rework']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
not_passed_orders = plan_obj.search(
[('production_line_id.name', '=', line), ('state', 'in', ['scrap', 'cancel']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
order_counts.append({
'date': date.strftime('%Y-%m-%d'),
'order_count': len(orders),
'rework_orders': len(rework_orders),
'not_passed_orders': len(not_passed_orders)
})
# 外面包一层没什么是包一层不能解决的包一层就能区分了类似于包一层div
# 外面包一层的好处是,可以把多个数据结构打包在一起,方便前端处理
# date_list_dict = {line: order_counts}
res['data'][line] = order_counts
return json.dumps(res)
# 实时产量
@http.route('/api/RealTimeProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def RealTimeProduct(self, **kw):
"""
获取实时产量
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
def get_hourly_intervals(start_time, end_time):
intervals = []
current_time = start_time
while current_time < end_time:
next_hour = current_time + timedelta(hours=1)
intervals.append((current_time, min(next_hour, end_time)))
current_time = next_hour
return intervals
# 当班计划量
for line in line_list:
plan_order_nums = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'not in', ['draft']),
('date_planned_start', '>=', begin_time),
('date_planned_start', '<', end_time)
])
finish_order_nums = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'in', ['finished']),
('date_planned_start', '>=', begin_time),
('date_planned_start', '<', end_time)
])
hourly_intervals = get_hourly_intervals(begin_time, end_time)
production_counts = []
for start, end in hourly_intervals:
orders = plan_obj.search([
('actual_end_time', '>=', start.strftime('%Y-%m-%d %H:%M:%S')),
('actual_end_time', '<', end.strftime('%Y-%m-%d %H:%M:%S')),
('production_line_id.name', '=', line)
])
production_counts.append({
'start_time': start.strftime('%Y-%m-%d %H:%M:%S'),
'end_time': end.strftime('%Y-%m-%d %H:%M:%S'),
'production_count': len(orders)
})
production_counts_dict = {'production_counts': production_counts,
'plan_order_nums': plan_order_nums,
'finish_order_nums': finish_order_nums,
}
res['data'][line] = production_counts_dict
# res['data'].append({line: production_counts})
return json.dumps(res)
# 工单明细
@http.route('/api/OrderDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OrderDetail(self, **kw):
"""
获取工单明细
:param kw:
:return:
"""
# res = {'status': 1, 'message': '成功', 'not_done_data': [], 'done_data': []}
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
print('line_list: %s' % line_list)
not_done_data = []
done_data = []
final_data = {}
for line in line_list:
# 未完成订单
not_done_orders = plan_obj.search(
[('production_line_id.name', '=', line), ('state', 'not in', ['finished'])])
print(not_done_orders)
# 完成订单
finish_orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'in', ['finished'])])
print(finish_orders)
# 获取所有未完成订单的ID列表
order_ids = [order.id for order in not_done_orders]
# 获取所有已完成订单的ID列表
finish_order_ids = [order.id for order in finish_orders]
# 对ID进行排序
sorted_order_ids = sorted(order_ids)
finish_sorted_order_ids = sorted(finish_order_ids)
# 创建ID与序号的对应关系
id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(sorted_order_ids)}
finish_id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(finish_sorted_order_ids)}
# # 输出结果或进一步处理
# for order_id, sequence in id_to_sequence.items():
# print(f"Order ID: {order_id} - Sequence: {sequence}")
for order in not_done_orders:
blank_name = ''
try:
blank_name = order.production_id.move_raw_ids[0].product_id.name
except:
continue
# blank_name = 'R-S00109-1 [碳素结构钢 Q235-118.0 * 72.0 * 21.0]'
# 正则表达式
material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止
dimensions = blank_name.split('-')[-1].split(']')[0]
# 匹配材料名称
material_match = re.search(material_pattern, blank_name)
material = material_match.group(1) if material_match else 'No match found'
state_dict = {
'draft': '待排程',
'done': '已排程',
'processing': '生产中',
'finished': '已完成'
}
line_dict = {
'sequence': id_to_sequence[order.id],
'workorder_name': order.name,
'blank_name': blank_name,
'material': material,
'dimensions': dimensions,
'order_qty': order.product_qty,
'state': state_dict[order.state],
}
not_done_data.append(line_dict)
for finish_order in finish_orders:
blank_name = ''
try:
blank_name = finish_order.production_id.move_raw_ids[0].product_id.name
except:
continue
material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止
dimensions = blank_name.split('-')[-1].split(']')[0]
# 匹配材料名称
material_match = re.search(material_pattern, blank_name)
material = material_match.group(1) if material_match else 'No match found'
line_dict = {
'sequence': finish_id_to_sequence[finish_order.id],
'workorder_name': finish_order.name,
'blank_name': blank_name,
'material': material,
'dimensions': dimensions,
'order_qty': finish_order.product_qty,
'finish_time': finish_order.actual_end_time.strftime('%Y-%m-%d %H:%M:%S'),
}
done_data.append(line_dict)
# 开始包一层
res['data'][line] = {'not_done_data': not_done_data, 'done_data': done_data}
return json.dumps(res)
# 查询pg库来获得待机次数
@http.route('/api/IdleAlarmCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def idle_count(self, **kw):
"""
查询设备的待机次数
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
idle_times = []
idle_dict = {}
for item in machine_list:
sql = '''
SELECT idle_start_time,alarm_time,alarm_repair_time FROM device_data WHERE device_name = %s;
'''
# 执行SQL命令
cur.execute(sql, (item,))
result = cur.fetchall()
# # print('result', result)
#
# # 将查询结果添加到idle_times列表中
# idle_times = [row[0] for row in result if row[0] is not None]
#
# # 对结果去重
# unique_idle_times = set(idle_times)
# # print('unique_idle_times', unique_idle_times)
#
# # 统计去重后的数量
# idle_count = len(unique_idle_times)
# # idle_dict[item] = idle_count
#
# res['data'][item] = idle_count
total_alarm_time = 0
alarm_count = 0
alarm_time_list = []
idle_times = []
alarm_times = []
for row in result:
idle_start_time = row[0]
alarm_time = row[1]
alarm_repair_time = row[2]
alarm_time_list.append(alarm_time) # 将时长累加,以秒为单位
idle_times.append(idle_start_time)
# if alarm_repair_time is not None:
# alarm_times.append(alarm_repair_time)
alarm_times.append(alarm_repair_time)
# 对结果去重
unique_total_alarm_time = set(alarm_time_list)
unique_idle_times = set(idle_times)
unique_alarm_times = set(alarm_times)
# 统计去重后的数量
idle_count = len(unique_idle_times)
for alarm_time in unique_total_alarm_time:
if alarm_time is not None:
total_alarm_time += abs(float(alarm_time))
alarm_count = len(unique_alarm_times) if unique_alarm_times else 0
alarm_count = alarm_count if total_alarm_time else 0
# 存储待机次数和总待机时长(单位:秒)
res['data'][item] = {
'idle_count': idle_count,
'total_alarm_time': total_alarm_time / 3600, # 以秒为单位
'alarm_count': alarm_count
}
# 返回统计结果
return json.dumps(res)
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
finally:
cur.close()
conn.close()
# 查询pg库来获得异常情况
@http.route('/api/alarm/logs', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def idle_count(self, **kw):
"""
查询设备的异常情况
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
# 获取请求的机床数据
# machine_list = ast.literal_eval(kw['machine_list'])
# idle_times = []
# idle_dict = {}
# for item in machine_list:
sql = '''
SELECT DISTINCT ON (alarm_time) alarm_time, alarm_message, system_date, system_time, alarm_repair_time
FROM device_data
WHERE alarm_time IS NOT NULL
ORDER BY alarm_time, time;
'''
# 执行SQL命令
cur.execute(sql)
result = cur.fetchall()
print('result', result)
# 将查询结果转换为字典列表
data = []
for row in result:
record = {
'alarm_time': row[0],
'alarm_message': row[1],
'system_date': row[2],
'system_time': row[3],
'alarm_repair_time': row[4]
}
data.append(record)
# 将数据填充到返回结果中
res['data'] = data
# 返回统计结果
return json.dumps(res, ensure_ascii=False)
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
finally:
cur.close()
conn.close()