# -*- coding: utf-8 -*- import re import json import logging import datetime import requests from odoo import api, fields, models, _ from odoo.exceptions import UserError def convert_to_seconds(time_str): # 修改正则表达式,使 H、M、S 部分可选 if time_str is None: return 0 if time_str == 0: return 0 pattern = r"(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?" match = re.match(pattern, time_str) if match: # 提取各时间单位,如果某个单位缺失则默认设为0 hours = int(match.group(1)) if match.group(1) else 0 minutes = int(match.group(2)) if match.group(2) else 0 seconds = int(match.group(3)) if match.group(3) else 0 # 计算总秒数 total_seconds = hours * 3600 + minutes * 60 + seconds if total_seconds == 0: # return None pattern = r"(?:(\d+)小时)?(?:(\d+)分钟)?(?:(\d+)秒)?" match = re.match(pattern, time_str) if match: # 提取各时间单位,如果某个单位缺失则默认设为0 hours = int(match.group(1)) if match.group(1) else 0 minutes = int(match.group(2)) if match.group(2) else 0 seconds = int(match.group(3)) if match.group(3) else 0 # 计算总秒数 total_seconds = hours * 3600 + minutes * 60 + seconds return total_seconds else: return None return total_seconds class SfMaintenanceEquipmentOEE(models.Model): _name = 'maintenance.equipment.oee' _description = '设备OEE' name = fields.Char('设备oee') equipment_id = fields.Many2one('maintenance.equipment', '机台号', domain="[('category_id.equipment_type', '=', '机床'),('state_zc', '=', '已注册')]") equipment_code = fields.Char('设备编码', related='equipment_id.code', store=True) type_id = fields.Many2one('sf.machine_tool.type', '型号', related='equipment_id.type_id') machine_tool_picture = fields.Binary('设备图片', related='equipment_id.machine_tool_picture') state = fields.Selection( [("正常", "正常"), ("故障停机", "故障停机"), ("计划维保", "计划维保"), ("空闲", "空闲"), ("封存(报废)", "封存(报废)")], default='正常', string="机床状态", related='equipment_id.state') online_time = fields.Char('开机时长(小时)', readonly='True') offline_time = fields.Char('关机时长(小时)', readonly='True') idle_nums = fields.Integer('待机次数', readonly='True') # 待机时长 idle_time = fields.Char('待机时长(小时)', readonly='True') # 待机率 idle_rate = fields.Char('待机率(%)', readonly='True') work_time = fields.Char('加工时长(小时)', readonly='True') work_rate = fields.Char('可用率(%)', readonly='True') fault_time = fields.Char('故障时长(小时)', readonly='True') fault_rate = fields.Char('故障率(%)', readonly='True') fault_nums = fields.Integer('故障次数', readonly='True') # 设备故障日志 sf_maintenance_logs_ids = fields.One2many('sf.maintenance.logs', 'maintenance_equipment_oee_id', '设备故障日志', related='equipment_id.sf_maintenance_logs_ids') day_logs_detail = fields.Html('日运行日志详情') history_logs_detail = fields.Html('历史运行日志详情') begin_time = fields.Date('开始时间') end_time = fields.Date('结束时间') def get_running_datas(self): base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') logging.info("base_url=============:%s" % base_url) # 只有当原始 URL 使用 http 时才替换为 https if base_url.startswith("http://"): secure_base_url = base_url.replace("http://", "https://") else: secure_base_url = base_url url_time = secure_base_url + '/api/RunningTimeDetail' logging.info("url_time=============:%s" % url_time) cnc_list_obj = self.env['maintenance.equipment'].sudo().search( [('function_type', '!=', False), ('active', '=', True)]) machine_list = list(map(lambda x: x.code, cnc_list_obj)) logging.info("machine_list=============:%s" % machine_list) data_time = { "machine_list": str(machine_list) } # 发送POST请求 response_time = requests.post(url_time, json={}, data=data_time) # print(response_time.json()) if response_time.status_code == 200: result_time = response_time.json() if result_time['status'] == 1: real_dict = result_time['data'] for key in real_dict: # print(key) equipment_obj = self.env['maintenance.equipment.oee'].sudo().search([('equipment_code', '=', key)]) if real_dict[key]['power_on_time'] == 0: equipment_obj.online_time = 0 equipment_obj.idle_time = 0 equipment_obj.idle_rate = 0 equipment_obj.work_rate = 0 equipment_obj.fault_time = 0 equipment_obj.fault_rate = 0 equipment_obj.fault_nums = 0 equipment_obj.idle_nums = 0 equipment_obj.work_time = 0 else: equipment_obj.online_time = round(convert_to_seconds(real_dict[key]['power_on_time']) / 3600, 2) equipment_obj.work_time = round(convert_to_seconds(real_dict[key]['cut_time']) / 3600, 2) equipment_obj.fault_nums = real_dict[key]['alarm_all_nums'] equipment_obj.idle_nums = real_dict[key]['idle_count'] equipment_obj.fault_time = round((float(real_dict[key]['alarm_all_time']) if real_dict[key][ 'alarm_all_time'] else 0) / 3600, 2) equipment_obj.idle_time = float(equipment_obj.online_time) - float( equipment_obj.work_time) if equipment_obj.online_time and equipment_obj.work_time else 0 equipment_obj.idle_rate = round( float(equipment_obj.idle_time) / ( float(equipment_obj.online_time) if equipment_obj.online_time else 1) * 100, 2) equipment_obj.work_rate = round( float(equipment_obj.work_time) / ( float(equipment_obj.online_time) if equipment_obj.online_time else 1) * 100, 2) equipment_obj.fault_rate = round( float(equipment_obj.fault_time) / ( float(equipment_obj.online_time) if equipment_obj.online_time else 1) * 100, 2) # 获取当前时间的时间戳 current_timestamp = datetime.datetime.now().timestamp() # 机床上线时间段 first_online_duration = current_timestamp - int(equipment_obj.equipment_id.first_online_time.timestamp()) if equipment_obj.online_time: equipment_obj.offline_time = round((first_online_duration - float(equipment_obj.online_time)) / 3600, 2) else: equipment_obj.offline_time = False # equipment_obj.offline_time = equipment_obj.equipment_id.first_online_time - ( # float(equipment_obj.online_time) if equipment_obj.online_time else 0) # 获取日志详情 def get_day_logs(self): base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') print(base_url) config = self.env['ir.config_parameter'].sudo() # url = 'http://172.16.10.112:8069/api/logs/list' # url_time = 'http://localhost:9069/api/RunningTimeDetail' url = base_url + '/api/logs/list' url_time = base_url + '/api/RunningTimeDetail' machine_list = [self.equipment_code] begin_time = datetime.datetime.now().strftime('%Y-%m-%d') + ' 00:00:00' end_time = datetime.datetime.now().strftime('%Y-%m-%d') + ' 23:59:59' # 请求的数据 data = { "machine_list": str(machine_list), "begin_time": begin_time, "end_time": end_time } data_time = { "machine_list": str(machine_list) } print(data) # 发送POST请求 response = requests.post(url, json={}, data=data) response_time = requests.post(url_time, json={}, data=data_time) # print(response.json()) # 输出服务器返回的响应 print(response_time.json()) if response_time.status_code == 200: result_time = response_time.json() real_dict = result_time['data'][self.equipment_code] print('=', result_time) if result_time['status'] == 1: if real_dict['power_on_24_time'] == 0: self.online_time = 0 self.idle_time = 0 self.idle_rate = 0 self.work_rate = 0 self.fault_time = 0 self.fault_rate = 0 self.fault_nums = 0 self.idle_nums = 0 self.work_time = 0 else: self.online_time = round((convert_to_seconds(real_dict['power_on_time']) - convert_to_seconds( real_dict['power_on_24_time'])) / 3600, 2) self.idle_time = float(self.online_time) - float( self.work_time) if self.online_time and self.work_time else 0 self.idle_rate = round( float(self.idle_time) / (float(self.online_time) if self.online_time else 1) * 100, 2) self.work_rate = round( float(self.work_time) / (float(self.online_time) if self.online_time else 1) * 100, 2) self.fault_time = (float(real_dict['alarm_last_24_time']) if real_dict[ 'alarm_last_24_time'] else 0) / 3600 self.fault_rate = round( float(self.fault_time) / (float(self.online_time) if self.online_time else 1) * 100, 2) self.fault_nums = real_dict['alarm_last_24_nums'] self.idle_nums = real_dict['idle_count'] self.work_time = round( (convert_to_seconds(real_dict['cut_time']) - convert_to_seconds( real_dict['cut_24_time'])) / 3600, 2) self.offline_time = 24 - (float(self.online_time) if self.online_time else 0) if response.status_code == 200: result = response.json() print('============', result) if result['status'] == 1: logs_list = result['data'][self.equipment_code] logs_detail = '' log_state = '' for log in logs_list: if log['state'] != log_state: print('loooooooooooooooooooogs', log) production_name = log['production_name'] if log['production_name'] else ' ' logs_detail += '' + log['time'] + '' + log[ 'state'] + '' + production_name + '' log_state = log['state'] # self.day_logs_detail = '' + logs_detail + '
时间事件/状态加工工单
' self.day_logs_detail = ''' {logs_detail}
时间 事件/状态 加工工单
'''.format(logs_detail=logs_detail) else: self.day_logs_detail = '获取日志失败' else: self.day_logs_detail = '获取日志失败' # 获取历史日志详情 def get_history_logs(self): config = self.env['ir.config_parameter'].sudo() base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') # url = 'http://172.16.10.112:8069/api/logs/list' url = base_url + '/api/logs/list' url_time = base_url + '/api/RunningTimeDetail' machine_list = [self.equipment_code] if not self.begin_time: raise UserError('请选择开始时间') if not self.end_time: raise UserError('请选择结束时间') begin_time = self.begin_time.strftime('%Y-%m-%d') + ' 00:00:00' end_time = self.end_time.strftime('%Y-%m-%d') + ' 23:59:59' # 请求的数据 data = { "machine_list": str(machine_list), "begin_time": begin_time, "end_time": end_time } print(data) # 发送POST请求 response = requests.post(url, json={}, data=data) print(response.json()) # 输出服务器返回的响应 if response.status_code == 200: result = response.json() print('============', result) if result['status'] == 1: logs_list = result['data'][self.equipment_code] logs_detail = '' log_state = '' for log in logs_list: if log['state'] != log_state: production_name = log['production_name'] if log['production_name'] else ' ' logs_detail += '' + log['time'] + '' + log[ 'state'] + '' + production_name + '' log_state = log['state'] # self.day_logs_detail = '' + logs_detail + '
时间事件/状态加工工单
' self.history_logs_detail = ''' {logs_detail}
时间 事件/状态 加工工单
'''.format(logs_detail=logs_detail) else: self.history_logs_detail = '获取日志失败' else: self.history_logs_detail = '获取日志失败' # 下载历史日志 def download_history_logs(self): config = self.env['ir.config_parameter'].sudo() url = 'http://172.16.10.112:8069/api/logs/list' machine_list = [self.equipment_code] if not self.begin_time: raise UserError('请选择开始时间') if not self.end_time: raise UserError('请选择结束时间') begin_time = self.begin_time.strftime('%Y-%m-%d') + ' 00:00:00' end_time = self.end_time.strftime('%Y-%m-%d') + ' 23:59:59' # 请求的数据 data = { "machine_list": str(machine_list), "begin_time": begin_time, "end_time": end_time } print(data) # 发送POST请求 response = requests.post(url, json={}, data=data) print(response.json()) # 输出服务器返回的响应 if response.status_code == 200: result = response.json() print('============', result) if result['status'] == 1: logs_list = result['data'][self.equipment_code] logs_detail = '' for log in logs_list: production_name = log['production_name'] if log['production_name'] else ' ' # todo 下载日志 else: self.history_logs_detail = '下载日志失败' else: self.history_logs_detail = '下载日志失败' def name_get(self): result = [] for parameter in self: if parameter.equipment_id: name = parameter.equipment_id.name result.append((parameter.id, name)) return result class SfMaintenanceEquipmentOEELog(models.Model): _name = 'maintenance.equipment.oee.logs' _description = '设备运行日志' equipment_id = fields.Many2one('maintenance.equipment', '机台号', readonly='True') equipment_code = fields.Char('设备编码', readonly='True') name = fields.Char('设备名称', readonly='True') function_type = fields.Selection( [("ZXJGZX", "钻铣加工中心"), ("CXJGZX", "车削加工中心"), ("FHJGZX", "复合加工中心")], default="", string="功能类型") machine_tool_picture = fields.Binary('设备图片') type_id = fields.Many2one('sf.machine_tool.type', '品牌型号', readonly='True') state = fields.Selection([("加工", "加工"), ("关机", "关机"), ("待机", "待机"), ("故障", "故障"), ("检修", "检修"), ("保养", "保养")], default="", string="实时状态") online_time = fields.Char('开机时长', readonly='True') offline_time = fields.Char('关机时长', readonly='True') offline_nums = fields.Integer('关机次数', readonly='True') # 待机时长 idle_time = fields.Char('待机时长', readonly='True') # 待机率 idle_rate = fields.Char('待机率', readonly='True') work_time = fields.Char('加工时长', readonly='True') work_rate = fields.Char('可用率', readonly='True') fault_time = fields.Char('故障时长', readonly='True') fault_rate = fields.Char('故障率', readonly='True') fault_nums = fields.Integer('故障次数', readonly='True') detail_ids = fields.One2many('maintenance.equipment.oee.log.detail', 'log_id', string='日志详情') # maintenance_time = fields.Char('维保时长') # work_nums = fields.Integer('加工件数') equipment_oee_id = fields.Many2one('maintenance.equipment.oee', '设备OEE') @api.onchange('equipment_id') def get_name(self): self.name = self.equipment_id.name self.equipment_code = self.equipment_id.code # 设备运行日志详情 class SfMaintenanceEquipmentOEELogDetail(models.Model): _name = 'maintenance.equipment.oee.log.detail' _description = '设备运行日志详情' _order = 'time desc' # sequence = fields.Integer('序号', related='id') time = fields.Datetime('时间') state = fields.Selection([("加工", "加工"), ("关机", "关机"), ("待机", "待机"), ("故障", "故障"), ("检修", "检修"), ("保养", "保养")], default="", string="事件/状态") production_name = fields.Char('加工工单') log_id = fields.Many2one('maintenance.equipment.oee.logs', '日志') # equipment_code = fields.Char('设备编码', related='log_id.equipment_code') equipment_code = fields.Char('设备编码', readonly='True')