import datetime import base64 from odoo import Command from odoo import models, fields, api, _ from odoo.exceptions import UserError, ValidationError READONLY_FIELD_STATES = { state: [('readonly', True)] for state in {'sale', 'done', 'cancel'} } class ReSaleOrder(models.Model): _inherit = 'sale.order' mrp_production_count = fields.Integer( "Count of MO generated", compute='_compute_mrp_production_ids', groups='mrp.group_mrp_user,sf_base.group_sale_salemanager,sf_base.group_sale_director') logistics_way = fields.Selection([('自提', '自提'), ('到付', '到付'), ('在线支付', '在线支付')], string='物流方式') state = fields.Selection( selection=[ ('draft', "报价"), ('sent', "报价已发送"), ('sale', "销售订单"), ('done', "已锁定"), ('cancel', '已废弃'), ], string="状态", readonly=True, copy=False, index=True, tracking=3, default='draft') deadline_of_delivery = fields.Date('订单交期', tracking=True) # person_of_delivery = fields.Char('收货人') # telephone_of_delivery = fields.Char('电话号码') # address_of_delivery = fields.Char('联系地址') payments_way = fields.Selection([('现结', '现结'), ('月结', '月结')], '结算方式', default='现结', tracking=True) pay_way = fields.Selection([('转账', '转账'), ('微信', '微信'), ('支付宝', '支付宝')], '支付方式') check_status = fields.Selection([('pending', '待审核'), ('approved', '已审核'), ('fail', '不通过')], '审核状态') # schedule_status = fields.Selection( # [('to schedule', '待排程'), ('to process', '待加工'), ('to deliver', '待发货'), ('to receive', '待收货'), # ('received', '已收货')], # '进度状态') payment_term_id = fields.Many2one( comodel_name='account.payment.term', string="交付条件", compute='_compute_payment_term_id', store=True, readonly=False, precompute=True, check_company=True, tracking=True, domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]") remark = fields.Text('备注') validity_date = fields.Date( string="Expiration", compute='_compute_validity_date', store=True, readonly=False, copy=False, precompute=True, states=READONLY_FIELD_STATES, default=fields.Datetime.now) delivery_warning = fields.Selection([('normal', '正常'), ('warning', '告警'), ('overdue', '逾期')], default='normal', string='时效', tracking=True) order_code = fields.Char('平台订单号', readonly=True) model_display_version = fields.Char('模型展示版本', default="v1") contract_code = fields.Char('合同编号') contract_document_id = fields.Many2one('documents.document', string='合同文件') contract_file = fields.Binary(related='contract_document_id.datas', string='合同文件内容') contract_file_name = fields.Char(related='contract_document_id.attachment_id.name', string='文件名') # 业务平台分配工厂后在智能工厂先创建销售订单 def sale_order_create(self, company_id, delivery_name, delivery_telephone, delivery_address, deadline_of_delivery, payments_way, pay_way, order_number, state='sale', model_display_version='v1'): now_time = datetime.datetime.now() partner = self.get_customer() data = { 'company_id': company_id.id, 'date_order': now_time, 'name': self.env['ir.sequence'].next_by_code('sale.order', sequence_date=now_time), 'partner_id': partner.id, 'check_status': 'approved', 'state': state, 'user_id': partner.user_id.id, 'person_of_delivery': delivery_name, 'telephone_of_delivery': delivery_telephone, 'address_of_delivery': delivery_address, 'payments_way': payments_way, 'pay_way': pay_way, 'order_code': order_number, 'model_display_version': model_display_version, } if deadline_of_delivery: # deadline_of_delivery字段存在为false字符串情况 if not isinstance(deadline_of_delivery, str): data.update({'deadline_of_delivery': deadline_of_delivery}) else: if deadline_of_delivery != "False": data.update({'deadline_of_delivery': deadline_of_delivery}) order_id = self.env['sale.order'].sudo().create(data) return order_id def write(self, vals): if self.env.user.has_group('sf_base.group_sale_director'): if vals.get('check_status'): if vals['check_status'] in ('pending', False): vals['check_status'] = 'approved' return super().write(vals) # 提交 def submit(self): self.check_status = 'pending' def get_customer(self): partner_tag = self.env['res.partner.category'].sudo().search([('name', '=', '业务平台')], limit=1, order='id asc') if not partner_tag: partner_tag = self.env['res.partner.category'].sudo().create({'name': '平台客户'}) customer = self.env['res.partner'].search([('name', '=', '业务平台')], limit=1, order='id asc') if customer: if not customer.vat: customer.write({'name': '业务平台', 'vat': '91430103MA7BRH9K4M', 'phone': '0731-85115515', 'email': 'jikimo@jikimo.com', 'category_id': [Command.set([partner_tag.id])]}) return customer else: partner = self.env['res.partner'].create( {'name': '业务平台', 'vat': '91430103MA7BRH9K4M', 'phone': '0731-85115515', 'email': 'jikimo@jikimo.com', 'category_id': [Command.set([partner_tag.id])]}) return partner # 业务平台分配工厂时在创建完产品后再创建销售明细信息 def sale_order_create_line(self, product, item): machining_accuracy_name = '' if product.model_machining_precision: machining_accuracy_name = self.env['sf.machining.accuracy'].sudo().search( [('sync_id', '=', product.model_machining_precision)]).name vals = { 'order_id': self.id, 'product_id': product.id, 'name': '%s/%s/%s/%s/%s/%s' % ( self.format_float(product.model_long), self.format_float(product.model_width), self.format_float(product.model_height), self.format_float(product.model_volume), machining_accuracy_name, product.materials_id.name), 'price_unit': product.list_price, 'product_uom_qty': item['number'], # 'model_glb_file': base64.b64decode(item['model_file']) if item['model_file'] else None, 'model_url': item['model_url'], 'glb_url': item['glb_url'], 'remark': item.get('remark'), 'embryo_redundancy_id': item.get('embryo_redundancy_id'), 'is_incoming_material': True if item.get('embryo_redundancy_id') else False, 'manual_quotation': item.get('manual_quotation'), 'model_id': item['model_id'], 'delivery_end_date': item['delivery_end_date'], } return self.env['sale.order.line'].with_context(skip_procurement=True).create(vals) def format_float(self, value): # 将浮点数转换为字符串 value_str = str(value) # 检查小数点的位置 if '.' in value_str: # 获取小数部分 decimal_part = value_str.split('.')[1] # 判断小数位数是否超过2位 if len(decimal_part) > 2: # 超过2位则保留2位小数 return "{:.2f}".format(value) # 否则保持原来的位数 return float(value_str) @api.constrains('order_line') def check_order_line(self): for item in self: if not item.order_line: raise ValidationError('请选择【订单行】中的【产品】') for line in item.order_line: if not line.product_template_id: raise UserError('请对【订单行】中的【产品】进行选择') if not line.name: raise UserError('请对【订单行】中的【说明】进行输入') if line.product_qty == 0: raise UserError('请对【订单行】中的【数量】进行输入') if not line.product_uom: raise UserError('请对【订单行】中的【计量单位】进行选择') if line.price_unit == 0: raise UserError('请对【订单行】中的【单价】进行输入') if not line.tax_id: raise UserError('请对【订单行】中的【税】进行选择') consignment_purchase_order_count = fields.Integer( "Number of consignment Purchase Order Generated", compute='_compute_purchase_order_count') @api.depends('order_line.purchase_line_ids.order_id') def _compute_purchase_order_count(self): for order in self: order.purchase_order_count = len(order._get_sale_to_purchase('outsourcing')) order.consignment_purchase_order_count = len(order._get_sale_to_purchase_1('outsourcing')) def action_view_purchase_orders(self): """ 采购 """ self.ensure_one() purchase_order_ids = self._get_sale_to_purchase('outsourcing') action = { 'res_model': 'purchase.order', 'type': 'ir.actions.act_window', } if len(purchase_order_ids) == 1: action.update({ 'view_mode': 'form', 'res_id': purchase_order_ids[0], }) else: action.update({ 'name': _("从 %s生成采购订单", self.name), 'domain': [('id', 'in', purchase_order_ids)], 'view_mode': 'tree,form', }) return action def _get_sale_to_purchase(self, purchase_type): """查询满足条件的采购订单""" purchase_order_ids = self._get_purchase_orders().filtered( lambda po: po.purchase_type not in ['outsourcing']).ids order_ids = self.env['purchase.order'].sudo().search( [('origin', '=', self.name), ('purchase_type', '!=', purchase_type)]).ids return list(set(purchase_order_ids) | set(order_ids)) def action_view_consignment_purchase_orders(self): """ 委外加工 """ self.ensure_one() outsourcing_purchase_order_ids = self._get_sale_to_purchase_1('outsourcing') action = { 'res_model': 'purchase.order', 'type': 'ir.actions.act_window', } if len(outsourcing_purchase_order_ids) == 1: action.update({ 'view_mode': 'form', 'res_id': outsourcing_purchase_order_ids[0], }) else: action.update({ 'name': _("从 %s生成委外加工订单", self.name), 'domain': [('id', 'in', outsourcing_purchase_order_ids)], 'view_mode': 'tree,form', }) return action def _get_sale_to_purchase_1(self, purchase_type): """查询满足条件的采购订单""" purchase_order_ids = self._get_purchase_orders().filtered( lambda po: po.purchase_type == purchase_type).ids order_ids = self.env['purchase.order'].sudo().search( [('origin', '=', self.name), ('purchase_type', '=', purchase_type)]).ids return list(set(purchase_order_ids) | set(order_ids)) class ResaleOrderLine(models.Model): _inherit = 'sale.order.line' # part_number = fields.Char('零件图号', related='product_id.part_number', readonly=True) part_name = fields.Char('零件名称', related='product_id.part_name', readonly=True) model_glb_file = fields.Binary('模型的glb文件', compute='_compute_model_glb_file', store=True) glb_url = fields.Char('glb文件地址', compute='_compute_model_glb_file', store=True) # product_template_id = fields.Many2one( # string="产品", # comodel_name='product.template', # compute='_compute_product_template_id', # readonly=False, # search='_search_product_template_id', # # previously related='product_id.product_tmpl_id' # # not anymore since the field must be considered editable for product configurator logic # # without modifying the related product_id when updated. # domain=[('sale_ok', '=', True), ('categ_type', '=', '成品')]) check_status = fields.Selection(related='order_id.check_status') remark = fields.Char('备注') is_incoming_material = fields.Boolean('客供料', compute='_compute_is_incoming_material', store=True) embryo_redundancy_id = fields.Many2one('sf.embryo.redundancy', '坯料冗余') manual_quotation = fields.Boolean('人工编程', default=False) model_url = fields.Char('模型文件地址') model_id = fields.Char('模型ID') delivery_end_date = fields.Date('交货截止日期') @api.depends('embryo_redundancy_id') def _compute_is_incoming_material(self): for line in self: line.is_incoming_material = True if line.embryo_redundancy_id else False @api.depends('product_template_id') def _compute_model_glb_file(self): for line in self: if line.product_template_id: if not line.model_glb_file: line.model_glb_file = line.product_id.product_tmpl_id.model_file if not line.glb_url: line.glb_url = line.product_id.product_tmpl_id.glb_url if not line.price_unit: line.price_unit = line.product_id.product_tmpl_id.list_price class ProductTemplate(models.Model): _inherit = 'product.template' manual_quotation = fields.Boolean('人工编程', default=False) part_name = fields.Char(string='零件名称', readonly=True) class RePurchaseOrder(models.Model): _inherit = 'purchase.order' mrp_production_count = fields.Integer( "Count of MO Source", compute='_compute_mrp_production_count', groups='mrp.group_mrp_user,sf_base.group_purchase,sf_base.group_purchase_director') remark = fields.Text('备注') user_id = fields.Many2one( 'res.users', string='买家', index=True, tracking=True, compute='_compute_user_id', store=True) purchase_type = fields.Selection( [('standard', '标准采购'), ('consignment', '工序外协'), ('outsourcing', '委外加工'), ('outside', '外购订单')], string='采购类型', default='standard', store=True, compute='_compute_purchase_type') # 合同编号 contract_number = fields.Char(string='合同编号', size=20) # 合同概况 contract_summary = fields.Text(string='合同概况') # 选择是否为紧急采购 urgent_purchase = fields.Selection([('no', '否'), ('yes', '是')], string='紧急采购', default='yes') @api.depends('origin') def _compute_purchase_type(self): for purchase in self: if not purchase.origin: continue origin = [origin.replace(' ', '') for origin in purchase.origin.split(',')] order_ids = self.env['sale.order'].sudo().search([('name', 'in', origin)]) if order_ids: product_list = [line.product_id.id for line in purchase.order_line] for order_line in order_ids[0].order_line: if order_line.supply_method == 'purchase': if order_line.product_id.id in product_list: purchase.purchase_type = 'outside' break elif order_line.supply_method == 'outsourcing': if order_line.product_id.id in product_list: purchase.purchase_type = 'outsourcing' break if purchase.order_line[0].product_id.categ_id.name == '坯料': if purchase.order_line[0].product_id.materials_type_id.gain_way == '外协': purchase.purchase_type = 'outsourcing' # request_lines = self.order_line.mapped('purchase_request_lines') # # 检查是否存在 is_subcontract 为 True 的行 # if any(line.is_subcontract for line in request_lines): # purchase.purchase_type = 'consignment' delivery_warning = fields.Selection([('normal', '正常'), ('warning', '预警'), ('overdue', '已逾期')], string='交期状态', default='normal', tracking=True) @api.depends('partner_id') def _compute_user_id(self): for item in self: if not item.user_id: if item.partner_id: item.user_id = item.partner_id.purchase_user_id.id # self.state = 'purchase' else: item.user_id = item.env.user.id @api.constrains('order_line') def check_order_line(self): for item in self: if not item.order_line: raise UserError('该询价单未添加【产品】,请进行添加') for line in item.order_line: if not line.product_id: raise UserError('【产品】未添加,请进行添加') if not line.name: raise UserError('请对【产品】中的【说明】进行输入') if line.product_qty == 0: raise UserError('请对【产品】中的【数量】进行输入') if not line.product_uom: raise UserError('请对【产品】中的【计量单位】进行选择') if line.price_unit == 0: raise UserError('请对【产品】中的【单价】进行输入') if not line.taxes_id: raise UserError('请对【产品】中的【税】进行选择') # def get_purchase_request(self, consecutive_process_parameters, production): # result = [] # for pp in consecutive_process_parameters: # server_template = self.env['product.template'].search( # [('server_product_process_parameters_id', '=', pp.surface_technics_parameters_id.id), # ('detailed_type', '=', 'service')]) # # route_ids # result.append({ # "product_id": server_template.product_variant_id.id, # 'related_product': production.product_id.id, # "name": production.procurement_group_id.name, # "date_required": fields.Datetime.now(), # "product_uom_id":server_template.uom_id.id, # "product_qty": production.product_qty, # "request_id": False, # "move_dest_ids": False, # "orderpoint_id": False, # 'is_subcontract':True, # 'group_id':production.procurement_group_id.id, # 'production_name':pp.production_id.name, # }) # return result def get_purchase_order(self, consecutive_process_parameters, production, product_id_to_production_names): for pp in consecutive_process_parameters: server_product_process = [] purchase_order = pp._get_surface_technics_purchase_ids() if purchase_order: # purchase_order.write({'state': 'draft'}) pp.purchase_id = [(6, 0, [purchase_order.id])] else: server_template = self.env['product.template'].search( [('server_product_process_parameters_id', '=', pp.surface_technics_parameters_id.id), ('detailed_type', '=', 'service')]) server_product_process.append((0, 0, { 'product_id': server_template.product_variant_id.id, 'product_qty': production.product_uom_qty, 'product_uom': server_template.uom_id.id, 'related_product': production.product_id.id, 'manual_part_number': pp.part_number, 'manual_part_name': pp.part_name, })) # 获取服务商品最后一个供应商的采购员 purchase_user_id = server_template.seller_ids[-1].partner_id.purchase_user_id if server_template.seller_ids else False purchase_order = self.env['purchase.order'].sudo().create({ 'partner_id': server_template.seller_ids[0].partner_id.id if server_template.seller_ids else False, 'origin': production.name, 'state': 'draft', 'purchase_type': 'consignment', 'order_line': server_product_process, 'user_id': purchase_user_id.id if purchase_user_id else False, }) purchase_order.order_line._compute_part_number() pp.purchase_id = [(6, 0, [purchase_order.id])] # self.env.cr.commit() # @api.onchange('order_line') # def _onchange_order_line(self): # for order in self: # if order.order_line: # line = order.order_line # product = line.product_id # product_id = product.ids # if len(product_id) != len(line): # raise ValidationError('【%s】已存在,请勿重复添加' % product[-1].name) def button_confirm(self): result = super(RePurchaseOrder, self).button_confirm() for item in self: for line in item.order_line: if line.product_id.categ_type == '表面工艺': if item.origin: for production_name in item.origin.split(','): production = self.env['mrp.production'].search([('name', '=', production_name.strip())]) for workorder in production.workorder_ids.filtered( lambda wd: wd.routing_type == '表面工艺' and wd.state == 'waiting' and line.product_id.server_product_process_parameters_id == wd.surface_technics_parameters_id): workorder.state = 'ready' return result # # 采购订单逾期预警和已逾期 def _overdue_or_warning_func(self): purchase_order = self.sudo().search( [('state', 'in', ['purchase']), ('date_planned', '!=', False), ('receipt_status', 'in', ('partial', 'pending'))]) last_overdue_order = None last_warning_order = None for item in purchase_order: current_time = datetime.datetime.now() if item.date_planned <= current_time: # 已逾期 item.delivery_warning = 'overdue' last_overdue_order = item elif (item.date_planned - current_time).total_seconds() < 48 * 3600: # 预警 item.delivery_warning = 'warning' last_warning_order = item purchase_order_done = self.sudo().search([('state', 'in', ['purchase']), ('receipt_status', '=', 'full')]) purchase_order_overdue = purchase_order_done.filtered(lambda x: x.delivery_warning in ['overdue', 'warning']) if purchase_order_overdue: purchase_order_overdue.write({'delivery_warning': 'normal'}) return last_overdue_order, last_warning_order class PurchaseOrderLine(models.Model): _inherit = 'purchase.order.line' part_name = fields.Char('零件名称', related='product_id.part_name', readonly=True) # part_number = fields.Char('零件图号',related='product_id.part_number', readonly=True) class ResPartnerToSale(models.Model): _inherit = 'res.partner' purchase_user_id = fields.Many2one('res.users', '采购员') @api.constrains('name') def _check_name(self): obj = self.sudo().search([('name', '=', self.name), ('id', '!=', self.id), ('active', '=', True)]) if obj: raise UserError('该名称已存在,请重新输入') @api.constrains('vat') def _check_vat(self): obj = self.sudo().search([('vat', '=', self.vat), ('id', '!=', self.id), ('active', '=', True)]) if obj: raise UserError('该税ID已存在,请重新输入') # @api.constrains('email') # def _check_email(self): # if self.customer_rank > 0: # obj = self.sudo().search([('email', '=', self.email), ('id', '!=', self.id), ('active', '=', True)]) # if obj: # raise UserError('该邮箱已存在,请重新输入') # @api.model # def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None): # if self._context.get('is_customer'): # if self.env.user.has_group('sf_base.group_sale_director'): # domain = [('customer_rank', '>', 0)] # elif self.env.user.has_group('sf_base.group_sale_salemanager'): # customer = self.env['res.partner'].search( # [('customer_rank', '>', 0), ('user_id', '=', self.env.user.id)]) # if customer: # ids = [t.id for t in customer] # domain = [('id', 'in', ids)] # else: # domain = [('id', '=', False)] # return self._search(domain, limit=limit, access_rights_uid=name_get_uid) # elif self._context.get('is_supplier') or self.env.user.has_group('sf_base.group_purchase_director'): # if self.env.user.has_group('sf_base.group_purchase_director'): # domain = [('supplier_rank', '>', 0)] # elif self.env.user.has_group('sf_base.group_purchase'): # supplier = self.env['res.partner'].search( # [('supplier_rank', '>', 0), ('purchase_user_id', '=', self.env.user.id)]) # if supplier: # ids = [t.id for t in supplier] # domain = [('id', 'in', ids)] # else: # domain = [('id', '=', False)] # return self._search(domain, limit=limit, access_rights_uid=name_get_uid) # return super()._name_search(name, args, operator, limit, name_get_uid) @api.onchange('user_id') def _get_salesman(self): if self.customer_rank > 0: if self.env.user.has_group('sf_base.group_sale_salemanager'): self.user_id = self.env.user.id @api.onchange('purchase_user_id') def _get_purchaseman(self): if self.supplier_rank > 0: if self.env.user.has_group('sf_base.group_purchase'): self.purchase_user_id = self.env.user.id class ResUserToSale(models.Model): _inherit = 'res.users' @api.model def _name_search(self, name, args=None, operator='ilike', limit=100, name_get_uid=None): domain = [] if self._context.get('is_sale'): if self.env.user.has_group('sf_base.group_sale_director'): pass elif self.env.user.has_group('sf_base.group_sale_salemanager'): if self.id != self.env.user.id: domain = [('id', '=', self.id)] else: domain = [('id', '=', self.env.user.id)] return self._search(domain, limit=limit, access_rights_uid=name_get_uid) elif self._context.get('supplier_rank'): if self.env.user.has_group('sf_base.group_purchase_director'): pass elif self.env.user.has_group('sf_base.group_purchase'): if self.id != self.env.user.id: domain = [('id', '=', self.id)] else: domain = [('id', '=', self.env.user.id)] return self._search(domain, limit=limit, access_rights_uid=name_get_uid) return super()._name_search(name, args, operator, limit, name_get_uid)