Files
test/sf_manufacturing/models/purchase_order.py
2025-05-12 08:41:26 +08:00

234 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import re
from collections import defaultdict
from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.tools import OrderedSet
# _get_surface_technics_purchase_ids
class PurchaseOrder(models.Model):
_inherit = 'purchase.order'
production_count = fields.Integer(
"关联制造订单",
compute='_compute_workorder_count',
)
def button_cancel(self):
account_moves = set() # 使用集合以避免重复,并提高查找速度
accounts = self.env['account.move'].search(
[('id', 'in', self.invoice_ids.ids), ('state', 'not in', ['cancel', False])])
# 直接筛选掉状态为'cancel'或False的记录避免多次迭代
for account in accounts:
account_moves.add(account.name) # 使用set的add方法避免重复添加
# 如果你需要list形式的结果可以将set转换为list
account_moves = list(account_moves)
if account_moves:
raise UserError(_("请联系工厂生产经理对该采购单的供应商账单进行取消"))
return super(PurchaseOrder, self).button_cancel()
def action_view_production(self):
origins = [order.name for order in self.picking_ids]
production_id = self.env['mrp.production'].search([('origin', 'in', origins)])
if not production_id:
return
action = {
'res_model': 'mrp.production',
'type': 'ir.actions.act_window',
}
if len(production_id) == 1:
action.update({
'view_mode': 'form',
'res_id': production_id.id,
})
else:
action.update({
'name': _("制造订单列表"),
'domain': [('id', 'in', production_id.ids)],
'view_mode': 'tree,form',
})
return action
def _compute_workorder_count(self):
for purchase in self:
origins = [order.name for order in purchase.picking_ids]
production_id = self.env['mrp.production'].search([('origin', 'in', origins)])
purchase.production_count = len(production_id)
def process_replenish(self,production,total_qty):
record = self
bom_line_id = production.bom_id.bom_line_ids
replenish = self.env['stock.warehouse.orderpoint'].search([
('product_id', '=', bom_line_id.product_id.id),
(
'location_id', '=', self.env.ref('sf_stock.stock_location_outsourcing_material_receiving_area').id),
# ('state', 'in', ['draft', 'confirmed'])
], limit=1)
if not replenish:
replenish_model = self.env['stock.warehouse.orderpoint']
replenish = replenish_model.create({
'product_id': bom_line_id.product_id.id,
'location_id': self.env.ref(
'sf_stock.stock_location_outsourcing_material_receiving_area').id,
'route_id': self.env.ref('sf_stock.stock_route_process_outsourcing').id,
'group_id': record.group_id.id,
'qty_to_order': total_qty,
'origin': record.name,
})
else:
replenish.write({
'product_id': bom_line_id.product_id.id,
'location_id': self.env.ref(
'sf_stock.stock_location_outsourcing_material_receiving_area').id,
'route_id': self.env.ref('sf_stock.stock_route_process_outsourcing').id,
'group_id': record.group_id.id,
'qty_to_order': total_qty + replenish.qty_to_order,
'origin': record.name + ',' + replenish.origin,
})
replenish.action_replenish()
def outsourcing_service_replenishment(self):
record = self
if record.purchase_type != 'consignment':
return
grouped_lines = {}
for line in record.order_line:
if line.related_product.id not in grouped_lines:
grouped_lines[line.related_product.id] = []
grouped_lines[line.related_product.id].append(line)
for product_id,lines in grouped_lines.items():
production = self.env['mrp.production'].search([('product_id', '=', product_id)], limit=1)
if not production:
continue
total_qty = sum(line.product_qty for line in lines)
record.process_replenish(production,total_qty)
for product_id,lines in grouped_lines.items():
productions = self.env['mrp.production'].search([('product_id', '=', product_id)], limit=1)
if not productions:
continue
# production.bom_id.bom_line_ids.product_id
location_id = self.env['stock.location'].search([('name', '=', '制造前')])
quants = self.env['stock.quant'].search([
('product_id', '=', productions.bom_id.bom_line_ids.product_id.id),
('location_id', '=', location_id.id)
])
total_qty = sum(quants.mapped('quantity')) # 计算该位置的总库存量
is_available = total_qty > 0
if not is_available:
raise UserError('请先完成坯料入库')
for production_id in productions:
work_ids = production_id.workorder_ids.filtered(
lambda wk: wk.state not in ['done', 'rework', 'cancel'])
if not work_ids:
continue
min_sequence_wk = min(work_ids, key=lambda wk: wk.sequence)
if min_sequence_wk.is_subcontract:
picking_id = production_id.picking_ids.filtered(
lambda wk: wk.location_id.name == '制造前' and wk.location_dest_id.name == '外协加工区')
move_out = picking_id.move_ids
for mo in move_out:
if mo.state != 'done':
mo.write({'state': 'assigned', 'production_id': False})
if not mo.move_line_ids:
self.env['stock.move.line'].create(
mo.get_move_line(production_id, min_sequence_wk))
# product = self.env['mrp.production'].search([('product_id', '=', product_id)], limit=1)
# match = re.search(r'(S\d{5}-\d)',product.name)
# pass
def button_confirm(self):
for record in self:
for line in record.order_line:
if line.product_qty <= 0:
raise UserError('请对【产品】中的【数量】进行输入')
if line.price_unit <= 0:
raise UserError('请对【产品】中的【单价】进行输入')
record.outsourcing_service_replenishment()
res = super(PurchaseOrder, self).button_confirm()
for line in self.order_line:
# 将产品不追踪序列号的行项目设置qty_done
if line.move_ids and line.move_ids[0].product_id.tracking == 'none':
line.move_ids[0].quantity_done = line.move_ids[0].product_qty
return res
origin_sale_id = fields.Many2one('sale.order', string='销售订单号', store=True, compute='_compute_origin_sale_id')
origin_sale_ids = fields.Many2many('sale.order', string='销售订单号(多个)', store=True,
compute='_compute_origin_sale_id')
@api.depends('origin')
def _compute_origin_sale_id(self):
for purchase in self:
if not purchase.origin:
continue
elif 'MO' in purchase.origin:
mp_name_list = [name.strip() for name in purchase['origin'].split(',')]
os_ids = list({mp_id.sale_order_id.id for mp_id in self.env['mrp.production'].sudo().search([
('name', 'in', mp_name_list)])})
if len(os_ids) == 1:
purchase.origin_sale_id = os_ids[0]
elif len(os_ids) >= 2:
purchase.origin_sale_ids = os_ids
elif 'S' in purchase.origin:
os_name_list = [name.strip() for name in purchase['origin'].split(',')]
os_ids = self.env['sale.order'].sudo().search([('name', 'in', os_name_list)])
if len(os_ids) == 1:
purchase.origin_sale_id = os_ids.id
elif len(os_ids) >= 2:
purchase.origin_sale_ids = os_ids.ids
elif 'IN' in purchase.origin:
sp_name_list = [name.strip() for name in purchase['origin'].split(',')]
os_ids = list({sp_id.sale_order_id.id for sp_id in self.env['stock.picking'].sudo().search([
('name', 'in', sp_name_list)])})
if len(os_ids) == 1:
purchase.origin_sale_id = os_ids[0]
elif len(os_ids) >= 2:
purchase.origin_sale_ids = os_ids
class PurchaseOrderLine(models.Model):
_inherit = 'purchase.order.line'
part_number = fields.Char('零件图号', store=True, compute='_compute_part_number')
part_name = fields.Char('零件名称', store=True, compute='_compute_part_number')
related_product = fields.Many2one('product.product', string='关联产品',
help='经此产品工艺加工成的成品')
manual_part_number = fields.Char()
manual_part_name = fields.Char()
@api.depends('product_id')
def _compute_part_number(self):
for record in self:
if record.part_number and record.part_name:
continue
if record.product_id.categ_id.name == '坯料':
product_name = ''
match = re.search(r'(S\d{5}-\d)', record.product_id.name)
# 如果匹配成功,提取结果
if match:
product_name = match.group(0)
sale_order_name = ''
match_sale = re.search(r'S(\d+)', record.product_id.name)
if match_sale:
sale_order_name = match_sale.group(0)
sale_order = self.env['sale.order'].sudo().search(
[('name', '=', sale_order_name)])
if sale_order:
filtered_order_line = sale_order.order_line.filtered(
lambda order_line: re.search(f'{product_name}$', order_line.product_id.name)
)
record.part_number = filtered_order_line.product_id.part_number
record.part_name = filtered_order_line.product_id.part_name
else:
record.part_number = record.product_id.part_number
record.part_name = record.product_id.part_name
if record.manual_part_name:
# 如果手动设置了 part_name使用手动设置的值
record.part_name = record.manual_part_name
if record.manual_part_number:
record.part_number = record.manual_part_number