Files
test/sf_sale/models/sale_order.py
廖丹龙 ed3ea32f45 Accept Merge Request #1543: (feature/delivery_status -> develop)
Merge Request: 智能工厂空模型处理

Created By: @廖丹龙
Reviewed By: @胡尧
Approved By: @胡尧 
Accepted By: @廖丹龙
URL: https://jikimo-hn.coding.net/p/jikimo_sfs/d/jikimo_sf/git/merge/1543
2024-11-25 11:52:20 +08:00

406 lines
19 KiB
Python

import datetime
import base64
from odoo import Command
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
READONLY_FIELD_STATES = {
state: [('readonly', True)]
for state in {'sale', 'done', 'cancel'}
}
class ReSaleOrder(models.Model):
_inherit = 'sale.order'
mrp_production_count = fields.Integer(
"Count of MO generated",
compute='_compute_mrp_production_ids',
groups='mrp.group_mrp_user,sf_base.group_sale_salemanager,sf_base.group_sale_director')
logistics_way = fields.Selection([('自提', '自提'), ('到付', '到付'), ('在线支付', '在线支付')], string='物流方式')
state = fields.Selection(
selection=[
('draft', "报价"),
('sent', "报价已发送"),
('sale', "销售订单"),
('done', "已锁定"),
('cancel', '已废弃'),
],
string="状态",
readonly=True, copy=False, index=True,
tracking=3,
default='draft')
deadline_of_delivery = fields.Date('订单交期', tracking=True)
# person_of_delivery = fields.Char('收货人')
# telephone_of_delivery = fields.Char('电话号码')
# address_of_delivery = fields.Char('联系地址')
payments_way = fields.Selection([('现结', '现结'), ('月结', '月结')], '结算方式', default='现结', tracking=True)
pay_way = fields.Selection([('转账', '转账'), ('微信', '微信'), ('支付宝', '支付宝')], '支付方式')
check_status = fields.Selection([('pending', '待审核'), ('approved', '已审核'), ('fail', '不通过')], '审核状态')
# schedule_status = fields.Selection(
# [('to schedule', '待排程'), ('to process', '待加工'), ('to deliver', '待发货'), ('to receive', '待收货'),
# ('received', '已收货')],
# '进度状态')
payment_term_id = fields.Many2one(
comodel_name='account.payment.term',
string="交付条件",
compute='_compute_payment_term_id',
store=True, readonly=False, precompute=True, check_company=True, tracking=True,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]")
remark = fields.Text('备注')
validity_date = fields.Date(
string="Expiration",
compute='_compute_validity_date',
store=True, readonly=False, copy=False, precompute=True,
states=READONLY_FIELD_STATES, default=fields.Datetime.now)
delivery_warning = fields.Selection([('normal', '正常'), ('warning', '告警'), ('overdue', '逾期')], string='时效',
tracking=True)
# 业务平台分配工厂后在智能工厂先创建销售订单
def sale_order_create(self, company_id, delivery_name, delivery_telephone, delivery_address,
deadline_of_delivery, payments_way, pay_way, state='sale'):
now_time = datetime.datetime.now()
partner = self.get_customer()
data = {
'company_id': company_id.id,
'date_order': now_time,
'name': self.env['ir.sequence'].next_by_code('sale.order', sequence_date=now_time),
'partner_id': partner.id,
'check_status': 'approved',
'state': state,
'user_id': partner.user_id.id,
'person_of_delivery': delivery_name,
'telephone_of_delivery': delivery_telephone,
'address_of_delivery': delivery_address,
'payments_way': payments_way,
'pay_way': pay_way,
}
if deadline_of_delivery:
# deadline_of_delivery字段存在为false字符串情况
if not isinstance(deadline_of_delivery, str):
data.update({'deadline_of_delivery': deadline_of_delivery})
else:
if deadline_of_delivery != "False":
data.update({'deadline_of_delivery': deadline_of_delivery})
order_id = self.env['sale.order'].sudo().create(data)
return order_id
def write(self, vals):
if self.env.user.has_group('sf_base.group_sale_director'):
if vals.get('check_status'):
if vals['check_status'] in ('pending', False):
vals['check_status'] = 'approved'
return super().write(vals)
# 提交
def submit(self):
self.check_status = 'pending'
def get_customer(self):
partner_tag = self.env['res.partner.category'].sudo().search([('name', '=', '业务平台')], limit=1,
order='id asc')
if not partner_tag:
partner_tag = self.env['res.partner.category'].sudo().create({'name': '平台客户'})
customer = self.env['res.partner'].search([('name', '=', '业务平台')], limit=1, order='id asc')
if customer:
if not customer.vat:
customer.write({'name': '业务平台', 'vat': '91430103MA7BRH9K4M', 'phone': '0731-85115515',
'email': 'jikimo@jikimo.com', 'category_id': [Command.set([partner_tag.id])]})
return customer
else:
partner = self.env['res.partner'].create(
{'name': '业务平台', 'vat': '91430103MA7BRH9K4M', 'phone': '0731-85115515',
'email': 'jikimo@jikimo.com', 'category_id': [Command.set([partner_tag.id])]})
return partner
# 业务平台分配工厂时在创建完产品后再创建销售明细信息
def sale_order_create_line(self, product, item):
machining_accuracy_name = ''
if product.model_machining_precision:
machining_accuracy_name = self.env['sf.machining.accuracy'].sudo().search(
[('sync_id', '=', product.model_machining_precision)]).name
vals = {
'order_id': self.id,
'product_id': product.id,
'name': '%s/%s/%s/%s/%s/%s' % (
product.model_long, product.model_width, product.model_height, product.model_volume,
machining_accuracy_name,
product.materials_id.name),
'price_unit': product.list_price,
'product_uom_qty': item['number'],
'model_glb_file': base64.b64decode(item['model_file']) if item['model_file'] else None,
'remark': item.get('remark'),
'embryo_redundancy_id': item.get('embryo_redundancy_id'),
'is_incoming_material': True if item.get('embryo_redundancy_id') else False,
'manual_quotation': item.get('manual_quotation')
}
return self.env['sale.order.line'].with_context(skip_procurement=True).create(vals)
@api.constrains('order_line')
def check_order_line(self):
for item in self:
if not item.order_line:
raise ValidationError('请选择【订单行】中的【产品】')
for line in item.order_line:
if not line.product_template_id:
raise UserError('请对【订单行】中的【产品】进行选择')
if not line.name:
raise UserError('请对【订单行】中的【说明】进行输入')
if line.product_qty == 0:
raise UserError('请对【订单行】中的【数量】进行输入')
if not line.product_uom:
raise UserError('请对【订单行】中的【计量单位】进行选择')
if line.price_unit == 0:
raise UserError('请对【订单行】中的【单价】进行输入')
if not line.tax_id:
raise UserError('请对【订单行】中的【税】进行选择')
class ResaleOrderLine(models.Model):
_inherit = 'sale.order.line'
model_glb_file = fields.Binary('模型的glb文件', compute='_compute_model_glb_file', store=True)
# product_template_id = fields.Many2one(
# string="产品",
# comodel_name='product.template',
# compute='_compute_product_template_id',
# readonly=False,
# search='_search_product_template_id',
# # previously related='product_id.product_tmpl_id'
# # not anymore since the field must be considered editable for product configurator logic
# # without modifying the related product_id when updated.
# domain=[('sale_ok', '=', True), ('categ_type', '=', '成品')])
check_status = fields.Selection(related='order_id.check_status')
remark = fields.Char('备注')
is_incoming_material = fields.Boolean('客供料', compute='_compute_is_incoming_material', store=True)
embryo_redundancy_id = fields.Many2one('sf.embryo.redundancy', '坯料冗余')
manual_quotation = fields.Boolean('人工编程', default=False)
@api.depends('embryo_redundancy_id')
def _compute_is_incoming_material(self):
for line in self:
line.is_incoming_material = True if line.embryo_redundancy_id else False
@api.depends('product_template_id')
def _compute_model_glb_file(self):
for line in self:
if line.product_template_id:
if not line.model_glb_file:
line.model_glb_file = line.product_id.product_tmpl_id.model_file
if not line.price_unit:
line.price_unit = line.product_id.product_tmpl_id.list_price
class ProductTemplate(models.Model):
_inherit = 'product.template'
manual_quotation = fields.Boolean('人工编程', default=False)
class RePurchaseOrder(models.Model):
_inherit = 'purchase.order'
mrp_production_count = fields.Integer(
"Count of MO Source",
compute='_compute_mrp_production_count',
groups='mrp.group_mrp_user,sf_base.group_purchase,sf_base.group_purchase_director')
remark = fields.Text('备注')
user_id = fields.Many2one(
'res.users', string='买家', index=True, tracking=True,
compute='_compute_user_id',
store=True)
purchase_type = fields.Selection([('standard', '标准采购'), ('consignment', '委外加工')], string='采购类型',
default='standard')
@api.depends('partner_id')
def _compute_user_id(self):
if not self.user_id:
if self.partner_id:
self.user_id = self.partner_id.purchase_user_id.id
# self.state = 'purchase'
else:
self.user_id = self.env.user.id
@api.constrains('order_line')
def check_order_line(self):
for item in self:
if not item.order_line:
raise UserError('该询价单未添加【产品】,请进行添加')
for line in item.order_line:
if not line.product_id:
raise UserError('【产品】未添加,请进行添加')
if not line.name:
raise UserError('请对【产品】中的【说明】进行输入')
if line.product_qty == 0:
raise UserError('请对【产品】中的【数量】进行输入')
if not line.product_uom:
raise UserError('请对【产品】中的【计量单位】进行选择')
if line.price_unit == 0:
raise UserError('请对【产品】中的【单价】进行输入')
if not line.taxes_id:
raise UserError('请对【产品】中的【税】进行选择')
def get_purchase_order(self, consecutive_process_parameters, production, product_id_to_production_names):
server_product_process = []
production_process = product_id_to_production_names.get(
production.product_id.id)
for pp in consecutive_process_parameters:
server_template = self.env['product.template'].search(
[('server_product_process_parameters_id', '=', pp.surface_technics_parameters_id.id),
('detailed_type', '=', 'service')])
purchase_order_line = self.env['purchase.order.line'].search(
[('product_id', '=', server_template.product_variant_id.id),
('product_qty', '=', len(production_process))], limit=1, order='id desc')
if not purchase_order_line:
server_product_process.append((0, 0, {
'product_id': server_template.product_variant_id.id,
'product_qty': len(production_process),
'product_uom': server_template.uom_id.id
}))
else:
if production.name in production_process:
purchase_order = self.env['purchase.order'].search(
[('state', '=', 'draft'), ('origin', '=', ','.join(production_process)),
('purchase_type', '=', 'consignment')])
if not purchase_order:
server_product_process.append((0, 0, {
'product_id': server_template.product_variant_id.id,
'product_qty': len(production_process),
'product_uom': server_template.uom_id.id
}))
if server_product_process:
self.env['purchase.order'].sudo().create({
'partner_id': server_template.seller_ids[0].partner_id.id,
'origin': ','.join(production_process),
'state': 'draft',
'purchase_type': 'consignment',
'order_line': server_product_process})
# self.env.cr.commit()
@api.onchange('order_line')
def _onchange_order_line(self):
for order in self:
if order.order_line:
line = order.order_line
product = line.product_id
product_id = product.ids
if len(product_id) != len(line):
raise ValidationError('%s】已存在,请勿重复添加' % product[-1].name)
def button_confirm(self):
result = super(RePurchaseOrder, self).button_confirm()
for item in self:
# 确认订单时,自动分配序列号
if item.picking_ids:
for picking_id in item.picking_ids:
if picking_id.move_ids:
for move_id in picking_id.move_ids:
move_id.put_move_line()
for line in item.order_line:
if line.product_id.categ_type == '表面工艺':
for production_name in item.origin.split(','):
production = self.env['mrp.production'].search([('name', '=', production_name)])
for workorder in production.workorder_ids.filtered(
lambda wd: wd.routing_type == '表面工艺' and wd.state == 'waiting' and line.product_id.server_product_process_parameters_id == wd.surface_technics_parameters_id):
workorder.state = 'ready'
return result
class ResPartnerToSale(models.Model):
_inherit = 'res.partner'
purchase_user_id = fields.Many2one('res.users', '采购员')
@api.constrains('name')
def _check_name(self):
obj = self.sudo().search([('name', '=', self.name), ('id', '!=', self.id), ('active', '=', True)])
if obj:
raise UserError('该名称已存在,请重新输入')
@api.constrains('vat')
def _check_vat(self):
obj = self.sudo().search([('vat', '=', self.vat), ('id', '!=', self.id), ('active', '=', True)])
if obj:
raise UserError('该税ID已存在,请重新输入')
# @api.constrains('email')
# def _check_email(self):
# if self.customer_rank > 0:
# obj = self.sudo().search([('email', '=', self.email), ('id', '!=', self.id), ('active', '=', True)])
# if obj:
# raise UserError('该邮箱已存在,请重新输入')
# @api.model
# def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None):
# if self._context.get('is_customer'):
# if self.env.user.has_group('sf_base.group_sale_director'):
# domain = [('customer_rank', '>', 0)]
# elif self.env.user.has_group('sf_base.group_sale_salemanager'):
# customer = self.env['res.partner'].search(
# [('customer_rank', '>', 0), ('user_id', '=', self.env.user.id)])
# if customer:
# ids = [t.id for t in customer]
# domain = [('id', 'in', ids)]
# else:
# domain = [('id', '=', False)]
# return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
# elif self._context.get('is_supplier') or self.env.user.has_group('sf_base.group_purchase_director'):
# if self.env.user.has_group('sf_base.group_purchase_director'):
# domain = [('supplier_rank', '>', 0)]
# elif self.env.user.has_group('sf_base.group_purchase'):
# supplier = self.env['res.partner'].search(
# [('supplier_rank', '>', 0), ('purchase_user_id', '=', self.env.user.id)])
# if supplier:
# ids = [t.id for t in supplier]
# domain = [('id', 'in', ids)]
# else:
# domain = [('id', '=', False)]
# return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
# return super()._name_search(name, args, operator, limit, name_get_uid)
@api.onchange('user_id')
def _get_salesman(self):
if self.customer_rank > 0:
if self.env.user.has_group('sf_base.group_sale_salemanager'):
self.user_id = self.env.user.id
@api.onchange('purchase_user_id')
def _get_purchaseman(self):
if self.supplier_rank > 0:
if self.env.user.has_group('sf_base.group_purchase'):
self.purchase_user_id = self.env.user.id
class ResUserToSale(models.Model):
_inherit = 'res.users'
@api.model
def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None):
if self._context.get('is_sale'):
if self.env.user.has_group('sf_base.group_sale_director'):
domain = []
elif self.env.user.has_group('sf_base.group_sale_salemanager'):
if self.id != self.env.user.id:
domain = [('id', '=', self.id)]
else:
domain = [('id', '=', self.env.user.id)]
return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
elif self._context.get('supplier_rank'):
if self.env.user.has_group('sf_base.group_purchase_director'):
domain = []
elif self.env.user.has_group('sf_base.group_purchase'):
if self.id != self.env.user.id:
domain = [('id', '=', self.id)]
else:
domain = [('id', '=', self.env.user.id)]
return self._search(domain, limit=limit, access_rights_uid=name_get_uid)
return super()._name_search(name, args, operator, limit, name_get_uid)