Files
test/sf_sale/models/sale_order.py

544 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import base64
from odoo import Command
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
READONLY_FIELD_STATES = {
state: [('readonly', True)]
for state in {'sale', 'done', 'cancel'}
}
class ReSaleOrder(models.Model):
_inherit = 'sale.order'
mrp_production_count = fields.Integer(
"Count of MO generated",
compute='_compute_mrp_production_ids',
groups='mrp.group_mrp_user,sf_base.group_sale_salemanager,sf_base.group_sale_director')
logistics_way = fields.Selection([('自提', '自提'), ('到付', '到付'), ('在线支付', '在线支付')], string='物流方式')
state = fields.Selection(
selection=[
('draft', "报价"),
('sent', "报价已发送"),
('sale', "销售订单"),
('done', "已锁定"),
('cancel', '已废弃'),
],
string="状态",
readonly=True, copy=False, index=True,
tracking=3,
default='draft')
deadline_of_delivery = fields.Date('订单交期', tracking=True)
# person_of_delivery = fields.Char('收货人')
# telephone_of_delivery = fields.Char('电话号码')
# address_of_delivery = fields.Char('联系地址')
payments_way = fields.Selection([('现结', '现结'), ('月结', '月结')], '结算方式', default='现结', tracking=True)
pay_way = fields.Selection([('转账', '转账'), ('微信', '微信'), ('支付宝', '支付宝')], '支付方式')
check_status = fields.Selection([('pending', '待审核'), ('approved', '已审核'), ('fail', '不通过')], '审核状态')
# schedule_status = fields.Selection(
# [('to schedule', '待排程'), ('to process', '待加工'), ('to deliver', '待发货'), ('to receive', '待收货'),
# ('received', '已收货')],
# '进度状态')
payment_term_id = fields.Many2one(
comodel_name='account.payment.term',
string="交付条件",
compute='_compute_payment_term_id',
store=True, readonly=False, precompute=True, check_company=True, tracking=True,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]")
remark = fields.Text('备注')
validity_date = fields.Date(
string="Expiration",
compute='_compute_validity_date',
store=True, readonly=False, copy=False, precompute=True,
states=READONLY_FIELD_STATES, default=fields.Datetime.now)
delivery_warning = fields.Selection([('normal', '正常'), ('warning', '告警'), ('overdue', '逾期')],
default='normal',
string='时效', tracking=True)
order_code = fields.Char('平台订单号', readonly=True)
# 业务平台分配工厂后在智能工厂先创建销售订单
def sale_order_create(self, company_id, delivery_name, delivery_telephone, delivery_address,
deadline_of_delivery, payments_way, pay_way, order_number, state='sale'):
now_time = datetime.datetime.now()
partner = self.get_customer()
data = {
'company_id': company_id.id,
'date_order': now_time,
'name': self.env['ir.sequence'].next_by_code('sale.order', sequence_date=now_time),
'partner_id': partner.id,
'check_status': 'approved',
'state': state,
'user_id': partner.user_id.id,
'person_of_delivery': delivery_name,
'telephone_of_delivery': delivery_telephone,
'address_of_delivery': delivery_address,
'payments_way': payments_way,
'pay_way': pay_way,
'order_code': order_number,
}
if deadline_of_delivery:
# deadline_of_delivery字段存在为false字符串情况
if not isinstance(deadline_of_delivery, str):
data.update({'deadline_of_delivery': deadline_of_delivery})
else:
if deadline_of_delivery != "False":
data.update({'deadline_of_delivery': deadline_of_delivery})
order_id = self.env['sale.order'].sudo().create(data)
return order_id
def write(self, vals):
if self.env.user.has_group('sf_base.group_sale_director'):
if vals.get('check_status'):
if vals['check_status'] in ('pending', False):
vals['check_status'] = 'approved'
return super().write(vals)
# 提交
def submit(self):
self.check_status = 'pending'
def get_customer(self):
partner_tag = self.env['res.partner.category'].sudo().search([('name', '=', '业务平台')], limit=1,
order='id asc')
if not partner_tag:
partner_tag = self.env['res.partner.category'].sudo().create({'name': '平台客户'})
customer = self.env['res.partner'].search([('name', '=', '业务平台')], limit=1, order='id asc')
if customer:
if not customer.vat:
customer.write({'name': '业务平台', 'vat': '91430103MA7BRH9K4M', 'phone': '0731-85115515',
'email': 'jikimo@jikimo.com', 'category_id': [Command.set([partner_tag.id])]})
return customer
else:
partner = self.env['res.partner'].create(
{'name': '业务平台', 'vat': '91430103MA7BRH9K4M', 'phone': '0731-85115515',
'email': 'jikimo@jikimo.com', 'category_id': [Command.set([partner_tag.id])]})
return partner
# 业务平台分配工厂时在创建完产品后再创建销售明细信息
def sale_order_create_line(self, product, item):
machining_accuracy_name = ''
if product.model_machining_precision:
machining_accuracy_name = self.env['sf.machining.accuracy'].sudo().search(
[('sync_id', '=', product.model_machining_precision)]).name
vals = {
'order_id': self.id,
'product_id': product.id,
'name': '%s/%s/%s/%s/%s/%s' % (
self.format_float(product.model_long),
self.format_float(product.model_width),
self.format_float(product.model_height),
self.format_float(product.model_volume),
machining_accuracy_name,
product.materials_id.name),
'price_unit': product.list_price,
'product_uom_qty': item['number'],
# 'model_glb_file': base64.b64decode(item['model_file']) if item['model_file'] else None,
'model_url': item['model_url'],
'glb_url': item['glb_url'],
'remark': item.get('remark'),
'embryo_redundancy_id': item.get('embryo_redundancy_id'),
'is_incoming_material': True if item.get('embryo_redundancy_id') else False,
'manual_quotation': item.get('manual_quotation'),
'model_id': item['model_id'],
}
return self.env['sale.order.line'].with_context(skip_procurement=True).create(vals)
def format_float(self, value):
# 将浮点数转换为字符串
value_str = str(value)
# 检查小数点的位置
if '.' in value_str:
# 获取小数部分
decimal_part = value_str.split('.')[1]
# 判断小数位数是否超过2位
if len(decimal_part) > 2:
# 超过2位则保留2位小数
return "{:.2f}".format(value)
# 否则保持原来的位数
return float(value_str)
@api.constrains('order_line')
def check_order_line(self):
for item in self:
if not item.order_line:
raise ValidationError('请选择【订单行】中的【产品】')
for line in item.order_line:
if not line.product_template_id:
raise UserError('请对【订单行】中的【产品】进行选择')
if not line.name:
raise UserError('请对【订单行】中的【说明】进行输入')
if line.product_qty == 0:
raise UserError('请对【订单行】中的【数量】进行输入')
if not line.product_uom:
raise UserError('请对【订单行】中的【计量单位】进行选择')
if line.price_unit == 0:
raise UserError('请对【订单行】中的【单价】进行输入')
if not line.tax_id:
raise UserError('请对【订单行】中的【税】进行选择')
consignment_purchase_order_count = fields.Integer(
"Number of consignment Purchase Order Generated",
compute='_compute_purchase_order_count')
@api.depends('order_line.purchase_line_ids.order_id')
def _compute_purchase_order_count(self):
for order in self:
order.purchase_order_count = len(order._get_purchase_orders().filtered(
lambda po: po.purchase_type not in ['outsourcing']))
order.consignment_purchase_order_count = len(order._get_purchase_orders().filtered(
lambda po: po.purchase_type in ['outsourcing']))
def action_view_purchase_orders(self):
"""
采购
"""
self.ensure_one()
purchase_order_ids = self._get_purchase_orders().filtered(
lambda po: po.purchase_type not in ['outsourcing']).ids
action = {
'res_model': 'purchase.order',
'type': 'ir.actions.act_window',
}
if len(purchase_order_ids) == 1:
action.update({
'view_mode': 'form',
'res_id': purchase_order_ids[0],
})
else:
action.update({
'name': _("%s生成采购订单", self.name),
'domain': [('id', 'in', purchase_order_ids)],
'view_mode': 'tree,form',
})
return action
def action_view_consignment_purchase_orders(self):
"""
委外加工
"""
self.ensure_one()
outsourcing_purchase_order_ids = self._get_purchase_orders().filtered(
lambda po: po.purchase_type in ['outsourcing']).ids
action = {
'res_model': 'purchase.order',
'type': 'ir.actions.act_window',
}
if len(outsourcing_purchase_order_ids) == 1:
action.update({
'view_mode': 'form',
'res_id': outsourcing_purchase_order_ids[0],
})
else:
action.update({
'name': _("%s生成委外加工订单", self.name),
'domain': [('id', 'in', outsourcing_purchase_order_ids)],
'view_mode': 'tree,form',
})
return action
class ResaleOrderLine(models.Model):
_inherit = 'sale.order.line'
# part_number = fields.Char('零件图号', related='product_id.part_number', readonly=True)
part_name = fields.Char('零件名称', related='product_id.part_name', readonly=True)
model_glb_file = fields.Binary('模型的glb文件', compute='_compute_model_glb_file', store=True)
glb_url = fields.Char('glb文件地址', compute='_compute_model_glb_file', store=True)
# product_template_id = fields.Many2one(
# string="产品",
# comodel_name='product.template',
# compute='_compute_product_template_id',
# readonly=False,
# search='_search_product_template_id',
# # previously related='product_id.product_tmpl_id'
# # not anymore since the field must be considered editable for product configurator logic
# # without modifying the related product_id when updated.
# domain=[('sale_ok', '=', True), ('categ_type', '=', '成品')])
check_status = fields.Selection(related='order_id.check_status')
remark = fields.Char('备注')
is_incoming_material = fields.Boolean('客供料', compute='_compute_is_incoming_material', store=True)
embryo_redundancy_id = fields.Many2one('sf.embryo.redundancy', '坯料冗余')
manual_quotation = fields.Boolean('人工编程', default=False)
model_url = fields.Char('模型文件地址')
model_id = fields.Char('模型id')
@api.depends('embryo_redundancy_id')
def _compute_is_incoming_material(self):
for line in self:
line.is_incoming_material = True if line.embryo_redundancy_id else False
@api.depends('product_template_id')
def _compute_model_glb_file(self):
for line in self:
if line.product_template_id:
if not line.model_glb_file:
line.model_glb_file = line.product_id.product_tmpl_id.model_file
if not line.glb_url:
line.glb_url = line.product_id.product_tmpl_id.glb_url
if not line.price_unit:
line.price_unit = line.product_id.product_tmpl_id.list_price
class ProductTemplate(models.Model):
_inherit = 'product.template'
manual_quotation = fields.Boolean('人工编程', default=False)
part_name = fields.Char(string='零件名称', readonly=True)
class RePurchaseOrder(models.Model):
_inherit = 'purchase.order'
mrp_production_count = fields.Integer(
"Count of MO Source",
compute='_compute_mrp_production_count',
groups='mrp.group_mrp_user,sf_base.group_purchase,sf_base.group_purchase_director')
remark = fields.Text('备注')
user_id = fields.Many2one(
'res.users', string='买家', index=True, tracking=True,
compute='_compute_user_id',
store=True)
purchase_type = fields.Selection(
[('standard', '标准采购'), ('consignment', '工序外协'), ('outsourcing', '委外加工'), ('outside', '外购订单')],
string='采购类型', default='standard', store=True, compute='_compute_purchase_type')
# 合同编号
contract_number = fields.Char(string='合同编号', size=20)
# 合同概况
contract_summary = fields.Text(string='合同概况')
# 选择是否为紧急采购
urgent_purchase = fields.Selection([('no', ''), ('yes', '')], string='紧急采购', default='no')
@api.depends('origin')
def _compute_purchase_type(self):
for purchase in self:
order_id = self.env['sale.order'].sudo().search([('name', '=', purchase.origin)])
if order_id:
product_list = [line.product_id.id for line in purchase.order_line]
for order_line in order_id.order_line:
if order_line.supply_method == 'purchase':
if order_line.product_id.id in product_list:
purchase.purchase_type = 'outside'
break
elif order_line.supply_method == 'outsourcing':
if order_line.product_id.id in product_list:
purchase.purchase_type = 'outsourcing'
break
delivery_warning = fields.Selection([('normal', '正常'), ('warning', '预警'), ('overdue', '已逾期')],
string='交期状态', default='normal',
tracking=True)
@api.depends('partner_id')
def _compute_user_id(self):
if not self.user_id:
if self.partner_id:
self.user_id = self.partner_id.purchase_user_id.id
# self.state = 'purchase'
else:
self.user_id = self.env.user.id
@api.constrains('order_line')
def check_order_line(self):
for item in self:
if not item.order_line:
raise UserError('该询价单未添加【产品】,请进行添加')
for line in item.order_line:
if not line.product_id:
raise UserError('【产品】未添加,请进行添加')
if not line.name:
raise UserError('请对【产品】中的【说明】进行输入')
if line.product_qty == 0:
raise UserError('请对【产品】中的【数量】进行输入')
if not line.product_uom:
raise UserError('请对【产品】中的【计量单位】进行选择')
if line.price_unit == 0:
raise UserError('请对【产品】中的【单价】进行输入')
if not line.taxes_id:
raise UserError('请对【产品】中的【税】进行选择')
def get_purchase_order(self, consecutive_process_parameters, production, product_id_to_production_names):
for pp in consecutive_process_parameters:
server_product_process = []
purchase_order = pp._get_surface_technics_purchase_ids()
if purchase_order:
# purchase_order.write({'state': 'draft'})
pp.purchase_id = [(6, 0, [purchase_order.id])]
else:
server_template = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', pp.surface_technics_parameters_id.id),
('detailed_type', '=', 'service')])
server_product_process.append((0, 0, {
'product_id': server_template.product_variant_id.id,
'product_qty': 1,
'product_uom': server_template.uom_id.id,
'related_product': production.product_id.id,
'part_number': pp.part_number,
'part_name': pp.part_name,
}))
# 获取服务商品最后一个供应商的采购员
purchase_user_id = server_template.seller_ids[-1].partner_id.purchase_user_id
purchase_order = self.env['purchase.order'].sudo().create({
'partner_id': server_template.seller_ids[0].partner_id.id,
'origin': production.name,
'state': 'draft',
'purchase_type': 'consignment',
'order_line': server_product_process,
'user_id': purchase_user_id.id
})
pp.purchase_id = [(6, 0, [purchase_order.id])]
# self.env.cr.commit()
# @api.onchange('order_line')
# def _onchange_order_line(self):
# for order in self:
# if order.order_line:
# line = order.order_line
# product = line.product_id
# product_id = product.ids
# if len(product_id) != len(line):
# raise ValidationError('【%s】已存在请勿重复添加' % product[-1].name)
def button_confirm(self):
result = super(RePurchaseOrder, self).button_confirm()
for item in self:
for line in item.order_line:
if line.product_id.categ_type == '表面工艺':
if item.origin:
for production_name in item.origin.split(','):
production = self.env['mrp.production'].search([('name', '=', production_name.strip())])
for workorder in production.workorder_ids.filtered(
lambda wd: wd.routing_type == '表面工艺' and wd.state == 'waiting' and line.product_id.server_product_process_parameters_id == wd.surface_technics_parameters_id):
workorder.state = 'ready'
return result
# # 采购订单逾期预警和已逾期
def _overdue_or_warning_func(self):
purchase_order = self.sudo().search(
[('state', 'in', ['purchase']), ('date_planned', '!=', False),
('receipt_status', 'in', ('partial', 'pending'))])
last_overdue_order = None
last_warning_order = None
for item in purchase_order:
current_time = datetime.datetime.now()
if item.date_planned <= current_time: # 已逾期
item.delivery_warning = 'overdue'
last_overdue_order = item
elif (item.date_planned - current_time).total_seconds() < 48 * 3600: # 预警
item.delivery_warning = 'warning'
last_warning_order = item
purchase_order_done = self.sudo().search([('state', 'in', ['purchase']), ('receipt_status', '=', 'full')])
purchase_order_overdue = purchase_order_done.filtered(lambda x: x.delivery_warning in ['overdue', 'warning'])
if purchase_order_overdue:
purchase_order_overdue.write({'delivery_warning': 'normal'})
return last_overdue_order, last_warning_order
class PurchaseOrderLine(models.Model):
_inherit = 'purchase.order.line'
part_name = fields.Char('零件名称', related='product_id.part_name', readonly=True)
# part_number = fields.Char('零件图号',related='product_id.part_number', readonly=True)
class ResPartnerToSale(models.Model):
_inherit = 'res.partner'
purchase_user_id = fields.Many2one('res.users', '采购员')
@api.constrains('name')
def _check_name(self):
obj = self.sudo().search([('name', '=', self.name), ('id', '!=', self.id), ('active', '=', True)])
if obj:
raise UserError('该名称已存在,请重新输入')
@api.constrains('vat')
def _check_vat(self):
obj = self.sudo().search([('vat', '=', self.vat), ('id', '!=', self.id), ('active', '=', True)])
if obj:
raise UserError('该税ID已存在,请重新输入')
# @api.constrains('email')
# def _check_email(self):
# if self.customer_rank > 0:
# obj = self.sudo().search([('email', '=', self.email), ('id', '!=', self.id), ('active', '=', True)])
# if obj:
# raise UserError('该邮箱已存在,请重新输入')
# @api.model
# def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None):
# if self._context.get('is_customer'):
# if self.env.user.has_group('sf_base.group_sale_director'):
# domain = [('customer_rank', '>', 0)]
# elif self.env.user.has_group('sf_base.group_sale_salemanager'):
# customer = self.env['res.partner'].search(
# [('customer_rank', '>', 0), ('user_id', '=', self.env.user.id)])
# if customer:
# ids = [t.id for t in customer]
# domain = [('id', 'in', ids)]
# else:
# domain = [('id', '=', False)]
# return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
# elif self._context.get('is_supplier') or self.env.user.has_group('sf_base.group_purchase_director'):
# if self.env.user.has_group('sf_base.group_purchase_director'):
# domain = [('supplier_rank', '>', 0)]
# elif self.env.user.has_group('sf_base.group_purchase'):
# supplier = self.env['res.partner'].search(
# [('supplier_rank', '>', 0), ('purchase_user_id', '=', self.env.user.id)])
# if supplier:
# ids = [t.id for t in supplier]
# domain = [('id', 'in', ids)]
# else:
# domain = [('id', '=', False)]
# return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
# return super()._name_search(name, args, operator, limit, name_get_uid)
@api.onchange('user_id')
def _get_salesman(self):
if self.customer_rank > 0:
if self.env.user.has_group('sf_base.group_sale_salemanager'):
self.user_id = self.env.user.id
@api.onchange('purchase_user_id')
def _get_purchaseman(self):
if self.supplier_rank > 0:
if self.env.user.has_group('sf_base.group_purchase'):
self.purchase_user_id = self.env.user.id
class ResUserToSale(models.Model):
_inherit = 'res.users'
@api.model
def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None):
domain = []
if self._context.get('is_sale'):
if self.env.user.has_group('sf_base.group_sale_director'):
pass
elif self.env.user.has_group('sf_base.group_sale_salemanager'):
if self.id != self.env.user.id:
domain = [('id', '=', self.id)]
else:
domain = [('id', '=', self.env.user.id)]
return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
elif self._context.get('supplier_rank'):
if self.env.user.has_group('sf_base.group_purchase_director'):
pass
elif self.env.user.has_group('sf_base.group_purchase'):
if self.id != self.env.user.id:
domain = [('id', '=', self.id)]
else:
domain = [('id', '=', self.env.user.id)]
return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
return super()._name_search(name, args, operator, limit, name_get_uid)