Files
test/sf_machine_connect/controllers/controllers.py
2025-06-20 15:39:11 +08:00

1913 lines
86 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import re
import ast
import json
import base64
import logging
import psycopg2
from datetime import datetime, timedelta, timezone
from odoo import http, fields
from odoo.http import request
# 数据库连接配置
db_config = {
"database": "timeseries_db",
"user": "postgres",
"password": "postgres",
"port": "5432",
"host": "172.16.10.131"
}
def convert_to_seconds(time_str):
# 修改正则表达式,使 H、M、S 部分可选
if time_str is None:
return 0
if time_str == 0:
return 0
pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
if total_seconds == 0:
# return None
pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
else:
return None
return total_seconds
class Sf_Dashboard_Connect(http.Controller):
@http.route('/api/get_machine_datas/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def get_machine_datas_list(self, **kw):
"""
拿到机床数据返回给大屏展示
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': []}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 获取当前时间的时间戳
current_timestamp = datetime.now().timestamp()
# print(current_timestamp)
# tem_list = [
# "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"
# ]
try:
equipment_obj = request.env['maintenance.equipment'].sudo()
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
for item in machine_list:
machine_data = equipment_obj.search([('code', '=', item)])
# 机床上线时间段
first_online_duration = current_timestamp - int(machine_data.first_online_time.timestamp())
power_off_time = None
power_off_rate = None
if machine_data.machine_power_on_time:
power_off_time = first_online_duration - convert_to_seconds(machine_data.machine_power_on_time)
power_off_rate = round((power_off_time / first_online_duration), 3)
else:
power_off_time = False
power_off_rate = False
if machine_data:
res['data'].append({
'active': machine_data.status,
'id': machine_data.id,
'name': machine_data.name,
'brand': machine_data.type_id.name,
'code': machine_data.code,
'status': machine_data.status,
'run_status': machine_data.run_status,
'run_time': machine_data.run_time,
'system_date': machine_data.system_date,
'system_time': machine_data.system_time,
'cut_time': machine_data.cut_time,
'cut_status': machine_data.cut_status,
'program': machine_data.program,
'program_name': machine_data.program_name,
'program_status': machine_data.program_status,
'tool_num': machine_data.tool_num,
'machine_power_on_time': machine_data.machine_power_on_time,
'product_counts': machine_data.product_counts,
'mode': machine_data.mode,
'start_time': machine_data.start_time,
'end_time': machine_data.end_time,
'program_start_time': machine_data.program_start_time,
'program_end_time': machine_data.program_end_time,
'standby_start_time': machine_data.standby_start_time,
'standby_end_time': machine_data.standby_end_time,
'offline_start_time': machine_data.offline_start_time,
'offline_end_time': machine_data.offline_end_time,
'emg_status': machine_data.emg_status,
'current_program': machine_data.current_program,
'current_program_seq': machine_data.current_program_seq,
'x_abs_pos': machine_data.x_abs_pos,
'y_abs_pos': machine_data.y_abs_pos,
'z_abs_pos': machine_data.z_abs_pos,
'feed_speed_set': machine_data.feed_speed_set,
'act_feed_speed': machine_data.act_feed_speed,
'spindle_speed_set': machine_data.spindle_speed_set,
'act_spindle_speed': machine_data.act_spindle_speed,
'spindle_load': machine_data.spindle_load,
'x_axis_load': machine_data.x_axis_load,
'y_axis_load': machine_data.y_axis_load,
'z_axis_load': machine_data.z_axis_load,
'rapid_feed': machine_data.rapid_feed,
'feed_rate': machine_data.feed_rate,
'x_mach_coord': machine_data.x_mach_coord,
'y_mach_coord': machine_data.y_mach_coord,
'z_mach_coord': machine_data.z_mach_coord,
'x_rel_coord': machine_data.x_rel_coord,
'y_rel_coord': machine_data.y_rel_coord,
'z_rel_coord': machine_data.z_rel_coord,
'x_dis_coord': machine_data.x_dis_coord,
'y_dis_coord': machine_data.y_dis_coord,
'z_dis_coord': machine_data.z_dis_coord,
'alarm_time': machine_data.alarm_time,
'alarm_msg': machine_data.alarm_msg,
'clear_time': machine_data.clear_time,
# 计算出来的数据
# 开动率:运行时间/通电时间
'run_rate': machine_data.run_rate,
# 关机时长:初次上线时间 - 通电时间
'power_off_time': power_off_time,
# 关机率:关机时长/初次上线时间
# 'power_off_rate': power_off_rate,
'first_online_duration': first_online_duration,
# 停机时间:关机时间 - 运行时间
# 停机时长:关机时间 - 初次上线时间
# 'img': f'data:image/png;base64,{machine_data.machine_tool_picture.decode("utf-8")}',
'img': f'https://xt.sf.jikimo.com/equipment/get_image/{machine_data.id}',
'equipment_type': machine_data.category_id.name,
})
return json.dumps(res)
except Exception as e:
logging.info('前端请求机床数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求机床数据失败,原因:%s' % e
return json.JSONEncoder().encode(res)
@http.route('/api/logs/list', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def logs_list(self, **kw):
"""
拿到日志数据返回给大屏展示
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求日志数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
machine_list = ast.literal_eval(kw['machine_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# print('begin_time: %s' % begin_time)
for item in machine_list:
sql = '''
SELECT time, device_state, program_name
FROM device_data
WHERE device_name = %s AND time >= %s AND time <= %s
ORDER BY time DESC;
'''
# 执行SQL命令使用参数绑定
cur.execute(sql, (item, begin_time, end_time))
results = cur.fetchall()
# 将数据按照 equipment_code 进行分组
if item not in res['data']:
res['data'][item] = []
for result in results:
res['data'][item].append({
'time': result[0].strftime('%Y-%m-%d %H:%M:%S'),
'state': result[1],
'production_name': result[2],
})
return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode()
except Exception as e:
logging.info('前端请求日志数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求日志数据失败,原因:%s' % e
return json.dumps(res)
finally:
if cur:
cur.close()
if conn:
conn.close()
@http.route('/api/logs/page_data', type='http', auth='public', methods=['GET', 'POST'],
csrf=False, cors="*")
def logs_page_data(self, **kw):
"""
拿到日志数据返回给大屏展示(支持时间戳分页)
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求日志数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
# 获取并解析传递的参数
machine_list = ast.literal_eval(kw.get('machine_list', '[]'))
begin_time_str = kw.get('begin_time', '').strip('"')
end_time_str = kw.get('end_time', '').strip('"')
page = int(kw.get('page', 1)) # 默认页码为1
page_size = int(kw.get('page_size', 80)) # 默认每页条数为10
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# 计算分页的 offset
offset = (page - 1) * page_size
# 先查询符合条件的总记录数
total_records = 0
for item in machine_list:
count_sql = '''
SELECT COUNT(*)
FROM device_data
WHERE device_name = %s AND time >= %s AND time <= %s;
'''
# 执行总记录数查询
cur.execute(count_sql, (item, begin_time, end_time))
record_count = cur.fetchone()[0] # 获取总记录数
total_records += record_count
# 计算总页数
if total_records > 0:
total_pages = (total_records + page_size - 1) // page_size # 向上取整
else:
total_pages = 0
# 将总页数和总记录数返回到响应中
res['total_records'] = total_records
res['total_pages'] = total_pages
for item in machine_list:
sql = '''
SELECT time, device_state, program_name
FROM device_data
WHERE device_name = %s AND time >= %s AND time <= %s
ORDER BY time DESC
LIMIT %s OFFSET %s;
'''
# 执行SQL命令使用参数绑定
cur.execute(sql, (item, begin_time, end_time, page_size, offset))
results = cur.fetchall()
# 将数据按照 equipment_code 进行分组
if item not in res['data']:
res['data'][item] = []
for result in results:
res['data'][item].append({
'time': result[0].strftime('%Y-%m-%d %H:%M:%S'),
'state': result[1],
'production_name': result[2],
})
return json.dumps(res) # 返回分页数据
except Exception as e:
logging.info('前端请求日志数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求日志数据失败,原因:%s' % e
return json.dumps(res)
finally:
if cur:
cur.close()
if conn:
conn.close()
# 返回CNC机床列表
@http.route('/api/CNCList', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def CNCList(self, **kw):
"""
获取CNC机床列表
:param kw:
:return:
"""
# logging.info('CNCList:%s' % kw)
try:
res = {'Succeed': True}
# cnc_list = request.env['sf.cnc.equipment'].sudo().search([])
# cnc_list = ["XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-1", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-3",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-4", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-5",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-6", "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-7",
# "XT-GNJC-LSZX-X800-Y550-Z550-T24-A3-8", "XT-GNJC-WZZX-X800-Y550-Z550-T24-A5-2",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-9", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-10",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-11", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-12",
# "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-13", "XT-GNJC-GSZG-X600-Y400-Z350-T21-A3-14"]
cnc_list_obj = request.env['maintenance.equipment'].sudo().search(
[('function_type', '!=', False), ('active', '=', True)])
cnc_list = list(map(lambda x: x.code, cnc_list_obj))
print('cnc_list: %s' % cnc_list)
res['CNCList'] = cnc_list
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': e}
logging.info('CNCList error:%s' % e)
return json.JSONEncoder().encode(res)
# 返回产线列表
@http.route('/api/LineList', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def LineList(self, **kw):
"""
获取产线列表
:param kw:
:return:
"""
try:
res = {'Succeed': True}
line_list_obj = request.env['sf.production.line'].sudo().search([('name', 'ilike', 'CNC')])
line_list = list(map(lambda x: x.name, line_list_obj))
# print('line_list: %s' % line_list)
res['LineList'] = ['业绩总览']
res['LineList'] += line_list
res['LineList'].append('人工线下加工中心')
# 增加“业绩总览”与“人工线下加工中心”
except Exception as e:
res = {'Succeed': False, 'ErrorCode': 202, 'Error': e}
logging.info('LineList error:%s' % e)
return json.JSONEncoder().encode(res)
# 获取产线产量相关
@http.route('/api/LineProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False,
cors="*")
def LineProduct(self, **kw):
"""
获取产线产量相关
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求产线产量数据的参数为:%s' % kw)
try:
plan_obj = request.env['sf.production.plan'].sudo()
# production_obj = request.env['mrp.production'].sudo()
work_order_obj = request.env['mrp.workorder'].sudo()
line_list = ast.literal_eval(kw['line_list'])
line_list_obj = request.env['sf.production.line'].sudo().search([('name', 'ilike', 'CNC')])
cnc_line_list = list(map(lambda x: x.name, line_list_obj))
# print('line_list: %s' % line_list)
for line in line_list:
if line == '业绩总览':
work_order_domain = [('routing_type', 'in', ['人工线下加工', 'CNC加工'])]
plan_domain = []
elif line == '人工线下加工中心':
work_order_domain = [('routing_type', '=', '人工线下加工')]
plan_domain = [('production_type', '=', '人工线下加工')]
else:
work_order_domain = [
('production_line_id.name', '=', line),
('routing_type', '=', 'CNC加工')
]
plan_domain = [('production_line_id.name', '=', line)]
# # 工单计划量
# plan_data_total_counts = production_obj.search_count(
# [('production_line_id.name', '=', line), ('state', 'not in', ['cancel']),
# ('active', '=', True)])
# 工单计划量切换为CNC工单
plan_data_total = work_order_obj.search(work_order_domain + [
('id', '!=', 8061),
('state', 'in', ['ready', 'progress', 'done'])
])
plan_data_total_counts = sum(plan_data_total.mapped('qty_production'))
# # 工单完成量
# plan_data_finish_counts = plan_obj.search_count(
# [('production_line_id.name', '=', line), ('state', 'in', ['finished'])])
# 工单完成量切换为CNC工单
plan_data_finish = work_order_obj.search(work_order_domain + [
('state', 'in', ['done'])
])
plan_data_finish_counts = sum(plan_data_finish.mapped('qty_produced'))
# 超期完成量
# 搜索所有已经完成的工单
plan_data_overtime = work_order_obj.search(work_order_domain + [
('state', 'in', ['done'])
])
# 使用 filtered 进行字段比较
plan_data_overtime_counts = plan_data_overtime.filtered(
lambda order: order.date_finished > order.date_planned_finished
)
# 获取数量
# plan_data_overtime_counts = len(plan_data_overtime_counts)
plan_data_overtime_counts = sum(plan_data_overtime_counts.mapped('qty_produced'))
# 查找符合条件的生产计划记录
# plan_data = plan_obj.search(plan_domain)
# 过滤出那些检测结果状态为 '返工' 或 '报废' 的记录
# faulty_plans = plan_data.filtered(lambda p: any(
# result.test_results in ['返工', '报废'] for result in p.production_id.detection_result_ids
# ))
faulty_plans = work_order_obj.search(work_order_domain + [
('state', 'in', ['scrap', 'rework'])
])
# 查找制造订单取消与归档的数量
# cancel_order_count = production_obj.search_count(
# [('production_line_id.name', '=', line), ('state', 'in', ['cancel']),
# ('active', '=', False)])
# 计算符合条件的记录数量
# plan_data_fault_counts = len(faulty_plans) + cancel_order_count
# plan_data_fault_counts = len(faulty_plans)
plan_data_fault_counts = sum(faulty_plans.mapped('qty_produced'))
# 工单返工数量
plan_data_rework = work_order_obj.search(work_order_domain + [
('state', 'in', ['rework'])
])
plan_data_rework_counts = sum(plan_data_rework.mapped('qty_produced'))
# 工单完成率
finishe_rate = round(
(plan_data_finish_counts / plan_data_total_counts if plan_data_total_counts > 0 else 0), 3)
# 工单进度偏差
plan_data_progress_deviation = plan_data_total_counts - plan_data_finish_counts - plan_data_fault_counts
# 完成记录
plan_data_finish_orders = plan_obj.search(plan_domain + [
('state', 'in', ['finished'])
])
# # 检测量
# detection_nums = 0
# for i in plan_data_finish_orders:
# print('i: %s' % i)
# if i.production_id.detection_result_ids:
# detection_nums += 1
#
# print('detection_nums: %s' % detection_nums)
# 检测量
detection_data = len(
plan_data_finish_orders.mapped('production_id.detection_result_ids').filtered(lambda r: r))
# 检测合格量
pass_nums = plan_data_finish_orders.filtered(lambda p: any(
result.test_results in ['合格'] for result in p.production_id.detection_result_ids
))
# 质量合格率
pass_rate = 1
if pass_nums:
pass_rate = round(
# (len(pass_nums) / detection_data if len(plan_data_finish_orders) > 0 else 0), 3)
(len(pass_nums) / len(plan_data_finish_orders) if len(plan_data_finish_orders) > 0 else 0), 3)
# 返工率
rework_rate = round(
(plan_data_rework_counts / plan_data_finish_counts if plan_data_finish_counts > 0 else 0), 3)
# 交付准时率
delay_num = 0
for plan in plan_data_finish_orders:
sale_obj = request.env['sale.order'].sudo().search([('name', '=', plan.origin)])
if sale_obj:
if sale_obj.deadline_of_delivery and plan.actual_end_time:
# 将 deadline_of_delivery 转换为字符串
date_as_string = sale_obj.deadline_of_delivery.strftime('%Y-%m-%d')
# 然后使用 strptime 将字符串转换为 datetime 对象
date_as_datetime = datetime.strptime(date_as_string, '%Y-%m-%d')
# 将 actual_end_time 转换为 datetime 对象
datetime_value = fields.Datetime.from_string(plan.actual_end_time)
# 给 datetime_value 加1天
new_datetime_value = datetime_value + timedelta(days=1)
# 比较 new_datetime_value 和 date_as_datetime 的大小
if new_datetime_value.date() > date_as_datetime.date():
delay_num += 1
delay_rate = round((delay_num / plan_data_finish_counts if plan_data_finish_counts > 0 else 0), 3)
on_time_rate = 1 - delay_rate
# if plan_data:
data = {
'plan_data_total_counts': plan_data_total_counts,
'plan_data_finish_counts': plan_data_finish_counts,
'plan_data_plan_counts': plan_data_total_counts,
'plan_data_fault_counts': plan_data_fault_counts,
'nopass_orders_counts': detection_data - len(pass_nums),
'finishe_rate': finishe_rate,
'plan_data_progress_deviation': plan_data_progress_deviation,
'plan_data_rework_counts': plan_data_rework_counts,
'on_time_rate': on_time_rate,
# 'detection_data': detection_data,
'detection_data': plan_data_finish_counts,
'pass_rate': (plan_data_finish_counts - plan_data_fault_counts) / plan_data_finish_counts,
'plan_data_overtime_counts': plan_data_overtime_counts,
'overtime_rate': plan_data_overtime_counts / plan_data_finish_counts
if plan_data_finish_counts > 0 else 0,
}
res['data'][line] = data
return json.dumps(res) # 注意使用 json.dumps 而不是直接用 json.JSONEncoder().encode()
except Exception as e:
logging.info('前端请求产线产量数据失败,原因:%s' % e)
res['status'] = -1
res['message'] = '前端请求产线产量数据失败,原因:%s' % e
return json.dumps(res)
# 日完成量统计
@http.route('/api/DailyFinishCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def DailyFinishCount(self, **kw):
"""
获取日完成量统计
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
# plan_obj = request.env['sf.production.plan'].sudo()
# plan_obj = request.env['mrp.workorder'].sudo().search([('routing_type', '=', 'CNC加工')])
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# print('line_list: %s' % line_list)
print('kw', kw)
time_unit = kw.get('time_unit', 'day').strip('"') # 默认单位为天
print('time_unit: %s' % time_unit)
# 日期或小时循环生成器根据time_unit决定是按天还是按小时
def get_time_intervals(start_time, end_time, time_unit):
intervals = []
current_time = start_time
if time_unit == 'hour':
delta = timedelta(hours=1)
else:
delta = timedelta(days=1)
while current_time < end_time:
next_time = current_time + delta
# 确保最后一个时间段不会超出end_time
if next_time > end_time:
next_time = end_time
intervals.append((current_time, next_time))
current_time = next_time
return intervals
def get_date_list(start_date, end_date):
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date)
current_date += timedelta(days=1)
return date_list
if time_unit == 'hour':
for line in line_list:
date_field_name = 'date_finished' # 替换为你模型中的实际字段名
order_counts = []
if line == '业绩总览':
work_order_domain = [('routing_type', 'in', ['人工线下加工', 'CNC加工'])]
elif line == '人工线下加工中心':
work_order_domain = [('routing_type', '=', '人工线下加工')]
else:
work_order_domain = [
('production_line_id.name', '=', line),
('routing_type', '=', 'CNC加工')
]
time_intervals = get_time_intervals(begin_time, end_time, time_unit)
print('============================= %s' % time_intervals)
time_count_dict = {}
plan_count_dict = {}
orders = request.env['mrp.workorder'].sudo().search(work_order_domain + [
('state', 'in', ['done']),
(date_field_name, '>=', begin_time.strftime('%Y-%m-%d %H:%M:%S')),
(date_field_name, '<=', end_time.strftime('%Y-%m-%d %H:%M:%S'))
])
for time_interval in time_intervals:
start_time, end_time = time_interval
# orders = plan_obj.search([
# ('production_line_id.name', '=', line),
# ('state', 'in', ['done']),
# (date_field_name, '>=', start_time.strftime('%Y-%m-%d %H:%M:%S')),
# (date_field_name, '<=', end_time.strftime('%Y-%m-%d %H:%M:%S')) # 包括结束时间
# ])
interval_orders = orders.filtered(
lambda o: o[date_field_name] >= start_time
and o[date_field_name] <= end_time
)
# 使用小时和分钟作为键,确保每个小时的数据有独立的键
key = start_time.strftime('%H:%M:%S') # 只取小时:分钟:秒作为键
# time_count_dict[key] = len(orders)
time_count_dict[key] = sum(interval_orders.mapped('qty_produced'))
# 计划量目前只能从mail.message中筛选出
plan_order_messages = request.env['mail.message'].sudo().search([
('model', '=', 'mrp.workorder'),
('create_date', '>=', begin_time.strftime('%Y-%m-%d %H:%M:%S')),
('create_date', '<=', end_time.strftime('%Y-%m-%d %H:%M:%S')),
('tracking_value_ids.field_desc', '=', '状态'),
('tracking_value_ids.new_value_char', '=', '就绪')
])
for time_interval in time_intervals:
start_time, end_time = time_interval
# orders = plan_obj.search([
# ('production_line_id.name', '=', line),
# ('state', 'in', ['done']),
# (date_field_name, '>=', start_time.strftime('%Y-%m-%d %H:%M:%S')),
# (date_field_name, '<=', end_time.strftime('%Y-%m-%d %H:%M:%S')) # 包括结束时间
# ])
interval_plan_orders = plan_order_messages.filtered(
lambda o: o.create_date >= start_time
and o.create_date <= end_time
)
interval_orders = request.env['mrp.workorder'].sudo().browse(interval_plan_orders.mapped('res_id'))
if line == '业绩总览':
interval_orders = interval_orders.filtered(lambda o: o.routing_type in ['人工线下加工', 'CNC加工'])
elif line == '人工线下加工中心':
interval_orders = interval_orders.filtered(lambda o: o.routing_type == '人工线下加工')
else:
interval_orders = interval_orders.filtered(lambda o: o.routing_type == 'CNC加工' and o.production_line_id.name == line)
# 使用小时和分钟作为键,确保每个小时的数据有独立的键
key = start_time.strftime('%H:%M:%S') # 只取小时:分钟:秒作为键
# time_count_dict[key] = len(orders)
plan_count_dict[key] = sum(interval_orders.mapped('qty_production'))
# order_counts.append()
res['data'][line] = {
'finish_order_nums': time_count_dict,
'plan_order_nums': plan_count_dict
}
else:
for line in line_list:
date_field_name = 'date_finished' # 替换为你模型中的实际字段名
order_counts = []
if line == '业绩总览':
work_order_domain = [('routing_type', 'in', ['人工线下加工', 'CNC加工'])]
elif line == '人工线下加工中心':
work_order_domain = [('routing_type', '=', '人工线下加工')]
else:
work_order_domain = [
('production_line_id.name', '=', line),
('routing_type', '=', 'CNC加工')
]
date_list = get_date_list(begin_time, end_time)
for date in date_list:
next_day = date + timedelta(days=1)
orders = request.env['mrp.workorder'].sudo().search(work_order_domain + [
('state', 'in', ['done']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
rework_orders = request.env['mrp.workorder'].sudo().search(work_order_domain + [
('state', 'in', ['rework']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
not_passed_orders = request.env['mrp.workorder'].sudo().search(work_order_domain + [
('state', 'in', ['scrap', 'cancel']),
(date_field_name, '>=', date.strftime('%Y-%m-%d 00:00:00')),
(date_field_name, '<', next_day.strftime('%Y-%m-%d 00:00:00'))
])
order_counts.append({
'date': date.strftime('%Y-%m-%d'),
'order_count': sum(orders.mapped('qty_produced')),
'rework_orders': sum(rework_orders.mapped('qty_produced')),
'not_passed_orders': sum(not_passed_orders.mapped('qty_produced'))
})
# 外面包一层没什么是包一层不能解决的包一层就能区分了类似于包一层div
# 外面包一层的好处是,可以把多个数据结构打包在一起,方便前端处理
# date_list_dict = {line: order_counts}
res['data'][line] = order_counts
return json.dumps(res)
# 实时产量
@http.route('/api/RealTimeProduct', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def RealTimeProduct(self, **kw):
"""
获取实时产量(作废)
:param kw:
:return:
"""
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
def get_hourly_intervals(start_time, end_time):
intervals = []
current_time = start_time
while current_time < end_time:
next_hour = current_time + timedelta(hours=1)
intervals.append((current_time, min(next_hour, end_time)))
current_time = next_hour
return intervals
# 当班计划量
for line in line_list:
if line == '业绩总览':
work_order_domain = [('routing_type', 'in', ['人工线下加工', 'CNC加工'])]
plan_domain = []
elif line == '人工线下加工中心':
work_order_domain = [('routing_type', '=', '人工线下加工')]
plan_domain = [('production_type', '=', '人工线下加工')]
else:
work_order_domain = [
('production_line_id.name', '=', line),
('routing_type', '=', 'CNC加工')
]
plan_domain = [('production_line_id.name', '=', line)]
plan_order_nums = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'not in', ['draft']),
('date_planned_start', '>=', begin_time),
('date_planned_start', '<', end_time)
])
finish_order_nums = plan_obj.search_count(
[('production_line_id.name', '=', line), ('state', 'in', ['finished']),
('date_planned_start', '>=', begin_time),
('date_planned_start', '<', end_time)
])
hourly_intervals = get_hourly_intervals(begin_time, end_time)
production_counts = []
for start, end in hourly_intervals:
orders = plan_obj.search([
('actual_end_time', '>=', start.strftime('%Y-%m-%d %H:%M:%S')),
('actual_end_time', '<', end.strftime('%Y-%m-%d %H:%M:%S')),
('production_line_id.name', '=', line)
])
production_counts.append({
'start_time': start.strftime('%Y-%m-%d %H:%M:%S'),
'end_time': end.strftime('%Y-%m-%d %H:%M:%S'),
'production_count': len(orders)
})
production_counts_dict = {'production_counts': production_counts,
'plan_order_nums': plan_order_nums,
'finish_order_nums': finish_order_nums,
}
res['data'][line] = production_counts_dict
# res['data'].append({line: production_counts})
return json.dumps(res)
# 工单明细
@http.route('/api/OrderDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OrderDetail(self, **kw):
"""
获取工单明细
:param kw:
:return:
"""
# res = {'status': 1, 'message': '成功', 'not_done_data': [], 'done_data': []}
res = {'status': 1, 'message': '成功', 'data': {}}
plan_obj = request.env['sf.production.plan'].sudo()
work_order_obj = request.env['mrp.workorder'].sudo()
line_list = ast.literal_eval(kw['line_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# print('line_list: %s' % line_list)
not_done_data = []
done_data = []
final_data = {}
not_done_index = 1
done_index = 1
for line in line_list:
if line == '业绩总览':
work_order_domain = [('routing_type', 'in', ['人工线下加工', 'CNC加工'])]
elif line == '人工线下加工中心':
work_order_domain = [('routing_type', '=', '人工线下加工')]
else:
work_order_domain = [
('production_line_id.name', '=', line),
('routing_type', '=', 'CNC加工')
]
# 未完成订单
# not_done_orders = plan_obj.search(
# [('production_line_id.name', '=', line), ('state', 'not in', ['finished']),
# ('production_id.state', 'not in', ['cancel', 'done']), ('active', '=', True)
# ])
not_done_orders = work_order_obj.search(work_order_domain +
[('state', 'in', ['ready', 'progress'])], order='id asc'
)
# 完成订单
# 获取当前时间并计算24小时前的时间
current_time = datetime.now()
time_24_hours_ago = current_time - timedelta(hours=24)
finish_orders = work_order_obj.search(work_order_domain + [
('state', 'in', ['finished']),
('production_id.state', 'not in', ['cancel']),
('date_finished', '>=', time_24_hours_ago)
], order='id asc')
# print(finish_orders)
# 获取所有未完成订单的ID列表
order_ids = [order.id for order in not_done_orders]
# 获取所有已完成订单的ID列表
finish_order_ids = [order.id for order in finish_orders]
# 对ID进行排序
# sorted_order_ids = sorted(order_ids)
# finish_sorted_order_ids = sorted(finish_order_ids)
# 创建ID与序号的对应关系
# id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(sorted_order_ids)}
# finish_id_to_sequence = {order_id: index + 1 for index, order_id in enumerate(finish_sorted_order_ids)}
# # 输出结果或进一步处理
# for order_id, sequence in id_to_sequence.items():
# print(f"Order ID: {order_id} - Sequence: {sequence}")
for order in not_done_orders:
blank_name = ''
try:
blank_name = order.production_id.move_raw_ids[0].product_id.name
except:
continue
# blank_name = 'R-S00109-1 [碳素结构钢 Q235-118.0 * 72.0 * 21.0]'
# 正则表达式
material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止
dimensions = blank_name.split('-')[-1].split(']')[0]
# 匹配材料名称
material_match = re.search(material_pattern, blank_name)
material = material_match.group(1) if material_match else 'No match found'
state_dict = {
'draft': '待排程',
'done': '已排程',
'processing': '生产中',
'finished': '已完成',
'ready': '待加工',
'progress': '生产中',
}
line_dict = {
'sequence': not_done_index,
'workorder_name': order.production_id.name,
'blank_name': blank_name,
'material': material,
'dimensions': dimensions,
'order_qty': order.qty_production,
'state': state_dict[order.state],
}
not_done_data.append(line_dict)
not_done_index += 1
for finish_order in finish_orders:
if not finish_order.actual_end_time:
continue
blank_name = ''
try:
blank_name = finish_order.production_id.move_raw_ids[0].product_id.name
except:
continue
material_pattern = r'\[(.*?)-' # 从 [ 开始,碰到 - 停止
dimensions = blank_name.split('-')[-1].split(']')[0]
# 匹配材料名称
material_match = re.search(material_pattern, blank_name)
material = material_match.group(1) if material_match else 'No match found'
line_dict = {
'sequence': done_index,
'workorder_name': finish_order.name,
'blank_name': blank_name,
'material': material,
'dimensions': dimensions,
'order_qty': order.qty_produced,
'finish_time': finish_order.actual_end_time.strftime(
'%Y-%m-%d %H:%M:%S') if finish_order.actual_end_time else ' '
}
done_data.append(line_dict)
done_index += 1
# 开始包一层
res['data'][line] = {'not_done_data': not_done_data, 'done_data': done_data}
return json.dumps(res)
# 查询pg库来获得待机次数
@http.route('/api/IdleAlarmCount', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def idle_alarm_count(self, **kw):
"""
查询设备的待机次数
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求机床数据的参数为:%s' % kw)
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
try:
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
total_alarm_time = 0
alarm_count_num = 0
for item in machine_list:
sql = '''
SELECT COUNT(*)
FROM (
SELECT DISTINCT ON (idle_start_time) idle_start_time
FROM device_data
WHERE device_name = %s AND idle_start_time IS NOT NULL
ORDER BY idle_start_time, time
) subquery;
'''
sql2 = '''
SELECT DISTINCT ON (alarm_start_time) alarm_time, alarm_start_time
FROM device_data
WHERE device_name = %s AND alarm_start_time IS NOT NULL
ORDER BY alarm_start_time, time;
'''
# 执行SQL命令
cur.execute(sql, (item,))
result = cur.fetchall()
# print('result========', result)
cur.execute(sql2, (item,))
result2 = cur.fetchall()
for row in result:
res['data'][item] = {'idle_count': row[0]}
alarm_count = []
for row in result2:
alarm_count.append(row[1])
if row[0]:
if float(row[0]) >= 28800:
continue
# total_alarm_time += abs(float(row[0]))
total_alarm_time += float(row[0])
else:
total_alarm_time += 0.0
if len(list(set(alarm_count))) == 1:
if list(set(alarm_count))[0] is None:
alarm_count_num = 0
else:
alarm_count_num = 1
else:
alarm_count_num = len(list(set(alarm_count)))
res['data'][item]['total_alarm_time'] = total_alarm_time / 3600
res['data'][item]['alarm_count_num'] = alarm_count_num
# 返回统计结果
return json.dumps(res)
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
finally:
cur.close()
conn.close()
# 查询pg库来获得异常情况
@http.route('/api/alarm/logs', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def alarm_logs(self, **kw):
"""
查询设备的异常情况
"""
res = {'status': 1, 'message': '成功', 'data': []}
logging.info('前端请求机床数据的参数为:%s' % kw)
try:
maintenance_logs_obj = request.env['sf.maintenance.logs'].sudo()
# # 获取请求的机床数据
# machine_list = ast.literal_eval(kw['machine_list'])
# for item in machine_list:
# machine_data = equipment_obj.search([('code', '=', item)])
for log in maintenance_logs_obj.search([], order='id desc', limit=30):
res['data'].append({
'name': log.name,
'alarm_time': log.alarm_time.strftime('%Y-%m-%d %H:%M:%S'),
'fault_alarm_info': log.fault_alarm_info if log.fault_alarm_info else ' ',
'fault_process': log.fault_process if log.fault_process else ' ',
})
except Exception as e:
logging.error(f"An error occurred: {e}")
return json.dumps(res)
# 设备oee
@http.route('/api/OEE', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OEE(self, **kw):
"""
获取产线等oee
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求oee数据的参数为:%s' % kw)
try:
count_oee = 1
workcenter_obj = request.env['mrp.workcenter'].sudo()
workcenter_list = ast.literal_eval(kw['workcenter_list'])
print('workcenter_list: %s' % workcenter_list)
for line in workcenter_list:
res['data'][line] = workcenter_obj.search([('name', '=', line)]).oee
count_oee *= workcenter_obj.search([('name', '=', line)]).oee
res['data']['综合oee'] = count_oee / 1000000
except Exception as e:
print(f"An error occurred: {e}")
return json.dumps(res)
# # 查询某段时间的设备oee
# @http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
# def OEEByTime(self, **kw):
# """
# 获取某段时间的oee
# """
# res = {'status': 1, 'message': '成功', 'data': {}}
# logging.info('前端请求获取某段时间的oee的参数为:%s' % kw)
# workcenter_list = ast.literal_eval(kw['workcenter_list'])
# begin_time_str = kw['begin_time'].strip('"')
# end_time_str = kw['end_time'].strip('"')
# begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
# end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# print('workcenter_list: %s' % workcenter_list)
# # 连接数据库
# conn = psycopg2.connect(**db_config)
# cur = conn.cursor()
# # 查询并计算OEE平均值
# oee_data = {}
# for workcenter in workcenter_list:
# cur.execute("""
# SELECT AVG(oee) as avg_oee
# FROM oee_data
# WHERE workcenter_name = %s
# AND time BETWEEN %s AND %s
# """, (workcenter, begin_time, end_time))
#
# result = cur.fetchone()
# avg_oee = result[0] if result else 0.0
# oee_data[workcenter] = avg_oee
#
# # 返回数据
# res['data'] = oee_data
# return json.dumps(res)
@http.route('/api/OEEByTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def OEEByTime(self, **kw):
"""
获取某段时间的OEE根据用户指定的时间单位day或hour返回对应的平均值。
如果不传time_unit则默认按天返回并补全没有数据的时间段填充0值。
"""
res = {'status': 1, 'message': '成功', 'data': {}}
logging.info('前端请求获取某段时间的OEE的参数为:%s' % kw)
# 获取并解析参数
workcenter_list = ast.literal_eval(kw['workcenter_list'])
begin_time_str = kw['begin_time'].strip('"')
end_time_str = kw['end_time'].strip('"')
time_unit = kw.get('time_unit', 'day') # 默认单位为天
begin_time = datetime.strptime(begin_time_str, '%Y-%m-%d %H:%M:%S')
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
# 连接数据库
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
# 根据时间单位选择不同的时间格式
if time_unit == 'hour':
time_format = 'YYYY-MM-DD HH24:00:00'
time_delta = timedelta(hours=1)
else: # 默认为'day'
time_format = 'YYYY-MM-DD'
time_delta = timedelta(days=1)
# 查询并计算OEE平均值
oee_data = {}
for workcenter in workcenter_list:
cur.execute(f"""
SELECT to_char(time, '{time_format}') as time_unit, AVG(oee) as avg_oee
FROM oee_data
WHERE workcenter_name = %s
AND time BETWEEN %s AND %s
GROUP BY time_unit
ORDER BY time_unit
""", (workcenter, begin_time, end_time))
results = cur.fetchall()
# 初始化当前产线的OEE数据字典
workcenter_oee = {row[0]: row[1] for row in results}
# 补全缺失的时间段
current_time = begin_time
if time_unit != 'hour':
while current_time <= end_time:
time_key = current_time.strftime('%Y-%m-%d')
if time_key not in workcenter_oee:
workcenter_oee[time_key] = 0
current_time += time_delta
# 按时间排序
oee_data[workcenter] = dict(sorted(workcenter_oee.items()))
# 关闭数据库连接
cur.close()
conn.close()
# 返回数据
res['data'] = oee_data
return json.dumps(res)
@http.route(['/equipment/get_image/<int:record_id>'], type='http', auth="public", website=True)
def get_image(self, record_id, **kwargs):
# 获取模型中的记录
record = request.env['maintenance.equipment'].sudo().browse(record_id)
# 获取图片字段的数据
image_data_base64 = record.machine_tool_picture
if image_data_base64:
# 将Base64解码为二进制数据
image_data_binary = base64.b64decode(image_data_base64)
# 返回图片数据并设置正确的Content-Type
return request.make_response(image_data_binary, headers=[('Content-Type', 'image/png')])
else:
# 如果没有图片数据返回404
return request.not_found()
# 设备运行率
# @http.route('/api/RunningTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
# def RunningTime(self, **kw):
# """
# 获取设备运行时长
# """
# res = {'status': 1, 'message': '成功', 'data': {}}
# # 连接数据库
# conn = psycopg2.connect(**db_config)
# # 获取请求的机床数据
# machine_list = ast.literal_eval(kw['machine_list'])
#
# def fetch_result_as_dict(cursor):
# """辅助函数:将查询结果转为字典"""
# columns = [desc[0] for desc in cursor.description]
# return dict(zip(columns, cursor.fetchone())) if cursor.rowcount != 0 else None
#
# # 初始化当天、当月和有记录以来的总时长
# day_total_running_time = 0
# day_total_process_time = 0
# day_work_rate = 0
# month_total_running_time = 0
# month_total_process_time = 0
# month_work_rate = 0
# all_time_total_running_time = 0
# all_time_total_process_time = 0
# all_time_work_rate = 0
#
# for item in machine_list:
# # 获取当天第一条记录排除device_state等于离线的记录
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND time >= CURRENT_DATE -- 今日 00:00:00
# AND time < CURRENT_DATE + 1 -- 明日 00:00:00
# AND device_state in ('待机', '警告', '运行中')
# ORDER BY time ASC
# LIMIT 1;
# """, (item,))
# first_today = fetch_result_as_dict(cur)
# # print("当天第一条记录(非离线):", first_today)
#
# # 获取当天最新一条记录排除device_state等于离线的记录
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND time >= CURRENT_DATE -- 今日 00:00:00
# AND time < CURRENT_DATE + 1 -- 明日 00:00:00
# AND device_state in ('待机', '警告', '运行中')
# ORDER BY time DESC
# LIMIT 1;
# """, (item,))
# last_today = fetch_result_as_dict(cur)
# # print("当天最新一条记录(非离线):", last_today)
#
# # 计算当天运行时长
# if first_today and last_today:
# running_time = convert_to_seconds(last_today['run_time']) - convert_to_seconds(first_today['run_time'])
# process_time = convert_to_seconds(last_today['process_time']) - convert_to_seconds(
# first_today['process_time'])
# day_total_running_time += abs(running_time)
# day_total_process_time += abs(process_time)
#
# # 获取当月第一条记录排除device_state等于离线的记录
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND time >= DATE_TRUNC('MONTH', CURRENT_DATE)
# AND time < DATE_TRUNC('MONTH', CURRENT_DATE) + INTERVAL '1 MONTH'
# AND device_state in ('待机', '警告', '运行中')
# ORDER BY time ASC
# LIMIT 1;
# """, (item,))
# first_month = fetch_result_as_dict(cur)
# # print("当月第一条记录:", first_month)
#
# # 获取当月最新一条记录
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND time >= DATE_TRUNC('MONTH', CURRENT_DATE)
# AND time < DATE_TRUNC('MONTH', CURRENT_DATE) + INTERVAL '1 MONTH'
# AND device_state in ('待机', '警告', '运行中')
# ORDER BY time DESC
# LIMIT 1;
# """, (item,))
# last_month = fetch_result_as_dict(cur)
# # print("当月最新一条记录(非离线):", last_month)
#
# # 计算当月运行时长
# if first_month and last_month:
# month_running_time = convert_to_seconds(last_month['run_time']) - convert_to_seconds(
# first_month['run_time'])
# month_process_time = convert_to_seconds(last_month['process_time']) - convert_to_seconds(
# first_month['process_time'])
# month_total_running_time += abs(month_running_time)
# month_total_process_time += abs(month_process_time)
#
# # 获取有记录以来的第一条记录排除device_state等于离线的记录
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND device_state in ('待机', '警告', '运行中')
# ORDER BY time ASC
# LIMIT 1;
# """, (item,))
# first_all_time = fetch_result_as_dict(cur)
# # print("有记录以来的第一条记录(非离线):", first_all_time)
#
# # 获取有记录以来的最新一条记录排除device_state等于离线的记录
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND device_state in ('待机', '警告', '运行中')
# ORDER BY time DESC
# LIMIT 1;
# """, (item,))
# last_all_time = fetch_result_as_dict(cur)
# # print("有记录以来的最新一条记录(非离线):", last_all_time)
#
# # 计算有记录以来的运行时长
# if first_all_time and last_all_time:
# all_time_running_time = convert_to_seconds(last_all_time['run_time']) - convert_to_seconds(
# first_all_time['run_time'])
# all_time_process_time = convert_to_seconds(last_all_time['process_time']) - convert_to_seconds(
# first_all_time['process_time'])
# all_time_total_running_time += abs(all_time_running_time)
# all_time_total_process_time += abs(all_time_process_time)
#
# # 计算当天工作效率
# if day_total_running_time > day_total_process_time:
# day_work_rate = day_total_process_time / day_total_running_time if day_total_running_time != 0 else 0
# else:
# day_work_rate = day_total_running_time / day_total_process_time if day_total_process_time != 0 else 0
# print("当天工作效率: %s" % day_work_rate)
#
# # 计算当月工作效率
# if month_total_running_time > month_total_process_time:
# month_work_rate = month_total_process_time / month_total_running_time if month_total_running_time != 0 else 0
# else:
# month_work_rate = month_total_running_time / month_total_process_time if month_total_process_time != 0 else 0
# print("当月工作效率: %s" % month_work_rate)
#
# # 计算有记录以来的工作效率
# if all_time_total_running_time > all_time_total_process_time:
# all_time_work_rate = all_time_total_process_time / all_time_total_running_time if all_time_total_running_time != 0 else 0
# else:
# all_time_work_rate = all_time_total_running_time / all_time_total_process_time if all_time_total_process_time != 0 else 0
# print("有记录以来的工作效率: %s" % all_time_work_rate)
#
# conn.close()
#
# # 返回数据
# res['data']['day_work_rate'] = day_work_rate
# res['data']['month_work_rate'] = month_work_rate
# res['data']['all_time_work_rate'] = all_time_work_rate
#
# return json.dumps(res)
# 设备运行时长
@http.route('/api/RunningTimeDetail', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def RunningTimeDetail(self, **kw):
"""
获取
"""
logging.info("kw=============:%s" % kw)
res = {'status': 1, 'message': '成功', 'data': {}}
# 连接数据库
conn = psycopg2.connect(**db_config)
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
time_threshold = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
def fetch_result_as_dict(cursor):
"""辅助函数:将查询结果转为字典"""
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, cursor.fetchone())) if cursor.rowcount != 0 else None
# 获取当前时间的时间戳
current_timestamp = datetime.now().timestamp()
for item in machine_list:
alarm_last_24_time = 0.0
alarm_all_time = 0.0
euipment_obj = request.env['maintenance.equipment'].sudo().search([('code', '=', item)])
# 机床上线时间段
first_online_duration = current_timestamp - euipment_obj.first_online_time.timestamp()
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND device_state in ('待机', '警告', '运行中') AND process_time IS NOT NULL
ORDER BY time DESC
LIMIT 1;
""", (item,))
last_all_time = fetch_result_as_dict(cur)
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM device_data
WHERE device_name = %s
AND device_state in ('待机', '警告', '运行中') AND time <= %s AND process_time IS NOT NULL
ORDER BY time DESC
LIMIT 1;
""", (item, time_threshold))
last_24_time = fetch_result_as_dict(cur)
with conn.cursor() as cur:
cur.execute("""
SELECT COUNT(*)
FROM (
SELECT DISTINCT ON (idle_start_time) idle_start_time
FROM device_data
WHERE device_name = %s AND idle_start_time IS NOT NULL AND time >= %s
ORDER BY idle_start_time, time
) subquery;
""", (item, time_threshold))
idle_count = cur.fetchone()[0]
alarm_last_24_nums = []
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (alarm_start_time, alarm_time) alarm_time, alarm_start_time
FROM device_data
WHERE device_name = %s
AND alarm_start_time IS NOT NULL AND alarm_start_time::timestamp >= %s;
""", (item, time_threshold))
results = cur.fetchall()
logging.info("results============:%s" % results)
for result in results:
alarm_last_24_nums.append(result[1])
if result[0]:
if float(result[0]) >= 28800:
continue
alarm_last_24_time += float(result[0])
else:
alarm_last_24_time += 0.0
alarm_all_nums = []
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (alarm_start_time) alarm_time, alarm_start_time
FROM device_data
WHERE device_name = %s
AND alarm_start_time IS NOT NULL;
""", (item,))
results = cur.fetchall()
for result in results:
alarm_all_nums.append(result[1])
if result[0]:
if float(result[0]) >= 28800:
continue
alarm_all_time += float(result[0])
else:
alarm_all_time += 0.0
# with conn.cursor() as cur:
# cur.execute("""
# SELECT * FROM device_data
# WHERE device_name = %s
# AND total_count IS NOT NULL
# ORDER BY time ASC
# LIMIT 1;
# """, (item, ))
# total_count = fetch_result_as_dict(cur)
# 返回数据
res['data'][item] = {
'wait_time': last_all_time['run_time'] if last_all_time['run_time'] is not None else 0,
'cut_time': last_all_time['process_time'] if last_all_time['process_time'] is not None else 0,
'cut_24_time': last_24_time['process_time'] if last_24_time else 0,
'power_on_time': last_all_time['power_on_time'] if last_all_time['power_on_time'] is not None else 0,
'power_on_24_time': last_24_time['power_on_time'] if last_24_time else 0,
'alarm_last_24_time': alarm_last_24_time,
'alarm_last_24_nums': len(list(set(alarm_last_24_nums))),
'idle_count': idle_count,
'first_online_time': first_online_duration,
'alarm_all_time': alarm_all_time,
'alarm_all_nums': len(list(set(alarm_all_nums)))
# 'total_count': total_count['total_count'] if total_count else 0
}
conn.close()
return json.dumps(res)
# @http.route('/api/utilization/rate', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
# def UtilizationRate(self, **kw):
# """
# 获取稼动率
# """
# logging.info("kw=:%s" % kw)
# res = {'status': 1, 'message': '成功', 'data': {}}
# # 获取请求的机床数据
# machine_list = ast.literal_eval(kw['machine_list'])
# line = kw['line']
# orders = request.env['mrp.workorder'].sudo().search([
# ('routing_type', '=', 'CNC加工'), # 将第一个条件合并进来
# ('production_line_id.name', '=', line),
# ('state', 'in', ['done'])
# ])
#
# faulty_plans = request.env['quality.check'].sudo().search([
# ('operation_id.name', '=', 'CNC加工'),
# ('quality_state', 'in', ['fail'])
# ])
#
# # 计算时间范围
# now = datetime.now()
# today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
#
# total_power_on_time = 0
# month_power_on_time = 0
# today_power_on_time = 0
# today_power_on_dict = {}
# today_alarm_dict = {}
# single_machine_dict = {}
#
# today_order_data = []
# month_order_data = []
# today_check_ng = []
# month_check_ng = []
#
# total_alarm_time = 0
# today_alarm_time = 0
# month_alarm_time = 0
#
# for order in orders:
# time = order.date_finished
# if time >= today_start:
# today_order_data.append(order)
# if time >= month_start:
# month_order_data.append(order)
#
# for faulty_plan in faulty_plans:
# time = faulty_plan.write_date
# if time >= today_start:
# today_check_ng.append(faulty_plan)
# if time >= month_start:
# month_check_ng.append(faulty_plan)
#
# # 连接数据库
# conn = psycopg2.connect(**db_config)
# for item in machine_list:
# with conn.cursor() as cur:
# cur.execute("""
# (
# SELECT power_on_time, 'latest' AS record_type
# FROM device_data
# WHERE device_name = %s
# AND power_on_time IS NOT NULL
# ORDER BY time DESC
# LIMIT 1
# )
# UNION ALL
# (
# SELECT power_on_time, 'month_first' AS record_type
# FROM device_data
# WHERE device_name = %s
# AND power_on_time IS NOT NULL
# AND time >= date_trunc('month', CURRENT_DATE) -- ✅ 修复日期函数
# AND time < (date_trunc('month', CURRENT_DATE) + INTERVAL '1 month')::date
# ORDER BY time ASC
# LIMIT 1
# )
# UNION ALL
# (
# SELECT power_on_time, 'day_first' AS record_type
# FROM device_data
# WHERE device_name = %s
# AND power_on_time IS NOT NULL
# AND time::date = CURRENT_DATE -- ✅ 更高效的写法
# ORDER BY time ASC
# LIMIT 1
# );
# """, (item, item, item))
# results = cur.fetchall()
# if len(results) >= 1:
# total_power_on_time += convert_to_seconds(results[0][0])
# else:
# total_power_on_time += 0
# if len(results) >= 2:
# month_power_on_time += convert_to_seconds(results[1][0])
# else:
# month_power_on_time += 0
# if len(results) >= 3:
# today_power_on_time += convert_to_seconds(results[2][0])
# today_power_on_dict[item] = today_power_on_time
# else:
# today_power_on_time += 0
# today_power_on_dict[item] = 0
#
# with conn.cursor() as cur:
# cur.execute("""
# SELECT DISTINCT ON (alarm_start_time) alarm_time, alarm_start_time
# FROM device_data
# WHERE device_name = %s AND alarm_start_time IS NOT NULL
# ORDER BY alarm_start_time, time;
# """, (item,))
# results = cur.fetchall()
# today_data = []
# month_data = []
#
# for record in results:
# if record[0]:
# if float(record[0]) >= 28800:
# continue
# total_alarm_time += float(record[0])
# else:
# total_alarm_time += 0.0
# alarm_start = datetime.strptime(record[1], "%Y-%m-%d %H:%M:%S")
# if alarm_start >= today_start:
# today_data.append(record)
# if alarm_start >= month_start:
# month_data.append(record)
# if today_data:
# for today in today_data:
# if today[0]:
# if float(today[0]) >= 28800:
# continue
# today_alarm_time += float(today[0])
# today_alarm_dict[item] = today_alarm_time
# else:
# today_alarm_time += 0.0
# today_alarm_dict[item] = 0
# else:
# today_alarm_dict[item] = 0
# for month in month_data:
# if month[0]:
# if float(month[0]) >= 28800:
# continue
# month_alarm_time += float(month[0])
# else:
# month_alarm_time += 0.0
#
# conn.close()
#
# logging.info('报警时间总月日=============%s, %s, %s' % (total_alarm_time, month_alarm_time, today_alarm_time))
# # 计算时间开动率(累计、月、日)
# if total_power_on_time:
# total_power_on_rate = (total_power_on_time - total_alarm_time) / total_power_on_time
# else:
# total_power_on_rate = 0
# if month_power_on_time:
# month_power_on_rate = (total_power_on_time - month_power_on_time - month_alarm_time) / (
# total_power_on_time - month_power_on_time)
# else:
# month_power_on_rate = 0
# if today_power_on_time:
# today_power_on_rate = (total_power_on_time - today_power_on_time - today_alarm_time) / (
# total_power_on_time - today_power_on_time)
# else:
# today_power_on_rate = 0
# logging.info("总开动率: %s" % total_power_on_rate)
# logging.info("月开动率: %s" % month_power_on_rate)
# logging.info("日开动率: %s" % today_power_on_rate)
#
# # 计算性能开动率(累计、月、日)
# logging.info('完成工单数量: %s' % len(orders))
# total_performance_rate = len(orders) * 30 * 60 / (total_power_on_time - total_alarm_time)
# month_performance_rate = len(month_data) * 30 * 60 / (
# total_power_on_time - month_power_on_time - month_alarm_time)
# today_performance_rate = len(today_data) * 30 * 60 / (
# total_power_on_time - today_power_on_time - today_alarm_time) if today_power_on_time != 0 else 0
# logging.info("总性能率: %s" % total_performance_rate)
# logging.info("月性能率: %s" % month_performance_rate)
# logging.info("日性能率: %s" % today_performance_rate)
#
# # 计算累计合格率
# total_pass_rate = (len(orders) - len(today_check_ng)) / len(orders) if len(orders) != 0 else 0
# month_pass_rate = (len(month_order_data) - len(month_check_ng)) / len(month_order_data) if len(month_order_data) != 0 else 0
# today_pass_rate = (len(today_order_data) - len(today_check_ng)) / len(today_order_data) if len(today_order_data) != 0 else 0
# logging.info("总合格率: %s" % total_pass_rate)
# logging.info("月合格率: %s" % month_pass_rate)
# logging.info("日合格率: %s" % today_pass_rate)
#
# # # 返回数据
# # res['data'][item] = {
# # 'total_utilization_rate': total_power_on_rate * total_performance_rate * total_pass_rate,
# # 'month_utilization_rate': month_power_on_rate * month_performance_rate * month_pass_rate,
# # 'today_utilization_rate': today_power_on_rate * today_performance_rate * today_pass_rate,
# # }
# for i in machine_list:
# single_machine_utilization_rate = total_power_on_time - today_power_on_dict[i] - today_alarm_dict[i] / (
# total_power_on_time - today_power_on_dict[i])
# single_machine_dict[i] = single_machine_utilization_rate * today_performance_rate * today_pass_rate
#
# res['data'] = {
# 'total_utilization_rate': total_power_on_rate * total_performance_rate * total_pass_rate,
# 'month_utilization_rate': month_power_on_rate * month_performance_rate * month_pass_rate,
# 'today_utilization_rate': today_power_on_rate * today_performance_rate * today_pass_rate,
# 'single_machine_dict': single_machine_dict
# }
#
# return json.dumps(res)
@http.route('/api/RunningTime', type='http', auth='public', methods=['GET', 'POST'], csrf=False, cors="*")
def UtilizationRate(self, **kw):
"""
获取稼动率
"""
logging.info("kw=:%s" % kw)
res = {'status': 1, 'message': '成功', 'data': {}}
# 获取请求的机床数据
machine_list = ast.literal_eval(kw['machine_list'])
faulty_plans = request.env['quality.check'].sudo().search([
('operation_id.name', '=', 'CNC加工'),
('quality_state', 'in', ['fail'])
])
# 计算时间范围
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
total_power_on_time = 0
month_power_on_time = 0
today_power_on_time = 0
today_power_on_dict = {}
today_alarm_dict = {}
single_machine_dict = {}
today_order_data = []
month_order_data = []
today_check_ng = []
month_check_ng = []
total_alarm_time = 0
today_alarm_time = 0
month_alarm_time = 0
# 连接数据库
conn = psycopg2.connect(**db_config)
for item in machine_list:
with conn.cursor() as cur:
cur.execute("""
(
SELECT power_on_time, 'latest' AS record_type
FROM device_data
WHERE device_name = %s
AND power_on_time IS NOT NULL
ORDER BY time DESC
LIMIT 1
)
UNION ALL
(
SELECT power_on_time, 'month_first' AS record_type
FROM device_data
WHERE device_name = %s
AND power_on_time IS NOT NULL
AND time >= date_trunc('month', CURRENT_DATE) -- ✅ 修复日期函数
AND time < (date_trunc('month', CURRENT_DATE) + INTERVAL '1 month')::date
ORDER BY time ASC
LIMIT 1
)
UNION ALL
(
SELECT power_on_time, 'day_first' AS record_type
FROM device_data
WHERE device_name = %s
AND power_on_time IS NOT NULL
AND time >= CURRENT_DATE
AND time < CURRENT_DATE + INTERVAL '1 day'
ORDER BY time ASC
LIMIT 1
);
""", (item, item, item))
results = cur.fetchall()
logging.info('====================%s' % results)
if len(results) >= 1:
total_power_on_time += convert_to_seconds(results[0][0])
else:
total_power_on_time += 0
if len(results) >= 2:
month_power_on_time += convert_to_seconds(results[1][0])
else:
month_power_on_time += 0
if len(results) >= 3:
today_power_on_time += convert_to_seconds(results[2][0])
# today_power_on_dict[item] = today_power_on_time
else:
today_power_on_time += convert_to_seconds(results[0][0])
# today_power_on_dict[item] = 0
if results[0][0] == '0H0M':
logging.info('=woshide=%s' % results[0][0])
logging.info(type(results[0][0]))
total_power_on_time += convert_to_seconds(results[1][0])
today_power_on_time += convert_to_seconds(results[1][0])
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (alarm_start_time) alarm_time, alarm_start_time
FROM device_data
WHERE device_name = %s AND alarm_start_time IS NOT NULL
ORDER BY alarm_start_time, time;
""", (item,))
results = cur.fetchall()
today_data = []
month_data = []
for record in results:
if record[0]:
if float(record[0]) >= 86400:
continue
total_alarm_time += abs(float(record[0]))
else:
total_alarm_time += 0.0
alarm_start = datetime.strptime(record[1], "%Y-%m-%d %H:%M:%S")
if alarm_start >= today_start:
today_data.append(record)
if alarm_start >= month_start:
month_data.append(record)
if today_data:
for today in today_data:
if today[0]:
if float(today[0]) >= 28800:
continue
today_alarm_time += abs(float(today[0]))
today_alarm_dict[item] = today_alarm_time
else:
today_alarm_time += 0.0
today_alarm_dict[item] = 0
else:
today_alarm_dict[item] = 0
for month in month_data:
if month[0]:
if float(month[0]) >= 28800:
continue
month_alarm_time += abs(float(month[0]))
else:
month_alarm_time += 0.0
conn.close()
logging.info('在线时间总月日=============%s, %s, %s' % (total_power_on_time, month_power_on_time, today_power_on_time))
logging.info('报警时间总月日=============%s, %s, %s' % (total_alarm_time, month_alarm_time, today_alarm_time))
# 计算时间开动率(累计、月、日)
if total_power_on_time:
total_power_on_rate = (total_power_on_time - total_alarm_time) / total_power_on_time
else:
total_power_on_rate = 0
if month_power_on_time:
month_power_on_rate = (total_power_on_time - month_power_on_time - month_alarm_time) / (
total_power_on_time - month_power_on_time)
else:
month_power_on_rate = 0
if today_power_on_time:
today_power_on_rate = (total_power_on_time - today_power_on_time - today_alarm_time) / (
total_power_on_time - today_power_on_time)
else:
today_power_on_rate = 0
logging.info("总开动率: %s" % total_power_on_rate)
logging.info("月开动率: %s" % month_power_on_rate)
logging.info("日开动率: %s" % today_power_on_rate)
res['data'] = {
'all_time_work_rate': total_power_on_rate,
'month_work_rate': month_power_on_rate,
'day_work_rate': today_power_on_rate,
}
return json.dumps(res)