own develop
import pysftp
import paramiko
import os
import unittest
class SftpUtils(object):
def __init__(self,ip,port,user,pwd):
self.ip=ip
self.port=port
self.user=user
self.pwd=pwd
self.sftp=self.sftp_client()
def sftp_client(self):
# https://pysftp.readthedocs.io/en/release_0.2.9/cookbook.html#pysftp-cnopts
#private_key="/home/app/privatekey_file"
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
sftp=pysftp.Connection(host=self.ip,username=self.user,password=self.pwd,port=22,cnopts=cnopts)
return sftp
def cmd(self,shell_cmd:str):
"""get stdout or err"""
std= self.sftp.execute(shell_cmd)
if std:
readlines = ‘‘
for line in std:
readlines= readlines + line
return readlines
def exists_file(self,filepath):
try:
self.sftp.stat(filepath)
except Exception:
return False
else:
return True
def exists_dir(self,dirpath):
try:
self.sftp.stat(dirpath)
except Exception:
return False
else:
return True
def is_dir(self,dirpath):
return True if self.sftp.isdir(remotepath=dirpath) else False
def is_file(self,filepath):
return True if self.sftp.isfile(remotepath=filepath) else False
def remove(self,filepath):
"""remove remote file"""
if self.exists_file(filepath):
self.sftp.remove(filepath)
else:
raise FileNotFoundError("%s file not find on remote ! "%filepath)
def rm(self,dirpath):
""" rmdir just delete empty dirs """
if self.exists_dir(dirpath):
files=self.sftp.listdir(dirpath)
for file in files:
filepath=os.path.join(dirpath,file)
if self.is_dir(filepath):
self.rm(filepath)
else:
self.remove(filepath)
self.sftp.rmdir(dirpath)
else:
raise FileNotFoundError("%s dir not find on remote !"%dirpath)
def mkdir(self,remote_dir):
"""parent dir must exists"""
self.sftp.mkdir(remotepath=remote_dir)
def mkdirs(self,remote_dir):
"""mkdir -p /usr/local/mkdir,it can create dir that parent dirs not exists"""
self.sftp.makedirs(remotedir=remote_dir)
def get(self,remote_path,local_path):
"""sftp get,download """
self.sftp.get(remotepath=remote_path,localpath=local_path,callback=None)
def put(self,local_path,remote_path):
"""sftp put,upload """
self.sftp.put(localpath=local_path,remotepath=remote_path)
def rename(self,remote_path,new_name_path):
"""rename file or dir on remote """
self.sftp.rename(remote_src=remote_path,remote_dest=new_name_path)
def open(self,remote_filepath,mode):
"""open r+/w+ file """
readlines= self.sftp.open(remote_file=remote_filepath,mode=mode)
return readlines
def getcwd(self):
return self.sftp.getcwd()
def cwd(self,remote_path):
"""change dir to given remote_path"""
return self.sftp.cwd(remotepath=remote_path)
def chmod(self,remote_path,mode:int):
""" change file grants for w,r,x"""
self.sftp.chmod(remotepath=remote_path,mode=mode)
def chown(self,remote_path,uid:int,gid:int):
"""change owner of user and group """
self.sftp.chown(remote_path,uid,gid)
def chdir(self,remote_path):
"""cwd()== chdir()"""
self.sftp.chdir(remote_path)
def touch(self,filepath):
"""if exists ,pass it or raise exception """
if self.exists_file(filepath):
self.remove(filepath)
self.sftp_client().execute("touch %s"%filepath)
else:
self.sftp_client().execute("touch %s"%filepath)
def close(self):
self.sftp.close()
def opens(self,filepath,mode,file_data=None):
"""file remote read and write """
tp = paramiko.Transport(self.ip, self.port)
tp.connect(username=self.user, password=self.pwd)
pipe=paramiko.sftp_client.SFTPClient.from_transport(tp)
if mode=="w" or mode=="w+":
with pipe.open(filepath,mode) as f:
f.write(file_data)
else:
if mode=="r" or mode=="r+":
with pipe.open(filepath,mode)as f:
return f.readlines()
if __name__ == ‘__main__‘:
# cli=SftpUtils(ip="172.16.194.100",port=22,user="hadoop",pwd="hadoop")
# cli.cmd("ifconfig")
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
with pysftp.Connection(‘172.16.194.100‘, username=‘hadoop‘, password=‘hadoop‘, cnopts=cnopts)as f:
s=f.listdir(‘/‘)
print(s)
原文:https://www.cnblogs.com/SunshineKimi/p/11837321.html