Files
test/sf_manufacturing/models/mrp_routing_workcenter.py
jinling.yang ecf683e663 修复采购
2024-11-27 17:54:31 +08:00

98 lines
4.7 KiB
Python

import logging
from odoo import fields, models, api
from odoo.exceptions import UserError
class ResMrpRoutingWorkcenter(models.Model):
_inherit = 'mrp.routing.workcenter'
routing_type = fields.Selection([
('装夹预调', '装夹预调'),
('CNC加工', 'CNC加工'),
('解除装夹', '解除装夹'),
('切割', '切割'),
('表面工艺', '表面工艺'),
('线切割', '线切割'),
('人工线下加工', '人工线下加工')
], string="工序类型")
routing_tag = fields.Selection([
('standard', '标准'),
('special', '特殊')
], string="标签")
is_repeat = fields.Boolean('重复', default=False)
workcenter_id = fields.Many2one('mrp.workcenter', required=False)
workcenter_ids = fields.Many2many('mrp.workcenter', 'rel_workcenter_route', required=True)
bom_id = fields.Many2one('mrp.bom', required=False)
surface_technics_id = fields.Many2one('sf.production.process', string="表面工艺")
reserved_duration = fields.Float('预留时长', default=30, tracking=True)
def get_no(self):
international_standards = self.search(
[('code', '!=', ''), ('active', 'in', [True, False])],
limit=1,
order="id desc")
if not international_standards:
num = "%03d" % 1
else:
m = int(international_standards.code) + 1
num = "%03d" % m
return num
code = fields.Char('编码', default=get_no)
# 获得当前登陆者公司
def get_company_id(self):
self.company_id = self.env.user.company_id.id
company_id = fields.Many2one('res.company', compute="get_company_id", related=False, store=True)
# 排产的时候, 根据坯料的长宽高比对一下机床的最大加工尺寸.不符合就不要分配给这个加工中心(机床).
# 工单对应的工作中心,根据工序中的工作中心去匹配,
# 如果只配置了一个工作中心,则默认采用该工作中心;
# 如果有多个工作中心,
# 则根据该工作中心的工单个数进行分配(优先分配给工单个数最少的);
def get_workcenter(self, workcenter_ids, routing_type, product):
if workcenter_ids:
if len(workcenter_ids) == 1:
return workcenter_ids[0]
elif len(workcenter_ids) >= 2:
# workcenter_ids_str = ','.join([str(s) for s in workcenter_ids])
if routing_type == 'CNC加工':
workcenter = self.env['mrp.workcenter'].search([('id', 'in', workcenter_ids)])
workcenter_ids = []
for item in workcenter:
logging.info('get_workcenter-vals:%s' % item.machine_tool_id.name)
if item.machine_tool_id:
machine_tool = self.env['sf.machine_tool'].search(
[('x_axis', '>', product.bom_ids.bom_line_ids.product_id.length),
('y_axis', '>', product.bom_ids.bom_line_ids.product_id.width),
('z_axis', '>', product.bom_ids.bom_line_ids.product_id.height),
('id', '=', item.machine_tool_id.id)])
if machine_tool:
workcenter_ids.append(item.id)
if len(workcenter_ids) == 1:
return workcenter_ids[0]
self.env.cr.execute("""
SELECT workcenter_id FROM mrp_workorder where workcenter_id
in %s group by workcenter_id
order by count(*),workcenter_id asc limit 1 """, [tuple(workcenter_ids)])
res = self.env.cr.fetchone()
if res:
workcenter_id = res[0]
else:
workcenter_id = workcenter_ids[0]
return workcenter_id
@api.model
def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None):
if self._context.get('production_id'):
route_ids = []
technology_design = self.env['sf.technology.design'].search(
[('production_id', '=', self._context.get('production_id'))])
for t in technology_design.filtered(lambda a: a.routing_tag == 'special'):
if not t.process_parameters_id:
route_ids.append(t.route_id.surface_technics_id.id)
domain = [('id', 'not in', route_ids), ('routing_tag', '=', 'special')]
return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
return super()._name_search(name, args, operator, limit, name_get_uid)