import os import json import math import requests import logging import base64 # import subprocess from datetime import datetime from dateutil.relativedelta import relativedelta from odoo import api, fields, models, SUPERUSER_ID, _ from odoo.exceptions import ValidationError from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' _order = 'sequence' workcenter_id = fields.Many2one('mrp.workcenter', required=False) processing_panel = fields.Char('加工面') sequence = fields.Integer(string='工序') routing_type = fields.Selection([ ('获取CNC加工程序', '获取CNC加工程序'), ('装夹', '装夹'), ('前置三元定位检测', '前置三元定位检测'), ('CNC加工', 'CNC加工'), ('后置三元质量检测', '后置三元质量检测'), ('解除装夹', '解除装夹'), ], string="工序类型") cnc_worksheet = fields.Binary( '工作指令', readonly=True) material_center_point = fields.Char(string='配料中心点') X1_axis = fields.Float(default=0) Y1_axis = fields.Float(default=0) Z1_axis = fields.Float(default=0) X2_axis = fields.Float(default=0) Y2_axis = fields.Float(default=0) Z2_axis = fields.Float(default=0) X3_axis = fields.Float(default=0) Y3_axis = fields.Float(default=0) Z3_axis = fields.Float(default=0) X4_axis = fields.Float(default=0) Y4_axis = fields.Float(default=0) Z4_axis = fields.Float(default=0) X5_axis = fields.Float(default=0) Y5_axis = fields.Float(default=0) Z5_axis = fields.Float(default=0) X6_axis = fields.Float(default=0) Y6_axis = fields.Float(default=0) Z6_axis = fields.Float(default=0) X7_axis = fields.Float(default=0) Y7_axis = fields.Float(default=0) Z7_axis = fields.Float(default=0) X8_axis = fields.Float(default=0) Y8_axis = fields.Float(default=0) Z8_axis = fields.Float(default=0) X9_axis = fields.Float(default=0) Y9_axis = fields.Float(default=0) Z9_axis = fields.Float(default=0) X10_axis = fields.Float(default=0) Y10_axis = fields.Float(default=0) Z10_axis = fields.Float(default=0) X_deviation_angle = fields.Integer(string="X轴偏差度", default=0) test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], string="检测结果") cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工") tray_code = fields.Char(string="托盘") # 计算配料中心点和与x轴倾斜度方法 def getcenter(self): x1 = self.X1_axis x2 = self.X2_axis x3 = self.X3_axis x4 = self.X4_axis x5 = self.X5_axis x6 = self.X6_axis x7 = self.X7_axis x8 = self.X8_axis y1 = self.Y1_axis y2 = self.Y2_axis y3 = self.Y3_axis y4 = self.Y4_axis y5 = self.Y5_axis y6 = self.Y6_axis y7 = self.Y7_axis y8 = self.Y8_axis z1 = self.Z9_axis x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / ( (x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4)) y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / ( (y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4)) x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / ( (x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8)); y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / ( (y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8)) x = (x0 + x1) / 2 y = (y0 + y1) / 2 z = z1 / 2 jd = math.atan2((x5 - x6), (y5 - y6)) jdz = jd * 180 / math.pi print("(%.2f,%.2f)" % (x, y)) self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z)) self.X_deviation_angle = jdz return self.material_center_point def json_workorder_str(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 工作中心看板按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')], 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_workorder_id': self.id, 'default_production_id': self.production_id.id, 'discard_on_footer_button': True, }, 'target': 'new', 'domain': [('workorder_id', '=', self.id)] } # 扫码绑定托盘方法 def gettray(self): if self.tray_code != False: values = self.env['sf.tray'].search([("code", "=", self.tray_code)]) if values: if values.state == "占用": raise UserError('该托盘已占用') if values.state == "报损": raise UserError('该托盘已损坏') else: values.update({ 'workorder_id': self, 'production_id': self.production_id, 'state': '占用', }) else: raise UserError('该托盘编码已失效') else: return "" def gettray_auto(self, barcode): if barcode != False: values = self.env['sf.tray'].search([("code", "=", barcode)]) if values: if values.state == "占用": raise UserError('该托盘已占用') if values.state == "报损": raise UserError('该托盘已损坏') else: values.update({ 'workorder_id': self, 'production_id': self.production_id, 'state': '占用', }) else: raise UserError('该托盘编码已失效') else: return "" # 解除托盘绑定 def unbindtray(self): tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)]) if tray: tray.unclamp() return "" def recreateManufacturingOrWorkerOrder(self): """ 重新生成制造订单或者重新生成工单 """ if self.test_results == '报废': values = self.env['mrp.production'].create_production1_values(self.production_id) productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company( self.production_id.company_id).create( values) self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder() productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() for production in productions: origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) if self.test_results == '返工': productions = self.production_id self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder2(self.processing_panel) else: return True # cnc程序获取 def fetchCNC(self): cnc = self.env['mrp.workorder'].search( [('routing_type', '=', 'CNC加工'), ('production_id', '=', self.production_id.id)], limit=1) res = {'model_code': cnc.product_id.barcode, 'production_no': self.production_id.name, 'machine_tool_code': cnc.workcenter_id.machine_tool_id.code, 'material_code': cnc.env['sf.production.materials'].search( [('id', '=', cnc.product_id.materials_id.id)]).materials_no, 'material_type_code': cnc.env['sf.materials.model'].search( [('id', '=', cnc.product_id.materials_type_id.id)]).materials_no, 'machining_precision': cnc.product_id.model_machining_precision, 'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length, 'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height, 'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width, 'order_no': cnc.production_id.origin # 'factory_code': self.env.user.company_id.partner_id. } logging.info('res:%s' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/create' config_url = configsettings['sf_url'] + url res_str = json.dumps(res) ret = requests.post(config_url, json={"result": res_str}, data=None, headers=config_header) ret = ret.json() result = json.loads(ret['result']) if result['status'] == 1: return self.write({'state': 'progress'}) def json_workorder_str1(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 重写工单开始按钮方法 def button_start(self): if self.state == 'waiting' or self.state == 'ready': self.ensure_one() if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)): return True # As button_start is automatically called in the new view if self.state in ('done', 'cancel'): return True if self.product_tracking == 'serial': self.qty_producing = 1.0 else: self.qty_producing = self.qty_remaining self.env['mrp.workcenter.productivity'].create( self._prepare_timeline_vals(self.duration, datetime.now()) ) if self.production_id.state != 'progress': self.production_id.write({ 'date_start': datetime.now(), }) if self.state == 'progress': return True start_date = datetime.now() vals = { 'state': 'progress', 'date_start': start_date, } if not self.leave_id: leave = self.env['resource.calendar.leaves'].create({ 'name': self.display_name, 'calendar_id': self.workcenter_id.resource_calendar_id.id, 'date_from': start_date, 'date_to': start_date + relativedelta(minutes=self.duration_expected), 'resource_id': self.workcenter_id.resource_id.id, 'time_type': 'other' }) vals['leave_id'] = leave.id return self.write(vals) else: if self.date_planned_start > start_date: vals['date_planned_start'] = start_date if self.date_planned_finished and self.date_planned_finished < start_date: vals['date_planned_finished'] = start_date return self.write(vals) else: raise ValidationError(_('请先完成上一步工单')) class CNCprocessing(models.Model): _name = 'sf.cnc.processing' _description = "CNC加工" cnc_id = fields.Many2one('ir.attachment') sequence_number = fields.Char('序号') program_name = fields.Char('程序名') cutting_tool_name = fields.Char('刀具名称') cutting_tool_no = fields.Char('刀号') processing_type = fields.Char('加工类型') margin_x_y = fields.Char('余量_X/Y') margin_z = fields.Char('余量_Z') depth_of_processing_z = fields.Char('加工深度(Z)') cutting_tool_extension_length = fields.Char('刀具伸出长度') cutting_tool_handle_type = fields.Char('刀柄型号') estimated_processing_time = fields.Datetime('预计加工时间') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") # mrs下发编程单创建CNC加工 def cnc_processing_create(self, obj): workorder = self.env['mrp.workorder'].search([('production_id.name', '=', obj['production_order_no']), ('processing_panel', '=', obj['processing_panel']), ('routing_type', '=', 'CNC加工')]) vals = { 'workorder_id': workorder.id, 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'remark': obj['remark'] } return self.env['sf.cnc.processing'].create(vals) # 创建附件(nc文件) def attachment_create(self, name, data): attachment = self.env['ir.attachment'].create({ 'datas': base64.b64encode(data), 'type': 'binary', 'description': '程序文件', 'name': name }) return attachment # 将FTP的nc文件下载到临时目录 def download_file_tmp(self, model_code, processing_panel): remotepath = os.path.join('/', model_code, 'return', processing_panel) serverdir = os.path.join('/tmp', model_code, 'return', processing_panel) ftp = FtpController() ftp.download_file_tree(remotepath, serverdir) return serverdir # 将nc文件存到attach的datas里 def write_file(self, nc_file_path, cnc): if os.path.exists(nc_file_path): with open(nc_file_path, 'rb') as file: data_bytes = file.read() attachment = self.attachment_create(cnc.program_name + '.NC', data_bytes) cnc.write({'cnc_id': attachment.id}) file.close() else: return False # 将nc文件对应的excel清单转为pdf # def to_pdf(self, excel_path, pdf_path): # """ # 需要在linux中下载好libreoffice # """ # logging.info('pdf_path:%s' % pdf_path) # logging.info('pdf_path:%s' % excel_path) # # 注意cmd中的libreoffice要和linux中安装的一致 # cmd = 'soffice --headless --convert-to pdf'.split() + [excel_path] + ['--outdir'] + [pdf_path] # p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=1) # # p.wait(timeout=30) # 停顿30秒等待转化 # # stdout, stderr = p.communicate() # p.communicate() class SfWorkOrderBarcodes(models.Model): """ 智能工厂工单处扫码绑定托盘 """ _name = "mrp.workorder" _inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"] def on_barcode_scanned(self, barcode): tray_code = self.env['sf.tray'].search([('code', '=', barcode)]) self.tray_code = tray_code.code workorder = self.env['mrp.workorder'].browse(self.ids) workorder.gettray_auto(barcode)