import re import logging import base64 import urllib.parse from datetime import date from datetime import datetime, timedelta import requests import os import math from lxml import etree from dateutil.relativedelta import relativedelta # import subprocess from odoo import api, fields, models, SUPERUSER_ID, _ from odoo.addons.sf_base.commons.common import Common from odoo.exceptions import UserError from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController class ResMrpWorkOrder(models.Model): _inherit = 'mrp.workorder' _order = 'sequence asc,create_date desc' product_tmpl_id_length = fields.Float(related='production_id.product_tmpl_id.length', readonly=True, store=True, string="坯料长度(mm)") product_tmpl_id_width = fields.Float(related='production_id.product_tmpl_id.width', readonly=True, store=True, string="坯料宽度(mm)") product_tmpl_id_height = fields.Float(related='production_id.product_tmpl_id.height', readonly=True, store=True, string="坯料高度(mm)") product_tmpl_id_materials_id = fields.Many2one(related='production_id.product_tmpl_id.materials_id', readonly=True, store=True, check_company=True, string="材料") product_tmpl_id_materials_type_id = fields.Many2one(related='production_id.product_tmpl_id.materials_type_id', readonly=True, store=True, check_company=True, string="型号") workcenter_id = fields.Many2one('mrp.workcenter', string='工作中心', required=False) users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids") processing_panel = fields.Char('加工面') sequence = fields.Integer(string='工序') routing_type = fields.Selection([ # ('获取CNC加工程序', '获取CNC加工程序'), ('装夹预调', '装夹预调'), # ('前置三元定位检测', '前置三元定位检测'), ('CNC加工', 'CNC加工'), # ('后置三元质量检测', '后置三元质量检测'), ('解除装夹', '解除装夹'), ('切割', '切割'), ('表面工艺', '表面工艺') ], string="工序类型") results = fields.Char('结果') manual_quotation = fields.Boolean('人工编程', default=False, readonly=True) @api.onchange('users_ids') def get_user_permissions(self): uid = self.env.uid for workorder in self: if workorder.users_ids: list_user_id = [] for item in workorder.users_ids: list_user_id.append(item.id) if uid in list_user_id: workorder.user_permissions = True else: workorder.user_permissions = False else: workorder.user_permissions = False user_permissions = fields.Boolean('用户权限', compute='get_user_permissions') programming_no = fields.Char('编程单号') work_state = fields.Char('业务状态') programming_state = fields.Char('编程状态') cnc_worksheet = fields.Binary( '工作指令', readonly=True) material_center_point = fields.Char(string='坯料中心点') X1_axis = fields.Float(default=0) Y1_axis = fields.Float(default=0) Z1_axis = fields.Float(default=0) X2_axis = fields.Float(default=0) Y2_axis = fields.Float(default=0) Z2_axis = fields.Float(default=0) X3_axis = fields.Float(default=0) Y3_axis = fields.Float(default=0) Z3_axis = fields.Float(default=0) X4_axis = fields.Float(default=0) Y4_axis = fields.Float(default=0) Z4_axis = fields.Float(default=0) X5_axis = fields.Float(default=0) Y5_axis = fields.Float(default=0) Z5_axis = fields.Float(default=0) X6_axis = fields.Float(default=0) Y6_axis = fields.Float(default=0) Z6_axis = fields.Float(default=0) X7_axis = fields.Float(default=0) Y7_axis = fields.Float(default=0) Z7_axis = fields.Float(default=0) X8_axis = fields.Float(default=0) Y8_axis = fields.Float(default=0) Z8_axis = fields.Float(default=0) X9_axis = fields.Float(default=0) Y9_axis = fields.Float(default=0) Z9_axis = fields.Float(default=0) X10_axis = fields.Float(default=0) Y10_axis = fields.Float(default=0) Z10_axis = fields.Float(default=0) X_deviation_angle = fields.Integer(string="X轴偏差度", default=0) test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], default='合格', string="检测结果") cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工程序") cmm_ids = fields.One2many("sf.cmm.program", 'workorder_id', string="CMM程序") tray_code = fields.Char(string="托盘编码") glb_file = fields.Binary("glb模型文件", related='production_id.model_file') is_subcontract = fields.Boolean(string='是否外协') surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数") picking_ids = fields.Many2many('stock.picking', string='外协出入库单') surface_technics_picking_count = fields.Integer("外协出入库", compute='_compute_surface_technics_picking_ids') @api.depends('name', 'production_id.name') def _compute_surface_technics_picking_ids(self): for order in self: picking_ids = self.env['stock.picking'].search([('id', 'in', order.picking_ids.ids)]) order.surface_technics_picking_count = len(picking_ids) def action_view_surface_technics_picking(self): self.ensure_one() action = self.env["ir.actions.actions"]._for_xml_id("stock.action_picking_tree_all") if len(self.picking_ids) > 1: action['domain'] = [('id', 'in', self.picking_ids.ids)] elif self.picking_ids: # action['name'] = '工艺外协' action['res_id'] = self.picking_ids.id action['views'] = [(self.env.ref('stock.view_picking_form').id, 'form')] if 'views' in action: action['views'] += [(state, view) for state, view in action['views'] if view != 'form'] action['context'] = dict(self._context, default_origin=self.name) return action supplier_id = fields.Many2one('res.partner', string='外协供应商') equipment_id = fields.Many2one('maintenance.equipment', string='加工设备') is_ok = fields.Boolean(string='是否合格') # 加工人 processing_user_id = fields.Many2one('res.users', string='加工人') # 检测人 inspection_user_id = fields.Many2one('res.users', string='检测人') # 保存名称 save_name = fields.Char(string='检测文件保存名称', compute='_compute_save_name') # 获取数据状态 data_state = fields.Boolean(string='获取数据状态', default=False) # 坯料长宽高 material_length = fields.Float(string='长') material_width = fields.Float(string='宽') material_height = fields.Float(string='高') # 零件图号 part_number = fields.Char(string='零件图号') # 工序状态 process_state = fields.Selection([ ('待装夹', '待装夹'), ('待检测', '待检测'), ('待加工', '装夹预调完工'), ('待解除装夹', '待解除装夹'), ('已完工', '已完工'), ], string='工序状态', default='待装夹', readonly='True') # 加工图纸 processing_drawing = fields.Binary(string='加工图纸') @api.depends('production_id') def _compute_save_name(self): """ 保存名称 """ for record in self: record.save_name = record.production_id.name.replace('/', '_') schedule_state = fields.Selection(related='production_id.schedule_state', store=True) # 工件装夹信息 functional_fixture_code = fields.Char(string="功能夹具编码", readonly=True) functional_fixture_serial_number = fields.Char(string="功能夹具序列号", readonly=True) functional_fixture_id = fields.Many2one('sf.functional.fixture', string="功能夹具") functional_fixture_type_id = fields.Many2one('sf.functional.fixture.type', string="功能夹具类型", readonly=True) chuck_serial_number = fields.Char(string="卡盘序列号") chuck_name = fields.Char(string="卡盘名称") chuck_brand_id = fields.Many2one('sf.machine.brand', string="卡盘品牌") chuck_type_id = fields.Char(string="卡盘类型") chuck_model_id = fields.Char(string="卡盘型号") tray_serial_number = fields.Char(string="托盘序列号") tray_product_id = fields.Many2one('product.product', string="托盘名称") tray_brand_id = fields.Many2one('sf.machine.brand', string="托盘品牌") tray_type_id = fields.Many2one('sf.fixture.material', string="托盘类型") tray_model_id = fields.Many2one('sf.fixture.model', string="托盘型号") total_wight = fields.Float(string="总重量") maximum_carrying_weight = fields.Char(string="最大承载重量[kg]") maximum_clamping_force = fields.Char(string="最大夹持力[n]") preset_program_information = fields.Char(string="预调程序信息") workpiece_delivery_ids = fields.One2many('sf.workpiece.delivery', 'workorder_id', '工件配送') is_delivery = fields.Boolean('是否配送完成', default=False) rfid_code = fields.Char('RFID码') rfid_code_old = fields.Char('RFID码(已解除)') production_line_id = fields.Many2one('sf.production.line', related='production_id.production_line_id', string='生产线', store=True) production_line_state = fields.Selection(related='production_id.production_line_state', string='上/下产线', store=True) detection_report = fields.Binary('检测报告', readonly=True) is_remanufacture = fields.Boolean(string='是否重新生成制造订单', default=True) @api.onchange('rfid_code') def _onchange(self): if self.rfid_code and self.state == 'progress': self.workpiece_delivery_ids[0].write({'rfid_code': self.rfid_code}) def get_plan_workorder(self, production_line): tomorrow = (date.today() + timedelta(days=+1)).strftime("%Y-%m-%d") tomorrow_start = tomorrow + ' 00:00:00' tomorrow_end = tomorrow + ' 23:59:59' logging.info('tomorrow:%s' % tomorrow) sql = """ SELECT * FROM mrp_workorder WHERE to_char(date_planned_start::timestamp + '8 hour','YYYY-MM-DD HH:mm:SS')>= %s AND to_char(date_planned_finished::timestamp + '8 hour','YYYY-MM-DD HH:mm:SS')<= %s """ params = [tomorrow_start, tomorrow_end] if production_line: sql += "AND production_line_id = %s" params.append(production_line) self.env.cr.execute(sql, params) ids = [t[0] for t in self.env.cr.fetchall()] return [('id', 'in', ids)] @api.onchange('is_ok') def _onchange_inspection_user_id(self): """ 检测is_ok(是否合格)被修改的话,就将当前用户赋值给inspection_user_id """ if not self.inspection_user_id: self.inspection_user_id = self.env.user.id else: self.inspection_user_id = False @api.onchange('functional_fixture_id') def _onchange_functional_fixture_id(self): if self.functional_fixture_id: self.functional_fixture_code = self.functional_fixture_id.code self.functional_fixture_type_id = self.functional_fixture_id.type_id.id def get_no_data(self, production_id): process_parameter_workorder = self.search( [('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_id)]) return process_parameter_workorder # 获取三次元检测点数据 def get_three_check_datas(self): factory_nick_name = 'XT' ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) # ftp.connect() local_dir_path = '/ftp/before' os.makedirs(local_dir_path, exist_ok=True) local_filename = self.save_name + '.xls' local_file_path = os.path.join(local_dir_path, local_filename) logging.info('local_file_path:%s' % local_file_path) remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename logging.info('remote_path:%s' % remote_path) if not ftp.file_exists(remote_path): raise UserError(f"文件不存在: {remote_path}") with open(local_file_path, 'wb') as local_file: ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write) logging.info('下载文件成功') # 解析本地文件 # file_path = 'WH_MO_00099.xls' # 使用下载的实际文件路径 parser = etree.XMLParser(recover=True) # Using recover to handle errors tree = etree.parse(local_file_path, parser) logging.info('tree:%s' % tree) root = tree.getroot() logging.info('root:%s' % root) # 准备一个外部字典来存储以PT为键的坐标字典 pt_coordinates = {} # 遍历每个工作表和行 for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'): sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name') logging.info('sheet_name:%s' % sheet_name) if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表 current_pt = None for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'): cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell')) for i, cell in enumerate(cells): data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data') if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空 # 检查是否是PT标识 logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据 if "PT" in data_cell.text: current_pt = data_cell.text pt_coordinates[current_pt] = [] elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None: # 确保当前单元格后面还有单元格存在,以获取理论值 if i + 1 < len(cells): next_cell = cells[i + 1] theory_value = next_cell.find( './/{urn:schemas-microsoft-com:office:spreadsheet}Data') if theory_value is not None: # 为当前PT键添加坐标数据 pt_coordinates[current_pt].append({ data_cell.text: float(theory_value.text) }) logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}") logging.info('pt_coordinates=====%s' % pt_coordinates) # pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}], # 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]} # 检查是否存在PT1等键 if 'PT1' in pt_coordinates and pt_coordinates['PT1']: self.X1_axis = pt_coordinates['PT3'][0]['X'] self.Y1_axis = pt_coordinates['PT3'][1]['Y'] self.Z1_axis = pt_coordinates['PT3'][2]['Z'] else: raise UserError('PT1点未测或数据错误') if 'PT2' in pt_coordinates and pt_coordinates['PT2']: self.X2_axis = pt_coordinates['PT4'][0]['X'] self.Y2_axis = pt_coordinates['PT4'][1]['Y'] self.Z2_axis = pt_coordinates['PT4'][2]['Z'] else: raise UserError('PT2点未测或数据错误') if 'PT3' in pt_coordinates and pt_coordinates['PT3']: self.X3_axis = pt_coordinates['PT5'][0]['X'] self.Y3_axis = pt_coordinates['PT5'][1]['Y'] self.Z3_axis = pt_coordinates['PT5'][2]['Z'] else: raise UserError('PT3点未测或数据错误') if 'PT4' in pt_coordinates and pt_coordinates['PT4']: self.X4_axis = pt_coordinates['PT6'][0]['X'] self.Y4_axis = pt_coordinates['PT6'][1]['Y'] self.Z4_axis = pt_coordinates['PT6'][2]['Z'] else: raise UserError('PT4点未测或数据错误') if 'PT5' in pt_coordinates and pt_coordinates['PT5']: self.X5_axis = pt_coordinates['PT7'][0]['X'] self.Y5_axis = pt_coordinates['PT7'][1]['Y'] self.Z5_axis = pt_coordinates['PT7'][2]['Z'] else: raise UserError('PT5点未测或数据错误') if 'PT6' in pt_coordinates and pt_coordinates['PT6']: self.X6_axis = pt_coordinates['PT8'][0]['X'] self.Y6_axis = pt_coordinates['PT8'][1]['Y'] self.Z6_axis = pt_coordinates['PT8'][2]['Z'] else: raise UserError('PT6点未测或数据错误') if 'PT7' in pt_coordinates and pt_coordinates['PT7']: self.X7_axis = pt_coordinates['PT9'][0]['X'] self.Y7_axis = pt_coordinates['PT9'][1]['Y'] self.Z7_axis = pt_coordinates['PT9'][2]['Z'] else: raise UserError('PT7点未测或数据错误') if 'PT8' in pt_coordinates and pt_coordinates['PT8']: self.X8_axis = pt_coordinates['PT10'][0]['X'] self.Y8_axis = pt_coordinates['PT10'][1]['Y'] self.Z8_axis = pt_coordinates['PT10'][2]['Z'] else: raise UserError('PT8点未测或数据错误') if 'PT9' in pt_coordinates and pt_coordinates['PT9']: self.X9_axis = pt_coordinates['PT1'][0]['X'] self.Y9_axis = pt_coordinates['PT1'][1]['Y'] self.Z9_axis = pt_coordinates['PT1'][2]['Z'] else: raise UserError('PT9点未测或数据错误') if 'PT10' in pt_coordinates and pt_coordinates['PT10']: self.X10_axis = pt_coordinates['PT2'][0]['X'] self.Y10_axis = pt_coordinates['PT2'][1]['Y'] self.Z10_axis = pt_coordinates['PT2'][2]['Z'] else: raise UserError('PT10点未测或数据错误') self.data_state = True return True # ftp.download_file('three_check_datas.xls', '/home/ftpuser/three_check_datas.xls') # ftp.close() # data = xlrd.open_workbook('/home/ftpuser/three_check_datas.xls') # table = data.sheets()[0] # nrows = table.nrows # # 点坐标列表 # point_list = [] # datas = [] # for i in range(1, nrows): # datas.append(table.row_values(i)) # return datas # 计算配料中心点和与x轴倾斜度方法 def getcenter(self): try: x1 = self.X1_axis x2 = self.X2_axis x3 = self.X3_axis x4 = self.X4_axis x5 = self.X5_axis x6 = self.X6_axis x7 = self.X7_axis x8 = self.X8_axis y1 = self.Y1_axis y2 = self.Y2_axis y3 = self.Y3_axis y4 = self.Y4_axis y5 = self.Y5_axis y6 = self.Y6_axis y7 = self.Y7_axis y8 = self.Y8_axis z1 = self.Z9_axis z2 = self.Z10_axis x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / ( (x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4)) y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / ( (y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4)) x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / ( (x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8)) y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / ( (y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8)) x = (x0 + x1) / 2 y = (y0 + y1) / 2 z = (z1 + z2) / 2 jd = math.atan2((x5 - x6), (y5 - y6)) jdz = jd * 180 / math.pi print("(%.2f,%.2f)" % (x, y)) self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z)) self.X_deviation_angle = jdz # 将补偿值写入CNC加工工单 workorder = self.env['mrp.workorder'].browse(self.ids) work = workorder.production_id.workorder_ids work.compensation_value_x = eval(self.material_center_point)[0] work.compensation_value_y = eval(self.material_center_point)[1] # work.process_state = '待加工' # self.sudo().production_id.process_state = '待加工' self.date_finished = datetime.now() workorder.button_finish() except Exception as e: # 重新抛出捕获到的异常信息 raise UserError(str(e)) def button_workpiece_delivery(self): if self.routing_type == '装夹预调': for item in self.workpiece_delivery_ids: if not item.route_id: raise UserError('【工件配送】明细中请选择【任务路线】') else: if self.state == 'done': if item.is_cnc_program_down is True: if item.status == '待下发': return { 'name': _('确认'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.workpiece.delivery.wizard', 'target': 'new', 'context': { 'default_workorder_id': self.id, }} else: raise UserError(_("该制造订单还未下发CNC程序单,无法进行工件配送")) else: raise UserError(_("该工单暂未完成,无法进行工件配送")) # 拼接工单对象属性值 def json_workorder_str(self, k, production, route): # 计算预计时长duration_expected if route.routing_type == '切割': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '切割')]).time_cycle # elif route.routing_type == '获取CNC加工程序': # duration_expected = self.env['mrp.routing.workcenter'].sudo().search( # [('name', '=', '获取CNC加工程序')]).time_cycle elif route.routing_type == '装夹预调': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '装夹预调')]).time_cycle # elif route.routing_type == '前置三元定位检测': # duration_expected = self.env['mrp.routing.workcenter'].sudo().search( # [('name', '=', '前置三元定位检测')]).time_cycle elif route.routing_type == 'CNC加工': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', 'CNC加工')]).time_cycle # elif route.routing_type == '后置三元质量检测': # duration_expected = self.env['mrp.routing.workcenter'].sudo().search( # [('name', '=', '后置三元质量检测')]).time_cycle elif route.routing_type == '解除装夹': duration_expected = self.env['mrp.routing.workcenter'].sudo().search( [('name', '=', '解除装夹')]).time_cycle else: duration_expected = 60 workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': route.route_workcenter_id.name, 'processing_panel': k, 'quality_point_ids': route.route_workcenter_id.quality_point_ids, 'routing_type': route.routing_type, 'work_state': '待发起', 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), # 设定初始化值,避免出现变成bool问题 'date_planned_start': datetime.now(), 'date_planned_finished': datetime.now() + timedelta(days=1), 'duration_expected': duration_expected, 'duration': 0, 'workpiece_delivery_ids': False if not route.routing_type == '装夹预调' else self._json_workpiece_delivery_list( production) }] return workorders_values_str def _json_workpiece_delivery_list(self, production): up_route = self.env['sf.agv.task.route'].search([('route_type', '=', '上产线')], limit=1, order='id asc') down_route = self.env['sf.agv.task.route'].search([('route_type', '=', '下产线')], limit=1, order='id asc') return [ [0, '', {'production_id': production.id, 'type': '上产线', 'route_id': up_route.id, 'feeder_station_start_id': up_route.start_site_id.id, 'feeder_station_destination_id': up_route.end_site_id.id}], [0, '', {'production_id': production.id, 'type': '下产线', 'route_id': down_route.id, 'feeder_station_start_id': down_route.start_site_id.id, 'feeder_station_destination_id': down_route.end_site_id.id}]] # 拼接工单对象属性值(表面工艺) def _json_workorder_surface_process_str(self, production, route, process_parameter, supplier_id): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s-%s' % (route.name, process_parameter.name), 'processing_panel': '', 'routing_type': '表面工艺', 'surface_technics_parameters_id': process_parameter.id, 'work_state': '', 'supplier_id': supplier_id, 'is_subcontract': True if process_parameter.gain_way == '外协' else False, 'workcenter_id': self.env[ 'mrp.workcenter'].get_process_outsourcing_workcenter() if process_parameter.gain_way == '外协' else self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': datetime.now(), 'date_planned_finished': datetime.now() + timedelta(days=1), 'duration_expected': 60, 'duration': 0 }] return workorders_values_str # 维修模块按钮 def button_maintenance_req(self): self.ensure_one() return { 'name': _('New Maintenance Request'), 'view_mode': 'form', 'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')], 'res_model': 'maintenance.request', 'type': 'ir.actions.act_window', 'context': { 'default_company_id': self.company_id.id, 'default_workorder_id': self.id, 'default_production_id': self.production_id.id, 'discard_on_footer_button': True, }, 'target': 'new', 'domain': [('workorder_id', '=', self.id)] } # tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True) # 扫码绑定托盘方法 # def gettray(self): # if self.tray_code != False: # values = self.env['sf.tray'].search([("code", "=", self.tray_code)]) # if values: # if values.state == "占用": # raise UserError('该托盘已占用') # if values.state == "报损": # raise UserError('该托盘已损坏') # else: # values.update({ # 'workorder_id': self, # 'production_id': self.production_id, # 'state': '占用', # }) # self.work_state = "已绑定" # orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) # for a in orders: # a.tray_id = values # else: # raise UserError('该托盘编码已失效') # else: # raise UserError('托盘码不能为空') # 验证坯料序列号是否正确 def pro_code_is_ok(self, barcode): if barcode is not False: if barcode != self.pro_code: raise UserError('坯料序列号错误') return False else: return True pro_code = fields.Char(string='坯料序列号') pro_code_ok = fields.Boolean(default=False) # 托盘扫码绑定 # def gettray_auto(self, barcode): # if barcode != False: # values = self.env['sf.tray'].search([("code", "=", barcode)]) # # if values: # if values.state == "占用": # raise UserError('该托盘已占用') # if values.state == "报损": # raise UserError('该托盘已损坏') # else: # values.update({ # 'workorder_id': self, # 'production_id': self.production_id, # 'state': '占用', # }) # self.work_state = "已绑定" # orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)]) # for a in orders: # a.tray_id = values # # return values # # # return { # # 'name': _('New Maintenance Request'), # # 'view_mode': 'form', # # 'res_model': 'maintenance.request', # # 'type': 'ir.actions.act_window', # # 'context': { # # 'default_company_id': self.company_id.id, # # 'default_production_id': self.id, # # }, # # 'domain': [('production_id', '=', self.id)], # # } # else: # raise UserError('该托盘编码已失效') # else: # raise UserError('托盘码不能为空') # 解除托盘绑定 # def unbindtray(self): # tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)]) # if tray: # tray.unclamp() # self.tray_id = False # return { # 'name': _('New Maintenance Request'), # 'view_mode': 'form', # 'res_model': 'maintenance.request', # 'res_id':self.id, # 'type': 'ir.actions.act_window', # 'context': { # 'default_company_id': self.company_id.id, # 'default_production_id': self.id, # }, # 'domain': [('production_id', '=', self.id)], # 'target':'new' # } def recreateManufacturingOrWorkerOrder(self): """ 重新生成制造订单或者重新生成工单 """ if self.test_results == '报废': values = self.env['mrp.production'].create_production1_values(self.production_id) productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company( self.production_id.company_id).create( values) self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder() productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \ ( p.move_dest_ids.procure_method != 'make_to_order' and not p.move_raw_ids and not p.workorder_ids)).action_confirm() for production in productions: origin_production = production.move_dest_ids and production.move_dest_ids[ 0].raw_material_production_id or False orderpoint = production.orderpoint_id if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual': production.message_post( body=_('This production order has been created from Replenishment Report.'), message_type='comment', subtype_xmlid='mail.mt_note') elif orderpoint: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': orderpoint}, subtype_id=self.env.ref('mail.mt_note').id) elif origin_production: production.message_post_with_view( 'mail.message_origin_link', values={'self': production, 'origin': origin_production}, subtype_id=self.env.ref('mail.mt_note').id) if self.test_results == '返工': productions = self.production_id # self.env['stock.move'].sudo().create(productions._get_moves_raw_values()) # self.env['stock.move'].sudo().create(productions._get_moves_finished_values()) productions._create_workorder2(self.processing_panel) else: self.results = '合格' def json_workorder_str1(self, k, production, route): workorders_values_str = [0, '', { 'product_uom_id': production.product_uom_id.id, 'qty_producing': 0, 'operation_id': False, 'name': '%s(返工)' % route.route_workcenter_id.name, 'processing_panel': k, 'routing_type': route.routing_type, 'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起', 'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids, route.routing_type, production.product_id), 'date_planned_start': datetime.now(), 'date_planned_finished': datetime.now() + timedelta(days=1), 'duration_expected': 60, 'duration': 0, }] return workorders_values_str # 重写工单开始按钮方法 def button_start(self): if self.routing_type == '装夹预调': # 判断是否有坯料的序列号信息 boolean = False if self.production_id.move_raw_ids[0].move_line_ids: if self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name: boolean = True if not boolean: raise UserError('制造订单【%s】缺少组件的序列号信息!' % self.production_id.name) self.pro_code = self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name # cnc校验 cnc_workorder = self.search( [('production_id', '=', self.production_id.id), ('routing_type', '=', 'CNC加工')], limit=1, order='id asc') if not cnc_workorder.cnc_ids: raise UserError(_('该制造订单还未下发CNC程序,请稍后再试')) else: for item in cnc_workorder.cnc_ids: functional_cutting_tool = self.env['sf.functional.cutting.tool.entity'].search( [('tool_name_id.name', '=', item.cutting_tool_name)]) if not functional_cutting_tool: raise UserError(_('该制造订单的CNC程序为%s没有对应的功能刀具' % item.cutting_tool_name)) if self.routing_type == '解除装夹': ''' 记录开始时间 ''' self.date_start = datetime.now() # 表面工艺外协出库单 if self.routing_type == '表面工艺': if self.is_subcontract is True: move_out = self.env['stock.move'].search( [('location_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'WH-PREPRODUCTION')]).id), ('location_dest_id', '=', self.env['stock.location'].search( [('barcode', 'ilike', 'VL-SPOC')]).id), ('origin', '=', self.production_id.name)]) purchase = self.env['purchase.order'].search([('origin', '=', self.production_id.name)]) if purchase and move_out: move_out.write({'state': 'assigned'}) self.env['stock.move.line'].create(move_out.get_move_line(purchase, self)) # move_out._action_assign() if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress': self.move_raw_ids = self.production_id.move_raw_ids self.move_raw_ids[0].write({ 'materiel_length': self.move_raw_ids[0].product_id.length, 'materiel_width': self.move_raw_ids[0].product_id.width, 'materiel_height': self.move_raw_ids[0].product_id.height }) self.write({ 'material_length': self.move_raw_ids[0].product_id.length, 'material_width': self.move_raw_ids[0].product_id.width, 'material_height': self.move_raw_ids[0].product_id.height }) self.ensure_one() if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)): return True # As button_start is automatically called in the new view if self.state in ('done', 'cancel'): return True if self.product_tracking == 'serial': self.qty_producing = 1.0 else: self.qty_producing = self.qty_remaining self.env['mrp.workcenter.productivity'].create( self._prepare_timeline_vals(self.duration, datetime.now()) ) if self.production_id.state != 'progress': self.production_id.write({ 'date_start': datetime.now(), }) if self.state == 'progress': return True start_date = datetime.now() vals = { 'state': 'progress', 'date_start': start_date, } if not self.leave_id: leave = self.env['resource.calendar.leaves'].create({ 'name': self.display_name, 'calendar_id': self.workcenter_id.resource_calendar_id.id, 'date_from': start_date, 'date_to': start_date + relativedelta(minutes=self.duration_expected), 'resource_id': self.workcenter_id.resource_id.id, 'time_type': 'other' }) vals['leave_id'] = leave.id return self.write(vals) else: if self.date_planned_start > start_date: vals['date_planned_start'] = start_date # if self.date_planned_finished and self.date_planned_finished < start_date: # vals['date_planned_finished'] = start_date return self.write(vals) else: raise UserError(_('请先完成上一步工单')) def button_finish(self): for record in self: if record.routing_type == '装夹预调': if not record.material_center_point and record.X_deviation_angle > 0: raise UserError("请对前置三元检测定位参数进行计算定位") if not record.rfid_code: raise UserError("请扫RFID码进行绑定") record.process_state = '待加工' # record.write({'process_state': '待加工'}) record.production_id.process_state = '待加工' if record.routing_type == 'CNC加工': record.process_state = '待解除装夹' # record.write({'process_state': '待加工'}) record.production_id.process_state = '待解除装夹' if record.routing_type == '解除装夹': ''' 记录结束时间 ''' record.date_finished = datetime.now() if record.routing_type == '表面工艺': logging.info('record.picking_ids:%s' % record.picking_ids) logging.info('record.picking_out:%s' % record.picking_ids[0]) if record.picking_ids: for pick_item in record.picking_ids: if pick_item.state not in ['done']: raise UserError( '请先完成该工单的工艺外协再进行操作') picking_out = record.env['stock.move.line'].search( [('picking_id', '=', record.picking_ids[0].id)]) logging.info('picking_out:%s' % picking_out.picking_id.name) if picking_out: order_line_ids = [] logging.info('surface_technics_parameters_id:%s' % record.surface_technics_parameters_id.name) server_product = self.env['product.template'].search( [('server_product_process_parameters_id', '=', record.surface_technics_parameters_id.id), ('detailed_type', '=', 'service')]) logging.info('server_product:%s' % server_product.name) if server_product: order_line_ids.append((0, 0, { 'product_id': server_product.product_variant_id.id, 'product_qty': 1, 'product_uom': server_product.uom_id.id })) self.env['purchase.order'].sudo().create({ 'partner_id': server_product.seller_ids.partner_id.id, 'origin': record.production_id.name, 'state': 'draft', 'order_line': order_line_ids, }) else: raise UserError( '请先在产品中配置表面工艺为%s相关的外协服务产品' % item.surface_technics_parameters_id.name) tem_date_planned_finished = record.date_planned_finished logging.info('routing_type:%s' % record.routing_type) super().button_finish() logging.info('date_planned_finished:%s' % record.date_planned_finished) # 表面工艺工单完成不走该修改 if record.routing_type != '表面工艺': record.write({ 'date_planned_finished': tem_date_planned_finished # 保持原值 }) is_production_id = True for workorder in record.production_id.workorder_ids: if workorder.state != 'done': is_production_id = False if is_production_id is True and record.routing_type in ['解除装夹', '表面工艺']: for workorder in record.production_id.workorder_ids: workorder.rfid_code_old = workorder.rfid_code workorder.rfid_code = None for move_raw_id in record.production_id.move_raw_ids: move_raw_id.quantity_done = move_raw_id.product_uom_qty record.process_state = '已完工' record.production_id.process_state = '已完工' if record.routing_type in ['表面工艺']: raw_move = self.env['stock.move'].sudo().search( [('origin', '=', record.production_id.name), ('procure_method', '=', 'make_to_order'), ('state', '!=', 'done')]) if raw_move: raw_move.write({'state': 'done'}) record.production_id.button_mark_done1() # self.production_id.state = 'done' # 将FTP的检测报告文件下载到临时目录 def download_reportfile_tmp(self, workorder, reportpath): logging.info('reportpath:%s' % reportpath) production_no_ftp = reportpath.split('/') production_no = workorder.production_id.name.replace('/', '_') # ftp地址 remotepath = os.path.join('/NC', production_no_ftp[1], 'detection') logging.info('ftp地址:%s' % remotepath) if reportpath.find(production_no) != -1: # 服务器内临时地址 serverdir = os.path.join('/tmp', production_no_ftp[1], 'detection') ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) download_state = ftp.download_reportfile_tree(remotepath, serverdir, reportpath) logging.info('download_state:%s' % download_state) else: download_state = 2 return download_state # 根据中控系统提供的检测文件地址去ftp里对应的制造订单里获取 def get_detection_file(self, workorder, reportPath): # if reportPath.startswith('/'): # reportPath = reportPath[4:] # serverdir = os.path.join('/tmp', reportPath) serverdir = '/tmp' + reportPath logging.info('get_detection_file-serverdir:%s' % serverdir) serverdir_prefix = os.path.dirname(serverdir) logging.info('serverdir_prefix-serverdir:%s' % serverdir_prefix) for root, dirs, files in os.walk(serverdir_prefix): for filename in files: logging.info('filename:%s' % filename) logging.info('reportPath:%s' % os.path.basename(reportPath)) if filename == os.path.basename(reportPath): report_file_path = os.path.join(root, filename) logging.info('get_detection_file-report_file_path:%s' % report_file_path) workorder.detection_report = base64.b64encode(open(report_file_path, 'rb').read()) return True class CNCprocessing(models.Model): _name = 'sf.cnc.processing' _description = "CNC加工" _rec_name = 'program_name' _order = 'sequence_number,id' cnc_id = fields.Many2one('ir.attachment') sequence_number = fields.Integer('序号') program_name = fields.Char('程序名') functional_tool_type_id = fields.Many2one('sf.functional.cutting.tool.model', string='功能刀具类型') cutting_tool_name = fields.Char('刀具名称') cutting_tool_no = fields.Char('刀号') processing_type = fields.Char('加工类型') margin_x_y = fields.Char('余量_X/Y') margin_z = fields.Char('余量_Z') depth_of_processing_z = fields.Char('加工深度(Z)') cutting_tool_extension_length = fields.Char('刀具伸出长度') cutting_tool_handle_type = fields.Char('刀柄型号') estimated_processing_time = fields.Char('预计加工时间') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") production_id = fields.Many2one('mrp.production', string="制造订单") button_state = fields.Boolean(string='是否已经下发') program_path = fields.Char('程序文件路径') # mrs下发编程单创建CNC加工 def cnc_processing_create(self, cnc_workorder, ret, program_path, program_path_tmp): cnc_processing = None for obj in ret['programming_list']: workorder = self.env['mrp.workorder'].search( [('production_id.name', '=', cnc_workorder.name), ('processing_panel', '=', obj['processing_panel']), ('routing_type', '=', 'CNC加工')]) logging.info('workorder:%s' % workorder.id) if obj['program_name'] in program_path: logging.info('obj:%s' % obj['program_name']) cnc_processing = self.env['sf.cnc.processing'].create({ 'workorder_id': workorder.id, 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'remark': obj['remark'], 'program_path': program_path.replace('/tmp', '/home/ftp/ftp_root/NC') }) cnc_processing.get_cnc_processing_file(program_path_tmp, cnc_processing, program_path) cnc_workorder.write({'programming_state': '已编程', 'work_state': '已编程'}) return cnc_processing def _json_cnc_processing(self, obj): cnc_processing_str = (0, 0, { 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'program_path': obj['program_path'], 'cnc_id': obj['cnc_id'].id, 'remark': obj['remark'] }) return cnc_processing_str # 根据程序名和加工面匹配到ftp里对应的Nc程序名,可优化为根据cnc_processing.program_path进行匹配 def get_cnc_processing_file(self, serverdir, cnc_processing, program_path): logging.info('serverdir:%s' % serverdir) for root, dirs, files in os.walk(serverdir): for f in files: if os.path.splitext(f)[1] == ".pdf": full_path = os.path.join(serverdir, root, f) cnc_processing.workorder_id.cnc_worksheet = base64.b64encode( open(full_path, 'rb').read()) else: if f in program_path: # if cnc_processing.program_name == f.split('.')[0]: cnc_file_path = os.path.join(serverdir, root, f) self.write_file(cnc_file_path, cnc_processing) # 创建附件(nc文件) def attachment_create(self, name, data): attachment = self.env['ir.attachment'].create({ 'datas': base64.b64encode(data), 'type': 'binary', 'public': True, 'description': '程序文件', 'name': name }) return attachment # 将FTP的nc文件下载到临时目录 def download_file_tmp(self, production_no, processing_panel): remotepath = os.path.join('/NC', production_no, 'return', processing_panel) serverdir = os.path.join('/tmp', production_no, 'return', processing_panel) ftp_resconfig = self.env['res.config.settings'].get_values() ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'], ftp_resconfig['ftp_password']) download_state = ftp.download_file_tree(remotepath, serverdir) logging.info('download_state:%s' % download_state) return download_state # 将nc文件存到attach的datas里 def write_file(self, nc_file_path, cnc): nc_file_name = nc_file_path.split('/') logging.info('file_name:%s' % nc_file_name[-1]) if os.path.exists(nc_file_path): with open(nc_file_path, 'rb') as file: data_bytes = file.read() attachment = self.attachment_create(cnc.program_name + nc_file_name[-1], data_bytes) cnc.write({'cnc_id': attachment.id}) file.close() else: return False # end_date = datetime.now() # for workorder in self: # if workorder.state in ('done', 'cancel'): # continue # workorder.end_all() # vals = { # 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production, # 'state': 'done', # 'date_finished': end_date, # 'date_planned_finished': end_date, # 'costs_hour': workorder.workcenter_id.costs_hour # } # if not workorder.date_start: # vals['date_start'] = end_date # if not workorder.date_planned_start or end_date < workorder.date_planned_start: # vals['date_planned_start'] = end_date # workorder.with_context(bypass_duration_calculation=True).write(vals) return True class SfWorkOrderBarcodes(models.Model): """ 智能工厂工单处扫码绑定托盘 """ _name = "mrp.workorder" _inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"] def on_barcode_scanned(self, barcode): workorder = self.env['mrp.workorder'].browse(self.ids) # workorder_preset = self.env['mrp.workorder'].search( # [('routing_type', '=', '装夹预调'), ('rfid_code', '=', barcode)]) workorder_olds = self.env['mrp.workorder'].search( [('routing_type', '=', '装夹预调'), ('rfid_code', '=', barcode)]) if workorder_olds: name = '' for workorder in workorder_olds: name = '%s %s' % (name, workorder.production_id.name) raise UserError('该托盘已绑定【%s】制造订单,请先解除绑定!!!' % name) if workorder: if workorder.routing_type == '装夹预调': if workorder.state in ['done']: work_state = {'done': '已完工'} raise UserError('装夹%s,请勿重复扫码' % work_state.get(workorder.state)) lots = self.env['stock.lot'].sudo().search([('rfid', '=', barcode)]) logging.info("托盘信息:%s" % lots) if lots: for lot in lots: if lot.product_id.categ_type == '夹具': val = { 'tray_serial_number': lot.name, 'tray_product_id': lot.product_id.id, 'tray_brand_id': lot.product_id.brand_id.id, 'tray_type_id': lot.product_id.fixture_material_id.id, 'tray_model_id': lot.product_id.fixture_model_id.id, 'rfid_code': barcode } workorder.write(val) self.write(val) workorder_rfid = self.env['mrp.workorder'].search( [('production_id', '=', workorder.production_id.id)]) if workorder_rfid: for item in workorder_rfid: item.write({'rfid_code': barcode}) logging.info("Rfid绑定成功!!!") else: raise UserError('该Rfid【%s】绑定的是【%s】, 不是托盘!!!' % (barcode, lot.product_id.name)) self.process_state = '待检测' self.date_start = datetime.now() else: raise UserError('该托盘信息不存在!!!') # stock_move_line = self.env['stock.move.line'].search([('lot_name', '=', barcode)]) # if stock_move_line.product_id.categ_type == '夹具': # workorder.write({ # 'tray_serial_number': stock_move_line.lot_name, # 'tray_product_id': stock_move_line.product_id.id, # 'tray_brand_id': stock_move_line.product_id.brand_id.id, # 'tray_type_id': stock_move_line.product_id.fixture_material_id.id, # 'tray_model_id': stock_move_line.product_id.fixture_model_id.id # }) # workorder.button_start() # # return { # # 'type': 'ir.actions.act_window', # # 'res_model': 'mrp.workorder', # # 'view_mode': 'form', # # 'domain': [('id', 'in', workorder.id)], # # 'target': 'current' # # } # else: # embryo_stock_lot = self.env['stock.lot'].search([('name', '=', barcode)]) # if embryo_stock_lot: # embryo_stock_move_line = self.env['stock.move.line'].search( # [('product_id', '=', embryo_stock_lot.product_id.id), # ('reference', '=', workorder.production_id.name), # ('lot_id', '=', embryo_stock_lot.id), # ('product_category_name', '=', '坯料')]) # if embryo_stock_move_line: # bom_production = self.env['mrp.production'].search( # [('product_id', '=', embryo_stock_lot.product_id.id), # ('origin', '=', workorder.production_id.name)], limit=1, order='id asc') # workpiece_delivery = self.env['sf.workpiece.delivery'].search( # [('workorder_id', '=', workorder.id)], limit=1, order='id asc') # if workpiece_delivery: # embryo_workpiece_code = workpiece_delivery.workpiece_code # if bom_production: # if workpiece_delivery.workpiece_code and bom_production.name not in \ # workpiece_delivery.workpiece_code: # embryo_workpiece_code = workpiece_delivery.workpiece_code + ',' + \ # bom_production.name # if not workpiece_delivery.workpiece_code: # embryo_workpiece_code = bom_production.name # workpiece_delivery.write({'workpiece_code': embryo_workpiece_code}) # else: # raise UserError('工件生产线不一致,请重新确认') # else: # workorder_rfid = self.env['mrp.workorder'].search( # [('production_id', '=', workorder.production_id.id)]) # if workorder_rfid: # for item in workorder_rfid: # item.write({'rfid_code': barcode}) class WorkPieceDelivery(models.Model): _name = "sf.workpiece.delivery" _description = '工件配送' name = fields.Char('单据编号') workorder_id = fields.Many2one('mrp.workorder', string='工单', readonly=True) workorder_state = fields.Selection(related='workorder_id.state', string='工单状态') rfid_code = fields.Char(related='workorder_id.rfid_code', string='rfid码', store=True) production_id = fields.Many2one('mrp.production', string='制造订单号', readonly=True) production_line_id = fields.Many2one('sf.production.line', string='目的生产线') plan_start_processing_time = fields.Datetime('计划开始加工时间', readonly=True) route_id = fields.Many2one('sf.agv.task.route', '任务路线') feeder_station_start_id = fields.Many2one('sf.agv.site', '起点接驳站') feeder_station_destination_id = fields.Many2one('sf.agv.site', '目的接驳站') task_delivery_time = fields.Datetime('任务下发时间') task_completion_time = fields.Datetime('任务完成时间') type = fields.Selection( [('上产线', '上产线'), ('下产线', '下产线'), ('运送空料架', '运送空料架')], string='类型') delivery_duration = fields.Float('配送时长', compute='_compute_delivery_duration') status = fields.Selection( [('待下发', '待下发'), ('待配送', '待配送'), ('已配送', '已配送')], string='状态', default='待下发') is_cnc_program_down = fields.Boolean('程序是否下发', default=False) is_manual_work = fields.Boolean('人工操作', default=False) active = fields.Boolean(string="有效", default=True) @api.model def create(self, vals): if vals.get('name', '/') == '/' or vals.get('name', '/') is False: vals['name'] = self.env['ir.sequence'].next_by_code('sf.workpiece.delivery') or '/' else: vals['type'] = '运送空料架' obj = super(WorkPieceDelivery, self).create(vals) return obj @api.constrains('name') def _check_name(self): if self.type == '运送空料架': wd = self.sudo().search([('name', '=', self.name), ('id', '!=', self.id)]) if wd: raise UserError("该名称已存在") def action_delivery_history(self): return { 'name': _('配送历史'), 'type': 'ir.actions.act_window', 'view_mode': 'tree', 'res_model': 'sf.workpiece.delivery', 'view_id': self.env.ref('sf_manufacturing.sf_workpiece_delivery_empty_racks_tree').id, 'domain': [('type', '=', '运送空料架'), ('route_id', '=', self.route_id.id), ('name', 'ilike', 'WDO')] } @api.onchange('route_id') def onchange_route(self): if self.route_id: self.feeder_station_start_id = self.route_id.start_site_id.id self.feeder_station_destination_id = self.route_id.end_site_id.id # 工件配送 def button_delivery(self): delivery_ids = [] production_ids = [] is_cnc_down = 0 is_not_production_line = 0 is_not_route = 0 same_production_line_id = None same_route_id = None down_status = '待下发' production_type = None num = 0 for item in self: num += 1 if production_type is None: production_type = item.type if item.type == "运送空料架": if num >= 2: raise UserError('仅选择一条路线进行配送,请重新选择') else: delivery_ids.append(item.id) else: if num > 4: raise UserError('仅限于配送1-4个制造订单,请重新选择') if item.status in ['待配送', '已配送']: raise UserError('请选择状态为【待下发】的制造订单进行配送') if item.route_id: if same_route_id is None: same_route_id = item.route_id.id if item.route_id.id != same_route_id: is_not_route += 1 # else: # raise UserError('请选择【任务路线】再进行配送') # if item.production_id.production_line_state == '已下产线' and item.state == '待下发' and item.type == '下产线': # raise UserError('该制造订单已下产线,无需配送') if production_type != item.type: raise UserError('请选择类型为%s的制造订单进行配送' % production_type) if down_status != item.status: up_workpiece = self.search([('type', '=', '上产线'), ('production_id', '=', item.production_id), ('status', '=', '待下发')]) if up_workpiece: raise UserError('您所选择的制造订单暂未上产线,请在上产线后再进行配送') else: raise UserError('请选择状态为【待下发】的制造订单进行配送') if same_production_line_id is None: same_production_line_id = item.production_line_id.id if item.production_line_id.id != same_production_line_id: is_not_production_line += 1 if item.is_cnc_program_down is False: is_cnc_down += 1 if is_cnc_down == 0 and is_not_production_line == 0 and is_not_route == 0: delivery_ids.append(item.id) production_ids.append(item.production_id.id) if is_cnc_down >= 1: raise UserError('您所选择制造订单的【CNC程序】暂未下发,请在程序下发后再进行配送') if is_not_production_line >= 1: raise UserError('您所选择制造订单的【目的生产线】不一致,请重新确认') if is_not_route >= 1: raise UserError('您所选择制造订单的【任务路线】不一致,请重新确认') is_free = self._check_avgsite_state() if is_free is True: if delivery_ids: return { 'name': _('确认'), 'type': 'ir.actions.act_window', 'view_mode': 'form', 'res_model': 'sf.workpiece.delivery.wizard', 'target': 'new', 'context': { 'default_delivery_ids': [(6, 0, delivery_ids)], 'default_production_ids': [(6, 0, production_ids)], 'default_destination_production_line_id': same_production_line_id, 'default_route_id': same_route_id, 'default_type': production_type, }} else: if production_type == '运送空料架': raise UserError("您所选择的【任务路线】的【终点接驳站】已占用,请在该接驳站空闲时进行配送") else: raise UserError( "您所选择制造订单的【任务路线】的【终点接驳站】已占用,请在该接驳站空闲时或选择其他路线进行配送") # 验证agv站点是否可用 def _check_avgsite_state(self): is_free = False agv_site = self.env['sf.agv.site'].search([]) if agv_site: has_site = agv_site.update_site_state() if has_site is True: for item in self: if item.type in ['上产线', '下产线']: logging.info('工件配送-起点状态:%s-%s' % ( item.feeder_station_start_id.name, item.feeder_station_start_id.state)) logging.info('工件配送-终点状态:%s-%s' % ( item.feeder_station_destination_id.name, item.feeder_station_destination_id.state)) if ( item.feeder_station_start_id.state == '占用' and item.feeder_station_destination_id.state == '空闲') or ( item.feeder_station_start_id.state == '空闲' and item.feeder_station_destination_id.state == '空闲'): is_free = True else: if item.feeder_station_destination_id.state == '空闲': is_free = True logging.info('is_free:%s' % is_free) return is_free else: raise UserError("接驳站暂未反馈站点实时状态,请稍后再试") # 配送至avg小车 def _delivery_avg(self): config = self.env['res.config.settings'].get_values() positionCode_Arr = [] delivery_Arr = [] feeder_station_start = None feeder_station_destination = None route_id = None for item in self: if route_id is None: route_id = item.route_id.id if feeder_station_start is None: feeder_station_start = item.feeder_station_start_id if feeder_station_destination is None: feeder_station_destination = item.feeder_station_destination_id if item.type in ['上产线', '下产线']: item.route_id = route_id item.feeder_station_start_id = feeder_station_start.id item.feeder_station_destination_id = feeder_station_destination.id delivery_Arr.append(item.name) else: self = self.create( {'name': self.env['ir.sequence'].next_by_code('sf.workpiece.delivery'), 'route_id': self.route_id.id, 'feeder_station_start_id': self.feeder_station_start_id.id, 'feeder_station_destination_id': self.feeder_station_destination_id.id}) delivery_Arr.append(self.name) delivery_str = ','.join(map(str, delivery_Arr)) if feeder_station_start is not None: positionCode_Arr.append({ 'positionCode': feeder_station_start.name, 'code': '00' }) if feeder_station_destination is not None: positionCode_Arr.append({ 'positionCode': feeder_station_destination.name, 'code': '00' }) res = {'reqCode': delivery_str, 'reqTime': '', 'clientCode': '', 'tokenCode': '', 'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'], 'positionCodePath': positionCode_Arr, 'podCode': '', 'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '', 'data': ''} try: logging.info('AGV请求路径:%s' % config['agv_rcs_url']) logging.info('AGV-json:%s' % res) headers = {'Content-Type': 'application/json'} ret = requests.post((config['agv_rcs_url']), json=res, headers=headers) ret = ret.json() logging.info('config-ret:%s' % ret) if ret['code'] == 0: req_codes = ret['reqCode'].split(',') for delivery_item in self: for req_code in req_codes: if delivery_item.name == req_code.strip(): logging.info('delivery_item-name:%s' % delivery_item.name) delivery_item.write({ 'task_delivery_time': fields.Datetime.now(), 'status': '待配送' }) if delivery_item.type == "上产线": delivery_item.workorder_id.write({'is_delivery': True}) else: raise UserError(ret['message']) except Exception as e: logging.info('config-e:%s' % e) raise UserError("工件配送请求agv失败:%s" % e) @api.depends('task_delivery_time', 'task_completion_time') def _compute_delivery_duration(self): for obj in self: if obj.task_delivery_time and obj.task_completion_time: obj.delivery_duration = round( (obj.task_completion_time - obj.task_delivery_time).total_seconds() / 60.0, 2) else: obj.delivery_duration = 0.0 class CMMprogram(models.Model): _name = 'sf.cmm.program' _description = "CMM程序" cmm_id = fields.Many2one('ir.attachment') sequence_number = fields.Integer('序号') program_name = fields.Char('程序名') cutting_tool_name = fields.Char('刀具名称') cutting_tool_no = fields.Char('刀号') processing_type = fields.Char('加工类型') margin_x_y = fields.Char('余量_X/Y') margin_z = fields.Char('余量_Z') depth_of_processing_z = fields.Char('加工深度(Z)') cutting_tool_extension_length = fields.Char('刀具伸出长度') cutting_tool_handle_type = fields.Char('刀柄型号') estimated_processing_time = fields.Char('预计加工时间') remark = fields.Text('备注') workorder_id = fields.Many2one('mrp.workorder', string="工单") production_id = fields.Many2one('mrp.production', string="制造订单") program_path = fields.Char('程序文件路径') def cmm_program_create(self, ret, program_path, program_path_tmp): cmm_program = None for obj in ret['programming_list']: workorder = self.env['mrp.workorder'].search( [('production_id.name', '=', ret['production_order_no'].split(',')[0]), ('processing_panel', '=', obj['processing_panel']), ('routing_type', '=', 'CNC加工')]) if obj['program_name'] in program_path: logging.info('obj:%s' % obj['program_name']) cmm_program = self.sudo().create({ 'workorder_id': workorder.id, 'sequence_number': obj['sequence_number'], 'program_name': obj['program_name'], 'cutting_tool_name': obj['cutting_tool_name'], 'cutting_tool_no': obj['cutting_tool_no'], 'processing_type': obj['processing_type'], 'margin_x_y': obj['margin_x_y'], 'margin_z': obj['margin_z'], 'depth_of_processing_z': obj['depth_of_processing_z'], 'cutting_tool_extension_length': obj['cutting_tool_extension_length'], 'cutting_tool_handle_type': obj['cutting_tool_handle_type'], 'estimated_processing_time': obj['estimated_processing_time'], 'remark': obj['remark'], 'program_path': program_path.replace('/tmp', '') }) cmm_program.get_cmm_program_file(program_path_tmp, cmm_program, program_path) return cmm_program # 根据程序名和加工面匹配到ftp里对应的cmm程序名 def get_cmm_program_file(self, serverdir, cmm_program, program_path): logging.info('cmm-serverdir:%s' % serverdir) for root, dirs, files in os.walk(serverdir): for f in files: if f in program_path: cmm_program_file_path = os.path.join(serverdir, root, f) self.write_file_cmm(cmm_program_file_path, cmm_program) # 创建附件(nc文件) def attachment_create(self, name, data): attachment = self.env['ir.attachment'].create({ 'datas': base64.b64encode(data), 'type': 'binary', 'public': True, 'description': '程序文件', 'name': name }) return attachment # 将cmm文件存到attach的datas里 def write_file_cmm(self, cmm_file_path, cnc): cmm_file_name = cmm_file_path.split('/') logging.info('cmm_file_name:%s' % cmm_file_name[-1]) if os.path.exists(cmm_file_path): with open(cmm_file_path, 'rb') as file: data_bytes = file.read() attachment = self.attachment_create(cnc.program_name + cmm_file_name[-1], data_bytes) cnc.write({'cmm_id': attachment.id}) file.close() else: return False