Files
jikimo_sf/sf_machine_connect/models/ftp_operate.py
2025-04-22 15:23:02 +08:00

270 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import logging
import os
import re
from ftplib import FTP, error_perm
_logger = logging.getLogger(__name__)
class FTP_P(FTP):
"""
重写FTP类重写dirs方法
"""
def dirs(self, *args):
"""List a directory in long form.
By default list current directory to stdout.
Optional last argument is callback function; all
non-empty arguments before it are concatenated to the
LIST command. (This *should* only be used for a pathname.)"""
cmd = 'LIST'
templist = []
tempdic = {}
if args[-1:] and type(args[-1]) != type(''):
args, func = args[:-1], args[-1]
for arg in args:
if arg:
cmd = cmd + (' ' + arg)
self.retrlines(cmd, templist.append)
# 处理返回结果,只需要目录名称
r_files = [file.split(" ")[-1] for file in templist]
tempdic['name'] = [file for file in r_files if file != "." and file != ".."]
# 去除. ..
return tempdic
# return [file for file in r_files if file != "." and file != ".."]
# FTP接口类
class FtpController:
"""
这是ftp接口类在类初始化的时候就连接了ftp服务器能否成功连接有反馈。
类中定义了两个接口:上传接口和删除接口
"""
# 三菱机床连接
def __init__(self, host="192.168.2.158", port=8080, username="MITSUBISHI", password="CNC"):
_logger.info("===================ftppppp==================")
self.host = host
self.port = port
self.username = username
self.password = password
# 测试
print("==============================================")
print(self.username, self.port, self.host, self.password)
ftp = FTP_P()
_logger.info("===================connect==================")
# ftp.set_debuglevel(2) #打开调试级别2显示详细信息
# ftp.set_pasv(1) # 0主动模式 1 #被动模式
try:
ftp.connect(self.host, self.port)
ftp.login(self.username, self.password)
_logger.info("=================连接成功==================")
print("连接成功")
self.ftp = ftp
except Exception as e:
print("连接失败" + str(e))
# 试验接口
def prin(self):
print("这是试验接口")
# 三菱代码下发
def upload_file(self, remotepath='/(192,168,199,2)/DS/Z4.5.NC', localpath='D:/ftp/up/Z4.5.NC'):
"""
第一个是要上传到ftp服务器路径下的文件第二个是本地要上传的的路径文件
:param remotepath: 上传和下载都需要设置工作目录,注意只能使用文件名,不能有路径中的冒号
:param localpath:
:return:
"""
bufsize = 8192
# fp = open(localpath, 'rb')
# self.ftp.storbinary('STOR ' + remotepath, fp, bufsize)
# fp.close()
with open(localpath, mode='rb') as file:
self.ftp.storbinary('STOR ' + remotepath, file, bufsize)
# 关闭连接
def close_ftp(self):
"""
下发完成后关闭ftp连接减少资源损耗
"""
self.ftp.close()
def delAllfile(self, ftppath):
"""
删除ftp服务器端全部文件
:param ftppath:
:return:
"""
dir_res = []
try:
print(ftppath)
try:
self.ftp.cwd(ftppath)
except Exception as e:
print("进入ftp目录失败" + str(e))
self.ftp.dir('.', dir_res.append) # 对当前目录进行dir(),将结果放入列表
print(dir_res)
# for i in dir_res:
# if i.startswith("d"):
# dirName = i.split(" ")[-1]
# print("开始删除" + dirName + "文件夹")
# delAllfile(ftp, ftp.pwd() + "/" + dirName)
# ftp.cwd('..')
# print(ftppath + "/" + dirName)
# ftp.rmd(ftppath + '/' + dirName)
# else:
# filelist = ftp.getfiles(ftppath)
# for f in filelist:
# print("删除FTP目录" + ftppath + "下存在文件:" + f)
# ftp.delete(f)
except Exception as e:
print("删除失败" + str(e))
# 出现550 not found file是路径不对
def del_file(self, delpath='/(192,168,199,2)/DS/Z4.5.NC'):
"""
删除ftp服务器端指定文件
:param delpath:
:return:
"""
self.ftp.delete(delpath)
def transfer_nc_files(
source_ftp_info,
target_ftp_info,
source_dir,
target_dir,
end_with=None,
match_str=None,
keep_dir=False):
"""
从源FTP服务器下载所有{end_with}结尾的文件并上传到目标FTP服务器,保持目录结构
Args:
source_ftp_info: dict, 源FTP连接信息 {host, port, username, password}
target_ftp_info: dict, 目标FTP连接信息 {host, port, username, password}
source_dir: str, 源FTP上的起始目录
target_dir: str, 目标FTP上的目标目录
keep_dir: bool, 是否保持目录结构,默认False
"""
trans_status = [False]
try:
# 连接源FTP
source_ftp = FtpController(
source_ftp_info['host'],
source_ftp_info['port'],
source_ftp_info['username'],
source_ftp_info['password']
)
source_ftp.ftp.set_pasv(1)
# 连接目标FTP
target_ftp = FtpController(
target_ftp_info['host'],
target_ftp_info['port'],
target_ftp_info['username'],
target_ftp_info['password']
)
source_ftp.ftp.set_pasv(1)
# 递归遍历源目录
def traverse_dir(current_dir, relative_path=''):
source_ftp.ftp.cwd(current_dir)
file_list = source_ftp.ftp.nlst()
for item in file_list:
try:
# 尝试进入目录
source_ftp.ftp.cwd(f"{current_dir}/{item}")
# 如果成功则是目录
new_relative_path = os.path.join(relative_path, item)
# 在目标FTP创建对应目录
try:
if keep_dir:
target_ftp.ftp.mkd(f"{target_dir}/{new_relative_path}")
except:
pass # 目录可能已存在
# 递归遍历子目录
traverse_dir(f"{current_dir}/{item}", new_relative_path)
source_ftp.ftp.cwd('..')
except:
matched = False
# 文件名匹配字符串BT30-(两个字符)-all.nc, 例6667_20250422-BT30-ZM-all.nc
if match_str and re.match(match_str, item):
matched = True
elif end_with and item.lower().endswith(end_with):
matched = True
if matched:
# 下载到临时文件
temp_path = f"/tmp/{item}"
with open(temp_path, 'wb') as f:
source_ftp.ftp.retrbinary(f'RETR {item}', f.write)
# 上传到目标FTP对应目录
if keep_dir:
target_path = f"{target_dir}/{relative_path}/{item}"
else:
target_path = f"{target_dir}/{item}"
with open(temp_path, 'rb') as f:
target_ftp.ftp.storbinary(f'STOR {target_path}', f)
trans_status[0] = True
# 删除临时文件
os.remove(temp_path)
logging.info(f"已传输文件: {item}")
# 清空目标目录下的所有内容
try:
target_ftp.ftp.cwd(target_dir)
files = target_ftp.ftp.nlst()
for f in files:
try:
# 尝试删除文件
target_ftp.ftp.delete(f)
except:
try:
# 如果删除失败,可能是目录,递归删除目录
def remove_dir(path):
target_ftp.ftp.cwd(path)
sub_files = target_ftp.ftp.nlst()
for sf in sub_files:
try:
target_ftp.ftp.delete(sf)
except:
remove_dir(f"{path}/{sf}")
target_ftp.ftp.cwd('..')
target_ftp.ftp.rmd(path)
remove_dir(f"{target_dir}/{f}")
except:
logging.error(f"无法删除 {f}")
pass
logging.info(f"已清空目标目录 {target_dir}")
except Exception as e:
logging.error(f"清空目标目录失败: {str(e)}")
raise Exception(f"清空目标目录失败: {str(e)}")
# 开始遍历
traverse_dir(source_dir)
logging.info("所有文件传输完成")
return trans_status[0]
except Exception as e:
logging.error(f"传输过程出错: {str(e)}")
raise e
finally:
# 关闭FTP连接
try:
source_ftp.ftp.quit()
target_ftp.ftp.quit()
except:
pass