161 lines
4.6 KiB
Python
161 lines
4.6 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
import os
|
|
from ftplib import FTP
|
|
import win32gui
|
|
import win32con
|
|
import logging
|
|
import time
|
|
|
|
# 配置日志记录
|
|
logging.basicConfig(filename='service.log', level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
class FileUploadRequest(BaseModel):
|
|
filename: str
|
|
|
|
|
|
# FTP 服务器配置信息
|
|
ftp_host = '110.52.114.162'
|
|
ftp_port = 10021
|
|
ftp_user = 'ftpuser'
|
|
ftp_password = '123456'
|
|
ftp_directory = '/home/ftp/ftp_root/ThreeTest/XT/Before/'
|
|
|
|
|
|
def find_child_window(parent_hwnd, class_name):
|
|
def enum_child_windows(hwnd, lparam):
|
|
class_name = win32gui.GetClassName(hwnd)
|
|
if class_name == lparam:
|
|
child_hwnds.append(hwnd)
|
|
return True
|
|
|
|
child_hwnds = []
|
|
win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, class_name)
|
|
return child_hwnds
|
|
|
|
|
|
def find_child_window_by_partial_title(parent_hwnd, partial_title):
|
|
def enum_child_windows(hwnd, lparam):
|
|
# 获取窗口的标题
|
|
title = win32gui.GetWindowText(hwnd)
|
|
# 检查标题是否包含指定的部分标题
|
|
if partial_title in title:
|
|
child_hwnds.append(hwnd)
|
|
return True
|
|
|
|
child_hwnds = []
|
|
win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, None)
|
|
return child_hwnds
|
|
|
|
|
|
def find_child_window_by_title(parent_hwnd, title):
|
|
def enum_child_windows(hwnd, lparam):
|
|
if win32gui.GetWindowText(hwnd) == lparam:
|
|
child_hwnds.append(hwnd)
|
|
return True
|
|
|
|
child_hwnds = []
|
|
win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, title)
|
|
return child_hwnds
|
|
|
|
|
|
def set_path_and_save(filename):
|
|
parent_hwnd = win32gui.FindWindow(None, '另存为')
|
|
|
|
if parent_hwnd == 0:
|
|
raise HTTPException(status_code=404, detail="没有找到保存报告的窗口,请检查!")
|
|
|
|
# 这里假设“地址:”是你需要的部分标题
|
|
address_hwnds = find_child_window_by_partial_title(parent_hwnd, "地址:")
|
|
|
|
# 确保找到的窗口句柄有效
|
|
if not address_hwnds:
|
|
raise HTTPException(status_code=404, detail="未找到地址框,请联系管理员!")
|
|
|
|
# 假设找到的第一个窗口是目标组件
|
|
address_hwnd = address_hwnds[0]
|
|
logging.info(f"找到地址框地址: {win32gui.GetWindowText(address_hwnd)}")
|
|
|
|
# 设置路径
|
|
local_file_path = os.path.join(win32gui.GetWindowText(address_hwnd).split(' ')[1], filename)
|
|
logging.info(f"设置路径: {local_file_path}")
|
|
|
|
path_hwnds = find_child_window(parent_hwnd, 'Edit')
|
|
|
|
if not path_hwnds:
|
|
raise HTTPException(status_code=404, detail="未找到路径框")
|
|
|
|
path_hwnd = path_hwnds[0]
|
|
win32gui.SendMessage(path_hwnd, win32con.WM_SETTEXT, 0, filename)
|
|
|
|
button_hwnds = find_child_window_by_title(parent_hwnd, '保存(&S)')
|
|
|
|
if not button_hwnds:
|
|
raise HTTPException(status_code=404, detail="未找到保存按钮")
|
|
|
|
save_button_hwnd = button_hwnds[0]
|
|
win32gui.PostMessage(save_button_hwnd, win32con.BM_CLICK, 0, 0)
|
|
|
|
return local_file_path
|
|
|
|
|
|
def wait_for_file_to_save(filepath, timeout=30):
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if os.path.isfile(filepath):
|
|
return True
|
|
time.sleep(0.1)
|
|
|
|
return False
|
|
|
|
|
|
def upload_file_to_ftp(local_file):
|
|
|
|
if not os.path.isfile(local_file):
|
|
raise HTTPException(status_code=204, detail="文件未找到")
|
|
|
|
ftp = FTP()
|
|
try:
|
|
ftp.connect(ftp_host, ftp_port)
|
|
ftp.login(ftp_user, ftp_password)
|
|
ftp.cwd(ftp_directory)
|
|
with open(local_file, 'rb') as file:
|
|
ftp.storbinary(f'STOR {os.path.basename(local_file)}', file)
|
|
return True
|
|
except Exception as e:
|
|
print(f"文件上传失败: {e}")
|
|
return False
|
|
finally:
|
|
ftp.quit()
|
|
|
|
|
|
@app.post("/get/check/report")
|
|
async def upload_file(request: FileUploadRequest):
|
|
# 设置路径框并点击保存
|
|
local_file_path = set_path_and_save(request.filename)
|
|
logging.info(f"文件上传请求: {request.filename}")
|
|
logging.info(f"文件保存路径: {local_file_path}")
|
|
|
|
# 等待文件保存完成
|
|
if not wait_for_file_to_save(local_file_path):
|
|
raise HTTPException(status_code=500, detail="文件保存超时")
|
|
|
|
# 上传文件到 FTP
|
|
success = upload_file_to_ftp(local_file_path)
|
|
if success:
|
|
ftp_file_path = os.path.join(ftp_directory, request.filename)
|
|
return {"ftp_file_path": ftp_file_path}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="文件上传失败")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|