Files
test/sf_manufacturing/models/mrp_workorder.py
2024-05-17 11:00:58 +08:00

1531 lines
77 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import logging
import base64
import urllib.parse
from datetime import date
from datetime import datetime, timedelta
import requests
import os
import math
from lxml import etree
from dateutil.relativedelta import relativedelta
# import subprocess
from odoo import api, fields, models, SUPERUSER_ID, _
from odoo.addons.sf_base.commons.common import Common
from odoo.exceptions import UserError
from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController
class ResMrpWorkOrder(models.Model):
_inherit = 'mrp.workorder'
_order = 'sequence asc,create_date desc'
product_tmpl_id_length = fields.Float(related='production_id.product_tmpl_id.length', readonly=True, store=True,
string="坯料长度(mm)")
product_tmpl_id_width = fields.Float(related='production_id.product_tmpl_id.width', readonly=True, store=True,
string="坯料宽度(mm)")
product_tmpl_id_height = fields.Float(related='production_id.product_tmpl_id.height', readonly=True, store=True,
string="坯料高度(mm)")
product_tmpl_id_materials_id = fields.Many2one(related='production_id.product_tmpl_id.materials_id', readonly=True,
store=True, check_company=True, string="材料")
product_tmpl_id_materials_type_id = fields.Many2one(related='production_id.product_tmpl_id.materials_type_id',
readonly=True, store=True, check_company=True, string="型号")
workcenter_id = fields.Many2one('mrp.workcenter', string='工作中心', required=False)
users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids")
processing_panel = fields.Char('加工面')
sequence = fields.Integer(string='工序')
routing_type = fields.Selection([
# ('获取CNC加工程序', '获取CNC加工程序'),
('装夹预调', '装夹预调'),
# ('前置三元定位检测', '前置三元定位检测'),
('CNC加工', 'CNC加工'),
# ('后置三元质量检测', '后置三元质量检测'),
('解除装夹', '解除装夹'),
('切割', '切割'), ('表面工艺', '表面工艺')
], string="工序类型")
results = fields.Char('结果')
manual_quotation = fields.Boolean('人工编程', default=False, readonly=True)
@api.onchange('users_ids')
def get_user_permissions(self):
uid = self.env.uid
for workorder in self:
if workorder.users_ids:
list_user_id = []
for item in workorder.users_ids:
list_user_id.append(item.id)
if uid in list_user_id:
workorder.user_permissions = True
else:
workorder.user_permissions = False
else:
workorder.user_permissions = False
user_permissions = fields.Boolean('用户权限', compute='get_user_permissions')
programming_no = fields.Char('编程单号')
work_state = fields.Char('业务状态')
programming_state = fields.Char('编程状态')
cnc_worksheet = fields.Binary(
'工作指令', readonly=True)
material_center_point = fields.Char(string='坯料中心点')
X1_axis = fields.Float(default=0)
Y1_axis = fields.Float(default=0)
Z1_axis = fields.Float(default=0)
X2_axis = fields.Float(default=0)
Y2_axis = fields.Float(default=0)
Z2_axis = fields.Float(default=0)
X3_axis = fields.Float(default=0)
Y3_axis = fields.Float(default=0)
Z3_axis = fields.Float(default=0)
X4_axis = fields.Float(default=0)
Y4_axis = fields.Float(default=0)
Z4_axis = fields.Float(default=0)
X5_axis = fields.Float(default=0)
Y5_axis = fields.Float(default=0)
Z5_axis = fields.Float(default=0)
X6_axis = fields.Float(default=0)
Y6_axis = fields.Float(default=0)
Z6_axis = fields.Float(default=0)
X7_axis = fields.Float(default=0)
Y7_axis = fields.Float(default=0)
Z7_axis = fields.Float(default=0)
X8_axis = fields.Float(default=0)
Y8_axis = fields.Float(default=0)
Z8_axis = fields.Float(default=0)
X9_axis = fields.Float(default=0)
Y9_axis = fields.Float(default=0)
Z9_axis = fields.Float(default=0)
X10_axis = fields.Float(default=0)
Y10_axis = fields.Float(default=0)
Z10_axis = fields.Float(default=0)
X_deviation_angle = fields.Integer(string="X轴偏差度", default=0)
test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], default='合格',
string="检测结果")
cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工程序")
cmm_ids = fields.One2many("sf.cmm.program", 'workorder_id', string="CMM程序")
tray_code = fields.Char(string="托盘编码")
glb_file = fields.Binary("glb模型文件", related='production_id.model_file')
is_subcontract = fields.Boolean(string='是否外协')
surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数")
picking_ids = fields.Many2many('stock.picking', string='外协出入库单')
surface_technics_picking_count = fields.Integer("外协出入库", compute='_compute_surface_technics_picking_ids')
@api.depends('name', 'production_id.name')
def _compute_surface_technics_picking_ids(self):
for order in self:
picking_ids = self.env['stock.picking'].search([('id', 'in', order.picking_ids.ids)])
order.surface_technics_picking_count = len(picking_ids)
def action_view_surface_technics_picking(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("stock.action_picking_tree_all")
if len(self.picking_ids) > 1:
action['domain'] = [('id', 'in', self.picking_ids.ids)]
elif self.picking_ids:
# action['name'] = '工艺外协'
action['res_id'] = self.picking_ids.id
action['views'] = [(self.env.ref('stock.view_picking_form').id, 'form')]
if 'views' in action:
action['views'] += [(state, view) for state, view in action['views'] if view != 'form']
action['context'] = dict(self._context, default_origin=self.name)
return action
supplier_id = fields.Many2one('res.partner', string='外协供应商')
equipment_id = fields.Many2one('maintenance.equipment', string='加工设备')
is_ok = fields.Boolean(string='是否合格')
# 加工人
processing_user_id = fields.Many2one('res.users', string='加工人')
# 检测人
inspection_user_id = fields.Many2one('res.users', string='检测人')
# 保存名称
save_name = fields.Char(string='检测文件保存名称', compute='_compute_save_name')
# 获取数据状态
data_state = fields.Boolean(string='获取数据状态', default=False)
# 坯料长宽高
material_length = fields.Float(string='')
material_width = fields.Float(string='')
material_height = fields.Float(string='')
# 零件图号
part_number = fields.Char(string='零件图号')
# 工序状态
process_state = fields.Selection([
('待装夹', '待装夹'),
('待检测', '待检测'),
('待加工', '待加工'),
('待解除装夹', '待解除装夹'),
('已完工', '已完工'),
], string='工序状态', default='待装夹')
# 加工图纸
processing_drawing = fields.Binary(string='加工图纸', related='production_id.part_drawing')
@api.depends('production_id')
def _compute_save_name(self):
"""
保存名称
"""
for record in self:
record.save_name = record.production_id.name.replace('/', '_')
schedule_state = fields.Selection(related='production_id.schedule_state', store=True)
# 工件装夹信息
functional_fixture_code = fields.Char(string="功能夹具编码", readonly=True)
functional_fixture_serial_number = fields.Char(string="功能夹具序列号", readonly=True)
functional_fixture_id = fields.Many2one('sf.functional.fixture', string="功能夹具")
functional_fixture_type_id = fields.Many2one('sf.functional.fixture.type', string="功能夹具类型", readonly=True)
chuck_serial_number = fields.Char(string="卡盘序列号")
chuck_name = fields.Char(string="卡盘名称")
chuck_brand_id = fields.Many2one('sf.machine.brand', string="卡盘品牌")
chuck_type_id = fields.Char(string="卡盘类型")
chuck_model_id = fields.Char(string="卡盘型号")
tray_serial_number = fields.Char(string="托盘序列号")
tray_product_id = fields.Many2one('product.product', string="托盘名称")
tray_brand_id = fields.Many2one('sf.machine.brand', string="托盘品牌")
tray_type_id = fields.Many2one('sf.fixture.material', string="托盘类型")
tray_model_id = fields.Many2one('sf.fixture.model', string="托盘型号")
total_wight = fields.Float(string="总重量")
maximum_carrying_weight = fields.Char(string="最大承载重量[kg]")
maximum_clamping_force = fields.Char(string="最大夹持力[n]")
preset_program_information = fields.Char(string="预调程序信息")
workpiece_delivery_ids = fields.One2many('sf.workpiece.delivery', 'workorder_id', '工件配送')
is_delivery = fields.Boolean('是否配送完成', default=False)
rfid_code = fields.Char('RFID码')
rfid_code_old = fields.Char('RFID码(已解除)')
production_line_id = fields.Many2one('sf.production.line', related='production_id.production_line_id',
string='生产线', store=True)
production_line_state = fields.Selection(related='production_id.production_line_state',
string='上/下产线', store=True)
detection_report = fields.Binary('检测报告', readonly=True)
is_remanufacture = fields.Boolean(string='是否重新生成制造订单', default=True)
@api.onchange('rfid_code')
def _onchange(self):
if self.rfid_code and self.state == 'progress':
self.workpiece_delivery_ids[0].write({'rfid_code': self.rfid_code})
def get_plan_workorder(self, production_line):
tomorrow = (date.today() + timedelta(days=+1)).strftime("%Y-%m-%d")
tomorrow_start = tomorrow + ' 00:00:00'
tomorrow_end = tomorrow + ' 23:59:59'
logging.info('tomorrow:%s' % tomorrow)
sql = """
SELECT *
FROM mrp_workorder
WHERE
to_char(date_planned_start::timestamp + '8 hour','YYYY-MM-DD HH:mm:SS')>= %s
AND to_char(date_planned_finished::timestamp + '8 hour','YYYY-MM-DD HH:mm:SS')<= %s
"""
params = [tomorrow_start, tomorrow_end]
if production_line:
sql += "AND production_line_id = %s"
params.append(production_line)
self.env.cr.execute(sql, params)
ids = [t[0] for t in self.env.cr.fetchall()]
return [('id', 'in', ids)]
@api.onchange('is_ok')
def _onchange_inspection_user_id(self):
"""
检测is_ok(是否合格)被修改的话就将当前用户赋值给inspection_user_id
"""
if not self.inspection_user_id:
self.inspection_user_id = self.env.user.id
else:
self.inspection_user_id = False
@api.onchange('functional_fixture_id')
def _onchange_functional_fixture_id(self):
if self.functional_fixture_id:
self.functional_fixture_code = self.functional_fixture_id.code
self.functional_fixture_type_id = self.functional_fixture_id.type_id.id
def get_no_data(self, production_id):
process_parameter_workorder = self.search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_id)])
return process_parameter_workorder
# 获取三次元检测点数据
def get_three_check_datas(self):
factory_nick_name = 'XT'
ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']),
ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password'])
# ftp.connect()
local_dir_path = '/ftp/before'
os.makedirs(local_dir_path, exist_ok=True)
local_filename = self.save_name + '.xls'
local_file_path = os.path.join(local_dir_path, local_filename)
logging.info('local_file_path:%s' % local_file_path)
remote_path = '/home/ftp/ftp_root/ThreeTest/XT/Before/' + local_filename
logging.info('remote_path:%s' % remote_path)
if not ftp.file_exists(remote_path):
raise UserError(f"文件不存在: {remote_path}")
with open(local_file_path, 'wb') as local_file:
ftp.ftp.retrbinary('RETR ' + remote_path, local_file.write)
logging.info('下载文件成功')
# 解析本地文件
# file_path = 'WH_MO_00099.xls' # 使用下载的实际文件路径
parser = etree.XMLParser(recover=True) # Using recover to handle errors
tree = etree.parse(local_file_path, parser)
logging.info('tree:%s' % tree)
root = tree.getroot()
logging.info('root:%s' % root)
# 准备一个外部字典来存储以PT为键的坐标字典
pt_coordinates = {}
# 遍历每个工作表和行
for worksheet in root.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Worksheet'):
sheet_name = worksheet.attrib.get('{urn:schemas-microsoft-com:office:spreadsheet}Name')
logging.info('sheet_name:%s' % sheet_name)
if sheet_name == "Sheet1": # 确保我们只查看包含数据的工作表
current_pt = None
for row in worksheet.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Row'):
cells = list(row.iterfind('.//{urn:schemas-microsoft-com:office:spreadsheet}Cell'))
for i, cell in enumerate(cells):
data_cell = cell.find('.//{urn:schemas-microsoft-com:office:spreadsheet}Data')
if data_cell is not None and data_cell.text is not None: # 添加检查以确保data_cell.text不为空
# 检查是否是PT标识
logging.info(f"Data in cell: {data_cell.text}") # 输出单元格数据
if "PT" in data_cell.text:
current_pt = data_cell.text
pt_coordinates[current_pt] = []
elif data_cell.text in ["X", "Y", "Z"] and current_pt is not None:
# 确保当前单元格后面还有单元格存在,以获取理论值
if i + 1 < len(cells):
next_cell = cells[i + 1]
theory_value = next_cell.find(
'.//{urn:schemas-microsoft-com:office:spreadsheet}Data')
if theory_value is not None:
# 为当前PT键添加坐标数据
pt_coordinates[current_pt].append({
data_cell.text: float(theory_value.text)
})
logging.info(f"PT: {current_pt} - {data_cell.text}: {theory_value.text}")
logging.info('pt_coordinates=====%s' % pt_coordinates)
# pt_coordinates:{'PT1': [{'X': 38.9221}, {'Y': -18.7304}, {'Z': 128.0783}],
# 'PT2': [{'X': 39.2456}, {'Y': -76.9169}, {'Z': 123.7541}]}
# 检查是否存在PT1等键
if 'PT1' in pt_coordinates and pt_coordinates['PT1']:
self.X1_axis = pt_coordinates['PT3'][0]['X']
self.Y1_axis = pt_coordinates['PT3'][1]['Y']
self.Z1_axis = pt_coordinates['PT3'][2]['Z']
else:
raise UserError('PT1点未测或数据错误')
if 'PT2' in pt_coordinates and pt_coordinates['PT2']:
self.X2_axis = pt_coordinates['PT4'][0]['X']
self.Y2_axis = pt_coordinates['PT4'][1]['Y']
self.Z2_axis = pt_coordinates['PT4'][2]['Z']
else:
raise UserError('PT2点未测或数据错误')
if 'PT3' in pt_coordinates and pt_coordinates['PT3']:
self.X3_axis = pt_coordinates['PT5'][0]['X']
self.Y3_axis = pt_coordinates['PT5'][1]['Y']
self.Z3_axis = pt_coordinates['PT5'][2]['Z']
else:
raise UserError('PT3点未测或数据错误')
if 'PT4' in pt_coordinates and pt_coordinates['PT4']:
self.X4_axis = pt_coordinates['PT6'][0]['X']
self.Y4_axis = pt_coordinates['PT6'][1]['Y']
self.Z4_axis = pt_coordinates['PT6'][2]['Z']
else:
raise UserError('PT4点未测或数据错误')
if 'PT5' in pt_coordinates and pt_coordinates['PT5']:
self.X5_axis = pt_coordinates['PT7'][0]['X']
self.Y5_axis = pt_coordinates['PT7'][1]['Y']
self.Z5_axis = pt_coordinates['PT7'][2]['Z']
else:
raise UserError('PT5点未测或数据错误')
if 'PT6' in pt_coordinates and pt_coordinates['PT6']:
self.X6_axis = pt_coordinates['PT8'][0]['X']
self.Y6_axis = pt_coordinates['PT8'][1]['Y']
self.Z6_axis = pt_coordinates['PT8'][2]['Z']
else:
raise UserError('PT6点未测或数据错误')
if 'PT7' in pt_coordinates and pt_coordinates['PT7']:
self.X7_axis = pt_coordinates['PT9'][0]['X']
self.Y7_axis = pt_coordinates['PT9'][1]['Y']
self.Z7_axis = pt_coordinates['PT9'][2]['Z']
else:
raise UserError('PT7点未测或数据错误')
if 'PT8' in pt_coordinates and pt_coordinates['PT8']:
self.X8_axis = pt_coordinates['PT10'][0]['X']
self.Y8_axis = pt_coordinates['PT10'][1]['Y']
self.Z8_axis = pt_coordinates['PT10'][2]['Z']
else:
raise UserError('PT8点未测或数据错误')
if 'PT9' in pt_coordinates and pt_coordinates['PT9']:
self.X9_axis = pt_coordinates['PT1'][0]['X']
self.Y9_axis = pt_coordinates['PT1'][1]['Y']
self.Z9_axis = pt_coordinates['PT1'][2]['Z']
else:
raise UserError('PT9点未测或数据错误')
if 'PT10' in pt_coordinates and pt_coordinates['PT10']:
self.X10_axis = pt_coordinates['PT2'][0]['X']
self.Y10_axis = pt_coordinates['PT2'][1]['Y']
self.Z10_axis = pt_coordinates['PT2'][2]['Z']
else:
raise UserError('PT10点未测或数据错误')
self.data_state = True
return True
# ftp.download_file('three_check_datas.xls', '/home/ftpuser/three_check_datas.xls')
# ftp.close()
# data = xlrd.open_workbook('/home/ftpuser/three_check_datas.xls')
# table = data.sheets()[0]
# nrows = table.nrows
# # 点坐标列表
# point_list = []
# datas = []
# for i in range(1, nrows):
# datas.append(table.row_values(i))
# return datas
# 计算配料中心点和与x轴倾斜度方法
def getcenter(self):
try:
x1 = self.X1_axis
x2 = self.X2_axis
x3 = self.X3_axis
x4 = self.X4_axis
x5 = self.X5_axis
x6 = self.X6_axis
x7 = self.X7_axis
x8 = self.X8_axis
y1 = self.Y1_axis
y2 = self.Y2_axis
y3 = self.Y3_axis
y4 = self.Y4_axis
y5 = self.Y5_axis
y6 = self.Y6_axis
y7 = self.Y7_axis
y8 = self.Y8_axis
z1 = self.Z9_axis
z2 = self.Z10_axis
x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / (
(x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4))
y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / (
(y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4))
x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / (
(x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8))
y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / (
(y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8))
x = (x0 + x1) / 2
y = (y0 + y1) / 2
z = (z1 + z2) / 2
jd = math.atan2((x5 - x6), (y5 - y6))
jdz = jd * 180 / math.pi
print("(%.2f,%.2f)" % (x, y))
self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z))
self.X_deviation_angle = jdz
# 将补偿值写入CNC加工工单
workorder = self.env['mrp.workorder'].browse(self.ids)
work = workorder.production_id.workorder_ids
work.compensation_value_x = eval(self.material_center_point)[0]
work.compensation_value_y = eval(self.material_center_point)[1]
# work.process_state = '待加工'
# self.sudo().production_id.process_state = '待加工'
self.date_finished = datetime.now()
workorder.button_finish()
except Exception as e:
# 重新抛出捕获到的异常信息
raise UserError(str(e))
def button_workpiece_delivery(self):
if self.routing_type == '装夹预调':
for item in self.workpiece_delivery_ids:
if not item.route_id:
raise UserError('【工件配送】明细中请选择【任务路线】')
else:
if self.state == 'done':
if item.is_cnc_program_down is True:
if item.status == '待下发':
return {
'name': _('确认'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'sf.workpiece.delivery.wizard',
'target': 'new',
'context': {
'default_workorder_id': self.id,
}}
else:
raise UserError(_("该制造订单还未下发CNC程序单无法进行工件配送"))
else:
raise UserError(_("该工单暂未完成,无法进行工件配送"))
# 拼接工单对象属性值
def json_workorder_str(self, k, production, route):
# 计算预计时长duration_expected
if route.routing_type == '切割':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', '切割')]).time_cycle
# elif route.routing_type == '获取CNC加工程序':
# duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
# [('name', '=', '获取CNC加工程序')]).time_cycle
elif route.routing_type == '装夹预调':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', '装夹预调')]).time_cycle
# elif route.routing_type == '前置三元定位检测':
# duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
# [('name', '=', '前置三元定位检测')]).time_cycle
elif route.routing_type == 'CNC加工':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle
# elif route.routing_type == '后置三元质量检测':
# duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
# [('name', '=', '后置三元质量检测')]).time_cycle
elif route.routing_type == '解除装夹':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', '解除装夹')]).time_cycle
else:
duration_expected = 60
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': route.route_workcenter_id.name,
'processing_panel': k,
'quality_point_ids': route.route_workcenter_id.quality_point_ids,
'routing_type': route.routing_type,
'work_state': '待发起',
'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids,
route.routing_type,
production.product_id),
# 设定初始化值避免出现变成bool问题
'date_planned_start': datetime.now(),
'date_planned_finished': datetime.now() + timedelta(days=1),
'duration_expected': duration_expected,
'duration': 0,
'workpiece_delivery_ids': False if not route.routing_type == '装夹预调' else self._json_workpiece_delivery_list(
production)
}]
return workorders_values_str
def _json_workpiece_delivery_list(self, production):
up_route = self.env['sf.agv.task.route'].search([('route_type', '=', '上产线')], limit=1, order='id asc')
down_route = self.env['sf.agv.task.route'].search([('route_type', '=', '下产线')], limit=1, order='id asc')
return [
[0, '', {'production_id': production.id, 'type': '上产线', 'route_id': up_route.id,
'feeder_station_start_id': up_route.start_site_id.id,
'feeder_station_destination_id': up_route.end_site_id.id}],
[0, '', {'production_id': production.id, 'type': '下产线', 'route_id': down_route.id,
'feeder_station_start_id': down_route.start_site_id.id,
'feeder_station_destination_id': down_route.end_site_id.id}]]
# 拼接工单对象属性值(表面工艺)
def _json_workorder_surface_process_str(self, production, route, process_parameter, supplier_id):
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': '%s-%s' % (route.name, process_parameter.name),
'processing_panel': '',
'routing_type': '表面工艺',
'surface_technics_parameters_id': process_parameter.id,
'work_state': '',
'supplier_id': supplier_id,
'is_subcontract': True if process_parameter.gain_way == '外协' else False,
'workcenter_id': self.env[
'mrp.workcenter'].get_process_outsourcing_workcenter() if process_parameter.gain_way == '外协' else
self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids,
route.routing_type,
production.product_id),
'date_planned_start': datetime.now(),
'date_planned_finished': datetime.now() + timedelta(days=1),
'duration_expected': 60,
'duration': 0
}]
return workorders_values_str
# 维修模块按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')],
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_workorder_id': self.id,
'default_production_id': self.production_id.id,
'discard_on_footer_button': True,
},
'target': 'new',
'domain': [('workorder_id', '=', self.id)]
}
# tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True)
# 扫码绑定托盘方法
# def gettray(self):
# if self.tray_code != False:
# values = self.env['sf.tray'].search([("code", "=", self.tray_code)])
# if values:
# if values.state == "占用":
# raise UserError('该托盘已占用')
# if values.state == "报损":
# raise UserError('该托盘已损坏')
# else:
# values.update({
# 'workorder_id': self,
# 'production_id': self.production_id,
# 'state': '占用',
# })
# self.work_state = "已绑定"
# orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)])
# for a in orders:
# a.tray_id = values
# else:
# raise UserError('该托盘编码已失效')
# else:
# raise UserError('托盘码不能为空')
# 验证坯料序列号是否正确
def pro_code_is_ok(self, barcode):
if barcode is not False:
if barcode != self.pro_code:
raise UserError('坯料序列号错误')
return False
else:
return True
pro_code = fields.Char(string='坯料序列号')
pro_code_ok = fields.Boolean(default=False)
# 托盘扫码绑定
# def gettray_auto(self, barcode):
# if barcode != False:
# values = self.env['sf.tray'].search([("code", "=", barcode)])
#
# if values:
# if values.state == "占用":
# raise UserError('该托盘已占用')
# if values.state == "报损":
# raise UserError('该托盘已损坏')
# else:
# values.update({
# 'workorder_id': self,
# 'production_id': self.production_id,
# 'state': '占用',
# })
# self.work_state = "已绑定"
# orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)])
# for a in orders:
# a.tray_id = values
#
# return values
#
# # return {
# # 'name': _('New Maintenance Request'),
# # 'view_mode': 'form',
# # 'res_model': 'maintenance.request',
# # 'type': 'ir.actions.act_window',
# # 'context': {
# # 'default_company_id': self.company_id.id,
# # 'default_production_id': self.id,
# # },
# # 'domain': [('production_id', '=', self.id)],
# # }
# else:
# raise UserError('该托盘编码已失效')
# else:
# raise UserError('托盘码不能为空')
# 解除托盘绑定
# def unbindtray(self):
# tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)])
# if tray:
# tray.unclamp()
# self.tray_id = False
# return {
# 'name': _('New Maintenance Request'),
# 'view_mode': 'form',
# 'res_model': 'maintenance.request',
# 'res_id':self.id,
# 'type': 'ir.actions.act_window',
# 'context': {
# 'default_company_id': self.company_id.id,
# 'default_production_id': self.id,
# },
# 'domain': [('production_id', '=', self.id)],
# 'target':'new'
# }
@api.depends('production_availability', 'blocked_by_workorder_ids', 'blocked_by_workorder_ids.state')
def _compute_state(self):
for workorder in self:
if workorder.routing_type == '装夹预调':
cnc_workorder = self.search(
[('production_id', '=', workorder.production_id.id), ('routing_type', '=', 'CNC加工')],
limit=1, order='id asc')
if not cnc_workorder:
workorder.state = 'waiting'
else:
for item in cnc_workorder.cnc_ids:
functional_cutting_tool = self.env['sf.functional.cutting.tool.entity'].search(
[('tool_name_id.name', '=', item.cutting_tool_name)])
if not functional_cutting_tool:
workorder.state = 'waiting'
if workorder.state == 'pending':
if all([wo.state in ('done', 'cancel') for wo in workorder.blocked_by_workorder_ids]):
workorder.state = 'ready' if workorder.production_id.reservation_state == 'assigned' else 'waiting'
continue
if workorder.state not in ('waiting', 'ready'):
continue
if not all([wo.state in ('done', 'cancel') for wo in workorder.blocked_by_workorder_ids]):
workorder.state = 'pending'
continue
if workorder.production_id.reservation_state not in ('waiting', 'confirmed', 'assigned'):
continue
if workorder.production_id.reservation_state == 'assigned' and workorder.state == 'waiting':
workorder.state = 'ready'
elif workorder.production_id.reservation_state != 'assigned' and workorder.state == 'ready':
workorder.state = 'waiting'
def recreateManufacturingOrWorkerOrder(self):
"""
重新生成制造订单或者重新生成工单
"""
if self.test_results == '报废':
values = self.env['mrp.production'].create_production1_values(self.production_id)
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(
self.production_id.company_id).create(
values)
self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and
not p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production in productions:
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
if self.test_results == '返工':
productions = self.production_id
# self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
# self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder2(self.processing_panel)
else:
self.results = '合格'
def json_workorder_str1(self, k, production, route):
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': '%s(返工)' % route.route_workcenter_id.name,
'processing_panel': k,
'routing_type': route.routing_type,
'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起',
'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids,
route.routing_type,
production.product_id),
'date_planned_start': datetime.now(),
'date_planned_finished': datetime.now() + timedelta(days=1),
'duration_expected': 60,
'duration': 0,
}]
return workorders_values_str
# 重写工单开始按钮方法
def button_start(self):
if self.routing_type == '装夹预调':
# 判断是否有坯料的序列号信息
boolean = False
if self.production_id.move_raw_ids[0].move_line_ids:
if self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name:
boolean = True
if not boolean:
raise UserError('制造订单【%s】缺少组件的序列号信息!' % self.production_id.name)
self.pro_code = self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name
# cnc校验
cnc_workorder = self.search(
[('production_id', '=', self.production_id.id), ('routing_type', '=', 'CNC加工')],
limit=1, order='id asc')
if not cnc_workorder:
raise UserError(_('该制造订单还未下发CNC程序请稍后再试'))
else:
for item in cnc_workorder.cnc_ids:
functional_cutting_tool = self.env['sf.functional.cutting.tool.entity'].search(
[('tool_name_id.name', '=', item.cutting_tool_name)])
if not functional_cutting_tool:
raise UserError(_('该制造订单的CNC程序为%s没有对应的功能刀具' % item.cutting_tool_name))
if self.routing_type == '解除装夹':
'''
记录开始时间
'''
self.date_start = datetime.now()
# 表面工艺外协出库单
if self.routing_type == '表面工艺':
if self.is_subcontract is True:
move_out = self.env['stock.move'].search(
[('location_id', '=', self.env['stock.location'].search(
[('barcode', 'ilike', 'WH-PREPRODUCTION')]).id),
('location_dest_id', '=', self.env.ref(
'sf_manufacturing.stock_location_locations_virtual_outcontract').id),
('origin', '=', self.production_id.name)])
purchase = self.env['purchase.order'].search([('origin', '=', self.production_id.name)])
if purchase and move_out:
move_out.write({'state': 'assigned'})
self.env['stock.move.line'].create(move_out.get_move_line(purchase, self))
# move_out._action_assign()
if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress':
self.move_raw_ids = self.production_id.move_raw_ids
self.move_raw_ids[0].write({
'materiel_length': self.move_raw_ids[0].product_id.length,
'materiel_width': self.move_raw_ids[0].product_id.width,
'materiel_height': self.move_raw_ids[0].product_id.height
})
self.write({
'material_length': self.move_raw_ids[0].product_id.length,
'material_width': self.move_raw_ids[0].product_id.width,
'material_height': self.move_raw_ids[0].product_id.height
})
self.ensure_one()
if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)):
return True
# As button_start is automatically called in the new view
if self.state in ('done', 'cancel'):
return True
if self.product_tracking == 'serial':
self.qty_producing = 1.0
else:
self.qty_producing = self.qty_remaining
self.env['mrp.workcenter.productivity'].create(
self._prepare_timeline_vals(self.duration, datetime.now())
)
if self.production_id.state != 'progress':
self.production_id.write({
'date_start': datetime.now(),
})
if self.state == 'progress':
return True
start_date = datetime.now()
vals = {
'state': 'progress',
'date_start': start_date,
}
if not self.leave_id:
leave = self.env['resource.calendar.leaves'].create({
'name': self.display_name,
'calendar_id': self.workcenter_id.resource_calendar_id.id,
'date_from': start_date,
'date_to': start_date + relativedelta(minutes=self.duration_expected),
'resource_id': self.workcenter_id.resource_id.id,
'time_type': 'other'
})
vals['leave_id'] = leave.id
return self.write(vals)
else:
if self.date_planned_start > start_date:
vals['date_planned_start'] = start_date
# if self.date_planned_finished and self.date_planned_finished < start_date:
# vals['date_planned_finished'] = start_date
return self.write(vals)
else:
raise UserError(_('请先完成上一步工单'))
def button_finish(self):
for record in self:
if record.routing_type == '装夹预调':
if not record.material_center_point and record.X_deviation_angle > 0:
raise UserError("请对前置三元检测定位参数进行计算定位")
if not record.rfid_code:
raise UserError("请扫RFID码进行绑定")
record.process_state = '待加工'
# record.write({'process_state': '待加工'})
record.production_id.process_state = '待加工'
if record.routing_type == 'CNC加工':
record.process_state = '待解除装夹'
# record.write({'process_state': '待加工'})
record.production_id.process_state = '待解除装夹'
if record.routing_type == '解除装夹':
'''
记录结束时间
'''
record.date_finished = datetime.now()
if record.routing_type == '表面工艺':
if record.picking_ids[0]:
picking_out = record.env['stock.move.line'].search(
[('picking_id', '=', record.picking_ids[0].id), ('workorder_id', '=', record.id)])
if picking_out:
order_line_ids = []
for item in picking_out.workorder_id:
server_product = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', item.surface_technics_parameters_id.id),
('detailed_type', '=', 'service')])
if server_product:
order_line_ids.append((0, 0, {
'product_id': server_product.product_variant_id.id,
'product_qty': 1,
'origin': record.production_id.name,
'product_uom': server_product.uom_id.id
}))
else:
raise UserError(
'请先在产品中配置表面工艺为%s相关的外协服务产品' % item.surface_technics_parameters_id.name)
if order_line_ids:
self.env['purchase.order'].create({
'partner_id': server_product.seller_ids.partner_id.id,
'state': 'draft',
'order_line': order_line_ids,
})
tem_date_planned_finished = record.date_planned_finished
super().button_finish()
record.write({
'date_planned_finished': tem_date_planned_finished # 保持原值
})
is_production_id = True
for workorder in record.production_id.workorder_ids:
if workorder.state != 'done':
is_production_id = False
if is_production_id == True and record.name == '解除装夹':
for workorder in record.production_id.workorder_ids:
workorder.rfid_code_old = workorder.rfid_code
workorder.rfid_code = None
for move_raw_id in record.production_id.move_raw_ids:
move_raw_id.quantity_done = move_raw_id.product_uom_qty
record.process_state = '已完工'
record.production_id.process_state = '已完工'
record.production_id.button_mark_done1()
# self.production_id.state = 'done'
# 将FTP的检测报告文件下载到临时目录
def download_reportfile_tmp(self, workorder, reportpath):
logging.info('reportpath:%s' % reportpath)
production_no_ftp = reportpath.split('/')
production_no = workorder.production_id.name.replace('/', '_')
# ftp地址
remotepath = os.path.join('/NC', production_no_ftp[1], 'detection')
logging.info('ftp地址:%s' % remotepath)
if reportpath.find(production_no) != -1:
# 服务器内临时地址
serverdir = os.path.join('/tmp', production_no_ftp[1], 'detection')
ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']),
ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password'])
download_state = ftp.download_reportfile_tree(remotepath, serverdir, reportpath)
logging.info('download_state:%s' % download_state)
else:
download_state = 2
return download_state
# 根据中控系统提供的检测文件地址去ftp里对应的制造订单里获取
def get_detection_file(self, workorder, reportPath):
# if reportPath.startswith('/'):
# reportPath = reportPath[4:]
# serverdir = os.path.join('/tmp', reportPath)
serverdir = '/tmp' + reportPath
logging.info('get_detection_file-serverdir:%s' % serverdir)
serverdir_prefix = os.path.dirname(serverdir)
logging.info('serverdir_prefix-serverdir:%s' % serverdir_prefix)
for root, dirs, files in os.walk(serverdir_prefix):
for filename in files:
logging.info('filename:%s' % filename)
logging.info('reportPath:%s' % os.path.basename(reportPath))
if filename == os.path.basename(reportPath):
report_file_path = os.path.join(root, filename)
logging.info('get_detection_file-report_file_path:%s' % report_file_path)
workorder.detection_report = base64.b64encode(open(report_file_path, 'rb').read())
return True
class CNCprocessing(models.Model):
_name = 'sf.cnc.processing'
_description = "CNC加工"
_rec_name = 'program_name'
_order = 'sequence_number,id'
cnc_id = fields.Many2one('ir.attachment')
sequence_number = fields.Char('序号')
program_name = fields.Char('程序名')
functional_tool_type_id = fields.Many2one('sf.functional.cutting.tool.model', string='功能刀具类型')
cutting_tool_name = fields.Char('刀具名称')
cutting_tool_no = fields.Char('刀号')
processing_type = fields.Char('加工类型')
margin_x_y = fields.Char('余量_X/Y')
margin_z = fields.Char('余量_Z')
depth_of_processing_z = fields.Char('加工深度(Z)')
cutting_tool_extension_length = fields.Char('刀具伸出长度')
cutting_tool_handle_type = fields.Char('刀柄型号')
estimated_processing_time = fields.Char('预计加工时间')
remark = fields.Text('备注')
workorder_id = fields.Many2one('mrp.workorder', string="工单")
production_id = fields.Many2one('mrp.production', string="制造订单")
button_state = fields.Boolean(string='是否已经下发')
program_path = fields.Char('程序文件路径')
# mrs下发编程单创建CNC加工
def cnc_processing_create(self, cnc_workorder, ret, program_path, program_path_tmp):
cnc_processing = None
for obj in ret['programming_list']:
workorder = self.env['mrp.workorder'].search(
[('production_id.name', '=', cnc_workorder.name),
('processing_panel', '=', obj['processing_panel']),
('routing_type', '=', 'CNC加工')])
logging.info('workorder:%s' % workorder.id)
if obj['program_name'] in program_path:
logging.info('obj:%s' % obj['program_name'])
cnc_processing = self.env['sf.cnc.processing'].create({
'workorder_id': workorder.id,
'sequence_number': obj['sequence_number'],
'program_name': obj['program_name'],
'cutting_tool_name': obj['cutting_tool_name'],
'cutting_tool_no': obj['cutting_tool_no'],
'processing_type': obj['processing_type'],
'margin_x_y': obj['margin_x_y'],
'margin_z': obj['margin_z'],
'depth_of_processing_z': obj['depth_of_processing_z'],
'cutting_tool_extension_length': obj['cutting_tool_extension_length'],
'cutting_tool_handle_type': obj['cutting_tool_handle_type'],
'estimated_processing_time': obj['estimated_processing_time'],
'remark': obj['remark'],
'program_path': program_path.replace('/tmp', '/home/ftp/ftp_root/NC')
})
cnc_processing.get_cnc_processing_file(program_path_tmp, cnc_processing, program_path)
cnc_workorder.write({'programming_state': '已编程', 'work_state': '已编程'})
return cnc_processing
def _json_cnc_processing(self, obj):
cnc_processing_str = (0, 0, {
'sequence_number': obj['sequence_number'],
'program_name': obj['program_name'],
'cutting_tool_name': obj['cutting_tool_name'],
'cutting_tool_no': obj['cutting_tool_no'],
'processing_type': obj['processing_type'],
'margin_x_y': obj['margin_x_y'],
'margin_z': obj['margin_z'],
'depth_of_processing_z': obj['depth_of_processing_z'],
'cutting_tool_extension_length': obj['cutting_tool_extension_length'],
'cutting_tool_handle_type': obj['cutting_tool_handle_type'],
'estimated_processing_time': obj['estimated_processing_time'],
'remark': obj['remark']
})
return cnc_processing_str
# 根据程序名和加工面匹配到ftp里对应的Nc程序名,可优化为根据cnc_processing.program_path进行匹配
def get_cnc_processing_file(self, serverdir, cnc_processing, program_path):
logging.info('serverdir:%s' % serverdir)
for root, dirs, files in os.walk(serverdir):
for f in files:
if os.path.splitext(f)[1] == ".pdf":
full_path = os.path.join(serverdir, root, f)
if full_path is not False:
if not cnc_processing.workorder_id.cnc_worksheet:
cnc_processing.workorder_id.cnc_worksheet = base64.b64encode(
open(full_path, 'rb').read())
else:
if f in program_path:
# if cnc_processing.program_name == f.split('.')[0]:
cnc_file_path = os.path.join(serverdir, root, f)
self.write_file(cnc_file_path, cnc_processing)
# 创建附件(nc文件)
def attachment_create(self, name, data):
attachment = self.env['ir.attachment'].create({
'datas': base64.b64encode(data),
'type': 'binary',
'public': True,
'description': '程序文件',
'name': name
})
return attachment
# 将FTP的nc文件下载到临时目录
def download_file_tmp(self, production_no, processing_panel):
remotepath = os.path.join('/NC', production_no, 'return', processing_panel)
serverdir = os.path.join('/tmp', production_no, 'return', processing_panel)
ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password'])
download_state = ftp.download_file_tree(remotepath, serverdir)
logging.info('download_state:%s' % download_state)
return download_state
# 将nc文件存到attach的datas里
def write_file(self, nc_file_path, cnc):
nc_file_name = nc_file_path.split('/')
logging.info('file_name:%s' % nc_file_name[-1])
if os.path.exists(nc_file_path):
with open(nc_file_path, 'rb') as file:
data_bytes = file.read()
attachment = self.attachment_create(cnc.program_name + nc_file_name[-1], data_bytes)
cnc.write({'cnc_id': attachment.id})
file.close()
else:
return False
# end_date = datetime.now()
# for workorder in self:
# if workorder.state in ('done', 'cancel'):
# continue
# workorder.end_all()
# vals = {
# 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production,
# 'state': 'done',
# 'date_finished': end_date,
# 'date_planned_finished': end_date,
# 'costs_hour': workorder.workcenter_id.costs_hour
# }
# if not workorder.date_start:
# vals['date_start'] = end_date
# if not workorder.date_planned_start or end_date < workorder.date_planned_start:
# vals['date_planned_start'] = end_date
# workorder.with_context(bypass_duration_calculation=True).write(vals)
return True
class SfWorkOrderBarcodes(models.Model):
"""
智能工厂工单处扫码绑定托盘
"""
_name = "mrp.workorder"
_inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"]
def on_barcode_scanned(self, barcode):
workorder = self.env['mrp.workorder'].browse(self.ids)
# workorder_preset = self.env['mrp.workorder'].search(
# [('routing_type', '=', '装夹预调'), ('rfid_code', '=', barcode)])
workorder_olds = self.env['mrp.workorder'].search(
[('routing_type', '=', '装夹预调'), ('rfid_code', '=', barcode)])
if workorder_olds:
name = ''
for workorder in workorder_olds:
name = '%s %s' % (name, workorder.production_id.name)
raise UserError('该托盘已绑定【%s】制造订单,请先解除绑定!!!' % name)
if workorder:
if workorder.routing_type == '装夹预调':
if workorder.state in ['done']:
work_state = {'done': '已完工'}
raise UserError('装夹%s,请勿重复扫码' % work_state.get(workorder.state))
lots = self.env['stock.lot'].sudo().search([('rfid', '=', barcode)])
logging.info("托盘信息:%s" % lots)
if lots:
for lot in lots:
if lot.product_id.categ_type == '夹具':
val = {
'tray_serial_number': lot.name,
'tray_product_id': lot.product_id.id,
'tray_brand_id': lot.product_id.brand_id.id,
'tray_type_id': lot.product_id.fixture_material_id.id,
'tray_model_id': lot.product_id.fixture_model_id.id,
'rfid_code': barcode
}
workorder.write(val)
self.write(val)
workorder_rfid = self.env['mrp.workorder'].search(
[('production_id', '=', workorder.production_id.id)])
if workorder_rfid:
for item in workorder_rfid:
item.write({'rfid_code': barcode})
logging.info("Rfid绑定成功")
else:
raise UserError('该Rfid【%s】绑定的是【%s】, 不是托盘!!!' % (barcode, lot.product_id.name))
self.process_state = '待检测'
self.date_start = datetime.now()
else:
raise UserError('该托盘信息不存在!!!')
# stock_move_line = self.env['stock.move.line'].search([('lot_name', '=', barcode)])
# if stock_move_line.product_id.categ_type == '夹具':
# workorder.write({
# 'tray_serial_number': stock_move_line.lot_name,
# 'tray_product_id': stock_move_line.product_id.id,
# 'tray_brand_id': stock_move_line.product_id.brand_id.id,
# 'tray_type_id': stock_move_line.product_id.fixture_material_id.id,
# 'tray_model_id': stock_move_line.product_id.fixture_model_id.id
# })
# workorder.button_start()
# # return {
# # 'type': 'ir.actions.act_window',
# # 'res_model': 'mrp.workorder',
# # 'view_mode': 'form',
# # 'domain': [('id', 'in', workorder.id)],
# # 'target': 'current'
# # }
# else:
# embryo_stock_lot = self.env['stock.lot'].search([('name', '=', barcode)])
# if embryo_stock_lot:
# embryo_stock_move_line = self.env['stock.move.line'].search(
# [('product_id', '=', embryo_stock_lot.product_id.id),
# ('reference', '=', workorder.production_id.name),
# ('lot_id', '=', embryo_stock_lot.id),
# ('product_category_name', '=', '坯料')])
# if embryo_stock_move_line:
# bom_production = self.env['mrp.production'].search(
# [('product_id', '=', embryo_stock_lot.product_id.id),
# ('origin', '=', workorder.production_id.name)], limit=1, order='id asc')
# workpiece_delivery = self.env['sf.workpiece.delivery'].search(
# [('workorder_id', '=', workorder.id)], limit=1, order='id asc')
# if workpiece_delivery:
# embryo_workpiece_code = workpiece_delivery.workpiece_code
# if bom_production:
# if workpiece_delivery.workpiece_code and bom_production.name not in \
# workpiece_delivery.workpiece_code:
# embryo_workpiece_code = workpiece_delivery.workpiece_code + ',' + \
# bom_production.name
# if not workpiece_delivery.workpiece_code:
# embryo_workpiece_code = bom_production.name
# workpiece_delivery.write({'workpiece_code': embryo_workpiece_code})
# else:
# raise UserError('工件生产线不一致,请重新确认')
# else:
# workorder_rfid = self.env['mrp.workorder'].search(
# [('production_id', '=', workorder.production_id.id)])
# if workorder_rfid:
# for item in workorder_rfid:
# item.write({'rfid_code': barcode})
class WorkPieceDelivery(models.Model):
_name = "sf.workpiece.delivery"
_description = '工件配送'
name = fields.Char('单据编号')
workorder_id = fields.Many2one('mrp.workorder', string='工单', readonly=True)
workorder_state = fields.Selection(related='workorder_id.state', string='工单状态')
rfid_code = fields.Char(related='workorder_id.rfid_code', string='rfid码', store=True)
production_id = fields.Many2one('mrp.production', string='制造订单号', readonly=True)
production_line_id = fields.Many2one('sf.production.line', string='目的生产线')
plan_start_processing_time = fields.Datetime('计划开始加工时间', readonly=True)
route_id = fields.Many2one('sf.agv.task.route', '任务路线')
feeder_station_start_id = fields.Many2one('sf.agv.site', '起点接驳站')
feeder_station_destination_id = fields.Many2one('sf.agv.site', '目的接驳站')
task_delivery_time = fields.Datetime('任务下发时间')
task_completion_time = fields.Datetime('任务完成时间')
type = fields.Selection(
[('上产线', '上产线'), ('下产线', '下产线'), ('运送空料架', '运送空料架')], string='类型')
delivery_duration = fields.Float('配送时长', compute='_compute_delivery_duration')
status = fields.Selection(
[('待下发', '待下发'), ('待配送', '待配送'), ('已配送', '已配送')], string='状态', default='待下发')
is_cnc_program_down = fields.Boolean('程序是否下发', default=False)
active = fields.Boolean(string="有效", default=True)
@api.model
def create(self, vals):
if vals.get('name', '/') == '/' or vals.get('name', '/') is False:
vals['name'] = self.env['ir.sequence'].next_by_code('sf.workpiece.delivery') or '/'
obj = super(WorkPieceDelivery, self).create(vals)
return obj
def action_delivery_history(self):
return {
'name': _('配送历史'),
'type': 'ir.actions.act_window',
'view_mode': 'tree',
'res_model': 'sf.workpiece.delivery',
'view_id': self.env.ref('sf_manufacturing.sf_workpiece_delivery_empty_racks_tree').id,
'domain': [('type', '=', '运送空料架'), ('route_id', '=', self.route_id.id), ('name', 'ilike', 'WDO')]
}
@api.onchange('route_id')
def onchange_route(self):
if self.route_id:
self.feeder_station_start_id = self.route_id.start_site_id.id
self.feeder_station_destination_id = self.route_id.end_site_id.id
# 工件配送
def button_delivery(self):
delivery_ids = []
production_ids = []
is_cnc_down = 0
is_not_production_line = 0
is_not_route = 0
same_production_line_id = None
same_route_id = None
down_status = '待下发'
production_type = None
num = 0
for item in self:
num += 1
if production_type is None:
production_type = item.type
if item.type == "运送空料架":
if num >= 2:
raise UserError('仅选择一条路线进行配送,请重新选择')
else:
delivery_ids.append(item.id)
else:
if num > 4:
raise UserError('仅限于配送1-4个制造订单请重新选择')
if item.status in ['待配送', '已配送']:
raise UserError('请选择状态为【待下发】的制造订单进行配送')
if item.route_id:
if same_route_id is None:
same_route_id = item.route_id.id
if item.route_id.id != same_route_id:
is_not_route += 1
# else:
# raise UserError('请选择【任务路线】再进行配送')
if production_type != item.type:
raise UserError('请选择类型为%s的制造订单进行配送' % production_type)
if down_status != item.status:
up_workpiece = self.search([('type', '=', '上产线'), ('production_id', '=', item.production_id),
('status', '=', '待下发')])
if up_workpiece:
raise UserError('您所选择的制造订单暂未上产线,请在上产线后再进行配送')
else:
raise UserError('请选择状态为【待下发】的制造订单进行配送')
if same_production_line_id is None:
same_production_line_id = item.production_line_id.id
if item.production_line_id.id != same_production_line_id:
is_not_production_line += 1
if item.is_cnc_program_down is False:
is_cnc_down += 1
if is_cnc_down == 0 and is_not_production_line == 0 and is_not_route == 0:
delivery_ids.append(item.id)
production_ids.append(item.production_id.id)
if is_cnc_down >= 1:
raise UserError('您所选择制造订单的【CNC程序】暂未下发请在程序下发后再进行配送')
if is_not_production_line >= 1:
raise UserError('您所选择制造订单的【目的生产线】不一致,请重新确认')
if is_not_route >= 1:
raise UserError('您所选择制造订单的【任务路线】不一致,请重新确认')
is_free = self._check_avgsite_state()
if is_free is True:
if delivery_ids:
return {
'name': _('确认'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'sf.workpiece.delivery.wizard',
'target': 'new',
'context': {
'default_delivery_ids': [(6, 0, delivery_ids)],
'default_production_ids': [(6, 0, production_ids)],
'default_destination_production_line_id': same_production_line_id,
'default_route_id': same_route_id,
'default_type': production_type,
}}
else:
if production_type == '运送空料架':
raise UserError("您所选择的【任务路线】的【终点接驳站】已占用,请在该接驳站空闲时进行配送")
else:
raise UserError(
"您所选择制造订单的【任务路线】的【终点接驳站】已占用,请在该接驳站空闲时或选择其他路线进行配送")
# 验证agv站点是否可用
def _check_avgsite_state(self):
is_free = False
agv_site = self.env['sf.agv.site'].search([])
if agv_site:
agv_site.update_site_state()
for item in self:
logging.info('工件配送-起点状态:%s-%s' % (
item.feeder_station_start_id.name, item.feeder_station_start_id.state))
logging.info('工件配送-终点状态:%s-%s' % (
item.feeder_station_destination_id.name, item.feeder_station_destination_id.state))
if item.type in ['上产线', '下产线']:
if (
item.feeder_station_start_id.state == '占用' and item.feeder_station_destination_id.state == '空闲') or (
item.feeder_station_start_id.state == '空闲' and item.feeder_station_destination_id.state == '空闲'):
is_free = True
else:
if item.feeder_station_destination_id.state == '空闲':
is_free = True
logging.info('is_free:%s' % is_free)
return is_free
# 配送至avg小车
def _delivery_avg(self):
config = self.env['res.config.settings'].get_values()
positionCode_Arr = []
delivery_Arr = []
feeder_station_start = None
feeder_station_destination = None
route_id = None
for item in self:
if route_id is None:
route_id = item.route_id.id
if feeder_station_start is None:
feeder_station_start = item.feeder_station_start_id
if feeder_station_destination is None:
feeder_station_destination = item.feeder_station_destination_id
if item.type in ['上产线', '下产线']:
item.route_id = route_id
item.feeder_station_start_id = feeder_station_start.id
item.feeder_station_destination_id = feeder_station_destination.id
delivery_Arr.append(item.name)
else:
self = self.create(
{'name': self.env['ir.sequence'].next_by_code('sf.workpiece.delivery'),
'route_id': self.route_id.id,
'feeder_station_start_id': self.feeder_station_start_id.id,
'feeder_station_destination_id': self.feeder_station_destination_id.id})
delivery_Arr.append(self.name)
delivery_str = ','.join(map(str, delivery_Arr))
if feeder_station_start is not None:
positionCode_Arr.append({
'positionCode': feeder_station_start.name,
'code': '00'
})
if feeder_station_destination is not None:
positionCode_Arr.append({
'positionCode': feeder_station_destination.name,
'code': '00'
})
res = {'reqCode': delivery_str, 'reqTime': '', 'clientCode': '', 'tokenCode': '',
'taskTyp': 'F01', 'ctnrTyp': '', 'ctnrCode': '', 'wbCode': config['wbcode'],
'positionCodePath': positionCode_Arr,
'podCode': '',
'podDir': '', 'materialLot': '', 'priority': '', 'taskCode': '', 'agvCode': '', 'materialLot': '',
'data': ''}
try:
logging.info('AGV请求路径:%s' % config['agv_rcs_url'])
logging.info('AGV-json:%s' % res)
headers = {'Content-Type': 'application/json'}
ret = requests.post((config['agv_rcs_url']), json=res, headers=headers)
ret = ret.json()
logging.info('config-ret:%s' % ret)
if ret['code'] == 0:
req_codes = ret['reqCode'].split(',')
for delivery_item in self:
for req_code in req_codes:
if delivery_item.name == req_code.strip():
logging.info('delivery_item-name:%s' % delivery_item.name)
delivery_item.write({
'task_delivery_time': fields.Datetime.now(),
'status': '待配送'
})
if delivery_item == "上产线":
delivery_item.workorder_id.write({'is_delivery': True})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('config-e:%s' % e)
raise UserError("工件配送请求agv失败:%s" % e)
@api.depends('task_delivery_time', 'task_completion_time')
def _compute_delivery_duration(self):
for obj in self:
if obj.task_delivery_time and obj.task_completion_time:
obj.delivery_duration = round(
(obj.task_completion_time - obj.task_delivery_time).total_seconds() / 60.0, 2)
else:
obj.delivery_duration = 0.0
class CMMprogram(models.Model):
_name = 'sf.cmm.program'
_description = "CMM程序"
cmm_id = fields.Many2one('ir.attachment')
sequence_number = fields.Integer('序号')
program_name = fields.Char('程序名')
cutting_tool_name = fields.Char('刀具名称')
cutting_tool_no = fields.Char('刀号')
processing_type = fields.Char('加工类型')
margin_x_y = fields.Char('余量_X/Y')
margin_z = fields.Char('余量_Z')
depth_of_processing_z = fields.Char('加工深度(Z)')
cutting_tool_extension_length = fields.Char('刀具伸出长度')
cutting_tool_handle_type = fields.Char('刀柄型号')
estimated_processing_time = fields.Char('预计加工时间')
remark = fields.Text('备注')
workorder_id = fields.Many2one('mrp.workorder', string="工单")
production_id = fields.Many2one('mrp.production', string="制造订单")
program_path = fields.Char('程序文件路径')
def cmm_program_create(self, ret, program_path, program_path_tmp):
cmm_program = None
for obj in ret['programming_list']:
workorder = self.env['mrp.workorder'].search(
[('production_id.name', '=', ret['production_order_no'].split(',')[0]),
('processing_panel', '=', obj['processing_panel']),
('routing_type', '=', 'CNC加工')])
if obj['program_name'] in program_path:
logging.info('obj:%s' % obj['program_name'])
cmm_program = self.sudo().create({
'workorder_id': workorder.id,
'sequence_number': obj['sequence_number'],
'program_name': obj['program_name'],
'cutting_tool_name': obj['cutting_tool_name'],
'cutting_tool_no': obj['cutting_tool_no'],
'processing_type': obj['processing_type'],
'margin_x_y': obj['margin_x_y'],
'margin_z': obj['margin_z'],
'depth_of_processing_z': obj['depth_of_processing_z'],
'cutting_tool_extension_length': obj['cutting_tool_extension_length'],
'cutting_tool_handle_type': obj['cutting_tool_handle_type'],
'estimated_processing_time': obj['estimated_processing_time'],
'remark': obj['remark'],
'program_path': program_path.replace('/tmp', '')
})
cmm_program.get_cmm_program_file(program_path_tmp, cmm_program, program_path)
return cmm_program
# 根据程序名和加工面匹配到ftp里对应的cmm程序名
def get_cmm_program_file(self, serverdir, cmm_program, program_path):
logging.info('cmm-serverdir:%s' % serverdir)
for root, dirs, files in os.walk(serverdir):
for f in files:
if f in program_path:
cmm_program_file_path = os.path.join(serverdir, root, f)
self.write_file_cmm(cmm_program_file_path, cmm_program)
# 创建附件(nc文件)
def attachment_create(self, name, data):
attachment = self.env['ir.attachment'].create({
'datas': base64.b64encode(data),
'type': 'binary',
'public': True,
'description': '程序文件',
'name': name
})
return attachment
# 将cmm文件存到attach的datas里
def write_file_cmm(self, cmm_file_path, cnc):
cmm_file_name = cmm_file_path.split('/')
logging.info('cmm_file_name:%s' % cmm_file_name[-1])
if os.path.exists(cmm_file_path):
with open(cmm_file_path, 'rb') as file:
data_bytes = file.read()
attachment = self.attachment_create(cnc.program_name + cmm_file_name[-1], data_bytes)
cnc.write({'cmm_id': attachment.id})
file.close()
else:
return False