增加设备运行时长效率接口

This commit is contained in:
mgw
2024-09-11 14:57:42 +08:00
parent 322718d19a
commit 763d64a1ca

View File

@@ -22,7 +22,11 @@ db_config = {
def convert_to_seconds(time_str):
# 修改正则表达式,使 H、M、S 部分可选
if time_str is None:
return 0
pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?"
match = re.match(pattern, time_str)
if match:
@@ -362,7 +366,8 @@ class Sf_Dashboard_Connect(http.Controller):
# print('detection_nums: %s' % detection_nums)
# 检测量
detection_data = len(plan_data_finish_orders.mapped('production_id.detection_result_ids').filtered(lambda r: r))
detection_data = len(
plan_data_finish_orders.mapped('production_id.detection_result_ids').filtered(lambda r: r))
# 检测合格量
pass_nums = plan_data_finish_orders.filtered(lambda p: any(
@@ -372,10 +377,12 @@ class Sf_Dashboard_Connect(http.Controller):
# 质量合格率
pass_rate = 1
if pass_nums:
pass_rate = round((len(pass_nums) / len(plan_data_finish_orders) if len(plan_data_finish_orders) > 0 else 0), 3)
pass_rate = round(
(len(pass_nums) / len(plan_data_finish_orders) if len(plan_data_finish_orders) > 0 else 0), 3)
# 返工率
rework_rate = round((plan_data_rework_counts / plan_data_finish_counts if plan_data_finish_counts > 0 else 0), 3)
rework_rate = round(
(plan_data_rework_counts / plan_data_finish_counts if plan_data_finish_counts > 0 else 0), 3)
# 交付准时率
delay_num = 0
@@ -928,3 +935,166 @@ class Sf_Dashboard_Connect(http.Controller):
else:
# 如果没有图片数据返回404
return request.not_found()
# 设备运行时长
@http.route('/api/RunningTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def RunningTime(self, **kw):
"""
获取设备运行时长
"""
res = {'status': 1, 'message': '成功', 'data': {}}
# 连接数据库
conn = psycopg2.connect(**db_config)
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
def fetch_result_as_dict(cursor):
"""辅助函数:将查询结果转为字典"""
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, cursor.fetchone())) if cursor.rowcount != 0 else None
# 初始化当天、当月和有记录以来的总时长
day_total_running_time = 0
day_total_process_time = 0
day_work_rate = 0
month_total_running_time = 0
month_total_process_time = 0
month_work_rate = 0
all_time_total_running_time = 0
all_time_total_process_time = 0
all_time_work_rate = 0
for item in machine_list:
# 获取当天第一条记录排除device_state等于离线的记录
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND time::date = CURRENT_DATE
AND device_state != '离线'
ORDER BY time ASC
LIMIT 1;
""", (item,))
first_today = fetch_result_as_dict(cur)
print("当天第一条记录(非离线):", first_today)
# 获取当天最新一条记录排除device_state等于离线的记录
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND time::date = CURRENT_DATE
AND device_state != '离线'
ORDER BY time DESC
LIMIT 1;
""", (item,))
last_today = fetch_result_as_dict(cur)
print("当天最新一条记录(非离线):", last_today)
# 计算当天运行时长
if first_today and last_today:
running_time = convert_to_seconds(last_today['run_time']) - convert_to_seconds(first_today['run_time'])
process_time = convert_to_seconds(last_today['process_time']) - convert_to_seconds(
first_today['process_time'])
day_total_running_time += abs(running_time)
day_total_process_time += abs(process_time)
# 获取当月第一条记录排除device_state等于离线的记录
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND EXTRACT(YEAR FROM time) = EXTRACT(YEAR FROM CURRENT_DATE)
AND EXTRACT(MONTH FROM time) = EXTRACT(MONTH FROM CURRENT_DATE)
AND device_state != '离线'
ORDER BY time ASC
LIMIT 1;
""", (item,))
first_month = fetch_result_as_dict(cur)
print("当月第一条记录(非离线):", first_month)
# 获取当月最新一条记录排除device_state等于离线的记录
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND EXTRACT(YEAR FROM time) = EXTRACT(YEAR FROM CURRENT_DATE)
AND EXTRACT(MONTH FROM time) = EXTRACT(MONTH FROM CURRENT_DATE)
AND device_state != '离线'
ORDER BY time DESC
LIMIT 1;
""", (item,))
last_month = fetch_result_as_dict(cur)
print("当月最新一条记录(非离线):", last_month)
# 计算当月运行时长
if first_month and last_month:
month_running_time = convert_to_seconds(last_month['run_time']) - convert_to_seconds(
first_month['run_time'])
month_process_time = convert_to_seconds(last_month['process_time']) - convert_to_seconds(
first_month['process_time'])
month_total_running_time += abs(month_running_time)
month_total_process_time += abs(month_process_time)
# 获取有记录以来的第一条记录排除device_state等于离线的记录
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND device_state != '离线'
ORDER BY time ASC
LIMIT 1;
""", (item,))
first_all_time = fetch_result_as_dict(cur)
print("有记录以来的第一条记录(非离线):", first_all_time)
# 获取有记录以来的最新一条记录排除device_state等于离线的记录
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND device_state != '离线'
ORDER BY time DESC
LIMIT 1;
""", (item,))
last_all_time = fetch_result_as_dict(cur)
print("有记录以来的最新一条记录(非离线):", last_all_time)
# 计算有记录以来的运行时长
if first_all_time and last_all_time:
all_time_running_time = convert_to_seconds(last_all_time['run_time']) - convert_to_seconds(
first_all_time['run_time'])
all_time_process_time = convert_to_seconds(last_all_time['process_time']) - convert_to_seconds(
first_all_time['process_time'])
all_time_total_running_time += abs(all_time_running_time)
all_time_total_process_time += abs(all_time_process_time)
# 计算当天工作效率
if day_total_running_time > day_total_process_time:
day_work_rate = day_total_process_time / day_total_running_time if day_total_running_time != 0 else 0
else:
day_work_rate = day_total_running_time / day_total_process_time if day_total_process_time != 0 else 0
print("当天工作效率: %s" % day_work_rate)
# 计算当月工作效率
if month_total_running_time > month_total_process_time:
month_work_rate = month_total_process_time / month_total_running_time if month_total_running_time != 0 else 0
else:
month_work_rate = month_total_running_time / month_total_process_time if month_total_process_time != 0 else 0
print("当月工作效率: %s" % month_work_rate)
# 计算有记录以来的工作效率
if all_time_total_running_time > all_time_total_process_time:
all_time_work_rate = all_time_total_process_time / all_time_total_running_time if all_time_total_running_time != 0 else 0
else:
all_time_work_rate = all_time_total_running_time / all_time_total_process_time if all_time_total_process_time != 0 else 0
print("有记录以来的工作效率: %s" % all_time_work_rate)
conn.close()
# 返回数据
res['data']['day_work_rate'] = day_work_rate
res['data']['month_work_rate'] = month_work_rate
res['data']['all_time_work_rate'] = all_time_work_rate
return json.dumps(res)