363 lines
14 KiB
Python
363 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
||
import logging
|
||
import os
|
||
import re
|
||
from ftplib import FTP, error_perm
|
||
|
||
_logger = logging.getLogger(__name__)
|
||
|
||
|
||
class FTP_P(FTP):
|
||
"""
|
||
重写FTP类,重写dirs方法,增加编码处理
|
||
"""
|
||
def __init__(self, host='', user='', passwd='', acct='', timeout=None, encoding='gbk'):
|
||
"""初始化时指定编码方式"""
|
||
super().__init__(host, user, passwd, acct, timeout)
|
||
self.encoding = encoding
|
||
|
||
def dirs(self, *args):
|
||
"""List a directory in long form.
|
||
By default list current directory to stdout.
|
||
Optional last argument is callback function; all
|
||
non-empty arguments before it are concatenated to the
|
||
LIST command. (This *should* only be used for a pathname.)"""
|
||
cmd = 'LIST'
|
||
templist = []
|
||
tempdic = {}
|
||
if args[-1:] and type(args[-1]) != type(''):
|
||
args, func = args[:-1], args[-1]
|
||
for arg in args:
|
||
if arg:
|
||
cmd = cmd + (' ' + arg)
|
||
self.retrlines(cmd, templist.append)
|
||
# 处理返回结果,只需要目录名称
|
||
r_files = [file.split(" ")[-1] for file in templist]
|
||
tempdic['name'] = [file for file in r_files if file != "." and file != ".."]
|
||
# 去除. ..
|
||
return tempdic
|
||
|
||
def nlst(self, *args):
|
||
"""Get a list of files in a directory."""
|
||
files = []
|
||
def append(line):
|
||
try:
|
||
if isinstance(line, bytes):
|
||
files.append(line.decode(self.encoding))
|
||
else:
|
||
files.append(line)
|
||
except UnicodeDecodeError:
|
||
files.append(line.decode('utf-8', errors='replace'))
|
||
cmd = 'NLST'
|
||
if args:
|
||
cmd = cmd + ' ' + args[0]
|
||
self.retrlines(cmd, append)
|
||
return files
|
||
|
||
def cwd(self, dirname):
|
||
"""Change to a directory."""
|
||
try:
|
||
if isinstance(dirname, bytes):
|
||
dirname = dirname.decode(self.encoding)
|
||
return super().cwd(dirname)
|
||
except UnicodeEncodeError:
|
||
return super().cwd(dirname.encode(self.encoding).decode('utf-8'))
|
||
|
||
def storbinary(self, cmd, fp, blocksize=8192, callback=None, rest=None):
|
||
"""Store a file in binary mode."""
|
||
try:
|
||
if isinstance(cmd, bytes):
|
||
cmd = cmd.decode(self.encoding)
|
||
return super().storbinary(cmd, fp, blocksize, callback, rest)
|
||
except UnicodeEncodeError:
|
||
return super().storbinary(cmd.encode(self.encoding).decode('utf-8'), fp, blocksize, callback, rest)
|
||
|
||
def retrbinary(self, cmd, callback, blocksize=8192, rest=None):
|
||
"""Retrieve a file in binary mode."""
|
||
try:
|
||
if isinstance(cmd, bytes):
|
||
cmd = cmd.decode(self.encoding)
|
||
return super().retrbinary(cmd, callback, blocksize, rest)
|
||
except UnicodeEncodeError:
|
||
return super().retrbinary(cmd.encode(self.encoding).decode('utf-8'), callback, blocksize, rest)
|
||
|
||
|
||
# FTP接口类
|
||
class FtpController:
|
||
"""
|
||
这是ftp接口类,在类初始化的时候就连接了ftp服务器,能否成功连接有反馈。
|
||
类中定义了两个接口:上传接口和删除接口
|
||
"""
|
||
|
||
# 三菱机床连接
|
||
def __init__(self, host="192.168.2.158", port=8080, username="MITSUBISHI", password="CNC"):
|
||
_logger.info("===================ftppppp==================")
|
||
self.host = host
|
||
self.port = port
|
||
self.username = username
|
||
self.password = password
|
||
# 测试
|
||
print("==============================================")
|
||
print(self.username, self.port, self.host, self.password)
|
||
ftp = FTP_P()
|
||
_logger.info("===================connect==================")
|
||
# ftp.set_debuglevel(2) #打开调试级别2,显示详细信息
|
||
# ftp.set_pasv(1) # 0主动模式 1 #被动模式
|
||
try:
|
||
ftp.connect(self.host, self.port)
|
||
ftp.login(self.username, self.password)
|
||
_logger.info("=================连接成功==================")
|
||
print("连接成功")
|
||
self.ftp = ftp
|
||
except Exception as e:
|
||
print("连接失败" + str(e))
|
||
|
||
# 试验接口
|
||
def prin(self):
|
||
print("这是试验接口")
|
||
|
||
# 三菱代码下发
|
||
def upload_file(self, remotepath='/(192,168,199,2)/DS/Z4.5.NC', localpath='D:/ftp/up/Z4.5.NC'):
|
||
"""
|
||
第一个是要上传到ftp服务器路径下的文件,第二个是本地要上传的的路径文件
|
||
:param remotepath: 上传和下载都需要设置工作目录,注意只能使用文件名,不能有路径中的冒号
|
||
:param localpath:
|
||
:return:
|
||
"""
|
||
bufsize = 8192
|
||
# fp = open(localpath, 'rb')
|
||
# self.ftp.storbinary('STOR ' + remotepath, fp, bufsize)
|
||
# fp.close()
|
||
with open(localpath, mode='rb') as file:
|
||
self.ftp.storbinary('STOR ' + remotepath, file, bufsize)
|
||
|
||
# 关闭连接
|
||
def close_ftp(self):
|
||
"""
|
||
下发完成后关闭ftp连接,减少资源损耗
|
||
"""
|
||
self.ftp.close()
|
||
|
||
def delAllfile(self, ftppath):
|
||
"""
|
||
删除ftp服务器端全部文件
|
||
:param ftppath:
|
||
:return:
|
||
"""
|
||
dir_res = []
|
||
try:
|
||
print(ftppath)
|
||
try:
|
||
self.ftp.cwd(ftppath)
|
||
except Exception as e:
|
||
print("进入ftp目录失败" + str(e))
|
||
self.ftp.dir('.', dir_res.append) # 对当前目录进行dir(),将结果放入列表
|
||
print(dir_res)
|
||
# for i in dir_res:
|
||
# if i.startswith("d"):
|
||
# dirName = i.split(" ")[-1]
|
||
# print("开始删除" + dirName + "文件夹")
|
||
# delAllfile(ftp, ftp.pwd() + "/" + dirName)
|
||
# ftp.cwd('..')
|
||
# print(ftppath + "/" + dirName)
|
||
# ftp.rmd(ftppath + '/' + dirName)
|
||
# else:
|
||
# filelist = ftp.getfiles(ftppath)
|
||
# for f in filelist:
|
||
# print("删除FTP目录:" + ftppath + "下存在文件:" + f)
|
||
# ftp.delete(f)
|
||
except Exception as e:
|
||
print("删除失败" + str(e))
|
||
|
||
# 出现550 not found file是路径不对
|
||
def del_file(self, delpath='/(192,168,199,2)/DS/Z4.5.NC'):
|
||
"""
|
||
删除ftp服务器端指定文件
|
||
:param delpath:
|
||
:return:
|
||
"""
|
||
self.ftp.delete(delpath)
|
||
|
||
|
||
|
||
def transfer_files(
|
||
source_ftp_info,
|
||
target_ftp_info,
|
||
source_dir,
|
||
target_dir,
|
||
end_with=None,
|
||
match_str=None,
|
||
keep_dir=False):
|
||
"""
|
||
从源FTP服务器下载所有{end_with}结尾的文件并上传到目标FTP服务器,保持目录结构
|
||
|
||
Args:
|
||
source_ftp_info: dict, 源FTP连接信息 {host, port, username, password}
|
||
target_ftp_info: dict, 目标FTP连接信息 {host, port, username, password}
|
||
source_dir: str, 源FTP上的起始目录
|
||
target_dir: str, 目标FTP上的目标目录
|
||
keep_dir: bool, 是否保持目录结构,默认False
|
||
"""
|
||
transfered_file_list = []
|
||
try:
|
||
# 连接源FTP
|
||
source_ftp = FtpController(
|
||
source_ftp_info['host'],
|
||
source_ftp_info['port'],
|
||
source_ftp_info['username'],
|
||
source_ftp_info['password']
|
||
)
|
||
if not source_ftp.ftp:
|
||
raise Exception("编程文件FTP连接失败")
|
||
source_ftp.ftp.set_pasv(1)
|
||
|
||
# 连接目标FTP
|
||
target_ftp = FtpController(
|
||
target_ftp_info['host'],
|
||
target_ftp_info['port'],
|
||
target_ftp_info['username'],
|
||
target_ftp_info['password']
|
||
)
|
||
if not source_ftp.ftp:
|
||
raise Exception("机床FTP连接失败")
|
||
source_ftp.ftp.set_pasv(1)
|
||
|
||
# 递归遍历源目录
|
||
def traverse_dir(current_dir, relative_path=''):
|
||
source_ftp.ftp.cwd(current_dir)
|
||
file_list = source_ftp.ftp.nlst()
|
||
|
||
for item in file_list:
|
||
try:
|
||
# 尝试进入目录
|
||
source_ftp.ftp.cwd(f"{current_dir}/{item}")
|
||
# 如果成功则是目录
|
||
new_relative_path = os.path.join(relative_path, item)
|
||
# 在目标FTP创建对应目录
|
||
try:
|
||
if keep_dir:
|
||
target_ftp.ftp.mkd(f"{target_dir}/{new_relative_path}")
|
||
except:
|
||
pass # 目录可能已存在
|
||
# 递归遍历子目录
|
||
traverse_dir(f"{current_dir}/{item}", new_relative_path)
|
||
source_ftp.ftp.cwd('..')
|
||
except:
|
||
matched = False
|
||
# 文件名匹配字符串BT30-(两个字符)-all.nc, 例6667_20250422-BT30-ZM-all.nc
|
||
if match_str and re.match(match_str, item):
|
||
matched = True
|
||
elif end_with and item.lower().endswith(end_with):
|
||
matched = True
|
||
|
||
if matched:
|
||
# 下载到临时文件
|
||
temp_path = f"/tmp/{item}"
|
||
with open(temp_path, 'wb') as f:
|
||
source_ftp.ftp.retrbinary(f'RETR {item}', f.write)
|
||
|
||
# 上传到目标FTP对应目录
|
||
if keep_dir:
|
||
target_path = f"{target_dir}/{relative_path}/{item}"
|
||
else:
|
||
target_path = f"{target_dir}/{item}"
|
||
|
||
# 规范化路径
|
||
target_path = target_path.replace('\\', '/').strip('/')
|
||
|
||
# 确保目标目录存在
|
||
target_dir_path = '/'.join(target_path.split('/')[:-1])
|
||
try:
|
||
target_ftp.ftp.cwd('/') # 回到根目录
|
||
for dir_part in target_dir_path.split('/'):
|
||
if dir_part:
|
||
try:
|
||
target_ftp.ftp.cwd(dir_part)
|
||
except:
|
||
try:
|
||
target_ftp.ftp.mkd(dir_part)
|
||
target_ftp.ftp.cwd(dir_part)
|
||
except Exception as e:
|
||
logging.error(f"创建目录失败 {dir_part}: {str(e)}")
|
||
raise
|
||
except Exception as e:
|
||
logging.error(f"处理目标目录失败: {str(e)}")
|
||
raise
|
||
|
||
# 检查FTP连接状态
|
||
try:
|
||
target_ftp.ftp.voidcmd('NOOP')
|
||
except:
|
||
logging.error("FTP连接已断开,尝试重新连接")
|
||
target_ftp.ftp.connect(target_ftp_info['host'], target_ftp_info['port'])
|
||
target_ftp.ftp.login(target_ftp_info['username'], target_ftp_info['password'])
|
||
|
||
# 上传文件
|
||
try:
|
||
with open(temp_path, 'rb') as f:
|
||
# 检查文件是否可读
|
||
content = f.read()
|
||
if not content:
|
||
raise Exception("临时文件为空")
|
||
f.seek(0) # 重置文件指针
|
||
target_ftp.ftp.storbinary(f'STOR {target_path}', f)
|
||
except Exception as e:
|
||
logging.error(f"上传文件失败: {str(e)}")
|
||
logging.error(f"目标路径: {target_path}")
|
||
raise
|
||
|
||
transfered_file_list.append(item)
|
||
# 删除临时文件
|
||
os.remove(temp_path)
|
||
logging.info(f"已传输文件: {item}")
|
||
|
||
# 清空目标目录下的所有内容
|
||
# try:
|
||
# target_ftp.ftp.cwd(target_dir)
|
||
# files = target_ftp.ftp.nlst()
|
||
|
||
# for f in files:
|
||
# try:
|
||
# # 尝试删除文件
|
||
# target_ftp.ftp.delete(f)
|
||
# except:
|
||
# try:
|
||
# # 如果删除失败,可能是目录,递归删除目录
|
||
# def remove_dir(path):
|
||
# target_ftp.ftp.cwd(path)
|
||
# sub_files = target_ftp.ftp.nlst()
|
||
# for sf in sub_files:
|
||
# try:
|
||
# target_ftp.ftp.delete(sf)
|
||
# except:
|
||
# remove_dir(f"{path}/{sf}")
|
||
# target_ftp.ftp.cwd('..')
|
||
# target_ftp.ftp.rmd(path)
|
||
|
||
# remove_dir(f"{target_dir}/{f}")
|
||
# except:
|
||
# logging.error(f"无法删除 {f}")
|
||
# pass
|
||
|
||
# logging.info(f"已清空目标目录 {target_dir}")
|
||
# except Exception as e:
|
||
# logging.error(f"清空目标目录失败: {str(e)}")
|
||
# raise Exception(f"清空目标目录失败: {str(e)}")
|
||
|
||
# 开始遍历
|
||
traverse_dir(source_dir)
|
||
|
||
logging.info("所有文件传输完成")
|
||
return transfered_file_list
|
||
|
||
except Exception as e:
|
||
logging.error(f"传输过程出错: {str(e)}")
|
||
raise e
|
||
|
||
finally:
|
||
# 关闭FTP连接
|
||
try:
|
||
source_ftp.ftp.quit()
|
||
target_ftp.ftp.quit()
|
||
except:
|
||
pass |