import logging from datetime import datetime from dateutil.relativedelta import relativedelta import os import base64 import math import requests # import subprocess from odoo import api, fields, models, SUPERUSER_ID, _ from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' _order = 'sequence asc,create_date desc' product_tmpl_id_length = fields.Float(related='production_id.product_tmpl_id.length', readonly=True, store=True, string="坯料长度(mm)") product_tmpl_id_width = fields.Float(related='production_id.product_tmpl_id.width', readonly=True, store=True, string="坯料宽度(mm)") product_tmpl_id_height = fields.Float(related='production_id.product_tmpl_id.height', readonly=True, store=True, string="坯料高度(mm)") product_tmpl_id_materials_id = fields.Many2one(related='production_id.product_tmpl_id.materials_id', readonly=True, store=True, check_company=True, string="材料") product_tmpl_id_materials_type_id = fields.Many2one(related='production_id.product_tmpl_id.materials_type_id', readonly=True, store=True, check_company=True, string="型号") workcenter_id = fields.Many2one('mrp.workcenter', string='工作中心', required=False) users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids") processing_panel = fields.Char('加工面') sequence = fields.Integer(string='工序') routing_type = fields.Selection([ ('获取CNC加工程序', '获取CNC加工程序'), ('装夹', '装夹'), ('前置三元定位检测', '前置三元定位检测'), ('CNC加工', 'CNC加工'), ('后置三元质量检测', '后置三元质量检测'), ('解除装夹', '解除装夹'), ('切割', '切割'), ('表面工艺', '表面工艺') ], string="工序类型") results = fields.Char('结果') @api.onchange('users_ids') def get_user_permissions(self): uid = self.env.uid for workorder in self: if workorder.users_ids: list_user_id = [] for item in workorder.users_ids: list_user_id.append(item.id) if uid in list_user_id: workorder.user_permissions = True else: workorder.user_permissions = False else: workorder.user_permissions = False user_permissions = fields.Boolean('用户权限', compute='get_user_permissions') programming_no = fields.Char('编程单号') work_state = fields.Char('业务状态') programming_state = fields.Char('编程状态') cnc_worksheet = fields.Binary( '工作指令', readonly=True) material_center_point = fields.Char(string='坯料中心点') X1_axis = fields.Float(default=0) Y1_axis = fields.Float(default=0) Z1_axis = fields.Float(default=0) X2_axis = fields.Float(default=0) Y2_axis = fields.Float(default=0) Z2_axis = fields.Float(default=0) X3_axis = fields.Float(default=0) Y3_axis = fields.Float(default=0) Z3_axis = fields.Float(default=0) X4_axis = fields.Float(default=0) Y4_axis = fields.Float(default=0) Z4_axis = fields.Float(default=0) X5_axis = fields.Float(default=0) Y5_axis = fields.Float(default=0) Z5_axis = fields.Float(default=0) X6_axis = fields.Float(default=0) Y6_axis = fields.Float(default=0) Z6_axis = fields.Float(default=0) X7_axis = fields.Float(default=0) Y7_axis = fields.Float(default=0) Z7_axis = fields.Float(default=0) X8_axis = fields.Float(default=0) Y8_axis = fields.Float(default=0) Z8_axis = fields.Float(default=0) X9_axis = fields.Float(default=0) Y9_axis = fields.Float(default=0) Z9_axis = fields.Float(default=0) X10_axis = fields.Float(default=0) Y10_axis = fields.Float(default=0) Z10_axis = fields.Float(default=0) X_deviation_angle = fields.Integer(string="X轴偏差度", default=0) test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], default='合格', string="检测结果") cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工") tray_code = fields.Char(string="托盘编码") glb_file = fields.Binary("glb模型文件") is_subcontract = fields.Boolean(string='是否外协') surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数") picking_in_id = fields.Many2one('stock.picking', string='外协入库单') picking_out_id = fields.Many2one('stock.picking', string='外协出库单') supplier_id = fields.Many2one('res.partner', string='外协供应商') equipment_id = fields.Many2one('maintenance.equipment', string='加工设备') schedule_state = fields.Selection(related='production_id.schedule_state', store=True) # 工件装夹信息 functional_fixture_code = fields.Char(string="功能夹具编码", readonly=True) functional_fixture_serial_number = fields.Char(string="功能夹具序列号", readonly=True) functional_fixture_id = fields.Many2one('sf.functional.fixture', string="功能夹具") functional_fixture_type_id = fields.Many2one('sf.functional.fixture.type', string="功能夹具类型", readonly=True) chuck_serial_number = fields.Char(string="卡盘序列号") chuck_name = fields.Char(string="卡盘名称") chuck_brand_id = fields.Many2one('sf.machine.brand', string="卡盘品牌") chuck_type_id = fields.Char(string="卡盘类型") chuck_model_id = fields.Char(string="卡盘型号") tray_serial_number = fields.Char(string="托盘序列号") tray_name = fields.Char(string="托盘名称") tray_brand_id = fields.Many2one('sf.machine.brand', string="托盘品牌") tray_type_id = fields.Char(string="托盘类型") tray_model_id = fields.Char(string="托盘型号") total_wight = fields.Float(string="总重量") maximum_carrying_weight = fields.Char(string="最大承载重量[kg]") maximum_clamping_force = fields.Char(string="最大夹持力[n]") production_line = fields.Char(string="生产线") preset_program_information = fields.Char(string="预调程序信息") @api.onchange('functional_fixture_id') def _onchange_functional_fixture_id(self): if self.functional_fixture_id: self.functional_fixture_code = self.functional_fixture_id.code self.functional_fixture_type_id = self.functional_fixture_id.type_id.id def get_no_data(self, production_id): process_parameter_workorder = self.search( [('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_id)]) return process_parameter_workorder # 计算配料中心点和与x轴倾斜度方法 def getcenter(self): try: x1 = self.X1_axis x2 = self.X2_axis x3 = self.X3_axis x4 = self.X4_axis x5 = self.X5_axis x6 = self.X6_axis x7 = self.X7_axis x8 = self.X8_axis y1 = self.Y1_axis y2 = self.Y2_axis y3 = self.Y3_axis y4 = self.Y4_axis y5 = self.Y5_axis y6 = self.Y6_axis y7 = self.Y7_axis y8 = self.Y8_axis z1 = self.Z9_axis x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / ( (x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4)) y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / ( (y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4)) x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / ( (x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8)) y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / ( (y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8)) x = (x0 + x1) / 2 y = (y0 + y1) / 2 z = z1 / 2 jd = math.atan2((x5 - x6), (y5 - y6)) jdz = jd * 180 / math.pi print("(%.2f,%.2f)" % (x, y)) self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z)) self.X_deviation_angle = jdz # 将补偿值写入CNC加工工单 workorder = self.env['mrp.workorder'].browse(self.ids) work = workorder.production_id.workorder_ids work.compensation_value_x = eval(self.material_center_point)[0] work.compensation_value_y = eval(self.material_center_point)[1] except: raise UserError("参数计算有误") # 拼接工单对象属性值 def json_workorder_str(self, k, production, route): # 计算预计时长duration_expected if route.routing_type == '切割': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '切割')]).time_cycle elif route.routing_type == '获取CNC加工程序': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '获取CNC加工程序')]).time_cycle elif route.routing_type == '工件装夹': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '工件装夹')]).time_cycle elif route.routing_type == '前置三元定位检测': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '前置三元定位检测')]).time_cycle elif route.routing_type == 'CNC加工': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', 'CNC加工')]).time_cycle elif route.routing_type == '后置三元质量检测': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '后置三元质量检测')]).time_cycle elif route.routing_type == '解除装夹': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '解除装夹')]).time_cycle else: duration_expected = 60 workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.route_workcenter_id.name, 'processing_panel': k, 'quality_point_ids': route.route_workcenter_id.quality_point_ids, 'routing_type': route.routing_type, 'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起', 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': duration_expected, 'duration': 0, }] return workorders_values_str # 拼接工单对象属性值(表面工艺) def _json_workorder_surface_process_str(self, production, route, process_parameter, supplier_id): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s-%s' % (route.name, process_parameter.name), 'processing_panel': '', 'routing_type': '表面工艺', 'surface_technics_parameters_id': process_parameter.id, 'work_state': '', 'supplier_id': supplier_id, 'is_subcontract': True if process_parameter.gain_way == '外协' else False, 'workcenter_id': self.env[ 'mrp.workcenter'].get_process_outsourcing_workcenter() if process_parameter.gain_way == '外协' else self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 维修模块按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')], 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_workorder_id': self.id, 'default_production_id': self.production_id.id, 'discard_on_footer_button': True, }, 'target': 'new', 'domain': [('workorder_id', '=', self.id)] } # tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True) # 扫码绑定托盘方法 # def gettray(self): # if self.tray_code != False: # values = self.env['sf.tray'].search([("code", "=", self.tray_code)]) # if values: # if values.state == "占用": # raise UserError('该托盘已占用') # if values.state == "报损": # raise UserError('该托盘已损坏') # else: # values.update({ # 'workorder_id': self, # 'production_id': self.production_id, # 'state': '占用', # }) # self.work_state = "已绑定" # orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) # for a in orders: # a.tray_id = values # else: # raise UserError('该托盘编码已失效') # else: # raise UserError('托盘码不能为空') # 验证坯料序列号是否正确 def pro_code_is_ok(self, barcode): if barcode is not False: if barcode != self.pro_code: raise UserError('坯料序列号错误') return False else: return True pro_code = fields.Char(string='坯料序列号') pro_code_ok = fields.Boolean(default=False) # 托盘扫码绑定 # def gettray_auto(self, barcode): # if barcode != False: # values = self.env['sf.tray'].search([("code", "=", barcode)]) # # if values: # if values.state == "占用": # raise UserError('该托盘已占用') # if values.state == "报损": # raise UserError('该托盘已损坏') # else: # values.update({ # 'workorder_id': self, # 'production_id': self.production_id, # 'state': '占用', # }) # self.work_state = "已绑定" # orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) # for a in orders: # a.tray_id = values # # return values # # # return { # # 'name': _('New Maintenance Request'), # # 'view_mode': 'form', # # 'res_model': 'maintenance.request', # # 'type': 'ir.actions.act_window', # # 'context': { # # 'default_company_id': self.company_id.id, # # 'default_production_id': self.id, # # }, # # 'domain': [('production_id', '=', self.id)], # # } # else: # raise UserError('该托盘编码已失效') # else: # raise UserError('托盘码不能为空') # 解除托盘绑定 # def unbindtray(self): # tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)]) # if tray: # tray.unclamp() # self.tray_id = False # return { # 'name': _('New Maintenance Request'), # 'view_mode': 'form', # 'res_model': 'maintenance.request', # 'res_id':self.id, # 'type': 'ir.actions.act_window', # 'context': { # 'default_company_id': self.company_id.id, # 'default_production_id': self.id, # }, # 'domain': [('production_id', '=', self.id)], # 'target':'new' # } def recreateManufacturingOrWorkerOrder(self): """ 重新生成制造订单或者重新生成工单 """ if self.test_results == '报废': values = self.env['mrp.production'].create_production1_values(self.production_id) productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company( self.production_id.company_id).create( values) self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder() productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() for production in productions: origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) if self.test_results == '返工': productions = self.production_id # self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder2(self.processing_panel) else: self.results = '合格' # cnc程序获取 def fetchCNC(self): try: cnc = self.env['mrp.workorder'].search( [('routing_type', '=', 'CNC加工'), ('production_id', '=', self.production_id.id)], limit=1) res = {'model_code': '' if not cnc.product_id.model_code else cnc.product_id.model_code, 'production_no': self.production_id.name, 'machine_tool_code': cnc.workcenter_id.equipment_id.code, 'material_code': cnc.env['sf.production.materials'].search( [('id', '=', cnc.product_id.materials_id.id)]).materials_no, 'material_type_code': cnc.env['sf.materials.model'].search( [('id', '=', cnc.product_id.materials_type_id.id)]).materials_no, 'machining_processing_panel': cnc.product_id.model_processing_panel, 'machining_precision': cnc.product_id.model_machining_precision, 'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length, 'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height, 'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width, 'order_no': cnc.production_id.origin, 'model_order_no': cnc.product_id.default_code.rsplit('-', 1)[0], 'user': self.env.user.name, 'model_file': '' if not cnc.product_id.model_file else base64.b64encode( cnc.product_id.model_file).decode('utf-8') } logging.info('res:%s' % res) configsettings = self.env['res.config.settings'].get_values() config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key']) url = '/api/intelligent_programming/create' config_url = configsettings['sf_url'] + url # res_str = json.dumps(res) ret = requests.post(config_url, json={}, data=res, headers=config_header) ret = ret.json() logging.info('fetchCNC-ret:%s' % ret) if ret['status'] == 1: self.write( {'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'}) else: raise UserError(ret['message']) except Exception as e: logging.info('fetchCNC error:%s' % e) raise UserError("cnc程序获取编程单失败,请联系管理员") def json_workorder_str1(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s(返工)' % route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起', 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': False, 'date_planned_finished': False, 'duration_expected': 60, 'duration': 0, }] return workorders_values_str # 重写工单开始按钮方法 def button_start(self): if self.routing_type == '装夹': self.pro_code = self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name # 外协出库单,从“正在等待”变为“就绪”状态 if self.is_subcontract is True: picking_out = self.env['stock.picking'].search([('id', '=', self.picking_out_id.id)]) if picking_out.state == 'confirmed': picking_out.write({'state': 'assigned'}) if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress': self.move_raw_ids = self.production_id.move_raw_ids self.ensure_one() if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)): return True # As button_start is automatically called in the new view if self.state in ('done', 'cancel'): return True if self.product_tracking == 'serial': self.qty_producing = 1.0 else: self.qty_producing = self.qty_remaining self.env['mrp.workcenter.productivity'].create( self._prepare_timeline_vals(self.duration, datetime.now()) ) if self.production_id.state != 'progress': self.production_id.write({ 'date_start': datetime.now(), }) if self.state == 'progress': return True start_date = datetime.now() vals = { 'state': 'progress', 'date_start': start_date, } if not self.leave_id: leave = self.env['resource.calendar.leaves'].create({ 'name': self.display_name, 'calendar_id': self.workcenter_id.resource_calendar_id.id, 'date_from': start_date, 'date_to': start_date + relativedelta(minutes=self.duration_expected), 'resource_id': self.workcenter_id.resource_id.id, 'time_type': 'other' }) vals['leave_id'] = leave.id return self.write(vals) else: if self.date_planned_start > start_date: vals['date_planned_start'] = start_date if self.date_planned_finished and self.date_planned_finished < start_date: vals['date_planned_finished'] = start_date return self.write(vals) else: raise UserError(_('请先完成上一步工单')) def button_finish(self): if self.picking_out_id: picking_out = self.env['stock.picking'].search([('id', '=', self.picking_out_id.id)]) if picking_out.workorder_out_id: order_line_ids = [] for item in picking_out.workorder_out_id: server_product = self.env['product.template'].search( [('server_product_process_parameters_id', '=', item.surface_technics_parameters_id.id), ('detailed_type', '=', 'service')]) if server_product: order_line_ids.append((0, 0, { 'product_id': server_product.product_variant_id.id, 'product_qty': 1, 'product_uom': server_product.uom_id.id })) else: raise UserError( '请先在产品中配置表面工艺为%s相关的外协服务产品' % item.surface_technics_parameters_id.name) self.env['purchase.order'].create({ 'partner_id': server_product.seller_ids.partner_id.id, 'state': 'draft', 'order_line': order_line_ids, }) super().button_finish() class CNCprocessing(models.Model): _name = 'sf.cnc.processing' _description = "CNC加工" _rec_name = 'program_name' cnc_id = fields.Many2one('ir.attachment') sequence_number = fields.Char('序号') program_name = fields.Char('程序名') cutting_tool_name = fields.Char('刀具名称') cutting_tool_no = fields.Char('刀号') processing_type = fields.Char('加工类型') margin_x_y = fields.Char('余量_X/Y') margin_z = fields.Char('余量_Z') depth_of_processing_z = fields.Char('加工深度(Z)') cutting_tool_extension_length = fields.Char('刀具伸出长度') cutting_tool_handle_type = fields.Char('刀柄型号') estimated_processing_time = fields.Char('预计加工时间') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") button_state = fields.Boolean(string='是否已经下发') # mrs下发编程单创建CNC加工 def cnc_processing_create(self, cnc_workorder, ret): logging.info('ret:%s' % ret) for obj in ret['programming_list']: workorder = self.env['mrp.workorder'].search([('production_id.name', '=', ret['production_order_no']), ('processing_panel', '=', obj['processing_panel']), ('routing_type', '=', 'CNC加工')]) cnc_processing = self.env['sf.cnc.processing'].create({ 'workorder_id': workorder.id, 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'remark': obj['remark'] }) self.get_cnc_processing_file(ret['folder_name'], cnc_processing, workorder.processing_panel) cnc_workorder.state = 'done' cnc_workorder.work_state = '已编程' cnc_workorder.programming_state = '已编程' cnc_workorder.time_ids.date_end = datetime.now() cnc_workorder.button_finish() # 根据程序名和加工面匹配到ftp里对应的Nc程序名 def get_cnc_processing_file(self, folder_name, cnc_processing, processing_panel): logging.info('folder_name:%s' % folder_name) serverdir = os.path.join('/tmp', folder_name, 'return', processing_panel) logging.info('serverdir:%s' % serverdir) for root, dirs, files in os.walk(serverdir): for f in files: logging.info('f:%s' % f) if os.path.splitext(f)[1] == ".pdf": full_path = os.path.join(serverdir, root, f) logging.info('pdf:%s' % full_path) if full_path is not False: if not cnc_processing.workorder_id.cnc_worksheet: cnc_processing.workorder_id.cnc_worksheet = base64.b64encode( open(full_path, 'rb').read()) else: if cnc_processing.program_name == f.split('.')[0]: cnc_file_path = os.path.join(serverdir, root, f) logging.info('cnc_file_path:%s' % cnc_file_path) self.write_file(cnc_file_path, cnc_processing) # 创建附件(nc文件) def attachment_create(self, name, data): attachment = self.env['ir.attachment'].create({ 'datas': base64.b64encode(data), 'type': 'binary', 'public': True, 'description': '程序文件', 'name': name }) return attachment # 将FTP的nc文件下载到临时目录 def download_file_tmp(self, production_no, processing_panel): remotepath = os.path.join('/', production_no, 'return', processing_panel) serverdir = os.path.join('/tmp', production_no, 'return', processing_panel) ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) download_state = ftp.download_file_tree(remotepath, serverdir) return download_state # 将nc文件存到attach的datas里 def write_file(self, nc_file_path, cnc): if os.path.exists(nc_file_path): with open(nc_file_path, 'rb') as file: data_bytes = file.read() attachment = self.attachment_create(cnc.program_name + '.NC', data_bytes) cnc.write({'cnc_id': attachment.id}) file.close() else: return False # end_date = datetime.now() # for workorder in self: # if workorder.state in ('done', 'cancel'): # continue # workorder.end_all() # vals = { # 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production, # 'state': 'done', # 'date_finished': end_date, # 'date_planned_finished': end_date, # 'costs_hour': workorder.workcenter_id.costs_hour # } # if not workorder.date_start: # vals['date_start'] = end_date # if not workorder.date_planned_start or end_date < workorder.date_planned_start: # vals['date_planned_start'] = end_date # workorder.with_context(bypass_duration_calculation=True).write(vals) return True class SfWorkOrderBarcodes(models.Model): """ 智能工厂工单处扫码绑定托盘 """ _name = "mrp.workorder" _inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"] # def on_barcode_scanned(self, barcode): # workorder = self.env['mrp.workorder'].browse(self.ids) # if "*" not in barcode: # if self.routing_type == '装夹': # tray_code = self.env['sf.tray'].search([('code', '=', barcode)]) # self.tray_code = tray_code.code # self.tray_id = workorder.gettray_auto(barcode) # elif self.routing_type == '前置三元定位检测': # print('我是前置三元检测') # logging.info('我是前置三元检测') # elif self.routing_type == 'CNC加工': # if barcode == 'UP-ALL': # print("我是一键合并下发") # logging.info('我是一键合并下发') # self.up_merge_all() # else: # print('CNC加工') # # print(barcode) # # a = self.env['sf.tray'].search([('code', '=', barcode)]) # # print(a) # # # workorder_obj = self.env['mrp.workorder'].search([('tray_code', '=', barcode)], limit=1) # # workorder_obj = self.env['mrp.workorder'].search([('tray_code', '=', barcode)]) # # e = workorder_obj.id # # print(workorder_obj) # # action = { # # 'name': '工单', # # 'type': 'ir.actions.act_window', # # # 'view_type': 'form', # # 'view_mode': 'form', # # 'res_model': 'mrp.workorder', # # 'view_id': self.env.ref('mrp.mrp_production_workorder_form_view_inherit').id, # # # 'res_id': workorder_obj.id, # # 'res_id': 1023, # # 'target': 'current', # # # 'context': self.env.context, # # # 'flags': {'initial_mode': 'edit'}, # # } # # return action # # elif self.routing_type == '后置三元质量检测': # print('后置三元检测') # elif self.routing_type == '解除装夹': # print("我是解除装夹") # else: # pass # # else: # self.pro_code_ok = workorder.pro_code_is_ok(barcode)