Files
test/sf_plan/models/custom_plan.py

436 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import json
from datetime import datetime, timedelta
import requests
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
# sf排程
class sf_production_plan(models.Model):
_name = 'sf.production.plan'
_description = 'sf_production_plan'
_inherit = ['mail.thread']
# _order = 'state desc, write_date desc'
state = fields.Selection([
('draft', '待排程'),
('done', '已排程'),
('processing', '加工中'),
('finished', '已完成'),
('cancel', '已取消')
], string='状态', tracking=True)
state_order = fields.Integer(compute='_compute_state_order', store=True)
@api.depends('state')
def _compute_state_order(self):
order_mapping = {
'draft': 1,
'done': 2,
'processing': 3,
'finished': 4
}
for record in self:
record.state_order = order_mapping.get(record.state, 0)
_order = 'state_order asc, write_date desc'
name = fields.Char(string='制造订单')
active = fields.Boolean(string='已归档', default=True)
# selected = fields.Boolean(default=False)
# order_number = fields.Char(string='订单号')
order_deadline = fields.Datetime(string='订单交期')
production_id = fields.Many2one('mrp.production', '关联制造订单')
product_qty = fields.Float(string='数量', digits='Product Unit of Measure', required=True, default=0.0)
production_line_id = fields.Many2one('sf.production.line', string='生产线')
# date_planned_start = fields.Datetime(string='计划开始时间', required=True, index=True, copy=False,
# default=fields.Datetime.now)
date_planned_start = fields.Datetime(string='计划开始时间')
date_planned_finished = fields.Datetime(string='计划结束时间')
# 排程设置selection(倒排,顺排,默认倒排)
schedule_setting = fields.Selection([
('reverse', '倒排'), ('positive', '顺排')], string='排程设置', default='reverse')
product_id = fields.Many2one('product.product', '关联产品')
origin = fields.Char(string='销售订单')
# # 加工时长
# process_time = fields.Float(string='加工时长', digits=(16, 2))
# 实际加工时长、实际开始时间、实际结束时间
actual_process_time = fields.Float(string='实际加工时长(分钟)', digits=(16, 2),
compute='_compute_actual_process_time')
actual_start_time = fields.Datetime(string='实际开始时间')
actual_end_time = fields.Datetime(string='实际结束时间')
shift = fields.Char(string='班次')
# 序号、坯料编号、坯料名称、材质、数量、长度、宽度、厚度、直径、计划开始时间、计划结束时间、状态(已产出与待产出)、操作、创建人、创建时间、
# 客户名称、订单号、行号、长度、宽度、厚度、直径、交货数量、交货日期
# sequence = fields.Integer(string='序号', required=True, copy=False, readonly=True, index=True,
# default=lambda self: self.env['ir.sequence'].sudo().next_by_code('sf.pl.plan'))
sequence = fields.Integer(string='序号', copy=False, readonly=True, index=True)
current_operation_name = fields.Char(string='当前工序名称', size=64, default='生产计划')
@api.onchange('date_planned_start')
def date_planned_start_onchange(self):
if self.date_planned_start:
self.date_planned_finished = self.date_planned_start + timedelta(hours=1)
# 处理计划状态非待排程,计划结束时间为空的数据处理
def deal_no_date_planned_finished(self):
plans = self.env['sf.production.plan'].search(
[('date_planned_finished', '=', False), ('state', 'in', ['processing', 'done', 'finished'])])
for item in plans:
if item.date_planned_start:
item.date_planned_finished = item.date_planned_start + timedelta(hours=1)
# 处理计划订单截止时间为空的数据
def deal_no_order_deadline(self):
plans = self.env['sf.production.plan'].sudo().search(
[('order_deadline', '=', False)])
for item in plans:
if item.date_planned_start:
item.order_deadline = item.date_planned_start + timedelta(days=7)
@api.model
def search_read(self, domain=None, fields=None, offset=0, limit=None, order=None):
info = super(sf_production_plan, self).search_read(domain, fields, offset, limit, order)
return info
# 计算实际加工时长
@api.depends('actual_start_time', 'actual_end_time')
def _compute_actual_process_time(self):
for item in self:
if item.actual_start_time and item.actual_end_time:
item.actual_process_time = (item.actual_end_time - item.actual_start_time).total_seconds() / 60
else:
item.actual_process_time = None
@api.onchange('production_line_id')
def _compute_production_line_id(self):
for item in self:
item.sudo().production_id.production_line_id = item.production_line_id.id
item.sudo().production_id.workorder_ids.filtered(
lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.write(
{'production_line_id': item.production_line_id.id,
'plan_start_processing_time': item.date_planned_start})
# item.sudo().production_id.plan_start_processing_time = item.date_planned_start
# @api.onchange('state')
# def _onchange_state(self):
# if self.state == 'finished':
# self.production_id.schedule_state = '已完成'
# @api.model
# def _search(self, args, offset=0, limit=None, order=None, count=False, access_rights_uid=None):
# """
# 默认修改筛选
# """
# return super(sf_production_plan, self.with_context(active_test=False))._search(
# args, offset, limit, order, count, access_rights_uid)
# def archive(self):
# """
# 归档
# """
# self.write({'active': False})
#
# def unarchive(self):
# """
# 取消归档
# """
# self.write({'active': True})
@api.model
def get_import_templates(self):
"""returns the xlsx import template file"""
return [{
'label': _('导入计划数据'),
'template': '/sf_plan/static/src/xlsx/sf_production_plan.xlsx'
}]
@api.model
def _compute_orderpoint_id(self):
pass
def test_sale_order(self):
company_id = self.env.ref('base.main_company').sudo()
date = datetime.today()
aaa = self.env['sale.order'].with_user(self.env.ref("base.user_admin")).sale_order_create(
company_id, 'delivery_name', 'delivery_telephone', 'delivery_address',
date)
print('aaa', aaa)
# 当不设置计划结束时间时,增加计算计划结束时间的方法,根据采购周期加缓冲期两个值来算就可以了
def action_view_production_schedule(self):
self.ensure_one()
if self.date_planned_start and self.date_planned_finished:
return None
elif self.date_planned_start and not self.date_planned_finished:
# 如果没有给出计划结束时间,则计划结束时间为计划开始时间+采购周期+缓冲期
# 采购周期
purchase_cycle = 3
# 缓冲期
buffer_period = 1
# 计划结束时间 = 计划开始时间 + 采购周期 + 缓冲期
self.date_planned_finished = self.date_planned_start + timedelta(days=purchase_cycle) + timedelta(
days=buffer_period)
self.state = '已排程'
return self.date_planned_finished
else:
return None
def cancel_plan(self):
self.ensure_one()
self.date_planned_finished = None
self.state = 'draft'
def unlink(self):
sequence_to_reorder = self.mapped('sequence')
res = super().unlink()
records_to_reorder = self.search([('sequence', '>', min(sequence_to_reorder))])
for record in records_to_reorder:
record.sequence -= 1
return res
# 生成编码
def _get_pl_no(self):
sf_pl_no = self.env['sf.production.plan'].sudo().search(
[('pl_no', '!=', '')],
limit=1,
order="id desc")
today_date = datetime.today().strftime('%y%m%d')
if not sf_pl_no:
# 如果没有找到先前的坯料编码则今天的第一个坯料编码为PL230520-001
num = 'PL' + today_date + '-001'
else:
# 获取最后一个坯料编码
last_pl_no = sf_pl_no.pl_no
last_date = last_pl_no[2:8]
last_seq = int(last_pl_no[-3:])
if last_date == today_date:
# 如果最后一个坯料编码的日期与今天相同则序号加1
new_seq = last_seq + 1
num = 'PL' + today_date + f'-{new_seq:03}'
else:
# 如果最后一个坯料编码的日期与今天不同则今天的第一个坯料编码为PL230520-001
num = 'PL' + today_date + '-001'
return num
def do_production_schedule(self, count=1):
"""
排程方法
"""
for record in self:
if not record.production_line_id:
raise ValidationError("未选择生产线")
else:
is_schedule = self.deal_processing_schedule(record.date_planned_start, count)
if not is_schedule:
raise ValidationError("排程失败")
workorder_id_list = record.production_id.workorder_ids.ids
if record.production_id:
if record.production_id.workorder_ids:
for item in record.production_id.workorder_ids:
if item.name == 'CNC加工':
item.date_planned_finished = datetime.now() + timedelta(days=100)
item.date_planned_start = self.date_planned_start if self.date_planned_start else datetime.now()
record.sudo().production_id.plan_start_processing_time = item.date_planned_start
item.date_planned_finished = item.date_planned_start + timedelta(
minutes=record.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle)
item.duration_expected = record.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle
record.calculate_plan_time_before(item, workorder_id_list)
record.calculate_plan_time_after(item, workorder_id_list)
record.date_planned_start, record.date_planned_finished = \
item.date_planned_start, item.date_planned_finished
record.state = 'done'
record.date_planned_finished = record.date_planned_start + timedelta(
minutes=60) if not record.date_planned_finished else record.date_planned_finished
# record.production_id.schedule_state = '已排'
record.sudo().production_id.schedule_state = '已排'
record.sudo().production_id.process_state = '待装夹'
# self.env['sale.order'].browse(record.production_id.origin).schedule_status = 'to process'
# sale_obj = self.env['sale.order'].search([('name', '=', record.origin)])
# if 'S' in sale_obj.name:
# sale_obj.schedule_status = 'to process'
mrp_production_ids = record.production_id._get_children().ids
print('mrp_production_ids', mrp_production_ids)
for i in mrp_production_ids:
record.env['mrp.production'].sudo().browse(i).schedule_state = '已排'
# record.production_id.date_planned_start = record.date_planned_start
# record.production_id.date_planned_finished = record.date_planned_finished
record.sudo().production_id.production_line_id = record.production_line_id.id
if record.production_id.workorder_ids:
record.sudo().production_id.workorder_ids.filtered(
lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.write(
{'production_line_id': record.production_line_id.id,
'plan_start_processing_time': record.date_planned_start})
# record.date_planned_finished = record.date_planned_start + timedelta(days=3)
# record.state = 'done'
return {
'name': '排程甘特图',
'type': 'ir.actions.act_window',
'res_model': 'sf.production.plan', # 要跳转的模型名称
# 要显示的视图类型,可以是'form', 'tree', 'kanban', 'graph', 'calendar', 'pivot'等
'view_mode': 'gantt,tree,form',
'target': 'current', # 跳转的目标窗口,可以是'current'或'new'
}
# 处理是否可排程
def deal_processing_schedule(self, date_planned_start, count):
for record in self:
workcenter_ids = record.production_line_id.mrp_workcenter_ids
if not workcenter_ids:
raise UserError('生产线没有配置工作中心')
production_lines = workcenter_ids.filtered(lambda b: "自动生产线" in b.name)
if not production_lines: # 判断是否配置了自动生产线
raise UserError('生产线没有配置自动生产线')
if date_planned_start < datetime.now(): # 判断计划开始时间是否小于当前时间
raise UserError('计划开始时间不能小于当前时间')
if all(not production_line.deal_with_workcenter_calendar(date_planned_start) for production_line in
production_lines): # 判断计划开始时间是否在配置的工作中心的工作日历内
raise UserError('当前计划开始时间不能预约排程,请在工作时间内排程')
if not production_lines.deal_available_default_capacity(date_planned_start): # 判断生产线是否可排程
raise UserError('当前计划开始时间不能预约排程,生产线今日没有可排程的资源')
if not production_lines.deal_available_single_machine_capacity(date_planned_start, count): # 判断生产线是否可排程
raise UserError('当前计划开始时间不能预约排程,生产线该时间段没有可排程的资源')
return True
def calculate_plan_time_before(self, item, workorder_id_list):
"""
根据CNC工单的时间去计算之前的其他工单的开始结束时间
"""
sequence = workorder_id_list.index(item.id) - 1
# 计算CNC加工之前工单的开始结束时间
for i in range(1 if sequence == 0 else sequence):
current_workorder_id = (item.id - (i + 1))
current_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', current_workorder_id)])
old_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', (current_workorder_id + 1))])
work_order = self.env['mrp.workorder'].sudo().search(
[('production_id', '=', self.production_id.id), ('id', '=', current_workorder_id)])
work_order.date_planned_finished = datetime.now() + timedelta(days=100)
work_order.date_planned_start = old_workorder_obj.date_planned_start - timedelta(
minutes=self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle)
work_order.date_planned_finished = old_workorder_obj.date_planned_start
work_order.duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle
first_workorder = self.env['mrp.workorder'].sudo().search([('id', '=', workorder_id_list[0])])
second_workorder = self.env['mrp.workorder'].sudo().search([('id', '=', workorder_id_list[1])])
if second_workorder.date_planned_start < first_workorder.date_planned_finished:
item.date_planned_start += timedelta(minutes=60)
item.date_planned_finished += timedelta(minutes=60)
item.duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle
self.calculate_plan_time_before(item, workorder_id_list)
def calculate_plan_time_after(self, item, workorder_id_list):
"""
计算CNC加工之后工单的开始结束时间
"""
sequence = workorder_id_list.index(item.id) - 1
# 计算CNC加工之后工单的开始结束时间
for j in range(len(workorder_id_list) - sequence - 2):
current_workorder_id = (item.id + (j + 1))
current_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', current_workorder_id)])
old_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', (current_workorder_id - 1))])
work_order = self.env['mrp.workorder'].sudo().search(
[('production_id', '=', self.production_id.id), ('id', '=', current_workorder_id)])
try:
work_order.date_planned_finished = datetime.now() + timedelta(days=100)
work_order.date_planned_start = old_workorder_obj.date_planned_finished
print('work_order.data_start', work_order.date_planned_start)
work_order.date_planned_finished = old_workorder_obj.date_planned_finished + timedelta(
minutes=self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle)
work_order.duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle
except ValueError as e:
print('时间设置失败,请检查是否为工序分配工作中心,%s' % e)
def cancel_production_schedule(self):
self.date_planned_start = None
self.date_planned_finished = None
self.state = 'draft'
self.production_line_id = None
aa = self.env['mrp.production'].sudo().search([('name', '=', self.name)])
aa.schedule_state = '未排'
# self.env['sale.order'].browse(record.production_id.origin).schedule_status = 'to shedule'
sale_obj = self.env['sale.order'].search([('name', '=', self.origin)])
# if 'S' in sale_obj.name:
# sale_obj.schedule_status = 'to schedule'
return self.date_planned_finished
def liucheng_cs(self):
res = {'order_number': '123', 'delivery_end_date': str(datetime.now()),
'delivery_name': '机企猫', 'delivery_telephone': '18943919239',
'delivery_address': '新时空大厦',
'bfm_process_order_list': []}
aa = self.env['ir.attachment'].search([('id', '=', 631)])
temp = self.env['product.template'].search([('id', '=', 47)])
val = {
'model_long': 3,
'model_width': 1,
'model_height': 1,
'model_volume': 3,
'model_machining_precision': '0.10',
'model_name': aa.name,
'model_data': base64.b64encode(aa.datas).decode('utf-8'),
'model_file': base64.b64encode(temp.model_file).decode('utf-8'),
'texture_code': '001',
'texture_type_code': '001001',
# 'surface_process_code': self.env['jikimo.surface.process']._json_surface_process_code(item),
'process_parameters_code': 'R',
'price': 20,
'number': 2,
'total_amount': 100,
'remark': '这只是测试',
'barcode': 123456789,
}
res['bfm_process_order_list'].append(val)
url = '/api/bfm_process_order/list'
res['bfm_process_order_list'] = json.dumps(res['bfm_process_order_list'])
try:
ret = requests.post(('http://localhost:1069' + url), json={}, data=res)
# aa = json.loads(ret.text)
print(ret)
except Exception as e:
raise UserError(e)
# 机台作业计划
class machine_work_schedule(models.Model):
_name = 'sf.machine.schedule'
_description = '机台作业计划'
name = fields.Char(string='机台名')
class MrpProductionInheritForPlan(models.Model):
_inherit = 'mrp.production'
def button_cancel(self):
# 调用父类的取消操作
res = super(MrpProductionInheritForPlan, self).button_cancel()
# 更新 sf.production.plan 模型的 state 为 'cancel'
self.env['sf.production.plan'].search([('production_id', '=', self.id)]).write({'state': 'cancel'})
return res
def toggle_active(self):
# 调用父类方法切换 active 状态
res = super(MrpProductionInheritForPlan, self).toggle_active()
self.env['sf.production.plan'].search(
[('production_id', '=', self.id), '|', ('active', '=', False), ('active', '=', True)]).write(
{'active': self.active})
return res