静态数据时序库移位后的相应配置调整
This commit is contained in:
160
tool_service/装夹自动保存检测文件服务/app.py
Normal file
160
tool_service/装夹自动保存检测文件服务/app.py
Normal file
@@ -0,0 +1,160 @@
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
import os
|
||||
from ftplib import FTP
|
||||
import win32gui
|
||||
import win32con
|
||||
import logging
|
||||
import time
|
||||
|
||||
# 配置日志记录
|
||||
logging.basicConfig(filename='service.log', level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class FileUploadRequest(BaseModel):
|
||||
filename: str
|
||||
|
||||
|
||||
# FTP 服务器配置信息
|
||||
ftp_host = '110.52.114.162'
|
||||
ftp_port = 10021
|
||||
ftp_user = 'ftpuser'
|
||||
ftp_password = '123456'
|
||||
ftp_directory = '/home/ftp/ftp_root/ThreeTest/XT/Before/'
|
||||
|
||||
|
||||
def find_child_window(parent_hwnd, class_name):
|
||||
def enum_child_windows(hwnd, lparam):
|
||||
class_name = win32gui.GetClassName(hwnd)
|
||||
if class_name == lparam:
|
||||
child_hwnds.append(hwnd)
|
||||
return True
|
||||
|
||||
child_hwnds = []
|
||||
win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, class_name)
|
||||
return child_hwnds
|
||||
|
||||
|
||||
def find_child_window_by_partial_title(parent_hwnd, partial_title):
|
||||
def enum_child_windows(hwnd, lparam):
|
||||
# 获取窗口的标题
|
||||
title = win32gui.GetWindowText(hwnd)
|
||||
# 检查标题是否包含指定的部分标题
|
||||
if partial_title in title:
|
||||
child_hwnds.append(hwnd)
|
||||
return True
|
||||
|
||||
child_hwnds = []
|
||||
win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, None)
|
||||
return child_hwnds
|
||||
|
||||
|
||||
def find_child_window_by_title(parent_hwnd, title):
|
||||
def enum_child_windows(hwnd, lparam):
|
||||
if win32gui.GetWindowText(hwnd) == lparam:
|
||||
child_hwnds.append(hwnd)
|
||||
return True
|
||||
|
||||
child_hwnds = []
|
||||
win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, title)
|
||||
return child_hwnds
|
||||
|
||||
|
||||
def set_path_and_save(filename):
|
||||
parent_hwnd = win32gui.FindWindow(None, '另存为')
|
||||
|
||||
if parent_hwnd == 0:
|
||||
raise HTTPException(status_code=404, detail="没有找到保存报告的窗口,请检查!")
|
||||
|
||||
# 这里假设“地址:”是你需要的部分标题
|
||||
address_hwnds = find_child_window_by_partial_title(parent_hwnd, "地址:")
|
||||
|
||||
# 确保找到的窗口句柄有效
|
||||
if not address_hwnds:
|
||||
raise HTTPException(status_code=404, detail="未找到地址框,请联系管理员!")
|
||||
|
||||
# 假设找到的第一个窗口是目标组件
|
||||
address_hwnd = address_hwnds[0]
|
||||
logging.info(f"找到地址框地址: {win32gui.GetWindowText(address_hwnd)}")
|
||||
|
||||
# 设置路径
|
||||
local_file_path = os.path.join(win32gui.GetWindowText(address_hwnd).split(' ')[1], filename)
|
||||
logging.info(f"设置路径: {local_file_path}")
|
||||
|
||||
path_hwnds = find_child_window(parent_hwnd, 'Edit')
|
||||
|
||||
if not path_hwnds:
|
||||
raise HTTPException(status_code=404, detail="未找到路径框")
|
||||
|
||||
path_hwnd = path_hwnds[0]
|
||||
win32gui.SendMessage(path_hwnd, win32con.WM_SETTEXT, 0, filename)
|
||||
|
||||
button_hwnds = find_child_window_by_title(parent_hwnd, '保存(&S)')
|
||||
|
||||
if not button_hwnds:
|
||||
raise HTTPException(status_code=404, detail="未找到保存按钮")
|
||||
|
||||
save_button_hwnd = button_hwnds[0]
|
||||
win32gui.PostMessage(save_button_hwnd, win32con.BM_CLICK, 0, 0)
|
||||
|
||||
return local_file_path
|
||||
|
||||
|
||||
def wait_for_file_to_save(filepath, timeout=30):
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
if os.path.isfile(filepath):
|
||||
return True
|
||||
time.sleep(0.1)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def upload_file_to_ftp(local_file):
|
||||
|
||||
if not os.path.isfile(local_file):
|
||||
raise HTTPException(status_code=204, detail="文件未找到")
|
||||
|
||||
ftp = FTP()
|
||||
try:
|
||||
ftp.connect(ftp_host, ftp_port)
|
||||
ftp.login(ftp_user, ftp_password)
|
||||
ftp.cwd(ftp_directory)
|
||||
with open(local_file, 'rb') as file:
|
||||
ftp.storbinary(f'STOR {os.path.basename(local_file)}', file)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"文件上传失败: {e}")
|
||||
return False
|
||||
finally:
|
||||
ftp.quit()
|
||||
|
||||
|
||||
@app.post("/get/check/report")
|
||||
async def upload_file(request: FileUploadRequest):
|
||||
# 设置路径框并点击保存
|
||||
local_file_path = set_path_and_save(request.filename)
|
||||
logging.info(f"文件上传请求: {request.filename}")
|
||||
logging.info(f"文件保存路径: {local_file_path}")
|
||||
|
||||
# 等待文件保存完成
|
||||
if not wait_for_file_to_save(local_file_path):
|
||||
raise HTTPException(status_code=500, detail="文件保存超时")
|
||||
|
||||
# 上传文件到 FTP
|
||||
success = upload_file_to_ftp(local_file_path)
|
||||
if success:
|
||||
ftp_file_path = os.path.join(ftp_directory, request.filename)
|
||||
return {"ftp_file_path": ftp_file_path}
|
||||
else:
|
||||
raise HTTPException(status_code=500, detail="文件上传失败")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
5
tool_service/装夹自动保存检测文件服务/readme.md
Normal file
5
tool_service/装夹自动保存检测文件服务/readme.md
Normal file
@@ -0,0 +1,5 @@
|
||||
app.py是主程序文件,主要功能是监听装夹自动保存文件,并将其上传到FTP服务器。
|
||||
|
||||
需要讲app.py做成一个exe文件,可以用pyinstaller工具。
|
||||
|
||||
然后在windows的计划任务中执行此exe文件即可。
|
||||
BIN
tool_service/装夹自动保存检测文件服务/装夹检测文件自动保存操作流程.docx
Normal file
BIN
tool_service/装夹自动保存检测文件服务/装夹检测文件自动保存操作流程.docx
Normal file
Binary file not shown.
Reference in New Issue
Block a user