from fastapi import FastAPI, HTTPException from pydantic import BaseModel import os from ftplib import FTP import win32gui import win32con import logging import time import re app = FastAPI() class FileUploadRequest(BaseModel): filename: str # FTP 服务器配置信息 ftp_host = '47.119.33.43' ftp_port = 21 ftp_user = 'ftpuser' ftp_password = 'FTPftp123' ftp_directory = 'ThreeTest/XT/Before/' def find_child_window(parent_hwnd, class_name): def enum_child_windows(hwnd, lparam): class_name = win32gui.GetClassName(hwnd) if class_name == lparam: child_hwnds.append(hwnd) return True child_hwnds = [] win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, class_name) return child_hwnds def find_child_window_by_partial_title(parent_hwnd, partial_title): def enum_child_windows(hwnd, lparam): # 获取窗口的标题 title = win32gui.GetWindowText(hwnd) # 检查标题是否包含指定的部分标题 if partial_title in title: child_hwnds.append(hwnd) return True child_hwnds = [] win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, None) return child_hwnds def find_child_window_by_title(parent_hwnd, title): def enum_child_windows(hwnd, lparam): if win32gui.GetWindowText(hwnd) == lparam: child_hwnds.append(hwnd) return True child_hwnds = [] win32gui.EnumChildWindows(parent_hwnd, enum_child_windows, title) return child_hwnds # 获取 ComboBox 的句柄 def get_combobox_handle(parent_handle): combo_handle = win32gui.FindWindowEx(parent_handle, 0, "ComboBox", None) if combo_handle == 0: raise Exception("ComboBox not found") return combo_handle # 获取 ComboBox 中的所有选项 def get_combobox_items(combo_handle): count = win32gui.SendMessage(combo_handle, win32con.CB_GETCOUNT, 0, 0) items = [] for i in range(count): length = win32gui.SendMessage(combo_handle, win32con.CB_GETLBTEXTLEN, i, 0) buffer = win32gui.PyMakeBuffer(length + 1) win32gui.SendMessage(combo_handle, win32con.CB_GETLBTEXT, i, buffer) byte_data = buffer.tobytes() # 尝试多种编码方式 text = None # 尝试多种编码方式,包括utf-16le for encoding in ['utf-16le', 'utf-8', 'gbk', 'latin-1']: try: decoded_text = byte_data.decode(encoding).rstrip('\x00') # 如果解码后的文本看起来是合理的,就接受它 if any(char.isprintable() for char in decoded_text): text = decoded_text break except UnicodeDecodeError: continue # 如果所有解码方式都失败,或者内容仍有大量乱码,显示为十六进制字符串 if text is None or not all(char.isprintable() or char.isspace() for char in text): text = byte_data.hex() items.append(text) return items # 获取当前选定项 def get_combobox_selected(combo_handle): index = win32gui.SendMessage(combo_handle, win32con.CB_GETCURSEL, 0, 0) if index == -1: return None length = win32gui.SendMessage(combo_handle, win32con.CB_GETLBTEXTLEN, index, 0) buffer = win32gui.PyMakeBuffer(1024) # 调整缓冲区大小 win32gui.SendMessage(combo_handle, win32con.CB_GETLBTEXT, index, buffer) # 尝试多种编码方式进行解码 for encoding in ['utf-16le', 'utf-8', 'latin-1']: try: # 解码 raw_text = buffer.tobytes().decode(encoding, errors='ignore').rstrip('\x00') # 使用正则表达式查找 "数字 + 月" 格式的部分 match = re.search(r'\d+月', raw_text) if match: return match.group() # 使用正则表达式提取有效字符(中文、数字、字母和常见标点) filtered_text = re.findall(r'[\w\u4e00-\u9fa5]+', raw_text) # 返回匹配到的第一个有效部分 if filtered_text: return filtered_text[0].strip() except UnicodeDecodeError: continue # 尝试下一个编码方式 # 如果所有解码方式都失败,返回 None return None # 设置 ComboBox 的值 def set_combobox_value(combo_handle, value): items = get_combobox_items(combo_handle) try: index = items.index(value) win32gui.SendMessage(combo_handle, win32con.CB_SETCURSEL, index, 0) except ValueError: raise Exception("Value not found in ComboBox") def set_path_and_save(filename): parent_hwnd = win32gui.FindWindow(None, '保存Excel文件') if parent_hwnd == 0: raise HTTPException(status_code=404, detail="没有找到保存报告的窗口,请检查!") combo_handle = get_combobox_handle(parent_hwnd) #logging.info(f"ComboBox Items: {get_combobox_items(combo_handle)}") logging.info(f"Current Selected: {get_combobox_selected(combo_handle)}") local_file_path = "C:\\RationalDMIS64\\Output\\" + get_combobox_selected(combo_handle) + "\\" + filename logging.info(f"设置路径: {local_file_path}") path_hwnds = find_child_window(parent_hwnd, 'Edit') if not path_hwnds: raise HTTPException(status_code=404, detail="未找到路径框") path_hwnd = path_hwnds[0] win32gui.SendMessage(path_hwnd, win32con.WM_SETTEXT, 0, filename) button_hwnds = find_child_window_by_title(parent_hwnd, '保存(&S)') if not button_hwnds: raise HTTPException(status_code=404, detail="未找到保存按钮") save_button_hwnd = button_hwnds[0] win32gui.PostMessage(save_button_hwnd, win32con.BM_CLICK, 0, 0) return local_file_path def wait_for_file_to_save(filepath, timeout=30): start_time = time.time() while time.time() - start_time < timeout: if os.path.isfile(filepath): return True time.sleep(0.1) return False def upload_file_to_ftp(local_file): if not os.path.isfile(local_file): raise HTTPException(status_code=204, detail="文件未找到") ftp = FTP() try: ftp.connect(ftp_host, ftp_port) ftp.login(ftp_user, ftp_password) ftp.cwd(ftp_directory) with open(local_file, 'rb') as file: ftp.storbinary(f'STOR {os.path.basename(local_file)}', file) return True except Exception as e: print(f"文件上传失败: {e}") return False finally: ftp.quit() @app.post("/get/check/report") async def upload_file(request: FileUploadRequest): # 设置路径框并点击保存 local_file_path = set_path_and_save(request.filename) logging.info(f"文件上传请求: {request.filename}") logging.info(f"文件保存路径: {local_file_path}") # 等待文件保存完成 if not wait_for_file_to_save(local_file_path): raise HTTPException(status_code=500, detail="文件保存超时") # 上传文件到 FTP success = upload_file_to_ftp(local_file_path) if success: ftp_file_path = os.path.join(ftp_directory, request.filename) return {"ftp_file_path": ftp_file_path} else: raise HTTPException(status_code=500, detail="文件上传失败") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8999)