首页 > 数据库技术 > 详细

测试ADB Pipe的封装

时间:2021-06-04 12:03:06      阅读:9      评论:0      收藏:0      [点我收藏+]

封装ADB Pipe 模块

import os
import subprocess
import sys
from subprocess import *
import threading
import setting

class commandPipe():

    def __init__(self,command,func,exitFunc,readyFunc=None,shell=True,stdin=subprocess.PIPE,
                 stdout=subprocess.PIPE,stderr=subprocess.PIPE,code="GBK"):
        ‘‘‘
        :param command:命令
        :param func:正常的输出函数
        :param exitFunc:异常反馈函数
        :param readyFunc:当管道创建完毕后调用
        ‘‘‘

        self._thread = threading.Thread(target=self.__run,args=(command,shell,stdin,stdout,stderr,readyFunc))
        self._code=code
        self._func=func
        self._exitFunc=exitFunc
        self._readyFunc=readyFunc


    def __run(self,command,shell,stdin,stdout,stderr,readyFunc):
        ‘‘‘私有函数‘‘‘
        global tmp
        try:
            self._process=subprocess.Popen(
                command,
                shell=shell,
                stdin=stdin,
                stdout=stdout,
                stderr=stderr,)
        except OSError as e:
            self._exitFunc(e)

        fun=self._process.stdout.readline
        if readyFunc!=None:
            threading.Thread(target=readyFunc).start()
        while True:
            line=fun()
            if not line:
                break
            try:
                tmp=line.decode(self._code)
            except Exception as e:
                print(e)
        self._func(tmp)
        self._process.stdout.close()

    def start(self):
        self._thread.start()

    def close(self):
        #self._process.stdout.close()
        self._thread.join()
        del self

    # def run_cmd(self,command):
    #     ret=subprocess.Popen(command,shell=True,stdout=subprocess.PIPE).stdout.read()
    #     ret=ret.decode(‘gbk‘)
    #     if ret.strip()=="":
    #         print("error:{}".format(str(ret)))
    #     else:
    #         print("Success:{}".format(str(ret)))
    #     return ret

if __name__==‘__main__‘:
    #cmd="adb devices | sed -n ‘2p‘ | awk ‘{print $1}‘"
    cmd=setting.deviceId.cmd


    def exitFunc(exception):
        print("Exception is  {}".format(exception))

    def readyFunc():
        print("Start running the command: {}".format(setting.deviceId.cmd_name))

    def func(line):
        print("The standard output for {} is {}".format(setting.deviceId.cmd_name,line))

    # E=commandPipe(cmd,func,exitFunc,readyFunc)
    # E.start()
    # E.close()

 调用

import setting
from commandPipe import commandPipe

class runCmd():
    instance=None

    def __new__(cls, *args, **kwargs):
        if cls.instance is None:
            cls.instance = super().__new__(cls)
        return cls.instance

    def __init__(self,name):
        self.name=name

    def exitFunc(self,exception):
        print("Exception is  {}".format(exception))

    def readyFunc(self):
        print("Start running the command: {}".format(self.name))

    def func(self,line):
        self.line=line
        print("The standard output for {} is {}".format(self.name,line))


    def run(self):
        #tmp=self.name
        s="setting.{}.cmd".format(self.name)
        #cmd=(setting.{}.cmd)
        E= commandPipe(eval(s),self.func,self.exitFunc,self.readyFunc)
        E.start()
        E.close()
        return self.line


if __name__==‘__main__‘:
    test=runCmd("deviceId").run()

 

测试ADB Pipe的封装

原文:https://www.cnblogs.com/judith0719/p/14848617.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!