Files
test/sf_machine_connect/controllers/controllers.py
2024-08-27 10:19:53 +08:00

834 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import re
import ast
import json
import base64
import logging
import psycopg2
from datetime import datetime, timedelta
from odoo import http
from odoo.http import request
# 数据库连接配置
db_config = {
"database": "timeseries_db",
"user": "postgres",
"password": "postgres",
"port": "5432",
"host": "172.16.10.98"
}
def convert_to_seconds(time_str):
# 修改正则表达式,使 H、M、S 部分可选
pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
if total_seconds == 0:
# return None
pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
else:
return None
return total_seconds
class Sf_Dashboard_Connect(http.Controller):
@http.route('/api/get_machine_datas/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def get_machine_datas_list(self, **kw):
"""
拿到机床数据返回给大屏展示
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': []}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 获取当前时间的时间戳
current_timestamp = datetime.now().timestamp()
print(current_timestamp)
# tem_list = [
# "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"
# ]
try:
equipment_obj = request.env['maintenance.equipment'].sudo()
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
for item in machine_list:
machine_data = equipment_obj.search([('code', '=', item)])
# 机床上线时间段
first_online_duration = current_timestamp - int(machine_data.first_online_time.timestamp())
power_off_time = None
power_off_rate = None
if machine_data.machine_power_on_time:
power_off_time = first_online_duration - convert_to_seconds(machine_data.machine_power_on_time)
power_off_rate = round((power_off_time / first_online_duration), 3)
else:
power_off_time = False
power_off_rate = False
if machine_data:
res['data'].append({
'active': machine_data.status,
'id': machine_data.id,
'name': machine_data.name,
'brand': machine_data.type_id.name,
'code': machine_data.code,
'status': machine_data.status,
'run_status': machine_data.run_status,
'run_time': machine_data.run_time,
'system_date': machine_data.system_date,
'system_time': machine_data.system_time,
'cut_time': machine_data.cut_time,
'cut_status': machine_data.cut_status,
'program': machine_data.program,
'program_name': machine_data.program_name,
'program_status': machine_data.program_status,
'tool_num': machine_data.tool_num,
'machine_power_on_time': machine_data.machine_power_on_time,
'product_counts': machine_data.product_counts,
'mode': machine_data.mode,
'start_time': machine_data.start_time,
'end_time': machine_data.end_time,
'program_start_time': machine_data.program_start_time,
'program_end_time': machine_data.program_end_time,
'standby_start_time': machine_data.standby_start_time,
'standby_end_time': machine_data.standby_end_time,
'offline_start_time': machine_data.offline_start_time,
'offline_end_time': machine_data.offline_end_time,
'emg_status': machine_data.emg_status,
'current_program': machine_data.current_program,
'current_program_seq': machine_data.current_program_seq,
'x_abs_pos': machine_data.x_abs_pos,
'y_abs_pos': machine_data.y_abs_pos,
'z_abs_pos': machine_data.z_abs_pos,
'feed_speed_set': machine_data.feed_speed_set,
'act_feed_speed': machine_data.act_feed_speed,
'spindle_speed_set': machine_data.spindle_speed_set,
'act_spindle_speed': machine_data.act_spindle_speed,
'spindle_load': machine_data.spindle_load,
'x_axis_load': machine_data.x_axis_load,
'y_axis_load': machine_data.y_axis_load,
'z_axis_load': machine_data.z_axis_load,
'rapid_feed': machine_data.rapid_feed,
'feed_rate': machine_data.feed_rate,
'x_mach_coord': machine_data.x_mach_coord,
'y_mach_coord': machine_data.y_mach_coord,
'z_mach_coord': machine_data.z_mach_coord,
'x_rel_coord': machine_data.x_rel_coord,
'y_rel_coord': machine_data.y_rel_coord,
'z_rel_coord': machine_data.z_rel_coord,
'x_dis_coord': machine_data.x_dis_coord,
'y_dis_coord': machine_data.y_dis_coord,
'z_dis_coord': machine_data.z_dis_coord,
'alarm_time': machine_data.alarm_time,
'alarm_msg': machine_data.alarm_msg,
'clear_time': machine_data.clear_time,
# 计算出来的数据
# 开动率:运行时间/通电时间
'run_rate': machine_data.run_rate,
# 关机时长:初次上线时间 - 通电时间
'power_off_time': power_off_time,
# 关机率:关机时长/初次上线时间
'power_off_rate': power_off_rate,
'first_online_duration': first_online_duration,
# 停机时间:关机时间 - 运行时间
# 停机时长:关机时间 - 初次上线时间
'img': f'data:image/png;base64,{machine_data.machine_tool_picture.decode("utf-8")}',
'equipment_type': machine_data.category_id.name,
})
return json.dumps(res)
except Exception as e:
logging.info('前端请求机床数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求机床数据失败,原因:%s' % e
return json.JSONEncoder().encode(res)
@http.route('/api/logs/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def logs_list(self, **kw):
"""
拿到日志数据返回给大屏展示
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求日志数据的参数为:%s' % kw)
try:
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
machine_list = ast.literal_eval(kw['machine_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
print('begin_time: %s' % begin_time)
for item in machine_list:
sql = '''
SELECT time, device_state, program_name
FROM device_data
WHERE device_name = %s AND time >= %s AND time <= %s
ORDER BY time ASC;
'''
# 执行SQL命令使用参数绑定
cur.execute(sql, (item, begin_time, end_time))
results = cur.fetchall()
# 将数据按照 equipment_code 进行分组
if item not in res['data']:
res['data'][item] = []
for result in results:
res['data'][item].append({
'time': result[0].strftime('%Y-%m-%d %H:%M:%S'),
'state': result[1],
'production_name': result[2],
})
return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode()
except Exception as e:
logging.info('前端请求日志数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求日志数据失败,原因:%s' % e
return json.dumps(res)
# 返回CNC机床列表
@http.route('/api/CNCList', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def CNCList(self, **kw):
"""
获取CNC机床列表
:param kw:
:return:
"""
# logging.info('CNCList:%s' % kw)
try:
res = {'Succeed': True}
# cnc_list = request.env['sf.cnc.equipment'].sudo().search([])
# cnc_list = ["XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"]
cnc_list_obj = request.env['maintenance.equipment'].sudo().search(
[('function_type', '!=', False), ('active', '=', True)])
cnc_list = list(map(lambda x: x.code, cnc_list_obj))
print('cnc_list: %s' % cnc_list)
res['CNCList'] = cnc_list
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': e}
logging.info('CNCList error:%s' % e)
return json.JSONEncoder().encode(res)
# 返回产线列表
@http.route('/api/LineList', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def LineList(self, **kw):
"""
获取产线列表
:param kw:
:return:
"""
try:
res = {'Succeed': True}
line_list_obj = request.env['sf.production.line'].sudo().search([('name', 'ilike', 'CNC')])
line_list = list(map(lambda x: x.name, line_list_obj))
print('line_list: %s' % line_list)
res['LineList'] = line_list
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': e}
logging.info('LineList error:%s' % e)
return json.JSONEncoder().encode(res)
# 获取产线产量相关
@http.route('/api/LineProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def LineProduct(self, **kw):
"""
获取产线产量相关
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求产线产量数据的参数为:%s' % kw)
try:
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
print('line_list: %s' % line_list)
for line in line_list:
plan_data = plan_obj.search([('production_line_id.name', '=', line)])
# 工单总量
plan_data_total_counts = plan_obj.search_count([('production_line_id.name', '=', line)])
# 工单完成量
plan_data_finish_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'in', ['finished'])])
# 工单计划量
plan_data_plan_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'not in', ['finished'])])
# 工单不良累计
plan_data_fault_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('production_id.state', 'in', ['scrap', 'cancel'])])
# 工单返工数量
plan_data_rework_counts = plan_obj.search_count(
[('production_line_id.name', '=', line), ('production_id.state', 'in', ['rework'])])
# 工单完成率
finishe_rate = round(
(plan_data_finish_counts / plan_data_total_counts if plan_data_total_counts > 0 else 0), 3)
# 工单进度偏差
plan_data_progress_deviation = plan_data_finish_counts - plan_data_plan_counts
if plan_data:
data = {
'plan_data_total_counts': plan_data_total_counts,
'plan_data_finish_counts': plan_data_finish_counts,
'plan_data_plan_counts': plan_data_plan_counts,
'plan_data_fault_counts': plan_data_fault_counts,
'finishe_rate': finishe_rate,
'plan_data_progress_deviation': plan_data_progress_deviation,
'plan_data_rework_counts': plan_data_rework_counts
}
res['data'][line] = data
return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode()
except Exception as e:
logging.info('前端请求产线产量数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求产线产量数据失败,原因:%s' % e
return json.dumps(res)
# 日完成量统计
class DailyFinishCount(http.Controller):
@http.route('/api/DailyFinishCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def DailyFinishCount(self, **kw):
"""
获取日完成量统计
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
print('line_list: %s' % line_list)
def get_date_list(start_date, end_date):
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date)
current_date += timedelta(days=1)
return date_list
for line in line_list:
date_list = get_date_list(begin_time, end_time)
order_counts = []
date_field_name = 'actual_end_time' # 替换为你模型中的实际字段名
for date in date_list:
next_day = date + timedelta(days=1)
orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'not in', ['draft']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
rework_orders = plan_obj.search(
[('production_line_id.name', '=', line), ('state', 'in', ['rework']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
not_passed_orders = plan_obj.search(
[('production_line_id.name', '=', line), ('state', 'in', ['scrap', 'cancel']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
order_counts.append({
'date': date.strftime('%Y-%m-%d'),
'order_count': len(orders),
'rework_orders': len(rework_orders),
'not_passed_orders': len(not_passed_orders)
})
# 外面包一层没什么是包一层不能解决的包一层就能区分了类似于包一层div
# 外面包一层的好处是,可以把多个数据结构打包在一起,方便前端处理
# date_list_dict = {line: order_counts}
res['data'][line] = order_counts
return json.dumps(res)
# 实时产量
@http.route('/api/RealTimeProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def RealTimeProduct(self, **kw):
"""
获取实时产量
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
def get_hourly_intervals(start_time, end_time):
intervals = []
current_time = start_time
while current_time < end_time:
next_hour = current_time + timedelta(hours=1)
intervals.append((current_time, min(next_hour, end_time)))
current_time = next_hour
return intervals
# 当班计划量
for line in line_list:
plan_order_nums = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'not in', ['draft']),
('date_planned_start', '>=', begin_time),
('date_planned_start', '<', end_time)
])
finish_order_nums = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'in', ['finished']),
('date_planned_start', '>=', begin_time),
('date_planned_start', '<', end_time)
])
hourly_intervals = get_hourly_intervals(begin_time, end_time)
production_counts = []
for start, end in hourly_intervals:
orders = plan_obj.search([
('actual_end_time', '>=', start.strftime('%Y-%m-%d %H:%M:%S')),
('actual_end_time', '<', end.strftime('%Y-%m-%d %H:%M:%S')),
('production_line_id.name', '=', line)
])
production_counts.append({
'start_time': start.strftime('%Y-%m-%d %H:%M:%S'),
'end_time': end.strftime('%Y-%m-%d %H:%M:%S'),
'production_count': len(orders)
})
production_counts_dict = {'production_counts': production_counts,
'plan_order_nums': plan_order_nums,
'finish_order_nums': finish_order_nums,
}
res['data'][line] = production_counts_dict
# res['data'].append({line: production_counts})
return json.dumps(res)
# 工单明细
@http.route('/api/OrderDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OrderDetail(self, **kw):
"""
获取工单明细
:param kw:
:return:
"""
# res = {'status': 1, 'message': '成功', 'not_done_data': [], 'done_data': []}
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
print('line_list: %s' % line_list)
not_done_data = []
done_data = []
final_data = {}
for line in line_list:
# 未完成订单
not_done_orders = plan_obj.search(
[('production_line_id.name', '=', line), ('state', 'not in', ['finished'])])
print(not_done_orders)
# 完成订单
finish_orders = plan_obj.search([('production_line_id.name', '=', line), ('state', 'in', ['finished'])])
print(finish_orders)
# 获取所有未完成订单的ID列表
order_ids = [order.id for order in not_done_orders]
# 获取所有已完成订单的ID列表
finish_order_ids = [order.id for order in finish_orders]
# 对ID进行排序
sorted_order_ids = sorted(order_ids)
finish_sorted_order_ids = sorted(finish_order_ids)
# 创建ID与序号的对应关系
id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(sorted_order_ids)}
finish_id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(finish_sorted_order_ids)}
# # 输出结果或进一步处理
# for order_id, sequence in id_to_sequence.items():
# print(f"Order ID: {order_id} - Sequence: {sequence}")
for order in not_done_orders:
blank_name = ''
try:
blank_name = order.production_id.move_raw_ids[0].product_id.name
except:
continue
# blank_name = 'R-S00109-1 [碳素结构钢 Q235-118.0 * 72.0 * 21.0]'
# 正则表达式
material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止
dimensions = blank_name.split('-')[-1].split(']')[0]
# 匹配材料名称
material_match = re.search(material_pattern, blank_name)
material = material_match.group(1) if material_match else 'No match found'
state_dict = {
'draft': '待排程',
'done': '已排程',
'processing': '生产中',
'finished': '已完成'
}
line_dict = {
'sequence': id_to_sequence[order.id],
'workorder_name': order.name,
'blank_name': blank_name,
'material': material,
'dimensions': dimensions,
'order_qty': order.product_qty,
'state': state_dict[order.state],
}
not_done_data.append(line_dict)
for finish_order in finish_orders:
blank_name = ''
try:
blank_name = finish_order.production_id.move_raw_ids[0].product_id.name
except:
continue
material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止
dimensions = blank_name.split('-')[-1].split(']')[0]
# 匹配材料名称
material_match = re.search(material_pattern, blank_name)
material = material_match.group(1) if material_match else 'No match found'
line_dict = {
'sequence': finish_id_to_sequence[finish_order.id],
'workorder_name': finish_order.name,
'blank_name': blank_name,
'material': material,
'dimensions': dimensions,
'order_qty': finish_order.product_qty,
'finish_time': finish_order.actual_end_time.strftime('%Y-%m-%d %H:%M:%S'),
}
done_data.append(line_dict)
# 开始包一层
res['data'][line] = {'not_done_data': not_done_data, 'done_data': done_data}
return json.dumps(res)
# 查询pg库来获得待机次数
@http.route('/api/IdleAlarmCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def idle_alarm_count(self, **kw):
"""
查询设备的待机次数
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
total_alarm_time = 0
alarm_count_num = 0
for item in machine_list:
sql = '''
SELECT COUNT(*)
FROM (
SELECT DISTINCT ON (idle_start_time) idle_start_time
FROM device_data
WHERE device_name = %s AND idle_start_time IS NOT NULL
ORDER BY idle_start_time, time
) subquery;
'''
sql2 = '''
SELECT DISTINCT ON (alarm_time) alarm_time, alarm_repair_time
FROM device_data
WHERE device_name = %s AND alarm_time IS NOT NULL
ORDER BY alarm_time, time;
'''
# 执行SQL命令
cur.execute(sql, (item,))
result = cur.fetchall()
print('result========', result)
cur.execute(sql2, (item,))
result2 = cur.fetchall()
print('result2========', result2)
#
for row in result:
res['data'][item] = {'idle_count': row[0]}
alarm_count = []
for row in result2:
alarm_count.append(row[0])
total_alarm_time += abs(float(row[0]))
if len(list(set(alarm_count))) == 1:
if list(set(alarm_count))[0] is None:
alarm_count_num = 0
else:
alarm_count_num = 1
else:
alarm_count_num = len(list(set(alarm_count)))
res['data'][item]['total_alarm_time'] = total_alarm_time / 3600
res['data'][item]['alarm_count_num'] = alarm_count_num
# 返回统计结果
return json.dumps(res)
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
finally:
cur.close()
conn.close()
# 查询pg库来获得异常情况
@http.route('/api/alarm/logs', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def alarm_logs(self, **kw):
"""
查询设备的异常情况
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
# 获取请求的机床数据
# machine_list = ast.literal_eval(kw['machine_list'])
# idle_times = []
# idle_dict = {}
# for item in machine_list:
sql = '''
SELECT DISTINCT ON (alarm_time) alarm_time, alarm_message, system_date, system_time, alarm_repair_time
FROM device_data
WHERE alarm_time IS NOT NULL
ORDER BY alarm_time, time;
'''
# 执行SQL命令
cur.execute(sql)
result = cur.fetchall()
print('result', result)
# 将查询结果转换为字典列表
data = []
for row in result:
record = {
'alarm_time': row[0],
'alarm_message': row[1],
'system_date': row[2],
'system_time': row[3],
'alarm_repair_time': row[4]
}
data.append(record)
# 将数据填充到返回结果中
res['data'] = data
# 返回统计结果
return json.dumps(res, ensure_ascii=False)
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
finally:
cur.close()
conn.close()
# 设备oee
@http.route('/api/OEE', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OEE(self, **kw):
"""
获取产线等oee
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求oee数据的参数为:%s' % kw)
try:
count_oee = 1
workcenter_obj = request.env['mrp.workcenter'].sudo()
workcenter_list = ast.literal_eval(kw['workcenter_list'])
print('workcenter_list: %s' % workcenter_list)
for line in workcenter_list:
res['data'][line] = workcenter_obj.search([('name', '=', line)]).oee
count_oee *= workcenter_obj.search([('name', '=', line)]).oee
res['data']['综合oee'] = count_oee / 1000000
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
# # 查询某段时间的设备oee
# @http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
# def OEEByTime(self, **kw):
# """
# 获取某段时间的oee
# """
# res = {'status': 1, 'message': '成功', 'data': {}}
# logging.info('前端请求获取某段时间的oee的参数为:%s' % kw)
# workcenter_list = ast.literal_eval(kw['workcenter_list'])
# begin_time_str = kw['begin_time'].strip('"')
# end_time_str = kw['end_time'].strip('"')
# begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
# end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# print('workcenter_list: %s' % workcenter_list)
# # 连接数据库
# conn = psycopg2.connect(**db_config)
# cur = conn.cursor()
# # 查询并计算OEE平均值
# oee_data = {}
# for workcenter in workcenter_list:
# cur.execute("""
# SELECT AVG(oee) as avg_oee
# FROM oee_data
# WHERE workcenter_name = %s
# AND time BETWEEN %s AND %s
# """, (workcenter, begin_time, end_time))
#
# result = cur.fetchone()
# avg_oee = result[0] if result else 0.0
# oee_data[workcenter] = avg_oee
#
# # 返回数据
# res['data'] = oee_data
# return json.dumps(res)
@http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OEEByTime(self, **kw):
"""
获取某段时间的OEE根据用户指定的时间单位day或hour返回对应的平均值。
如果不传time_unit则默认按天返回并补全没有数据的时间段填充0值。
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求获取某段时间的OEE的参数为:%s' % kw)
# 获取并解析参数
workcenter_list = ast.literal_eval(kw['workcenter_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
time_unit = kw.get('time_unit', 'day') # 默认单位为天
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
# 根据时间单位选择不同的时间格式
if time_unit == 'hour':
time_format = 'YYYY-MM-DD HH24:00:00'
time_delta = timedelta(hours=1)
else: # 默认为'day'
time_format = 'YYYY-MM-DD'
time_delta = timedelta(days=1)
# 查询并计算OEE平均值
oee_data = {}
for workcenter in workcenter_list:
cur.execute(f"""
SELECT to_char(time, '{time_format}') as time_unit, AVG(oee) as avg_oee
FROM oee_data
WHERE workcenter_name = %s
AND time BETWEEN %s AND %s
GROUP BY time_unit
ORDER BY time_unit
""", (workcenter, begin_time, end_time))
results = cur.fetchall()
# 初始化当前产线的OEE数据字典
workcenter_oee = {row[0]: row[1] for row in results}
# 补全缺失的时间段
current_time = begin_time
if time_unit != 'hour':
while current_time <= end_time:
time_key = current_time.strftime('%Y-%m-%d')
if time_key not in workcenter_oee:
workcenter_oee[time_key] = 0
current_time += time_delta
# 按时间排序
oee_data[workcenter] = dict(sorted(workcenter_oee.items()))
# 关闭数据库连接
cur.close()
conn.close()
# 返回数据
res['data'] = oee_data
return json.dumps(res)