封装ADB Pipe 模块
import os
import subprocess
import sys
from subprocess import *
import threading
import setting
class commandPipe():
def __init__(self,command,func,exitFunc,readyFunc=None,shell=True,stdin=subprocess.PIPE,
stdout=subprocess.PIPE,stderr=subprocess.PIPE,code="GBK"):
‘‘‘
:param command:命令
:param func:正常的输出函数
:param exitFunc:异常反馈函数
:param readyFunc:当管道创建完毕后调用
‘‘‘
self._thread = threading.Thread(target=self.__run,args=(command,shell,stdin,stdout,stderr,readyFunc))
self._code=code
self._func=func
self._exitFunc=exitFunc
self._readyFunc=readyFunc
def __run(self,command,shell,stdin,stdout,stderr,readyFunc):
‘‘‘私有函数‘‘‘
global tmp
try:
self._process=subprocess.Popen(
command,
shell=shell,
stdin=stdin,
stdout=stdout,
stderr=stderr,)
except OSError as e:
self._exitFunc(e)
fun=self._process.stdout.readline
if readyFunc!=None:
threading.Thread(target=readyFunc).start()
while True:
line=fun()
if not line:
break
try:
tmp=line.decode(self._code)
except Exception as e:
print(e)
self._func(tmp)
self._process.stdout.close()
def start(self):
self._thread.start()
def close(self):
#self._process.stdout.close()
self._thread.join()
del self
# def run_cmd(self,command):
# ret=subprocess.Popen(command,shell=True,stdout=subprocess.PIPE).stdout.read()
# ret=ret.decode(‘gbk‘)
# if ret.strip()=="":
# print("error:{}".format(str(ret)))
# else:
# print("Success:{}".format(str(ret)))
# return ret
if __name__==‘__main__‘:
#cmd="adb devices | sed -n ‘2p‘ | awk ‘{print $1}‘"
cmd=setting.deviceId.cmd
def exitFunc(exception):
print("Exception is {}".format(exception))
def readyFunc():
print("Start running the command: {}".format(setting.deviceId.cmd_name))
def func(line):
print("The standard output for {} is {}".format(setting.deviceId.cmd_name,line))
# E=commandPipe(cmd,func,exitFunc,readyFunc)
# E.start()
# E.close()
调用
import setting
from commandPipe import commandPipe
class runCmd():
instance=None
def __new__(cls, *args, **kwargs):
if cls.instance is None:
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self,name):
self.name=name
def exitFunc(self,exception):
print("Exception is {}".format(exception))
def readyFunc(self):
print("Start running the command: {}".format(self.name))
def func(self,line):
self.line=line
print("The standard output for {} is {}".format(self.name,line))
def run(self):
#tmp=self.name
s="setting.{}.cmd".format(self.name)
#cmd=(setting.{}.cmd)
E= commandPipe(eval(s),self.func,self.exitFunc,self.readyFunc)
E.start()
E.close()
return self.line
if __name__==‘__main__‘:
test=runCmd("deviceId").run()
原文:https://www.cnblogs.com/judith0719/p/14848617.html