Files
jikimo_sf/sf_manufacturing/models/mrp_workorder.py
2024-02-27 17:33:08 +08:00

929 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import base64
from datetime import date
from datetime import datetime, timedelta
import requests
import os
import math
from dateutil.relativedelta import relativedelta
# import subprocess
from odoo import api, fields, models, SUPERUSER_ID, _
from odoo.addons.sf_base.commons.common import Common
from odoo.exceptions import UserError
from odoo.addons.sf_mrs_connect.models.ftp_operate import FtpController
class ResMrpWorkOrder(models.Model):
_inherit = 'mrp.workorder'
_order = 'sequence asc,create_date desc'
product_tmpl_id_length = fields.Float(related='production_id.product_tmpl_id.length', readonly=True, store=True,
string="坯料长度(mm)")
product_tmpl_id_width = fields.Float(related='production_id.product_tmpl_id.width', readonly=True, store=True,
string="坯料宽度(mm)")
product_tmpl_id_height = fields.Float(related='production_id.product_tmpl_id.height', readonly=True, store=True,
string="坯料高度(mm)")
product_tmpl_id_materials_id = fields.Many2one(related='production_id.product_tmpl_id.materials_id', readonly=True,
store=True, check_company=True, string="材料")
product_tmpl_id_materials_type_id = fields.Many2one(related='production_id.product_tmpl_id.materials_type_id',
readonly=True, store=True, check_company=True, string="型号")
workcenter_id = fields.Many2one('mrp.workcenter', string='工作中心', required=False)
users_ids = fields.Many2many("res.users", 'users_workorder', related="workcenter_id.users_ids")
processing_panel = fields.Char('加工面')
sequence = fields.Integer(string='工序')
routing_type = fields.Selection([
# ('获取CNC加工程序', '获取CNC加工程序'),
('装夹预调', '装夹预调'),
# ('前置三元定位检测', '前置三元定位检测'),
('CNC加工', 'CNC加工'),
# ('后置三元质量检测', '后置三元质量检测'),
('解除装夹', '解除装夹'),
('切割', '切割'), ('表面工艺', '表面工艺')
], string="工序类型")
results = fields.Char('结果')
@api.onchange('users_ids')
def get_user_permissions(self):
uid = self.env.uid
for workorder in self:
if workorder.users_ids:
list_user_id = []
for item in workorder.users_ids:
list_user_id.append(item.id)
if uid in list_user_id:
workorder.user_permissions = True
else:
workorder.user_permissions = False
else:
workorder.user_permissions = False
user_permissions = fields.Boolean('用户权限', compute='get_user_permissions')
programming_no = fields.Char('编程单号')
work_state = fields.Char('业务状态')
programming_state = fields.Char('编程状态')
cnc_worksheet = fields.Binary(
'工作指令', readonly=True)
material_center_point = fields.Char(string='坯料中心点')
X1_axis = fields.Float(default=0)
Y1_axis = fields.Float(default=0)
Z1_axis = fields.Float(default=0)
X2_axis = fields.Float(default=0)
Y2_axis = fields.Float(default=0)
Z2_axis = fields.Float(default=0)
X3_axis = fields.Float(default=0)
Y3_axis = fields.Float(default=0)
Z3_axis = fields.Float(default=0)
X4_axis = fields.Float(default=0)
Y4_axis = fields.Float(default=0)
Z4_axis = fields.Float(default=0)
X5_axis = fields.Float(default=0)
Y5_axis = fields.Float(default=0)
Z5_axis = fields.Float(default=0)
X6_axis = fields.Float(default=0)
Y6_axis = fields.Float(default=0)
Z6_axis = fields.Float(default=0)
X7_axis = fields.Float(default=0)
Y7_axis = fields.Float(default=0)
Z7_axis = fields.Float(default=0)
X8_axis = fields.Float(default=0)
Y8_axis = fields.Float(default=0)
Z8_axis = fields.Float(default=0)
X9_axis = fields.Float(default=0)
Y9_axis = fields.Float(default=0)
Z9_axis = fields.Float(default=0)
X10_axis = fields.Float(default=0)
Y10_axis = fields.Float(default=0)
Z10_axis = fields.Float(default=0)
X_deviation_angle = fields.Integer(string="X轴偏差度", default=0)
test_results = fields.Selection([("合格", "合格"), ("返工", "返工"), ("报废", "报废")], default='合格',
string="检测结果")
cnc_ids = fields.One2many("sf.cnc.processing", 'workorder_id', string="CNC加工程序")
cmm_ids = fields.One2many("sf.cmm.program", 'workorder_id', string="CMM程序")
tray_code = fields.Char(string="托盘编码")
glb_file = fields.Binary("glb模型文件")
is_subcontract = fields.Boolean(string='是否外协')
surface_technics_parameters_id = fields.Many2one('sf.production.process.parameter', string="表面工艺可选参数")
picking_in_id = fields.Many2one('stock.picking', string='外协入库单')
picking_out_id = fields.Many2one('stock.picking', string='外协出库单')
supplier_id = fields.Many2one('res.partner', string='外协供应商')
equipment_id = fields.Many2one('maintenance.equipment', string='加工设备')
is_ok = fields.Boolean(string='是否合格')
# 加工人
processing_user_id = fields.Many2one('res.users', string='加工人')
# 检测人
inspection_user_id = fields.Many2one('res.users', string='检测人')
schedule_state = fields.Selection(related='production_id.schedule_state', store=True)
# 工件装夹信息
functional_fixture_code = fields.Char(string="功能夹具编码", readonly=True)
functional_fixture_serial_number = fields.Char(string="功能夹具序列号", readonly=True)
functional_fixture_id = fields.Many2one('sf.functional.fixture', string="功能夹具")
functional_fixture_type_id = fields.Many2one('sf.functional.fixture.type', string="功能夹具类型", readonly=True)
chuck_serial_number = fields.Char(string="卡盘序列号")
chuck_name = fields.Char(string="卡盘名称")
chuck_brand_id = fields.Many2one('sf.machine.brand', string="卡盘品牌")
chuck_type_id = fields.Char(string="卡盘类型")
chuck_model_id = fields.Char(string="卡盘型号")
tray_serial_number = fields.Char(string="托盘序列号")
tray_product_id = fields.Many2one('product.product', string="托盘名称")
tray_brand_id = fields.Many2one('sf.machine.brand', string="托盘品牌")
tray_type_id = fields.Many2one('sf.fixture.material', string="托盘类型")
tray_model_id = fields.Many2one('sf.fixture.model', string="托盘型号")
total_wight = fields.Float(string="总重量")
maximum_carrying_weight = fields.Char(string="最大承载重量[kg]")
maximum_clamping_force = fields.Char(string="最大夹持力[n]")
preset_program_information = fields.Char(string="预调程序信息")
workpiece_delivery_ids = fields.One2many('sf.workpiece.delivery', 'workorder_id', '工件配送')
is_delivery = fields.Boolean('是否配送完成', default=False)
rfid_code = fields.Char('RFID码')
production_line_id = fields.Many2one('sf.production.line', related='production_id.production_line_id',
string='生产线', store=True)
production_line_state = fields.Selection(related='production_id.production_line_state',
string='上/下产线', store=True)
detection_report = fields.Binary('检测报告', readonly=True)
def get_plan_workorder(self, production_line):
tomorrow = (date.today() + timedelta(days=+1)).strftime("%Y-%m-%d")
sql = """
SELECT *
FROM mrp_workorder
WHERE date_planned_start = %s::timestamp
AND date_planned_start < (%s::timestamp + interval '1 day')
AND date_planned_finished >= %s::timestamp
AND date_planned_finished < (%s::timestamp + interval '1 day')
"""
params = [tomorrow, tomorrow, tomorrow, tomorrow]
if production_line:
sql += "AND production_line_id = %s"
params.append(production_line)
self.env.cr.execute(sql, params)
ids = [t[0] for t in self.env.cr.fetchall()]
return [('id', 'in', ids)]
@api.onchange('is_ok')
def _onchange_inspection_user_id(self):
"""
检测is_ok(是否合格)被修改的话就将当前用户赋值给inspection_user_id
"""
if not self.inspection_user_id:
self.inspection_user_id = self.env.user.id
else:
self.inspection_user_id = False
@api.onchange('functional_fixture_id')
def _onchange_functional_fixture_id(self):
if self.functional_fixture_id:
self.functional_fixture_code = self.functional_fixture_id.code
self.functional_fixture_type_id = self.functional_fixture_id.type_id.id
def get_no_data(self, production_id):
process_parameter_workorder = self.search(
[('surface_technics_parameters_id', '!=', False), ('production_id', '=', production_id)])
return process_parameter_workorder
# 计算配料中心点和与x轴倾斜度方法
def getcenter(self):
try:
x1 = self.X1_axis
x2 = self.X2_axis
x3 = self.X3_axis
x4 = self.X4_axis
x5 = self.X5_axis
x6 = self.X6_axis
x7 = self.X7_axis
x8 = self.X8_axis
y1 = self.Y1_axis
y2 = self.Y2_axis
y3 = self.Y3_axis
y4 = self.Y4_axis
y5 = self.Y5_axis
y6 = self.Y6_axis
y7 = self.Y7_axis
y8 = self.Y8_axis
z1 = self.Z9_axis
x0 = ((x3 - x4) * (x2 * y1 - x1 * y2) - (x1 - x2) * (x4 * y3 - x3 * y4)) / (
(x3 - x4) * (y1 - y2) - (x1 - x2) * (y3 - y4))
y0 = ((y3 - y4) * (y2 * x1 - y1 * x2) - (y1 - y2) * (y4 * x3 - y3 * x4)) / (
(y3 - y4) * (x1 - x2) - (y1 - y2) * (x3 - x4))
x1 = ((x7 - x8) * (x6 * y5 - x5 * y6) - (x5 - x6) * (x8 * y7 - x7 * y8)) / (
(x7 - x8) * (y5 - y6) - (x5 - x6) * (y7 - y8))
y1 = ((y7 - y8) * (y6 * x5 - y5 * x6) - (y5 - y6) * (y8 * x7 - y7 * x8)) / (
(y7 - y8) * (x5 - x6) - (y5 - y6) * (x7 - x8))
x = (x0 + x1) / 2
y = (y0 + y1) / 2
z = z1 / 2
jd = math.atan2((x5 - x6), (y5 - y6))
jdz = jd * 180 / math.pi
print("(%.2f,%.2f)" % (x, y))
self.material_center_point = ("(%.2f,%.2f,%.2f)" % (x, y, z))
self.X_deviation_angle = jdz
# 将补偿值写入CNC加工工单
workorder = self.env['mrp.workorder'].browse(self.ids)
work = workorder.production_id.workorder_ids
work.compensation_value_x = eval(self.material_center_point)[0]
work.compensation_value_y = eval(self.material_center_point)[1]
workorder.button_finish()
except:
raise UserError("参数计算有误")
def button_workpiece_delivery(self):
if self.routing_type == '装夹预调':
for item in self.workpiece_delivery_ids:
if not item.feeder_station_start:
raise UserError('【工件配送】明细中请输入起点接驳站')
# if not item.workpiece_code:
# raise UserError('请对【同运工件】进行扫描')
else:
item.write({'task_delivery_time': fields.Datetime.now(), 'status': '待配送'})
# 拼接工单对象属性值
def json_workorder_str(self, k, production, route):
# 计算预计时长duration_expected
if route.routing_type == '切割':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', '切割')]).time_cycle
# elif route.routing_type == '获取CNC加工程序':
# duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
# [('name', '=', '获取CNC加工程序')]).time_cycle
elif route.routing_type == '装夹预调':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', '装夹预调')]).time_cycle
# elif route.routing_type == '前置三元定位检测':
# duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
# [('name', '=', '前置三元定位检测')]).time_cycle
elif route.routing_type == 'CNC加工':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle
# elif route.routing_type == '后置三元质量检测':
# duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
# [('name', '=', '后置三元质量检测')]).time_cycle
elif route.routing_type == '解除装夹':
duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', '解除装夹')]).time_cycle
else:
duration_expected = 60
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': route.route_workcenter_id.name,
'processing_panel': k,
'quality_point_ids': route.route_workcenter_id.quality_point_ids,
'routing_type': route.routing_type,
'work_state': '待发起',
'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids,
route.routing_type,
production.product_id),
'date_planned_start': False,
'date_planned_finished': False,
'duration_expected': duration_expected,
'duration': 0,
'workpiece_delivery_ids': False if not route.routing_type == '装夹预调' else self.env[
'sf.workpiece.delivery'].create(
{'production_id': production.id})
}]
return workorders_values_str
# 拼接工单对象属性值(表面工艺)
def _json_workorder_surface_process_str(self, production, route, process_parameter, supplier_id):
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': '%s-%s' % (route.name, process_parameter.name),
'processing_panel': '',
'routing_type': '表面工艺',
'surface_technics_parameters_id': process_parameter.id,
'work_state': '',
'supplier_id': supplier_id,
'is_subcontract': True if process_parameter.gain_way == '外协' else False,
'workcenter_id': self.env[
'mrp.workcenter'].get_process_outsourcing_workcenter() if process_parameter.gain_way == '外协' else
self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids,
route.routing_type,
production.product_id),
'date_planned_start': False,
'date_planned_finished': False,
'duration_expected': 60,
'duration': 0
}]
return workorders_values_str
# 维修模块按钮
def button_maintenance_req(self):
self.ensure_one()
return {
'name': _('New Maintenance Request'),
'view_mode': 'form',
'views': [(self.env.ref('mrp_maintenance.maintenance_request_view_form_inherit_mrp').id, 'form')],
'res_model': 'maintenance.request',
'type': 'ir.actions.act_window',
'context': {
'default_company_id': self.company_id.id,
'default_workorder_id': self.id,
'default_production_id': self.production_id.id,
'discard_on_footer_button': True,
},
'target': 'new',
'domain': [('workorder_id', '=', self.id)]
}
# tray_id = fields.Many2one('sf.tray', string="托盘信息", tracking=True)
# 扫码绑定托盘方法
# def gettray(self):
# if self.tray_code != False:
# values = self.env['sf.tray'].search([("code", "=", self.tray_code)])
# if values:
# if values.state == "占用":
# raise UserError('该托盘已占用')
# if values.state == "报损":
# raise UserError('该托盘已损坏')
# else:
# values.update({
# 'workorder_id': self,
# 'production_id': self.production_id,
# 'state': '占用',
# })
# self.work_state = "已绑定"
# orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)])
# for a in orders:
# a.tray_id = values
# else:
# raise UserError('该托盘编码已失效')
# else:
# raise UserError('托盘码不能为空')
# 验证坯料序列号是否正确
def pro_code_is_ok(self, barcode):
if barcode is not False:
if barcode != self.pro_code:
raise UserError('坯料序列号错误')
return False
else:
return True
pro_code = fields.Char(string='坯料序列号')
pro_code_ok = fields.Boolean(default=False)
# 托盘扫码绑定
# def gettray_auto(self, barcode):
# if barcode != False:
# values = self.env['sf.tray'].search([("code", "=", barcode)])
#
# if values:
# if values.state == "占用":
# raise UserError('该托盘已占用')
# if values.state == "报损":
# raise UserError('该托盘已损坏')
# else:
# values.update({
# 'workorder_id': self,
# 'production_id': self.production_id,
# 'state': '占用',
# })
# self.work_state = "已绑定"
# orders = self.env['mrp.workorder'].search([('production_id', '=', self.production_id.id)])
# for a in orders:
# a.tray_id = values
#
# return values
#
# # return {
# # 'name': _('New Maintenance Request'),
# # 'view_mode': 'form',
# # 'res_model': 'maintenance.request',
# # 'type': 'ir.actions.act_window',
# # 'context': {
# # 'default_company_id': self.company_id.id,
# # 'default_production_id': self.id,
# # },
# # 'domain': [('production_id', '=', self.id)],
# # }
# else:
# raise UserError('该托盘编码已失效')
# else:
# raise UserError('托盘码不能为空')
# 解除托盘绑定
# def unbindtray(self):
# tray = self.env['sf.tray'].search([("production_id", "=", self.production_id.id)])
# if tray:
# tray.unclamp()
# self.tray_id = False
# return {
# 'name': _('New Maintenance Request'),
# 'view_mode': 'form',
# 'res_model': 'maintenance.request',
# 'res_id':self.id,
# 'type': 'ir.actions.act_window',
# 'context': {
# 'default_company_id': self.company_id.id,
# 'default_production_id': self.id,
# },
# 'domain': [('production_id', '=', self.id)],
# 'target':'new'
# }
def recreateManufacturingOrWorkerOrder(self):
"""
重新生成制造订单或者重新生成工单
"""
if self.test_results == '报废':
values = self.env['mrp.production'].create_production1_values(self.production_id)
productions = self.env['mrp.production'].with_user(SUPERUSER_ID).sudo().with_company(
self.production_id.company_id).create(
values)
self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder()
productions.filtered(lambda p: (not p.orderpoint_id and p.move_raw_ids) or \
(
p.move_dest_ids.procure_method != 'make_to_order' and
not p.move_raw_ids and not p.workorder_ids)).action_confirm()
for production in productions:
origin_production = production.move_dest_ids and production.move_dest_ids[
0].raw_material_production_id or False
orderpoint = production.orderpoint_id
if orderpoint and orderpoint.create_uid.id == SUPERUSER_ID and orderpoint.trigger == 'manual':
production.message_post(
body=_('This production order has been created from Replenishment Report.'),
message_type='comment',
subtype_xmlid='mail.mt_note')
elif orderpoint:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': orderpoint},
subtype_id=self.env.ref('mail.mt_note').id)
elif origin_production:
production.message_post_with_view(
'mail.message_origin_link',
values={'self': production, 'origin': origin_production},
subtype_id=self.env.ref('mail.mt_note').id)
if self.test_results == '返工':
productions = self.production_id
# self.env['stock.move'].sudo().create(productions._get_moves_raw_values())
# self.env['stock.move'].sudo().create(productions._get_moves_finished_values())
productions._create_workorder2(self.processing_panel)
else:
self.results = '合格'
# cnc程序获取
def fetchCNC(self):
try:
cnc = self.env['mrp.workorder'].search(
[('routing_type', '=', 'CNC加工'), ('production_id', '=', self.production_id.id)], limit=1)
res = {'model_code': '' if not cnc.product_id.model_code else cnc.product_id.model_code,
'production_no': self.production_id.name,
'machine_tool_code': cnc.workcenter_id.equipment_id.code,
'material_code': cnc.env['sf.production.materials'].search(
[('id', '=', cnc.product_id.materials_id.id)]).materials_no,
'material_type_code': cnc.env['sf.materials.model'].search(
[('id', '=', cnc.product_id.materials_type_id.id)]).materials_no,
'machining_processing_panel': cnc.product_id.model_processing_panel,
'machining_precision': cnc.product_id.model_machining_precision,
'embryo_long': cnc.product_id.bom_ids.bom_line_ids.product_id.length,
'embryo_height': cnc.product_id.bom_ids.bom_line_ids.product_id.height,
'embryo_width': cnc.product_id.bom_ids.bom_line_ids.product_id.width,
'order_no': cnc.production_id.origin,
'model_order_no': cnc.product_id.default_code.rsplit(' -', 1)[0],
'user': self.env.user.name,
'model_file': '' if not cnc.product_id.model_file else base64.b64encode(
cnc.product_id.model_file).decode('utf-8')
}
logging.info('res:%s' % res)
configsettings = self.env['res.config.settings'].get_values()
config_header = Common.get_headers(self, configsettings['token'], configsettings['sf_secret_key'])
url = '/api/intelligent_programming/create'
config_url = configsettings['sf_url'] + url
# res_str = json.dumps(res)
ret = requests.post(config_url, json={}, data=res, headers=config_header)
ret = ret.json()
logging.info('fetchCNC-ret:%s' % ret)
if ret['status'] == 1:
self.write(
{'programming_no': ret['programming_no'], 'programming_state': '编程中', 'work_state': '编程中'})
else:
raise UserError(ret['message'])
except Exception as e:
logging.info('fetchCNC error:%s' % e)
raise UserError("cnc程序获取编程单失败,请联系管理员")
def json_workorder_str1(self, k, production, route):
workorders_values_str = [0, '', {
'product_uom_id': production.product_uom_id.id,
'qty_producing': 0,
'operation_id': False,
'name': '%s(返工)' % route.route_workcenter_id.name,
'processing_panel': k,
'routing_type': route.routing_type,
'work_state': '' if not route.routing_type == '获取CNC加工程序' else '待发起',
'workcenter_id': self.env['mrp.routing.workcenter'].get_workcenter(route.workcenter_ids.ids,
route.routing_type,
production.product_id),
'date_planned_start': False,
'date_planned_finished': False,
'duration_expected': 60,
'duration': 0,
}]
return workorders_values_str
# 重写工单开始按钮方法
def button_start(self):
if self.routing_type == '装夹预调' and self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name:
self.pro_code = self.production_id.move_raw_ids[0].move_line_ids[0].lot_id.name
# 外协出库单,从“正在等待”变为“就绪”状态
if self.is_subcontract is True:
picking_out = self.env['stock.picking'].search([('id', '=', self.picking_out_id.id)])
if picking_out.state == 'confirmed':
picking_out.write({'state': 'assigned'})
if self.state == 'waiting' or self.state == 'ready' or self.state == 'progress':
self.move_raw_ids = self.production_id.move_raw_ids
self.ensure_one()
if any(not time.date_end for time in self.time_ids.filtered(lambda t: t.user_id.id == self.env.user.id)):
return True
# As button_start is automatically called in the new view
if self.state in ('done', 'cancel'):
return True
if self.product_tracking == 'serial':
self.qty_producing = 1.0
else:
self.qty_producing = self.qty_remaining
self.env['mrp.workcenter.productivity'].create(
self._prepare_timeline_vals(self.duration, datetime.now())
)
if self.production_id.state != 'progress':
self.production_id.write({
'date_start': datetime.now(),
})
if self.state == 'progress':
return True
start_date = datetime.now()
vals = {
'state': 'progress',
'date_start': start_date,
}
if not self.leave_id:
leave = self.env['resource.calendar.leaves'].create({
'name': self.display_name,
'calendar_id': self.workcenter_id.resource_calendar_id.id,
'date_from': start_date,
'date_to': start_date + relativedelta(minutes=self.duration_expected),
'resource_id': self.workcenter_id.resource_id.id,
'time_type': 'other'
})
vals['leave_id'] = leave.id
return self.write(vals)
else:
if self.date_planned_start > start_date:
vals['date_planned_start'] = start_date
if self.date_planned_finished and self.date_planned_finished < start_date:
vals['date_planned_finished'] = start_date
return self.write(vals)
else:
raise UserError(_('请先完成上一步工单'))
def button_finish(self):
if self.routing_type == '装夹预调':
if not self.material_center_point and self.X_deviation_angle > 0:
raise UserError("请对前置三元检测定位参数进行计算定位")
if self.picking_out_id:
picking_out = self.env['stock.picking'].search([('id', '=', self.picking_out_id.id)])
if picking_out.workorder_out_id:
order_line_ids = []
for item in picking_out.workorder_out_id:
server_product = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', item.surface_technics_parameters_id.id),
('detailed_type', '=', 'service')])
if server_product:
order_line_ids.append((0, 0, {
'product_id': server_product.product_variant_id.id,
'product_qty': 1,
'product_uom': server_product.uom_id.id
}))
else:
raise UserError(
'请先在产品中配置表面工艺为%s相关的外协服务产品' % item.surface_technics_parameters_id.name)
self.env['purchase.order'].create({
'partner_id': server_product.seller_ids.partner_id.id,
'state': 'draft',
'order_line': order_line_ids,
})
super().button_finish()
is_production_id = True
for workorder in self.production_id.workorder_ids:
if workorder.state != 'done':
is_production_id = False
if is_production_id == True and self.name == '解除装夹':
for workorder in self.production_id.workorder_ids:
workorder.rfid_code = None
for move_raw_id in self.production_id.move_raw_ids:
move_raw_id.quantity_done = move_raw_id.product_uom_qty
self.production_id.button_mark_done1()
# self.production_id.state = 'done'
# 将FTP的检测报告文件下载到临时目录
def download_reportfile_tmp(self, workorder, reportpath):
production_no = workorder.production_id.name.replace('/', '_')
remotepath = os.path.join('/', production_no, 'detection')
serverdir = os.path.join('/tmp', production_no, 'detection')
ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']),
ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password'])
download_state = ftp.download_reportfile_tree(remotepath, serverdir, reportpath)
logging.info('download_state:%s' % download_state)
return download_state
# 根据中控系统提供的检测文件地址去ftp里对应的制造订单里获取
def get_detection_file(self, workorder, reportPath):
logging.info('workorder:%s' % workorder.name)
logging.info('制造订单:%s' % workorder.production_id.name)
logging.info('reportPath:%s' % reportPath)
serverdir = os.path.join('/tmp', reportPath).replace('//', '/')
logging.info('serverdir:%s' % serverdir)
for root, dirs, files in os.walk(serverdir):
for f in files:
logging.info('f:%s' % f)
if os.path.splitext(f)[1] == ".pdf":
full_path = os.path.join(serverdir, root, f)
logging.info('检测文件路径:%s' % full_path)
if full_path is not False:
workorder.detection_report = base64.b64encode(
open(full_path, 'rb').read())
class CNCprocessing(models.Model):
_name = 'sf.cnc.processing'
_description = "CNC加工"
_rec_name = 'program_name'
cnc_id = fields.Many2one('ir.attachment')
sequence_number = fields.Char('序号')
program_name = fields.Char('程序名')
functional_tool_type_id = fields.Many2one('sf.functional.cutting.tool.model', string='功能刀具类型')
cutting_tool_name = fields.Char('刀具名称')
cutting_tool_no = fields.Char('刀号')
processing_type = fields.Char('加工类型')
margin_x_y = fields.Char('余量_X/Y')
margin_z = fields.Char('余量_Z')
depth_of_processing_z = fields.Char('加工深度(Z)')
cutting_tool_extension_length = fields.Char('刀具伸出长度')
cutting_tool_handle_type = fields.Char('刀柄型号')
estimated_processing_time = fields.Char('预计加工时间')
remark = fields.Text('备注')
workorder_id = fields.Many2one('mrp.workorder', string="工单")
production_id = fields.Many2one('mrp.production', string="制造订单")
button_state = fields.Boolean(string='是否已经下发')
program_path = fields.Char('程序文件路径')
# mrs下发编程单创建CNC加工
def cnc_processing_create(self, cnc_workorder, ret, program_path, program_path_tmp):
logging.info('ret:%s' % ret)
logging.info('program_path_tmp:%s' % program_path_tmp)
logging.info('program_path:%s' % program_path)
for obj in ret['programming_list']:
workorder = self.env['mrp.workorder'].search([('production_id.name', '=', ret['production_order_no']),
('processing_panel', '=', obj['processing_panel']),
('routing_type', '=', 'CNC加工')])
logging.info('workorder:%s' % workorder.id)
logging.info('obj:%s' % obj)
if obj['program_name'] in program_path:
logging.info('obj:%s' % obj['program_name'])
cnc_processing = self.env['sf.cnc.processing'].create({
'workorder_id': workorder.id,
'sequence_number': obj['sequence_number'],
'program_name': obj['program_name'],
'cutting_tool_name': obj['cutting_tool_name'],
'cutting_tool_no': obj['cutting_tool_no'],
'processing_type': obj['processing_type'],
'margin_x_y': obj['margin_x_y'],
'margin_z': obj['margin_z'],
'depth_of_processing_z': obj['depth_of_processing_z'],
'cutting_tool_extension_length': obj['cutting_tool_extension_length'],
'cutting_tool_handle_type': obj['cutting_tool_handle_type'],
'estimated_processing_time': obj['estimated_processing_time'],
'remark': obj['remark'],
'program_path': program_path.replace('/tmp', '')
})
cnc_processing.get_cnc_processing_file(program_path_tmp, cnc_processing, program_path)
# cnc_workorder.state = 'done'
cnc_workorder.work_state = '已编程'
cnc_workorder.programming_state = '已编程'
# cnc_workorder.time_ids.date_end = datetime.now()
# cnc_workorder.button_finish()
# 根据程序名和加工面匹配到ftp里对应的Nc程序名
def get_cnc_processing_file(self, serverdir, cnc_processing, program_path):
# logging.info('program_path_tmp:%s' % program_path_tmp)
# serverdir = os.path.join('/tmp', folder_name, 'return', processing_panel)
logging.info('serverdir:%s' % serverdir)
for root, dirs, files in os.walk(serverdir):
for f in files:
logging.info('f:%s' % f)
if os.path.splitext(f)[1] == ".pdf":
full_path = os.path.join(serverdir, root, f)
logging.info('pdf:%s' % full_path)
if full_path is not False:
if not cnc_processing.workorder_id.cnc_worksheet:
cnc_processing.workorder_id.cnc_worksheet = base64.b64encode(
open(full_path, 'rb').read())
else:
if f in program_path:
# if cnc_processing.program_name == f.split('.')[0]:
cnc_file_path = os.path.join(serverdir, root, f)
logging.info('cnc_file_path:%s' % cnc_file_path)
logging.info('program_path:%s' % program_path)
logging.info('f:%s' % f)
self.write_file(cnc_file_path, cnc_processing)
# 创建附件(nc文件)
def attachment_create(self, name, data):
attachment = self.env['ir.attachment'].create({
'datas': base64.b64encode(data),
'type': 'binary',
'public': True,
'description': '程序文件',
'name': name
})
return attachment
# 将FTP的nc文件下载到临时目录
def download_file_tmp(self, production_no, processing_panel):
remotepath = os.path.join('/', production_no, 'return', processing_panel)
serverdir = os.path.join('/tmp', production_no, 'return', processing_panel)
ftp_resconfig = self.env['res.config.settings'].get_values()
ftp = FtpController(str(ftp_resconfig['ftp_host']), int(ftp_resconfig['ftp_port']), ftp_resconfig['ftp_user'],
ftp_resconfig['ftp_password'])
download_state = ftp.download_file_tree(remotepath, serverdir)
logging.info('download_state:%s' % download_state)
return download_state
# 将nc文件存到attach的datas里
def write_file(self, nc_file_path, cnc):
nc_file_name = nc_file_path.split('/')
logging.info('nc_file_name:%s' % nc_file_name[-1])
if os.path.exists(nc_file_path):
with open(nc_file_path, 'rb') as file:
data_bytes = file.read()
attachment = self.attachment_create(cnc.program_name + nc_file_name[-1], data_bytes)
cnc.write({'cnc_id': attachment.id})
file.close()
else:
return False
# end_date = datetime.now()
# for workorder in self:
# if workorder.state in ('done', 'cancel'):
# continue
# workorder.end_all()
# vals = {
# 'qty_produced': workorder.qty_produced or workorder.qty_producing or workorder.qty_production,
# 'state': 'done',
# 'date_finished': end_date,
# 'date_planned_finished': end_date,
# 'costs_hour': workorder.workcenter_id.costs_hour
# }
# if not workorder.date_start:
# vals['date_start'] = end_date
# if not workorder.date_planned_start or end_date < workorder.date_planned_start:
# vals['date_planned_start'] = end_date
# workorder.with_context(bypass_duration_calculation=True).write(vals)
return True
class SfWorkOrderBarcodes(models.Model):
"""
智能工厂工单处扫码绑定托盘
"""
_name = "mrp.workorder"
_inherit = ["mrp.workorder", "barcodes.barcode_events_mixin"]
def on_barcode_scanned(self, barcode):
workorder = self.env['mrp.workorder'].browse(self.ids)
# workorder = self.env['mrp.workorder'].search(
# [('routing_type', '=', '装夹预调'), ('production_id', '=', self.production_id.id)])
if workorder:
if workorder.routing_type == '装夹预调':
stock_move_line = self.env['stock.move.line'].search([('lot_name', '=', barcode)])
if stock_move_line.product_id.categ_type == '夹具':
workorder.write({
'tray_serial_number': stock_move_line.lot_name,
'tray_product_id': stock_move_line.product_id.id,
'tray_brand_id': stock_move_line.product_id.brand_id.id,
'tray_type_id': stock_move_line.product_id.fixture_material_id.id,
'tray_model_id': stock_move_line.product_id.fixture_model_id.id
})
workorder.button_start()
# return {
# 'type': 'ir.actions.act_window',
# 'res_model': 'mrp.workorder',
# 'view_mode': 'form',
# 'domain': [('id', 'in', workorder.id)],
# 'target': 'current'
# }
else:
embryo_stock_lot = self.env['stock.lot'].search([('name', '=', barcode)])
if embryo_stock_lot:
embryo_stock_move_line = self.env['stock.move.line'].search(
[('product_id', '=', embryo_stock_lot.product_id.id),
('reference', '=', workorder.production_id.name),
('lot_id', '=', embryo_stock_lot.id),
('product_category_name', '=', '坯料')])
if embryo_stock_move_line:
bom_production = self.env['mrp.production'].search(
[('product_id', '=', embryo_stock_lot.product_id.id),
('origin', '=', workorder.production_id.name)], limit=1, order='id asc')
workpiece_delivery = self.env['sf.workpiece.delivery'].search(
[('workorder_id', '=', workorder.id)], limit=1, order='id asc')
if workpiece_delivery:
embryo_workpiece_code = workpiece_delivery.workpiece_code
if bom_production:
if workpiece_delivery.workpiece_code and bom_production.name not in \
workpiece_delivery.workpiece_code:
embryo_workpiece_code = workpiece_delivery.workpiece_code + ',' + \
bom_production.name
if not workpiece_delivery.workpiece_code:
embryo_workpiece_code = bom_production.name
workpiece_delivery.write({'workpiece_code': embryo_workpiece_code})
else:
raise UserError('工件生产线不一致,请重新确认')
else:
workorder_rfid = self.env['mrp.workorder'].search(
[('production_id', '=', workorder.production_id.id)])
if workorder_rfid:
for item in workorder_rfid:
item.write({'rfid_code': barcode})
class WorkPieceDelivery(models.Model):
_name = "sf.workpiece.delivery"
_description = '工件配送'
workorder_id = fields.Many2one('mrp.workorder', string='工单', readonly=True)
production_id = fields.Many2one('mrp.production', string='制造订单', readonly=True)
production_line_id = fields.Many2one('sf.production.line', compute='_compute_production_line_id',
string='目的生产线', readonly=True,
store=True)
plan_start_processing_time = fields.Datetime('计划开始加工时间', readonly=True)
workpiece_code = fields.Char('同运工件编码')
feeder_station_start = fields.Char('起点接驳站')
feeder_station_destination = fields.Char('目的接驳站')
task_delivery_time = fields.Datetime('任务下发时间')
task_completion_time = fields.Datetime('任务完成时间')
delivery_duration = fields.Float('配送时长', compute='_compute_delivery_duration')
status = fields.Selection(
[('待下发', '待下发'), ('待配送', '待配送'), ('已配送', '已配送')], string='状态',
default='待下发')
# 工件配送
def button_delivery(self):
if self.status == '待下发':
return {
'name': _('确认'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'sf.workpiece.delivery.wizard',
'target': 'new',
'context': {
'default_delivery_id': self.id,
}}
else:
raise UserError('状态为【待下发】的工件记录可进行配送')
# 配送至avg小车
def _delivery_avg(self):
self.write({'task_delivery_time': fields.Datetime.now(), 'status': '待配送'})
@api.depends('production_id.production_line_id')
def _compute_production_line_id(self):
if self.production_id.production_line_id:
self.production_line_id = self.production_id.production_line_id.id
self.plan_start_processing_time = self.production_id.plan_start_processing_time
@api.depends('task_delivery_time', 'task_completion_time')
def _compute_delivery_duration(self):
for obj in self:
if obj.task_delivery_time and obj.task_completion_time:
obj.delivery_duration = round(
(obj.task_completion_time - obj.task_delivery_time).total_seconds() / 60.0, 2)
else:
obj.delivery_duration = 0.0
class CMMprogram(models.Model):
_name = 'sf.cmm.program'
_description = "CMM程序"
program_path = fields.Char('程序文件路径')
post_processing_name = fields.Char('后处理名称')
program_date = fields.Datetime('程序日期')
workorder_id = fields.Many2one('mrp.workorder', string="工单")