Files
test/sf_plan/models/custom_plan.py
2024-05-11 23:36:18 +08:00

363 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import json
from datetime import datetime, timedelta
import requests
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
# sf排程
class sf_production_plan(models.Model):
_name = 'sf.production.plan'
_description = 'sf_production_plan'
_inherit = ['mail.thread']
# _order = 'state desc, write_date desc'
state = fields.Selection([
('draft', '待排程'),
('done', '已排程'),
('processing', '加工中'),
('finished', '已完成')
], string='工单状态', tracking=True)
state_order = fields.Integer(compute='_compute_state_order', store=True)
@api.depends('state')
def _compute_state_order(self):
order_mapping = {
'draft': 1,
'done': 2,
'processing': 3,
'finished': 4
}
for record in self:
record.state_order = order_mapping.get(record.state, 0)
_order = 'state_order asc, write_date desc'
name = fields.Char(string='工单编号')
active = fields.Boolean(string='已归档', default=True)
# selected = fields.Boolean(default=False)
# order_number = fields.Char(string='订单号')
order_deadline = fields.Datetime(string='订单交期')
production_id = fields.Many2one('mrp.production', '关联制造订单')
product_qty = fields.Float(string='数量', digits='Product Unit of Measure', required=True, default=0.0)
production_line_id = fields.Many2one('sf.production.line', string='生产线')
# date_planned_start = fields.Datetime(string='计划开始时间', required=True, index=True, copy=False,
# default=fields.Datetime.now)
date_planned_start = fields.Datetime(string='计划开始时间')
date_planned_finished = fields.Datetime(string='计划结束时间')
# 排程设置selection(倒排,顺排,默认倒排)
schedule_setting = fields.Selection([
('reverse', '倒排'), ('positive', '顺排')], string='排程设置', default='reverse')
product_id = fields.Many2one('product.product', '关联产品')
origin = fields.Char(string='订单号')
# # 加工时长
# process_time = fields.Float(string='加工时长', digits=(16, 2))
# 实际加工时长、实际开始时间、实际结束时间
actual_process_time = fields.Float(string='实际加工时长(分钟)', digits=(16, 2),
compute='_compute_actual_process_time')
actual_start_time = fields.Datetime(string='实际开始时间')
actual_end_time = fields.Datetime(string='实际结束时间')
shift = fields.Char(string='班次')
# 序号、坯料编号、坯料名称、材质、数量、长度、宽度、厚度、直径、计划开始时间、计划结束时间、状态(已产出与待产出)、操作、创建人、创建时间、
# 客户名称、订单号、行号、长度、宽度、厚度、直径、交货数量、交货日期
# sequence = fields.Integer(string='序号', required=True, copy=False, readonly=True, index=True,
# default=lambda self: self.env['ir.sequence'].sudo().next_by_code('sf.pl.plan'))
sequence = fields.Integer(string='序号', copy=False, readonly=True, index=True)
current_operation_name = fields.Char(string='当前工序名称', size=64, default='生产计划')
# 计算实际加工时长
@api.depends('actual_start_time', 'actual_end_time')
def _compute_actual_process_time(self):
for item in self:
if item.actual_start_time and item.actual_end_time:
item.actual_process_time = (item.actual_end_time - item.actual_start_time).total_seconds() / 60
else:
item.actual_process_time = None
@api.onchange('production_line_id')
def _compute_production_line_id(self):
for item in self:
item.sudo().production_id.production_line_id = item.production_line_id.id
item.sudo().production_id.workorder_ids.filtered(
lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.write(
{'production_line_id': item.production_line_id.id,
'plan_start_processing_time': item.date_planned_start})
# item.sudo().production_id.plan_start_processing_time = item.date_planned_start
# @api.onchange('state')
# def _onchange_state(self):
# if self.state == 'finished':
# self.production_id.schedule_state = '已完成'
# @api.model
# def _search(self, args, offset=0, limit=None, order=None, count=False, access_rights_uid=None):
# """
# 默认修改筛选
# """
# return super(sf_production_plan, self.with_context(active_test=False))._search(
# args, offset, limit, order, count, access_rights_uid)
def archive(self):
"""
归档
"""
self.write({'active': False})
def unarchive(self):
"""
取消归档
"""
self.write({'active': True})
@api.model
def get_import_templates(self):
"""returns the xlsx import template file"""
return [{
'label': _('导入计划数据'),
'template': '/sf_plan/static/src/xlsx/sf_production_plan.xlsx'
}]
@api.model
def _compute_orderpoint_id(self):
pass
def test_sale_order(self):
company_id = self.env.ref('base.main_company').sudo()
date = datetime.today()
aaa = self.env['sale.order'].with_user(self.env.ref("base.user_admin")).sale_order_create(
company_id, 'delivery_name', 'delivery_telephone', 'delivery_address',
date)
print('aaa', aaa)
# 当不设置计划结束时间时,增加计算计划结束时间的方法,根据采购周期加缓冲期两个值来算就可以了
def action_view_production_schedule(self):
self.ensure_one()
if self.date_planned_start and self.date_planned_finished:
return None
elif self.date_planned_start and not self.date_planned_finished:
# 如果没有给出计划结束时间,则计划结束时间为计划开始时间+采购周期+缓冲期
# 采购周期
purchase_cycle = 3
# 缓冲期
buffer_period = 1
# 计划结束时间 = 计划开始时间 + 采购周期 + 缓冲期
self.date_planned_finished = self.date_planned_start + timedelta(days=purchase_cycle) + timedelta(
days=buffer_period)
self.state = '已排程'
return self.date_planned_finished
else:
return None
def cancel_plan(self):
self.ensure_one()
self.date_planned_finished = None
self.state = 'draft'
def unlink(self):
sequence_to_reorder = self.mapped('sequence')
res = super().unlink()
records_to_reorder = self.search([('sequence', '>', min(sequence_to_reorder))])
for record in records_to_reorder:
record.sequence -= 1
return res
# 生成编码
def _get_pl_no(self):
sf_pl_no = self.env['sf.production.plan'].sudo().search(
[('pl_no', '!=', '')],
limit=1,
order="id desc")
today_date = datetime.today().strftime('%y%m%d')
if not sf_pl_no:
# 如果没有找到先前的坯料编码则今天的第一个坯料编码为PL230520-001
num = 'PL' + today_date + '-001'
else:
# 获取最后一个坯料编码
last_pl_no = sf_pl_no.pl_no
last_date = last_pl_no[2:8]
last_seq = int(last_pl_no[-3:])
if last_date == today_date:
# 如果最后一个坯料编码的日期与今天相同则序号加1
new_seq = last_seq + 1
num = 'PL' + today_date + f'-{new_seq:03}'
else:
# 如果最后一个坯料编码的日期与今天不同则今天的第一个坯料编码为PL230520-001
num = 'PL' + today_date + '-001'
return num
def do_production_schedule(self):
"""
排程方法
"""
for record in self:
if not record.production_line_id:
raise ValidationError("未选择生产线")
else:
workorder_id_list = record.production_id.workorder_ids.ids
if record.production_id.workorder_ids:
for item in record.production_id.workorder_ids:
if item.name == 'CNC加工':
item.date_planned_finished = datetime.now() + timedelta(days=100)
# item.date_planned_start = record.date_planned_start
item.date_planned_start = self.date_planned_start if self.date_planned_start else datetime.now()
record.sudo().production_id.plan_start_processing_time = item.date_planned_start
item.date_planned_finished = item.date_planned_start + timedelta(
minutes=record.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle)
item.duration_expected = record.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle
record.calculate_plan_time_before(item, workorder_id_list)
record.calculate_plan_time_after(item, workorder_id_list)
record.date_planned_start, record.date_planned_finished = \
item.date_planned_start, item.date_planned_finished
record.state = 'done'
# record.production_id.schedule_state = '已排'
record.sudo().production_id.schedule_state = '已排'
record.sudo().production_id.process_state = '待装夹'
# self.env['sale.order'].browse(record.production_id.origin).schedule_status = 'to process'
# sale_obj = self.env['sale.order'].search([('name', '=', record.origin)])
# if 'S' in sale_obj.name:
# sale_obj.schedule_status = 'to process'
mrp_production_ids = record.production_id._get_children().ids
print('mrp_production_ids', mrp_production_ids)
for i in mrp_production_ids:
record.env['mrp.production'].sudo().browse(i).schedule_state = '已排'
# record.production_id.date_planned_start = record.date_planned_start
# record.production_id.date_planned_finished = record.date_planned_finished
record.production_id.production_line_id = record.production_line_id.id
record.production_id.workorder_ids.filtered(
lambda b: b.routing_type == "装夹预调").workpiece_delivery_ids.write(
{'production_line_id': record.production_line_id.id,
'plan_start_processing_time': record.date_planned_start})
else:
raise ValidationError("未找到工单")
# record.date_planned_finished = record.date_planned_start + timedelta(days=3)
# record.state = 'done'
return {
'name': '排程甘特图',
'type': 'ir.actions.act_window',
'res_model': 'sf.production.plan', # 要跳转的模型名称
# 要显示的视图类型,可以是'form', 'tree', 'kanban', 'graph', 'calendar', 'pivot'等
'view_mode': 'gantt,tree,form',
'target': 'current', # 跳转的目标窗口,可以是'current'或'new'
}
def calculate_plan_time_before(self, item, workorder_id_list):
"""
根据CNC工单的时间去计算之前的其他工单的开始结束时间
"""
sequence = workorder_id_list.index(item.id) - 1
# 计算CNC加工之前工单的开始结束时间
for i in range(1 if sequence == 0 else sequence):
current_workorder_id = (item.id - (i + 1))
current_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', current_workorder_id)])
old_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', (current_workorder_id + 1))])
work_order = self.env['mrp.workorder'].sudo().search(
[('production_id', '=', self.production_id.id), ('id', '=', current_workorder_id)])
work_order.date_planned_finished = datetime.now() + timedelta(days=100)
work_order.date_planned_start = old_workorder_obj.date_planned_start - timedelta(
minutes=self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle)
work_order.date_planned_finished = old_workorder_obj.date_planned_start
work_order.duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle
first_workorder = self.env['mrp.workorder'].sudo().search([('id', '=', workorder_id_list[0])])
second_workorder = self.env['mrp.workorder'].sudo().search([('id', '=', workorder_id_list[1])])
if second_workorder.date_planned_start < first_workorder.date_planned_finished:
item.date_planned_start += timedelta(minutes=60)
item.date_planned_finished += timedelta(minutes=60)
item.duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', 'CNC加工')]).time_cycle
self.calculate_plan_time_before(item, workorder_id_list)
def calculate_plan_time_after(self, item, workorder_id_list):
"""
计算CNC加工之后工单的开始结束时间
"""
sequence = workorder_id_list.index(item.id) - 1
# 计算CNC加工之后工单的开始结束时间
for j in range(len(workorder_id_list) - sequence - 2):
current_workorder_id = (item.id + (j + 1))
current_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', current_workorder_id)])
old_workorder_obj = self.env['mrp.workorder'].sudo().search(
[('id', '=', (current_workorder_id - 1))])
work_order = self.env['mrp.workorder'].sudo().search(
[('production_id', '=', self.production_id.id), ('id', '=', current_workorder_id)])
try:
work_order.date_planned_finished = datetime.now() + timedelta(days=100)
work_order.date_planned_start = old_workorder_obj.date_planned_finished
print('work_order.data_start', work_order.date_planned_start)
work_order.date_planned_finished = old_workorder_obj.date_planned_finished + timedelta(
minutes=self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle)
work_order.duration_expected = self.env['mrp.routing.workcenter'].sudo().search(
[('name', '=', current_workorder_obj.name)]).time_cycle
except ValueError as e:
print('时间设置失败,请检查是否为工序分配工作中心,%s' % e)
def cancel_production_schedule(self):
self.date_planned_start = None
self.date_planned_finished = None
self.state = 'draft'
self.production_line_id = None
aa = self.env['mrp.production'].sudo().search([('name', '=', self.name)])
aa.schedule_state = '未排'
# self.env['sale.order'].browse(record.production_id.origin).schedule_status = 'to shedule'
sale_obj = self.env['sale.order'].search([('name', '=', self.origin)])
# if 'S' in sale_obj.name:
# sale_obj.schedule_status = 'to schedule'
return self.date_planned_finished
def liucheng_cs(self):
res = {'order_number': '123', 'delivery_end_date': str(datetime.now()),
'delivery_name': '机企猫', 'delivery_telephone': '18943919239',
'delivery_address': '新时空大厦',
'bfm_process_order_list': []}
aa = self.env['ir.attachment'].search([('id', '=', 631)])
temp = self.env['product.template'].search([('id', '=', 47)])
val = {
'model_long': 3,
'model_width': 1,
'model_height': 1,
'model_volume': 3,
'model_machining_precision': '0.10',
'model_name': aa.name,
'model_data': base64.b64encode(aa.datas).decode('utf-8'),
'model_file': base64.b64encode(temp.model_file).decode('utf-8'),
'texture_code': '001',
'texture_type_code': '001001',
# 'surface_process_code': self.env['jikimo.surface.process']._json_surface_process_code(item),
'process_parameters_code': 'R',
'price': 20,
'number': 2,
'total_amount': 100,
'remark': '这只是测试',
'barcode': 123456789,
}
res['bfm_process_order_list'].append(val)
url = '/api/bfm_process_order/list'
res['bfm_process_order_list'] = json.dumps(res['bfm_process_order_list'])
try:
ret = requests.post(('http://localhost:1069' + url), json={}, data=res)
# aa = json.loads(ret.text)
print(ret)
except Exception as e:
raise UserError(e)
# 机台作业计划
class machine_work_schedule(models.Model):
_name = 'sf.machine.schedule'
_description = '机台作业计划'
name = fields.Char(string='机台名')