Files
test/sf_machine_connect/models/ftp_operate.py
2023-02-27 09:52:17 +08:00

128 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
from ftplib import FTP
class FTP_P(FTP):
"""
重写FTP类重写dirs方法
"""
def dirs(self, *args):
"""List a directory in long form.
By default list current directory to stdout.
Optional last argument is callback function; all
non-empty arguments before it are concatenated to the
LIST command. (This *should* only be used for a pathname.)"""
cmd = 'LIST'
templist = []
tempdic = {}
func = None
if args[-1:] and type(args[-1]) != type(''):
args, func = args[:-1], args[-1]
for arg in args:
if arg:
cmd = cmd + (' ' + arg)
self.retrlines(cmd, templist.append)
# 处理返回结果,只需要目录名称
r_files = [file.split(" ")[-1] for file in templist]
tempdic['name'] = [file for file in r_files if file != "." and file != ".."]
# 去除. ..
return tempdic
# return [file for file in r_files if file != "." and file != ".."]
# FTP接口类
class FtpController:
"""
这是ftp接口类在类初始化的时候就连接了ftp服务器能否成功连接有反馈。
类中定义了两个接口:上传接口和删除接口
"""
# 三菱机床连接
def __init__(self, host="192.168.2.158", port=8080, username="MITSUBISHI", password="CNC"):
self.host = host
self.port = port
self.username = username
self.password = password
# 测试
print("==============================================")
print(self.username, self.port, self.host, self.password)
ftp = FTP_P()
# self.ftp.set_debuglevel(2) #打开调试级别2显示详细信息
ftp.set_pasv(0) # 0主动模式 1 #被动模式
try:
ftp.connect(self.host, self.port)
ftp.login(self.username, self.password)
print("连接成功")
self.ftp = ftp
except Exception as e:
print("连接失败" + str(e))
# 试验接口
def prin(self):
print("这是试验接口")
# 三菱代码下发
def upload_file(self, remotepath='/(192,168,199,2)/DS/Z4.5.NC', localpath='D:/ftp/up/Z4.5.NC'):
"""
第一个是要上传到ftp服务器路径下的文件第二个是本地要上传的的路径文件
:param remotepath: 上传和下载都需要设置工作目录,注意只能使用文件名,不能有路径中的冒号
:param localpath:
:return:
"""
bufsize = 8192
# fp = open(localpath, 'rb')
# self.ftp.storbinary('STOR ' + remotepath, fp, bufsize)
# fp.close()
with open(localpath, mode='rb') as file:
self.ftp.storbinary('STOR ' + remotepath, file, bufsize)
# 关闭连接
def close_ftp(self):
"""
下发完成后关闭ftp连接减少资源损耗
"""
self.ftp.close()
def delAllfile(self, ftppath):
"""
删除ftp服务器端全部文件
:param ftppath:
:return:
"""
dir_res = []
try:
print(ftppath)
try:
self.ftp.cwd(ftppath)
except Exception as e:
print("进入ftp目录失败" + str(e))
self.ftp.dir('.', dir_res.append) # 对当前目录进行dir(),将结果放入列表
print(dir_res)
# for i in dir_res:
# if i.startswith("d"):
# dirName = i.split(" ")[-1]
# print("开始删除" + dirName + "文件夹")
# delAllfile(ftp, ftp.pwd() + "/" + dirName)
# ftp.cwd('..')
# print(ftppath + "/" + dirName)
# ftp.rmd(ftppath + '/' + dirName)
# else:
# filelist = ftp.getfiles(ftppath)
# for f in filelist:
# print("删除FTP目录" + ftppath + "下存在文件:" + f)
# ftp.delete(f)
except Exception as e:
print("删除失败" + str(e))
# 出现550 not found file是路径不对
def del_file(self, delpath='/(192,168,199,2)/DS/Z4.5.NC'):
"""
删除ftp服务器端指定文件
:param delpath:
:return:
"""
self.ftp.delete(delpath)