# -*- coding: utf-8 -*- import logging import os from ftplib import FTP, error_perm _logger = logging.getLogger(__name__) class FTP_P(FTP): """ 重写FTP类,重写dirs方法 """ def dirs(self, *args): """List a directory in long form. By default list current directory to stdout. Optional last argument is callback function; all non-empty arguments before it are concatenated to the LIST command. (This *should* only be used for a pathname.)""" cmd = 'LIST' templist = [] tempdic = {} if args[-1:] and type(args[-1]) != type(''): args, func = args[:-1], args[-1] for arg in args: if arg: cmd = cmd + (' ' + arg) self.retrlines(cmd, templist.append) # 处理返回结果,只需要目录名称 r_files = [file.split(" ")[-1] for file in templist] tempdic['name'] = [file for file in r_files if file != "." and file != ".."] # 去除. .. return tempdic # return [file for file in r_files if file != "." and file != ".."] # FTP接口类 class FtpController: """ 这是ftp接口类,在类初始化的时候就连接了ftp服务器,能否成功连接有反馈。 类中定义了两个接口:上传接口和删除接口 """ # 三菱机床连接 def __init__(self, host="192.168.2.158", port=8080, username="MITSUBISHI", password="CNC"): _logger.info("===================ftppppp==================") self.host = host self.port = port self.username = username self.password = password # 测试 print("==============================================") print(self.username, self.port, self.host, self.password) ftp = FTP_P() _logger.info("===================connect==================") # ftp.set_debuglevel(2) #打开调试级别2,显示详细信息 # ftp.set_pasv(1) # 0主动模式 1 #被动模式 try: ftp.connect(self.host, self.port) ftp.login(self.username, self.password) _logger.info("=================连接成功==================") print("连接成功") self.ftp = ftp except Exception as e: print("连接失败" + str(e)) # 试验接口 def prin(self): print("这是试验接口") # 三菱代码下发 def upload_file(self, remotepath='/(192,168,199,2)/DS/Z4.5.NC', localpath='D:/ftp/up/Z4.5.NC'): """ 第一个是要上传到ftp服务器路径下的文件,第二个是本地要上传的的路径文件 :param remotepath: 上传和下载都需要设置工作目录,注意只能使用文件名,不能有路径中的冒号 :param localpath: :return: """ bufsize = 8192 # fp = open(localpath, 'rb') # self.ftp.storbinary('STOR ' + remotepath, fp, bufsize) # fp.close() with open(localpath, mode='rb') as file: self.ftp.storbinary('STOR ' + remotepath, file, bufsize) # 关闭连接 def close_ftp(self): """ 下发完成后关闭ftp连接,减少资源损耗 """ self.ftp.close() def delAllfile(self, ftppath): """ 删除ftp服务器端全部文件 :param ftppath: :return: """ dir_res = [] try: print(ftppath) try: self.ftp.cwd(ftppath) except Exception as e: print("进入ftp目录失败" + str(e)) self.ftp.dir('.', dir_res.append) # 对当前目录进行dir(),将结果放入列表 print(dir_res) # for i in dir_res: # if i.startswith("d"): # dirName = i.split(" ")[-1] # print("开始删除" + dirName + "文件夹") # delAllfile(ftp, ftp.pwd() + "/" + dirName) # ftp.cwd('..') # print(ftppath + "/" + dirName) # ftp.rmd(ftppath + '/' + dirName) # else: # filelist = ftp.getfiles(ftppath) # for f in filelist: # print("删除FTP目录:" + ftppath + "下存在文件:" + f) # ftp.delete(f) except Exception as e: print("删除失败" + str(e)) # 出现550 not found file是路径不对 def del_file(self, delpath='/(192,168,199,2)/DS/Z4.5.NC'): """ 删除ftp服务器端指定文件 :param delpath: :return: """ self.ftp.delete(delpath) def transfer_nc_files(source_ftp_info, target_ftp_info, source_dir, target_dir, keep_dir=False): """ 从源FTP服务器下载所有.nc文件并上传到目标FTP服务器,保持目录结构 Args: source_ftp_info: dict, 源FTP连接信息 {host, port, username, password} target_ftp_info: dict, 目标FTP连接信息 {host, port, username, password} source_dir: str, 源FTP上的起始目录 target_dir: str, 目标FTP上的目标目录 keep_dir: bool, 是否保持目录结构,默认False """ try: # 连接源FTP source_ftp = FtpController( source_ftp_info['host'], source_ftp_info['port'], source_ftp_info['username'], source_ftp_info['password'] ) source_ftp.ftp.set_pasv(1) # 连接目标FTP target_ftp = FtpController( target_ftp_info['host'], target_ftp_info['port'], target_ftp_info['username'], target_ftp_info['password'] ) source_ftp.ftp.set_pasv(1) # 递归遍历源目录 def traverse_dir(current_dir, relative_path=''): source_ftp.ftp.cwd(current_dir) file_list = source_ftp.ftp.nlst() for item in file_list: try: # 尝试进入目录 source_ftp.ftp.cwd(f"{current_dir}/{item}") # 如果成功则是目录 new_relative_path = os.path.join(relative_path, item) # 在目标FTP创建对应目录 try: if keep_dir: target_ftp.ftp.mkd(f"{target_dir}/{new_relative_path}") except: pass # 目录可能已存在 # 递归遍历子目录 traverse_dir(f"{current_dir}/{item}", new_relative_path) source_ftp.ftp.cwd('..') except: # 如果是.nc文件则传输 if item.lower().endswith('.nc'): # 下载到临时文件 temp_path = f"/tmp/{item}" with open(temp_path, 'wb') as f: source_ftp.ftp.retrbinary(f'RETR {item}', f.write) # 上传到目标FTP对应目录 if keep_dir: target_path = f"{target_dir}/{relative_path}/{item}" else: target_path = f"{target_dir}/{item}" with open(temp_path, 'rb') as f: target_ftp.ftp.storbinary(f'STOR {target_path}', f) # 删除临时文件 os.remove(temp_path) logging.info(f"已传输文件: {item}") # 清空目标目录下的所有内容 try: target_ftp.ftp.cwd(target_dir) files = target_ftp.ftp.nlst() for f in files: try: # 尝试删除文件 target_ftp.ftp.delete(f) except: try: # 如果删除失败,可能是目录,递归删除目录 def remove_dir(path): target_ftp.ftp.cwd(path) sub_files = target_ftp.ftp.nlst() for sf in sub_files: try: target_ftp.ftp.delete(sf) except: remove_dir(f"{path}/{sf}") target_ftp.ftp.cwd('..') target_ftp.ftp.rmd(path) remove_dir(f"{target_dir}/{f}") except: logging.error(f"无法删除 {f}") pass logging.info(f"已清空目标目录 {target_dir}") except Exception as e: logging.error(f"清空目标目录失败: {str(e)}") return False # 开始遍历 traverse_dir(source_dir) logging.info("所有.nc文件传输完成") return True except Exception as e: logging.error(f"传输过程出错: {str(e)}") return False finally: # 关闭FTP连接 try: source_ftp.ftp.quit() target_ftp.ftp.quit() except: pass