import os import json import math import requests import logging import base64 # import subprocess from datetime import datetime from dateutil.relativedelta import relativedelta from odoo import api, fields, models, SUPERUSER_ID, _ from odoo.exceptions import ValidationError from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' _order = 'sequence' workcenter_id = fields.Many2one('mrp.workcenter', required=False) users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids") processing_panel = fields.Char('加工面') sequence = fields.Integer(string='工序') routing_type = fields.Selection([ ('获取CNC加工程序', '获取CNC加工程序'), ('装夹', '装夹'), ('前置三元定位检测', '前置三元定位检测'), ('CNC加工', 'CNC加工'), ('后置三元质量检测', '后置三元质量检测'), ('解除装夹', '解除装夹'), ('切割', '切割'), ('表面工艺', '表面工艺') ], string="工序类型") results = fields.Char('检测结果') @api.onchange('users_ids') def get_user_permissions(self): uid = self.env.uid for workorder in self: if workorder.users_ids: list_user_id = [] for item in workorder.users_ids: list_user_id.append(item.id) if uid in list_user_id: workorder.user_permissions = True else: workorder.user_permissions = False else: workorder.user_permissions = False user_permissions = fields.Boolean('用户权限', compute='get_user_permissions') programming_no = fields.Char('编程单号') work_state = fields.Char('业务状态') programming_state = fields.Char('编程状态') cnc_worksheet = fields.Binary( '工作指令', readonly=True) material_center_point = fields.Char(string='胚料中心点') X1_axis = fields.Float(default=0) Y1_axis = fields.Float(default=0) Z1_axis = fields.Float(default=0) X2_axis = fields.Float(default=0) Y2_axis = fields.Float(default=0) Z2_axis = fields.Float(default=0) X3_axis = fields.Float(default=0) Y3_axis = fields.Float(default=0) Z3_axis = fields.Float(default=0) X4_axis = fields.Float(default=0) Y4_axis = fields.Float(default=0) Z4_axis = fields.Float(default=0) X5_axis = fields.Float(default=0) Y5_axis = fields.Float(default=0) Z5_axis = fields.Float(default=0) X6_axis = fields.Float(default=0) Y6_axis = fields.Float(default=0) Z6_axis = fields.Float(default=0) X7_axis = fields.Float(default=0) Y7_axis = fields.Float(default=0) Z7_axis = fields.Float(default=0) X8_axis = fields.Float(default=0) Y8_axis = fields.Float(default=0) Z8_axis = fields.Float(default=0) X9_axis = fields.Float(default=0) Y9_axis = fields.Float(default=0) Z9_axis = fields.Float(default=0) X10_axis = fields.Float(default=0) Y10_axis = fields.Float(default=0) Z10_axis = fields.Float(default=0) X_deviation_angle = fields.Integer(string="X轴偏差度", default=0) test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], default='合格', string="检测结果") cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工") tray_code = fields.Char(string="托盘") glb_file = fields.Binary("glb模型文件") is_subcontract = fields.Boolean(string='是否外协') surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数") stock_picking_in_ids = fields.One2many('stock.picking', 'workorder_in_id',string='库存外协入库单') stock_picking_out_ids = fields.One2many('stock.picking', 'workorder_out_id', string='库存外协出库单') supplier_id = fields.Integer('供应商Id') # 计算配料中心点和与x轴倾斜度方法 def getcenter(self): try: x1 = self.X1_axis x2 = self.X2_axis x3 = self.X3_axis x4 = self.X4_axis x5 = self.X5_axis x6 = self.X6_axis x7 = self.X7_axis x8 = self.X8_axis y1 = self.Y1_axis y2 = self.Y2_axis y3 = self.Y3_axis y4 = self.Y4_axis y5 = self.Y5_axis y6 = self.Y6_axis y7 = self.Y7_axis y8 = self.Y8_axis z1 = self.Z9_axis x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / ( (x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4)) y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / ( (y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4)) x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / ( (x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8)); y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / ( (y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8)) x = (x0 + x1) / 2 y = (y0 + y1) / 2 z = z1 / 2 jd = math.atan2((x5 - x6), (y5 - y6)) jdz = jd * 180 / math.pi print("(%.2f,%.2f)" % (x, y)) self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z)) self.X_deviation_angle = jdz # 将补偿值写入CNC加工工单 workorder = self.env['mrp.workorder'].browse(self.ids) work = workorder.production_id.workorder_ids work.compensation_value_x = eval(self.material_center_point)[0] work.compensation_value_y = eval(self.material_center_point)[1] except: raise UserError("参数计算有误") # 拼接工单对象属性值 def json_workorder_str(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起', 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 拼接工单对象属性值(表面工艺) def _json_workorder_surface_process_str(self, production, route, process_parameter, supplier_id): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s-%s' % (route.name, process_parameter.name), 'processing_panel': '', 'routing_type': '表面工艺', 'surface_technics_parameters_id': process_parameter.id, 'work_state': '', 'supplier_id': supplier_id, 'is_subcontract': True if process_parameter.gain_way == '外协' else False, 'workcenter_id': self.env[ 'mrp.workcenter'].get_process_outsourcing_workcenter() if process_parameter.gain_way == '外协' else self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 维修模块按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')], 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_workorder_id': self.id, 'default_production_id': self.production_id.id, 'discard_on_footer_button': True, }, 'target': 'new', 'domain': [('workorder_id', '=', self.id)] } tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True) # 扫码绑定托盘方法 def gettray(self): if self.tray_code != False: values = self.env['sf.tray'].search([("code", "=", self.tray_code)]) if values: if values.state == "占用": raise UserError('该托盘已占用') if values.state == "报损": raise UserError('该托盘已损坏') else: values.update({ 'workorder_id': self, 'production_id': self.production_id, 'state': '占用', }) self.work_state = "已绑定" orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) for a in orders: a.tray_id = values else: raise UserError('该托盘编码已失效') else: raise UserError('托盘码不能为空') # 验证坯料序列号是否正确 def pro_code_is_ok(self, barcode): if barcode != False: if barcode != self.pro_code: raise UserError('坯料序列号错误') return False else: return True pro_code = fields.Char(string='坯料序列号') pro_code_ok = fields.Boolean(default=False) # 托盘扫码绑定 def gettray_auto(self, barcode): if barcode != False: values = self.env['sf.tray'].search([("code", "=", barcode)]) if values: if values.state == "占用": raise UserError('该托盘已占用') if values.state == "报损": raise UserError('该托盘已损坏') else: values.update({ 'workorder_id': self, 'production_id': self.production_id, 'state': '占用', }) self.work_state = "已绑定" orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) for a in orders: a.tray_id = values return values # return { # 'name': _('New Maintenance Request'), # 'view_mode': 'form', # 'res_model': 'maintenance.request', # 'type': 'ir.actions.act_window', # 'context': { # 'default_company_id': self.company_id.id, # 'default_production_id': self.id, # }, # 'domain': [('production_id', '=', self.id)], # } else: raise UserError('该托盘编码已失效') else: raise UserError('托盘码不能为空') # 解除托盘绑定 def unbindtray(self): tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)]) if tray: tray.unclamp() self.tray_id = False # return { # 'name': _('New Maintenance Request'), # 'view_mode': 'form', # 'res_model': 'maintenance.request', # 'res_id':self.id, # 'type': 'ir.actions.act_window', # 'context': { # 'default_company_id': self.company_id.id, # 'default_production_id': self.id, # }, # 'domain': [('production_id', '=', self.id)], # 'target':'new' # } def recreateManufacturingOrWorkerOrder(self): """ 重新生成制造订单或者重新生成工单 """ if self.test_results == '报废': values = self.env['mrp.production'].create_production1_values(self.production_id) productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company( self.production_id.company_id).create( values) self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder() productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() for production in productions: origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) if self.test_results == '返工': productions = self.production_id # self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder2(self.processing_panel) else: self.results = '合格' # cnc程序获取 def fetchCNC(self): try: cnc = self.env['mrp.workorder'].search( [('routing_type', '=', 'CNC加工'), ('production_id', '=', self.production_id.id)], limit=1) logging.info('fetchCNC-cnc:%s' % cnc) # if cnc.product_id.upload_model_file: # logging.info('fetchCNC-upload_model_file:%s' % cnc.product_id.upload_model_file) # attachments = cnc.product_id.upload_model_file[0] # logging.info('fetchCNC-attachment1:%s' % attachments) # logging.info('fetchCNC-attachment1:%s' % cnc.product_id.upload_model_file[0]) # logging.info('fetchCNC-attachment2:%s' % cnc.product_id.upload_model_file[0].datas) # logging.info('fetchCNC-attachment:%s' % attachments.datas) # base64_data = base64.b64encode(attachments.datas) # logging.info('fetchCNC-attachment1:%s' % attachments) # base64_datas = base64_data.decode('utf-8') # model_code = hashlib.sha1(base64_datas.encode('utf-8')).hexdigest() # logging.info('fetchCNC-model_code:%s' % model_code) logging.info('fetchCNC-model_code1:%s' % cnc.product_id.model_code) res = {'model_code': '' if not cnc.product_id.model_code else cnc.product_id.model_code, 'production_no': self.production_id.name, 'machine_tool_code': cnc.workcenter_id.machine_tool_id.code, 'material_code': cnc.env['sf.production.materials'].search( [('id', '=', cnc.product_id.materials_id.id)]).materials_no, 'material_type_code': cnc.env['sf.materials.model'].search( [('id', '=', cnc.product_id.materials_type_id.id)]).materials_no, 'machining_processing_panel': cnc.product_id.model_processing_panel, 'machining_precision': cnc.product_id.model_machining_precision, 'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length, 'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height, 'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width, 'order_no': cnc.production_id.origin, 'user': self.env.user.name, 'model_file': '' if not cnc.product_id.model_file else base64.b64encode( cnc.product_id.model_file).decode('utf-8') } logging.info('res:%s' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/create' config_url = configsettings['sf_url'] + url # res_str = json.dumps(res) ret = requests.post(config_url, json={}, data=res, headers=config_header) ret = ret.json() logging.info('fetchCNC-ret:%s' % ret) if ret['status'] == 1: self.write( {'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'}) else: raise UserError(ret['message']) except Exception as e: logging.info('fetchCNC error:%s' % e) raise UserError("cnc程序获取编程单失败,请联系管理员") def json_workorder_str1(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s(返工)' % route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起', 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 重写工单开始按钮方法 def button_start(self): if self.routing_type == '装夹': self.pro_code = self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress': self.ensure_one() if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)): return True # As button_start is automatically called in the new view if self.state in ('done', 'cancel'): return True if self.product_tracking == 'serial': self.qty_producing = 1.0 else: self.qty_producing = self.qty_remaining self.env['mrp.workcenter.productivity'].create( self._prepare_timeline_vals(self.duration, datetime.now()) ) if self.production_id.state != 'progress': self.production_id.write({ 'date_start': datetime.now(), }) if self.state == 'progress': return True start_date = datetime.now() vals = { 'state': 'progress', 'date_start': start_date, } if not self.leave_id: leave = self.env['resource.calendar.leaves'].create({ 'name': self.display_name, 'calendar_id': self.workcenter_id.resource_calendar_id.id, 'date_from': start_date, 'date_to': start_date + relativedelta(minutes=self.duration_expected), 'resource_id': self.workcenter_id.resource_id.id, 'time_type': 'other' }) vals['leave_id'] = leave.id return self.write(vals) else: if self.date_planned_start > start_date: vals['date_planned_start'] = start_date if self.date_planned_finished and self.date_planned_finished < start_date: vals['date_planned_finished'] = start_date return self.write(vals) else: raise UserError(_('请先完成上一步工单')) def button_finish(self): end_date = datetime.now() for workorder in self: if workorder.state in ('done', 'cancel'): continue workorder.end_all() vals = { 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production, 'state': 'done', 'date_finished': end_date, 'date_planned_finished': end_date, 'costs_hour': workorder.workcenter_id.costs_hour } if not workorder.date_start: vals['date_start'] = end_date if not workorder.date_planned_start or end_date < workorder.date_planned_start: vals['date_planned_start'] = end_date workorder.with_context(bypass_duration_calculation=True).write(vals) self.env.cr.commit() finish_workorder_count = self.env['mrp.workorder'].search_count( [('production_id', '=', workorder.production_id.id), ('is_subcontract', '=', True)]) subcontract_workorder_count = self.env['mrp.workorder'].search_count( [('production_id', '=', workorder.production_id.id), ('is_subcontract', '=', True), ('state', '=', 'done')]) if finish_workorder_count > 0 and subcontract_workorder_count > 0: subcontract_workorder = self.env['mrp.workorder'].search( [('production_id', '=', workorder.production_id.id), ('is_subcontract', '=', True), ('state', '=', 'done')]) for item in subcontract_workorder: order_line_ids = [] server_product = self.env['product.template'].search( [('server_product_process_parameters_id', '=', item.surface_technics_parameters_id.id), ('categ_type', '=', '服务'), ('detailed_type', '=', 'service')]) order_line_ids.append((0, 0, { 'product_id': server_product.product_variant_id.id, 'product_qty': 1, 'product_uom': server_product.uom_id.id })) self.env['purchase.order'].create({ 'partner_id': server_product.seller_ids.partner_id.id, 'state': 'draft', 'order_line': order_line_ids, }) class CNCprocessing(models.Model): _name = 'sf.cnc.processing' _description = "CNC加工" _rec_name = 'program_name' cnc_id = fields.Many2one('ir.attachment') sequence_number = fields.Char('序号') program_name = fields.Char('程序名') cutting_tool_name = fields.Char('刀具名称') cutting_tool_no = fields.Char('刀号') processing_type = fields.Char('加工类型') margin_x_y = fields.Char('余量_X/Y') margin_z = fields.Char('余量_Z') depth_of_processing_z = fields.Char('加工深度(Z)') cutting_tool_extension_length = fields.Char('刀具伸出长度') cutting_tool_handle_type = fields.Char('刀柄型号') estimated_processing_time = fields.Char('预计加工时间') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") button_state = fields.Boolean(string='是否已经下发') # mrs下发编程单创建CNC加工 def cnc_processing_create(self, cnc_workorder, ret): logging.info('ret:%s' % ret) for obj in ret['programming_list']: workorder = self.env['mrp.workorder'].search([('production_id.name', '=', ret['production_order_no']), ('processing_panel', '=', obj['processing_panel']), ('routing_type', '=', 'CNC加工')]) cnc_processing = self.env['sf.cnc.processing'].create({ 'workorder_id': workorder.id, 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'remark': obj['remark'] }) self.get_cnc_processing_file(ret['folder_name'], cnc_processing, workorder.processing_panel) cnc_workorder.state = 'done' cnc_workorder.work_state = '已编程' cnc_workorder.programming_state = '已编程' cnc_workorder.time_ids.date_end = datetime.now() cnc_workorder.button_finish() # 根据程序名和加工面匹配到ftp里对应的Nc程序名 def get_cnc_processing_file(self, folder_name, cnc_processing, processing_panel): logging.info('folder_name:%s' % folder_name) serverdir = os.path.join('/tmp', folder_name, 'return', processing_panel) logging.info('serverdir:%s' % serverdir) for root, dirs, files in os.walk(serverdir): for f in files: logging.info('f:%s' % f) if os.path.splitext(f)[1] == ".pdf": full_path = os.path.join(serverdir, root, f) logging.info('pdf:%s' % full_path) if full_path != False: if not cnc_processing.workorder_id.cnc_worksheet: cnc_processing.workorder_id.cnc_worksheet = base64.b64encode( open(full_path, 'rb').read()) else: if cnc_processing.program_name == f.split('.')[0]: cnc_file_path = os.path.join(serverdir, root, f) logging.info('cnc_file_path:%s' % cnc_file_path) self.write_file(cnc_file_path, cnc_processing) # 创建附件(nc文件) def attachment_create(self, name, data): attachment = self.env['ir.attachment'].create({ 'datas': base64.b64encode(data), 'type': 'binary', 'public': True, 'description': '程序文件', 'name': name }) return attachment # 将FTP的nc文件下载到临时目录 def download_file_tmp(self, production_no, processing_panel): remotepath = os.path.join('/', production_no, 'return', processing_panel) serverdir = os.path.join('/tmp', production_no, 'return', processing_panel) ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) download_state = ftp.download_file_tree(remotepath, serverdir) return download_state # 将nc文件存到attach的datas里 def write_file(self, nc_file_path, cnc): if os.path.exists(nc_file_path): with open(nc_file_path, 'rb') as file: data_bytes = file.read() attachment = self.attachment_create(cnc.program_name + '.NC', data_bytes) cnc.write({'cnc_id': attachment.id}) file.close() else: return False # end_date = datetime.now() # for workorder in self: # if workorder.state in ('done', 'cancel'): # continue # workorder.end_all() # vals = { # 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production, # 'state': 'done', # 'date_finished': end_date, # 'date_planned_finished': end_date, # 'costs_hour': workorder.workcenter_id.costs_hour # } # if not workorder.date_start: # vals['date_start'] = end_date # if not workorder.date_planned_start or end_date < workorder.date_planned_start: # vals['date_planned_start'] = end_date # workorder.with_context(bypass_duration_calculation=True).write(vals) return True class SfWorkOrderBarcodes(models.Model): """ 智能工厂工单处扫码绑定托盘 """ _name = "mrp.workorder" _inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"] def on_barcode_scanned(self, barcode): workorder = self.env['mrp.workorder'].browse(self.ids) if "*" not in barcode: tray_code = self.env['sf.tray'].search([('code', '=', barcode)]) self.tray_code = tray_code.code self.tray_id = workorder.gettray_auto(barcode) else: self.pro_code_ok = workorder.pro_code_is_ok(barcode) # return { # 'type': 'ir.actions.act_window', # 'name': '工单', # 'res_model': 'mrp.workorder', # 'view_mode': 'form', # 'context': {'active_id': self.id}, # # 'target': 'current', # }