import paramiko, re, time
class Linux(object):
def init(self, info):
self.ip = info[‘ip‘]
self.user = info[‘user‘]
self.passw = info[‘passw‘]
def linuxConnect(self):
# 实例化一个transport对象
try:
self.transport = paramiko.Transport((self.ip, 22))
# 建立连接
self.transport.connect(username=self.user, password=self.passw)
# 建立ssh对象
self.ssh = paramiko.SSHClient()
# 绑定transport到ssh对象
self.ssh._transport = self.transport
stdin, stdout, stderr = self.ssh.exec_command(‘ls‘)
if stdout.read() != ‘‘:
print(‘连接成功‘)
return self.ssh
else:
self.linuxConnect()
except:
print(‘连接失败‘)
return ‘fail‘
def linuxSend(self, info):
stdin, stdout, stderr = self.ssh.exec_command(info)
if re.search(‘docker exec‘, info):
return
result = stdout.read().decode()
return result
def linuxReceive(self, info):
stdin, stdout, stderr = self.ssh.exec_command(info)
result = stdout.readlines()
return result
# 关闭连接
def linuxClose(self):
self.transport.close()
if name == ‘main‘:
linux = Linux({"ip": "10.20.70.11", "user": "root", "passw": "1"})
linux.linuxConnect()
rev = linux.linuxReceive(‘LANG=en_US.UTF-8;sar -u 1 1‘)
print(rev[-1])
print(float(re.findall(‘(\S+)‘, rev[-1])[2]))
v_CPU = 100 - float(re.findall(‘(\S+)‘, rev[-1])[-1]) # 获取cpu使用率
print(v_CPU)
打印:
连接成功
Average: all 0.00 0.00 0.00 0.00 0.00 100.00
0.0
0.0
原文:https://www.cnblogs.com/liuzhanghao/p/12974559.html