Files
test/sf_manufacturing/wizard/sale_order_cancel.py
2025-03-21 18:15:37 +08:00

746 lines
34 KiB
Python

from odoo import models, fields, api
from odoo.exceptions import UserError
class SFSaleOrderCancelWizard(models.TransientModel):
_name = 'sf.sale.order.cancel.wizard'
_description = '销售订单取消向导'
order_id = fields.Many2one('sale.order', string='销售订单')
related_docs = fields.One2many('sf.sale.order.cancel.line', 'wizard_id', string='相关单据')
has_movement = fields.Boolean(compute='_compute_has_movement', string='是否有异动')
display_message = fields.Char(compute='_compute_display_message', string='显示消息')
@api.model
def default_get(self, fields_list):
defaults = super().default_get(fields_list)
if self._context.get('active_id'):
order = self.env['sale.order'].browse(self._context.get('active_id'))
defaults['order_id'] = order.id
# 创建向导时自动创建关联单据行
wizard = self.create(defaults)
self.env['sf.sale.order.cancel.line'].create_from_order(wizard.id, order)
defaults['related_docs'] = wizard.related_docs.ids
return defaults
@api.depends('related_docs.cancel_reason')
def _compute_has_movement(self):
for wizard in self:
docs_has_movement = any(doc.cancel_reason for doc in wizard.related_docs)
order_canceled = wizard.order_id.state == 'cancel'
wizard.has_movement = docs_has_movement or order_canceled
@api.depends('has_movement', 'related_docs', 'related_docs.doc_state')
def _compute_display_message(self):
for wizard in self:
# 如果没有相关记录,显示为空
if not wizard.related_docs:
wizard.display_message = '无下游单据'
continue
# 检查是否所有记录都是已取消状态
all_canceled = all(doc.doc_state == '已取消' for doc in wizard.related_docs)
if all_canceled:
wizard.display_message = '取消的下游单据如下:'
else:
wizard.display_message = '部分或全部下游单据存在异动,无法取消,详情如下:' if wizard.has_movement else '确认所有下游单据全部取消?'
def action_confirm_cancel(self):
self.ensure_one()
# 删除现有关联单据行
self.related_docs.unlink()
# 重新生成最新关联单据行
self.env['sf.sale.order.cancel.line'].create_from_order(self.id, self.order_id)
# 强制重新计算校验字段
self._compute_has_movement()
self._compute_display_message()
# 检查是否存在异动
if self.has_movement:
raise UserError(
"存在下游单据异动,无法取消订单!\n"
"请关闭向导重新进入,以查看最新状态!"
)
# 取消销售订单关联的采购单
purchase_orders = self.env['purchase.order'].search([
('origin', '=', self.order_id.name)
])
if purchase_orders:
purchase_orders.write({'state': 'cancel'})
# 取消销售订单关联的采购申请明细
purchase_request_lines = self.env['purchase.request.line'].search([
('origin', '=', self.order_id.name)
])
if purchase_request_lines:
purchase_request_lines.write({'request_state': 'cancel'})
# 取消销售订单关联的采购申请
purchase_requests = self.env['purchase.request'].search([
('origin', '=', self.order_id.name)
])
if purchase_requests:
purchase_requests.write({'state': 'cancel'})
# 取消销售订单
result = self.order_id.action_cancel()
# 取消制造订单的排程单
mo_plan_orders = self.env['sf.production.plan'].search([
('origin', '=', self.order_id.name)])
if mo_plan_orders:
mo_plan_orders.write({'state': 'cancel'})
# 取消关联的制造订单及其采购单
manufacturing_orders = self.env['mrp.production'].search([
('origin', '=', self.order_id.name)
])
for mo in manufacturing_orders:
# 取消制造订单关联的采购单,但保持关联关系
mo_purchase_orders = self.env['purchase.order'].search([
('origin', '=', mo.name)
])
if mo_purchase_orders:
mo_purchase_orders.write({'state': 'cancel'})
# 取消制造订单的质检单
mo_quality_checks = self.env['quality.check'].search([
('production_id', '=', mo.id)
])
if mo_quality_checks:
mo_quality_checks.write({'quality_state': 'cancel'})
# 取消制造订单的子制造订单
child_mo_ids = self.env['mrp.production'].search([
('origin', '=', mo.name)
])
if child_mo_ids:
# child_mo_ids |= mo.child_ids
# for child_mo in child_mo_ids:
for child_mo in child_mo_ids:
child_mo.action_cancel()
# 取消工单的外协单
for workorder in mo.workorder_ids:
if workorder.picking_ids:
for pkd in workorder.picking_ids:
pkd.write({'state': 'cancel'})
# 取消制造订单
mo.action_cancel()
# 取消制造订单关联的编程单
mo._change_programming_state()
# 取消组件的制造单关联的采购单
for comp_mo in self.env['mrp.production'].search([
('origin', '=', mo.name)
]):
comp_purchase_orders = self.env['purchase.order'].search([
('origin', '=', comp_mo.name)
])
if comp_purchase_orders:
comp_purchase_orders.button_cancel()
return result
class SFSaleOrderCancelLine(models.TransientModel):
_name = 'sf.sale.order.cancel.line'
_description = '销售订单取消行'
wizard_id = fields.Many2one('sf.sale.order.cancel.wizard')
sequence = fields.Integer('序号')
category = fields.Char('大类')
doc_name = fields.Char('单据名称')
operation_type = fields.Char('作业类型')
doc_number = fields.Char('单据编号')
line_number = fields.Char('行号')
product_name = fields.Char('产品名称')
quantity = fields.Float('数量')
doc_state = fields.Char('单据状态')
cancel_reason = fields.Char('禁止取消原因')
quantity_str = fields.Char(
string="数量(字符串)",
compute="_compute_quantity_str",
store=False, # 默认不存储,除非需要搜索/排序
)
@api.depends("quantity")
def _compute_quantity_str(self):
for record in self:
# 处理所有可能的 False/0 情况
record.quantity_str = str(int(record.quantity)) if record.quantity not in [False, 0] else ""
@api.model
def create_from_order(self, wizard_id, order):
sequence = 1
lines = []
map_dict = {
'waiting': '等待其他作业',
'to approve': '待批准',
'technology_to_confirmed': '待工艺确认',
'confirmed': '已确认',
'pending': '等待其他工单',
'none': '待处理',
'draft': '询价',
'cancel': '已取消',
'pass': '通过的',
'fail': '失败的',
'done': '已完成',
'rework': '返工',
'purchase': '采购订单',
'ready': '就绪',
'approved': '已批准',
'pending_cam': '待加工',
'progress': '加工中',
'assigned': '就绪'
}
plan_map_dict = {
'draft': '待排程',
'done': '已排程',
'processing': '加工中',
'finished': '已完成',
'cancel': '已取消'}
purchase_request_map_dict = {
'draft': '草稿',
'to_approve': '待批准',
'approved': '已批准',
'done': '已完成',
'cancel': '已取消',
'rejected': '已驳回',
'in_progress': '处理中'
}
module_name_dict = {
'purchase': '采购',
'quality': '质量',
'mrp': '制造',
'stock': '库存',
'account': '会计',
'hr': '员工',
'project': '项目',
'crm': '销售',
'point_of_sale': '销售',
'website': '网站',
'sf_plan': '计划',
'purchase_request': '采购',
}
# 检查销售订单
if order.invoice_ids:
a = 0
for invoice in order.invoice_ids:
a += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': '销售',
'doc_name': '销售订单',
'operation_type': '',
'doc_number': invoice.name,
'line_number': a,
'product_name': invoice.product_id.name,
'quantity': invoice.quantity,
'doc_state': invoice.state,
'cancel_reason': '已有异动' if invoice.state != 'draft' else ''
}
lines.append(self.create(vals))
sequence += 1
# 检查交货单
if order.picking_ids:
for picking in order.picking_ids:
b = 0
for move in picking.move_ids:
b += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
# 'category': '库存',
'category': module_name_dict[picking._original_module],
# 'doc_name': '交货单',
'doc_name': picking._description,
'operation_type': picking.picking_type_id.name,
'doc_number': picking.name,
'line_number': b,
'product_name': f'[{move.product_id.default_code}] {move.product_id.name}' if move else '',
# 'quantity': picking.product_qty if hasattr(picking, 'product_qty') else 0,
'quantity': move.product_uom_qty,
'doc_state': map_dict.get(picking.state, picking.state),
'cancel_reason': '已有异动' if picking.state not in ['draft', 'cancel', 'waiting'] else ''
}
lines.append(self.create(vals))
sequence += 1
# # 成品质检单
# fin_quality_checks = self.env['quality.check'].search([
# ('picking_id', '=', picking.id)
# ])
# if fin_quality_checks:
# b1 = 0
# for fin_qc in fin_quality_checks:
# b1 += 1
# vals = {
# 'wizard_id': wizard_id,
# 'sequence': sequence,
# 'category': '制造',
# 'doc_name': '质检单',
# 'operation_type': '',
# 'doc_number': fin_qc.name,
# 'line_number': b1,
# 'product_name': f'[{fin_qc.product_id.default_code}] {fin_qc.product_id.name}',
# 'quantity': 1,
# 'doc_state': map_dict.get(fin_qc.quality_state, fin_qc.quality_state),
# 'cancel_reason': '已有异动' if fin_qc.quality_state not in ['none', 'cancel', 'waiting'] else ''
# }
# lines.append(self.create(vals))
# 检查所有的质检单
quality_checks = self.env['quality.check'].search([
('product_id.name', 'like', f'%{order.name}%')])
if quality_checks:
b1 = 0
for quality_check in quality_checks:
b1 += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[quality_check._original_module],
'doc_name': quality_check._description,
'operation_type': '',
'doc_number': f'{quality_check.name}-{quality_check.title}',
'line_number': 1,
'product_name': f'[{quality_check.product_id.default_code}] {quality_check.product_id.name}' if quality_check.product_id.default_code else quality_check.product_id.name,
'quantity': 1,
'doc_state': map_dict.get(quality_check.quality_state, quality_check.quality_state),
'cancel_reason': '已有异动' if quality_check.quality_state not in ['none', 'cancel', 'waiting'] else ''
}
lines.append(self.create(vals))
# 检查所有的排程单
sf_plan_orders = self.env['sf.production.plan'].search([
('origin', '=', order.name)])
if sf_plan_orders:
p1 = 0
for plan_order in sf_plan_orders:
if not plan_order.product_id.default_code:
product_name = plan_order.product_id.name
else:
product_name = f'[{plan_order.product_id.default_code}] {plan_order.product_id.name}'
p1 += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': '排程',
'doc_name': '排程单',
'operation_type': '',
'doc_number': plan_order.name,
'line_number': p1,
'product_name': product_name,
'quantity': 1,
'doc_state': plan_map_dict.get(plan_order.state, plan_order.state),
'cancel_reason': '已有异动' if plan_order.state not in ['draft', 'cancel'] else ''
}
lines.append(self.create(vals))
sequence += 1
# 检查组件的制造单
# component_mos = self.env['mrp.production'].search([
# ('origin', '=', mo.name)])
component_mos = self.env['mrp.production'].search([
('product_id.name', 'like', f'%R-{order.name}%')])
h = 0
if component_mos:
for comp_mo in component_mos:
h += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[comp_mo._original_module],
'doc_name': comp_mo._description,
'operation_type': '',
'doc_number': comp_mo.name,
'line_number': h,
'product_name': f'{comp_mo.product_id.name}',
'quantity': comp_mo.product_qty,
'doc_state': map_dict.get(comp_mo.state, comp_mo.state),
'cancel_reason': '已有异动' if comp_mo.state not in ['technology_to_confirmed',
'cancel'] else ''
}
lines.append(self.create(vals))
sequence += 1
for pinking_id in comp_mo.picking_ids:
y = 0
for move in pinking_id.move_ids:
y += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[pinking_id._original_module],
'doc_name': pinking_id._description,
'doc_number': f'{comp_mo.name}-{pinking_id.name}',
'line_number': y,
'operation_type': pinking_id.picking_type_id.name,
'product_name': move.product_id.name if move.product_id else '',
'quantity': move.product_uom_qty,
'doc_state': map_dict.get(pinking_id.state, pinking_id.state),
'cancel_reason': '已有异动' if pinking_id.state not in ['cancel', 'waiting',
'assigned'] else ''
}
lines.append(self.create(vals))
# 检查销售订单直接关联的采购单
purchase_orders = self.env['purchase.order'].search([
('origin', 'like', f'%{order.name}%')
])
if purchase_orders:
c = 0
for po in purchase_orders:
for order_line in po.order_line:
c += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[po._original_module],
'doc_name': po._description,
'operation_type': '',
'doc_number': po.name,
'line_number': c,
'product_name': f'[{order_line.product_id.default_code}] {order_line.product_id.name}',
'quantity': order_line.product_qty if order_line else 0,
'doc_state': map_dict.get(po.state, po.state),
'cancel_reason': '已有异动' if po.state not in ['draft', 'cancel'] else ''
}
lines.append(self.create(vals))
sequence += 1
# 客供料的入库单
for pod in purchase_orders:
pkds = self.env['stock.picking'].search([
('origin', '=', pod.name)
])
if pkds:
for pkd in pkds:
x3 = 0
for move in pkd.move_ids:
x3 += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[pkd._original_module],
'doc_name': pkd._description,
'doc_number': pkd.name,
'line_number': x3,
'operation_type': pkd.picking_type_id.name,
'product_name': f'[{move.product_id.default_code}] {move.product_id.name}',
'quantity': move.product_uom_qty,
'doc_state': map_dict.get(pkd.state, pkd.state),
'cancel_reason': '已有异动' if pkd.state not in ['waiting', 'cancel', 'confirmed'] else ''
}
lines.append(self.create(vals))
#
for child_pkd in self.env['stock.picking'].search([
('origin', '=', pkd.name)
]):
x4 = 0
for child_move in child_pkd.move_ids:
x4 += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[child_pkd._original_module],
'doc_name': child_pkd._description,
'doc_number': child_pkd.name,
'line_number': x4,
'operation_type': child_pkd.picking_type_id.name,
'product_name': child_move.product_id.name if child_move.product_id else '',
'quantity': child_move.product_uom_qty,
'doc_state': map_dict.get(child_pkd.state, child_pkd.state),
'cancel_reason': '已有异动' if child_pkd.state not in ['waiting',
'cancel', 'confirmed'] else ''
}
lines.append(self.create(vals))
# 检查采购申请明细
purchase_request_lines = self.env['purchase.request.line'].search([
('origin', '=', order.name)
])
if purchase_request_lines:
prl_count = 0
for purchase_request_line in purchase_request_lines:
prl_count += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[purchase_request_line._original_module],
'doc_name': purchase_request_line.request_id._description,
'doc_number': purchase_request_line.request_id.name,
'line_number': prl_count,
'product_name': f'[{purchase_request_line.product_id.default_code}] {purchase_request_line.product_id.name}',
'quantity': purchase_request_line.product_qty,
'doc_state': purchase_request_map_dict.get(purchase_request_line.request_state, purchase_request_line.request_state),
'cancel_reason': '已有异动' if purchase_request_line.request_state not in ['draft', 'cancel', 'approved'] else ''
}
lines.append(self.create(vals))
# 检查制造订单
manufacturing_orders = self.env['mrp.production'].search([
('origin', '=', order.name)
])
d = 0
# 在领料单处只进行一次
flag = True
program_list = []
for mo in manufacturing_orders:
# 添加制造订单本身
d += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[mo._original_module],
'doc_name': mo._description,
'doc_number': mo.name,
'operation_type': '',
'line_number': d,
'product_name': f'[{mo.product_id.default_code}] {mo.product_id.name}',
'quantity': mo.product_qty,
'doc_state': map_dict.get(mo.state, mo.state),
'cancel_reason': '已有异动' if mo.state not in ['technology_to_confirmed', 'cancel'] else ''
}
lines.append(self.create(vals))
sequence += 1
# 检查制造订单关联的采购单
purchase_orders = self.env['purchase.order'].search([
('origin', 'like', f'%{mo.name}%')
])
if purchase_orders:
e = 0
for po in purchase_orders:
for order_line in po.order_line:
e += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[po._original_module],
'doc_name': po._description,
'doc_number': po.name,
'line_number': e,
'operation_type': '',
'product_name': order_line.product_id.name if order_line else '',
'quantity': order_line.product_qty if order_line else 0,
'doc_state': map_dict.get(po.state, po.state),
'cancel_reason': '已有异动' if po.state not in ['draft', 'cancel'] else ''
}
lines.append(self.create(vals))
sequence += 1
# 制造询价单的入库单
for pod in purchase_orders:
pkds = self.env['stock.picking'].search([
('origin', '=', pod.name)
])
if pkds:
for pkd in pkds:
x1 = 0
for move in pkd.move_ids:
x1 += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[pkd._original_module],
'doc_name': pkd._description,
'doc_number': pkd.name,
'line_number': x1,
'operation_type': pkd.picking_type_id.name,
'product_name': move.product_id.name if move.product_id else '',
'quantity': move.product_uom_qty,
'doc_state': map_dict.get(pkd.state, pkd.state),
'cancel_reason': '已有异动' if pkd.state not in ['draft', 'cancel'] else ''
}
lines.append(self.create(vals))
#
for child_pkd in self.env['stock.picking'].search([
('origin', '=', pkd.name)
]):
x2 = 0
for child_move in child_pkd.move_ids:
x2 += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[child_pkd._original_module],
'doc_name': child_pkd._description,
'doc_number': child_pkd.name,
'line_number': x2,
'operation_type': child_pkd.picking_type_id.name,
'product_name': child_move.product_id.name if child_move.product_id else '',
'quantity': child_move.product_uom_qty,
'doc_state': map_dict.get(child_pkd.state, child_pkd.state),
'cancel_reason': '已有异动' if child_pkd.state not in ['draft', 'cancel'] else ''
}
lines.append(self.create(vals))
# 检查制造订单的领料单
if mo.picking_ids and flag:
for picking in mo.picking_ids:
f = 0
for move in picking.move_ids:
f += 1
is_changed = False
if picking.state not in ['draft', 'cancel', 'waiting']:
is_changed = True
if picking.picking_type_id.name == '客供料入库' and picking.state in ['cancel', 'assigned']:
is_changed = False
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[picking._original_module],
'doc_name': picking._description,
'doc_number': picking.name,
'line_number': f,
'operation_type': picking.picking_type_id.name,
'product_name': move.product_id.name if move.product_id else '',
'quantity': move.product_uom_qty,
'doc_state': map_dict.get(picking.state, picking.state),
'cancel_reason': '已有异动' if is_changed else ''
}
lines.append(self.create(vals))
sequence += 1
flag = False
# 检查制造订单的工单
if mo.workorder_ids:
g = 0
for workorder in mo.workorder_ids:
g += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[workorder._original_module],
'doc_name': workorder._description,
'doc_number': f'{mo.name}-{workorder.processing_panel}-{workorder.name}' if workorder.processing_panel else f'{mo.name}-{workorder.name}',
'line_number': g,
'operation_type': '',
'product_name': f'[{mo.product_id.default_code}] {mo.product_id.name}',
'quantity': workorder.qty_production,
'doc_state': map_dict.get(workorder.state, workorder.state),
'cancel_reason': '已有异动' if workorder.state not in ['draft', 'cancel', 'pending',
'waiting'] else ''
}
lines.append(self.create(vals))
sequence += 1
# 工艺外协处理
if workorder.picking_ids:
for pkd in workorder.picking_ids:
z = 0
for move in pkd.move_ids:
z += 1
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': module_name_dict[pkd._original_module],
'doc_name': pkd._description,
'doc_number': f'{mo.name}-{workorder.name}-{pkd.name}',
'line_number': z,
'operation_type': pkd.picking_type_id.name,
'product_name': move.product_id.name if move.product_id else '',
'quantity': move.product_uom_qty,
'doc_state': map_dict.get(pkd.state, pkd.state),
'cancel_reason': '已有异动' if pkd.state not in ['cancel', 'waiting'] else ''
}
lines.append(self.create(vals))
# # 检查制造订单组件的采购单和制造单
# for move in mo.move_raw_ids:
# # 检查组件的采购单
# component_pos = self.env['purchase.order'].search([
# ('origin', '=', mo.name),
# ('order_line.product_id', '=', move.product_id.id)
# ])
# for po in component_pos:
# vals = {
# 'wizard_id': wizard_id,
# 'sequence': sequence,
# 'category': '制造',
# 'doc_name': '组件采购单',
# 'operation_type': '组件采购',
# 'doc_number': po.name,
# 'product_name': move.product_id.name,
# 'quantity': po.order_line[0].product_qty if po.order_line else 0,
# 'doc_state': po.state,
# 'cancel_reason': '已有异动' if po.state not in ['draft', 'cancel'] else ''
# }
# lines.append(self.create(vals))
# sequence += 1
# # 检查制造订单的质检单
# quality_checks = self.env['quality.check'].search([
# ('production_id', '=', mo.id)
# ])
# if quality_checks:
# i = 0
# for check in quality_checks:
# i += 1
# vals = {
# 'wizard_id': wizard_id,
# 'sequence': sequence,
# 'category': '制造',
# 'doc_name': '质检单',
# 'operation_type': '',
# 'doc_number': check.name,
# 'line_number': i,
# 'product_name': f'[{check.product_id.default_code}] {check.product_id.name}',
# 'quantity': 1,
# 'doc_state': map_dict.get(check.quality_state, check.quality_state),
# 'cancel_reason': '已有异动' if check.quality_state not in ['none', 'cancel', 'waiting'] else ''
# }
# lines.append(self.create(vals))
# sequence += 1
# 检查制造订单的编程单
cloud_programming = mo._cron_get_programming_state()
if cloud_programming:
programming_no = cloud_programming['programming_no']
# 检查当前lines中是否已存在相同doc_number的记录
if not any(line.doc_number == programming_no for line in lines):
vals = {
'wizard_id': wizard_id,
'sequence': sequence,
'category': '编程',
'doc_name': '编程单',
'operation_type': '',
'doc_number': programming_no, # 直接使用变量
'line_number': 1,
'product_name': '',
'quantity': 0,
'doc_state': cloud_programming['programming_state'],
'cancel_reason': ''
}
lines.append(self.create(vals))
return lines
# unique_lines = {}
# for line in lines:
# doc_number = line.doc_number
# if doc_number not in unique_lines:
# unique_lines[doc_number] = line
#
# # 返回去重后的记录列表
# return list(unique_lines.values())