Files
test/sf_maintenance/models/sf_maintenance_oee.py
2024-11-20 10:29:32 +08:00

425 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import re
import json
import logging
import datetime
import requests
from odoo import api, fields, models, _
from odoo.exceptions import UserError
def convert_to_seconds(time_str):
# 修改正则表达式,使 H、M、S 部分可选
if time_str is None:
return 0
if time_str == 0:
return 0
pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
if total_seconds == 0:
# return None
pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?"
match = re.match(pattern, time_str)
if match:
# 提取各时间单位如果某个单位缺失则默认设为0
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
# 计算总秒数
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
else:
return None
return total_seconds
class SfMaintenanceEquipmentOEE(models.Model):
_name = 'maintenance.equipment.oee'
_description = '设备OEE'
name = fields.Char('设备oee')
equipment_id = fields.Many2one('maintenance.equipment', '机台号',
domain="[('category_id.equipment_type', '=', '机床'),('state_zc', '=', '已注册')]")
equipment_code = fields.Char('设备编码', related='equipment_id.code', store=True)
type_id = fields.Many2one('sf.machine_tool.type', '型号', related='equipment_id.type_id')
machine_tool_picture = fields.Binary('设备图片', related='equipment_id.machine_tool_picture')
state = fields.Selection(
[("正常", "正常"), ("故障停机", "故障停机"), ("计划维保", "计划维保"), ("空闲", "空闲"),
("封存(报废)", "封存(报废)")],
default='正常', string="机床状态", related='equipment_id.state')
online_time = fields.Char('开机时长(小时)', readonly='True')
offline_time = fields.Char('关机时长(小时)', readonly='True')
idle_nums = fields.Integer('待机次数', readonly='True')
# 待机时长
idle_time = fields.Char('待机时长(小时)', readonly='True')
# 待机率
idle_rate = fields.Char('待机率(%)', readonly='True')
work_time = fields.Char('加工时长(小时)', readonly='True')
work_rate = fields.Char('可用率(%)', readonly='True')
fault_time = fields.Char('故障时长(小时)', readonly='True')
fault_rate = fields.Char('故障率(%)', readonly='True')
fault_nums = fields.Integer('故障次数', readonly='True')
# 设备故障日志
sf_maintenance_logs_ids = fields.One2many('sf.maintenance.logs', 'maintenance_equipment_oee_id', '设备故障日志',
related='equipment_id.sf_maintenance_logs_ids')
day_logs_detail = fields.Html('日运行日志详情')
history_logs_detail = fields.Html('历史运行日志详情')
begin_time = fields.Date('开始时间')
end_time = fields.Date('结束时间')
def get_running_datas(self):
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
logging.info("base_url=============:%s" % base_url)
# 只有当原始 URL 使用 http 时才替换为 https
if base_url.startswith("http://"):
secure_base_url = base_url.replace("http://", "https://")
else:
secure_base_url = base_url
url_time = secure_base_url + '/api/RunningTimeDetail'
logging.info("url_time=============:%s" % url_time)
cnc_list_obj = self.env['maintenance.equipment'].sudo().search(
[('function_type', '!=', False), ('active', '=', True)])
machine_list = list(map(lambda x: x.code, cnc_list_obj))
logging.info("machine_list=============:%s" % machine_list)
data_time = {
"machine_list": str(machine_list)
}
# 发送POST请求
response_time = requests.post(url_time, json={}, data=data_time)
# print(response_time.json())
if response_time.status_code == 200:
result_time = response_time.json()
if result_time['status'] == 1:
real_dict = result_time['data']
for key in real_dict:
# print(key)
equipment_obj = self.env['maintenance.equipment.oee'].sudo().search([('equipment_code', '=', key)])
if real_dict[key]['power_on_time'] == 0:
equipment_obj.online_time = 0
equipment_obj.idle_time = 0
equipment_obj.idle_rate = 0
equipment_obj.work_rate = 0
equipment_obj.fault_time = 0
equipment_obj.fault_rate = 0
equipment_obj.fault_nums = 0
equipment_obj.idle_nums = 0
equipment_obj.work_time = 0
else:
equipment_obj.online_time = round(convert_to_seconds(real_dict[key]['power_on_time']) / 3600, 2)
equipment_obj.work_time = round(convert_to_seconds(real_dict[key]['cut_time']) / 3600, 2)
equipment_obj.fault_nums = real_dict[key]['alarm_all_nums']
equipment_obj.idle_nums = real_dict[key]['idle_count']
equipment_obj.fault_time = round((float(real_dict[key]['alarm_all_time']) if real_dict[key][
'alarm_all_time'] else 0) / 3600, 2)
equipment_obj.idle_time = float(equipment_obj.online_time) - float(
equipment_obj.work_time) if equipment_obj.online_time and equipment_obj.work_time else 0
equipment_obj.idle_rate = round(
float(equipment_obj.idle_time) / (
float(equipment_obj.online_time) if equipment_obj.online_time else 1) * 100, 2)
equipment_obj.work_rate = round(
float(equipment_obj.work_time) / (
float(equipment_obj.online_time) if equipment_obj.online_time else 1) * 100, 2)
equipment_obj.fault_rate = round(
float(equipment_obj.fault_time) / (
float(equipment_obj.online_time) if equipment_obj.online_time else 1) * 100, 2)
# 获取当前时间的时间戳
current_timestamp = datetime.datetime.now().timestamp()
# 机床上线时间段
first_online_duration = current_timestamp - int(equipment_obj.equipment_id.first_online_time.timestamp())
if equipment_obj.online_time:
equipment_obj.offline_time = round((first_online_duration - float(equipment_obj.online_time)) / 3600, 2)
else:
equipment_obj.offline_time = False
# equipment_obj.offline_time = equipment_obj.equipment_id.first_online_time - (
# float(equipment_obj.online_time) if equipment_obj.online_time else 0)
# 获取日志详情
def get_day_logs(self):
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
print(base_url)
config = self.env['ir.config_parameter'].sudo()
# url = 'http://172.16.10.112:8069/api/logs/list'
# url_time = 'http://localhost:9069/api/RunningTimeDetail'
url = base_url + '/api/logs/list'
url_time = base_url + '/api/RunningTimeDetail'
machine_list = [self.equipment_code]
begin_time = datetime.datetime.now().strftime('%Y-%m-%d') + ' 00:00:00'
end_time = datetime.datetime.now().strftime('%Y-%m-%d') + ' 23:59:59'
# 请求的数据
data = {
"machine_list": str(machine_list),
"begin_time": begin_time,
"end_time": end_time
}
data_time = {
"machine_list": str(machine_list)
}
print(data)
# 发送POST请求
response = requests.post(url, json={}, data=data)
response_time = requests.post(url_time, json={}, data=data_time)
# print(response.json()) # 输出服务器返回的响应
print(response_time.json())
if response_time.status_code == 200:
result_time = response_time.json()
real_dict = result_time['data'][self.equipment_code]
print('=', result_time)
if result_time['status'] == 1:
if real_dict['power_on_24_time'] == 0:
self.online_time = 0
self.idle_time = 0
self.idle_rate = 0
self.work_rate = 0
self.fault_time = 0
self.fault_rate = 0
self.fault_nums = 0
self.idle_nums = 0
self.work_time = 0
else:
self.online_time = round((convert_to_seconds(real_dict['power_on_time']) - convert_to_seconds(
real_dict['power_on_24_time'])) / 3600, 2)
self.idle_time = float(self.online_time) - float(
self.work_time) if self.online_time and self.work_time else 0
self.idle_rate = round(
float(self.idle_time) / (float(self.online_time) if self.online_time else 1) * 100, 2)
self.work_rate = round(
float(self.work_time) / (float(self.online_time) if self.online_time else 1) * 100, 2)
self.fault_time = (float(real_dict['alarm_last_24_time']) if real_dict[
'alarm_last_24_time'] else 0) / 3600
self.fault_rate = round(
float(self.fault_time) / (float(self.online_time) if self.online_time else 1) * 100, 2)
self.fault_nums = real_dict['alarm_last_24_nums']
self.idle_nums = real_dict['idle_count']
self.work_time = round(
(convert_to_seconds(real_dict['cut_time']) - convert_to_seconds(
real_dict['cut_24_time'])) / 3600,
2)
self.offline_time = 24 - (float(self.online_time) if self.online_time else 0)
if response.status_code == 200:
result = response.json()
print('============', result)
if result['status'] == 1:
logs_list = result['data'][self.equipment_code]
logs_detail = ''
log_state = ''
for log in logs_list:
if log['state'] != log_state:
print('loooooooooooooooooooogs', log)
production_name = log['production_name'] if log['production_name'] else ' '
logs_detail += '<tr><td>' + log['time'] + '</td><td>' + log[
'state'] + '</td><td>' + production_name + '</td></tr>'
log_state = log['state']
# self.day_logs_detail = '<table><tr><th>时间</th><th>事件/状态</th><th>加工工单</th></tr>' + logs_detail + '</table>'
self.day_logs_detail = '''
<table border="1" style="border-collapse: collapse; width: 100%; text-align: center;">
<tr style="background-color: #f2f2f2;">
<th style="padding: 8px; border: 1px solid #ddd;">时间</th>
<th style="padding: 8px; border: 1px solid #ddd;">事件/状态</th>
<th style="padding: 8px; border: 1px solid #ddd;">加工工单</th>
</tr>
{logs_detail}
</table>
'''.format(logs_detail=logs_detail)
else:
self.day_logs_detail = '获取日志失败'
else:
self.day_logs_detail = '获取日志失败'
# 获取历史日志详情
def get_history_logs(self):
config = self.env['ir.config_parameter'].sudo()
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
# url = 'http://172.16.10.112:8069/api/logs/list'
url = base_url + '/api/logs/list'
url_time = base_url + '/api/RunningTimeDetail'
machine_list = [self.equipment_code]
if not self.begin_time:
raise UserError('请选择开始时间')
if not self.end_time:
raise UserError('请选择结束时间')
begin_time = self.begin_time.strftime('%Y-%m-%d') + ' 00:00:00'
end_time = self.end_time.strftime('%Y-%m-%d') + ' 23:59:59'
# 请求的数据
data = {
"machine_list": str(machine_list),
"begin_time": begin_time,
"end_time": end_time
}
print(data)
# 发送POST请求
response = requests.post(url, json={}, data=data)
print(response.json()) # 输出服务器返回的响应
if response.status_code == 200:
result = response.json()
print('============', result)
if result['status'] == 1:
logs_list = result['data'][self.equipment_code]
logs_detail = ''
log_state = ''
for log in logs_list:
if log['state'] != log_state:
production_name = log['production_name'] if log['production_name'] else ' '
logs_detail += '<tr><td>' + log['time'] + '</td><td>' + log[
'state'] + '</td><td>' + production_name + '</td></tr>'
log_state = log['state']
# self.day_logs_detail = '<table><tr><th>时间</th><th>事件/状态</th><th>加工工单</th></tr>' + logs_detail + '</table>'
self.history_logs_detail = '''
<table border="1" style="border-collapse: collapse; width: 100%; text-align: center;">
<tr style="background-color: #f2f2f2;">
<th style="padding: 8px; border: 1px solid #ddd;">时间</th>
<th style="padding: 8px; border: 1px solid #ddd;">事件/状态</th>
<th style="padding: 8px; border: 1px solid #ddd;">加工工单</th>
</tr>
{logs_detail}
</table>
'''.format(logs_detail=logs_detail)
else:
self.history_logs_detail = '获取日志失败'
else:
self.history_logs_detail = '获取日志失败'
# 下载历史日志
def download_history_logs(self):
config = self.env['ir.config_parameter'].sudo()
url = 'http://172.16.10.112:8069/api/logs/list'
machine_list = [self.equipment_code]
if not self.begin_time:
raise UserError('请选择开始时间')
if not self.end_time:
raise UserError('请选择结束时间')
begin_time = self.begin_time.strftime('%Y-%m-%d') + ' 00:00:00'
end_time = self.end_time.strftime('%Y-%m-%d') + ' 23:59:59'
# 请求的数据
data = {
"machine_list": str(machine_list),
"begin_time": begin_time,
"end_time": end_time
}
print(data)
# 发送POST请求
response = requests.post(url, json={}, data=data)
print(response.json()) # 输出服务器返回的响应
if response.status_code == 200:
result = response.json()
print('============', result)
if result['status'] == 1:
logs_list = result['data'][self.equipment_code]
logs_detail = ''
for log in logs_list:
production_name = log['production_name'] if log['production_name'] else ' '
# todo 下载日志
else:
self.history_logs_detail = '下载日志失败'
else:
self.history_logs_detail = '下载日志失败'
def name_get(self):
result = []
for parameter in self:
if parameter.equipment_id:
name = parameter.equipment_id.name
result.append((parameter.id, name))
return result
class SfMaintenanceEquipmentOEELog(models.Model):
_name = 'maintenance.equipment.oee.logs'
_description = '设备运行日志'
equipment_id = fields.Many2one('maintenance.equipment', '机台号', readonly='True')
equipment_code = fields.Char('设备编码', readonly='True')
name = fields.Char('设备名称', readonly='True')
function_type = fields.Selection(
[("ZXJGZX", "钻铣加工中心"), ("CXJGZX", "车削加工中心"), ("FHJGZX", "复合加工中心")],
default="", string="功能类型")
machine_tool_picture = fields.Binary('设备图片')
type_id = fields.Many2one('sf.machine_tool.type', '品牌型号', readonly='True')
state = fields.Selection([("加工", "加工"), ("关机", "关机"), ("待机", "待机"), ("故障", "故障"),
("检修", "检修"), ("保养", "保养")], default="", string="实时状态")
online_time = fields.Char('开机时长', readonly='True')
offline_time = fields.Char('关机时长', readonly='True')
offline_nums = fields.Integer('关机次数', readonly='True')
# 待机时长
idle_time = fields.Char('待机时长', readonly='True')
# 待机率
idle_rate = fields.Char('待机率', readonly='True')
work_time = fields.Char('加工时长', readonly='True')
work_rate = fields.Char('可用率', readonly='True')
fault_time = fields.Char('故障时长', readonly='True')
fault_rate = fields.Char('故障率', readonly='True')
fault_nums = fields.Integer('故障次数', readonly='True')
detail_ids = fields.One2many('maintenance.equipment.oee.log.detail', 'log_id', string='日志详情')
# maintenance_time = fields.Char('维保时长')
# work_nums = fields.Integer('加工件数')
equipment_oee_id = fields.Many2one('maintenance.equipment.oee', '设备OEE')
@api.onchange('equipment_id')
def get_name(self):
self.name = self.equipment_id.name
self.equipment_code = self.equipment_id.code
# 设备运行日志详情
class SfMaintenanceEquipmentOEELogDetail(models.Model):
_name = 'maintenance.equipment.oee.log.detail'
_description = '设备运行日志详情'
_order = 'time desc'
# sequence = fields.Integer('序号', related='id')
time = fields.Datetime('时间')
state = fields.Selection([("加工", "加工"), ("关机", "关机"), ("待机", "待机"), ("故障", "故障"),
("检修", "检修"), ("保养", "保养")], default="", string="事件/状态")
production_name = fields.Char('加工工单')
log_id = fields.Many2one('maintenance.equipment.oee.logs', '日志')
# equipment_code = fields.Char('设备编码', related='log_id.equipment_code')
equipment_code = fields.Char('设备编码', readonly='True')