设置变量:
from collections import namedtuple
‘‘‘
利用namedtuple 设置变量
‘‘‘
default=[None,None]
ConfigItem=namedtuple("ConfigItem",["cmd_name","cmd"])
#获取电脑上链接的设备id
deviceId=ConfigItem(cmd_name="device_Id",cmd="adb devices | sed -n ‘2p‘ | awk ‘{print $1}‘")
#获取app的内存
meminfo=ConfigItem(cmd_name="meminfo",cmd="adb shell dumpsys meminfo packageName | sed -n ‘2p‘ |awk -F \" \" ‘{print $2}‘")
#获取app的CPU
cpuinfo=ConfigItem(cmd_name="cpuinfo",cmd="adb shell dumpsys cpuinfo | grep packageName |awk -F \" \" ‘{print $1}‘| awk ‘{SUM+=$1}‘END‘{print SUM}‘")
#获取app(设备)的FPS数据,这个计算的是加载时长,获得FPS需要计算: 1000/(fpsinfo/1000/100)
fpsinfo=ConfigItem(cmd_name="fpsinfo",cmd="adb shell dumpsys SurfaceFlinger --latency SurfaceView[packageName]#0 | sed -n ‘1p‘ |awk -F \" \" ‘{print $1}‘")
#获取页面加载时长,计算页面渲染时间
#用户界面发送请求的时间 + 网络传输时间 + 服务端处理时间 (包括数据层的处理时间)+ 网络传输时间 + 用户端展示返回结果的时间
使用Thread的Timer启动定时任务,抓取性能数据:
import threading
import os
import sys
import subprocess
import time
import datetime
from threading import Event
‘‘‘
启动定时器,每隔一定的时间刷新adb数据
gflag:控制子线程结束时间,不然会子线程不会结束
result:返回结果列表
‘‘‘
class cmdTimer():
instance=None
def __new__(cls, *args, **kwargs):
if cls.instance is None:
cls.instance = super().__new__(cls)
return cls.instance
‘‘‘
interval:定时任务触发间隔
duration:执行时长
‘‘‘
def __init__(self,interval,duration):
self.interval=interval
self.duration=duration
self.gflag=True
self.result=[]
def run(self,cmd):
P=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,encoding="GBK")
tmp=P.stdout.readline().strip()
#通过adb获取的值计算内存的值mem
if "meminfo" in cmd:
tmp="{:.2f}".format(int(tmp)/1024/1024)
#通过adb获取的值计算FPS的值
if "SurfaceFlinger" in cmd:
tmp="{:.2f}".format(1000/(int(tmp)/1000/1000))
if self.gflag:
timer=threading.Timer(self.interval,self.run,[cmd])
timer.start()
self.result.append(tmp)
def runCmd(self,cmd):
timer=threading.Timer(self.interval,self.run,[cmd])
timer.start()
time.sleep(self.duration)
self.gflag=False
timer.cancel()
#print("The Timer running result is {}".format(str(self.result)))
return self.result
#测试
if __name__==‘__main__‘:
command="adb devices | sed -n ‘2p‘ | awk ‘{print $1}‘"
print(cmdTimer(1,10).runCmd(command))
利用Process,同时抓取CPU/Mem等性能数据。
import random
import setting
import multiprocessing
import CmdTimer
‘‘‘
在测试案例运行中获取安卓包下的性能数据并收集
关键点一: 多进程是并行执行,
相当于分别独立得处理各个请求。
关键点二: 多线程,虽然不能并行运行,
但是可以通过避开阻塞切换线程
来实现并发的效果,并且不浪费cpu
‘‘‘
#不同Process 使用相同的队列
q = multiprocessing.Queue()
jobs = []
def getResult(cmd,q):
t=CmdTimer.cmdTimer(1,20).runCmd(cmd)
#print(t)
q.put(t)
def data_collect():
package="XXXXX"
#deviceId_cmd=setting.deviceId.cmd
cpu_cmd=setting.cpuinfo.cmd.replace("packageName",package)
fps_cmd=setting.fpsinfo.cmd.replace("packageName",package)
mem_cmd=setting.meminfo.cmd.replace("packageName",package)
‘‘‘
这里args必须是tuple
target书写的时候不能添加()
‘‘‘
p1 = multiprocessing.Process(target=getResult,args=(cpu_cmd,q))
p2 = multiprocessing.Process(target=getResult,args=(fps_cmd,q))
p3 = multiprocessing.Process(target=getResult,args=(mem_cmd,q))
# jobs.append(p1)
# jobs.append(p2)
# jobs.append(p3)
#
# for p in jobs:
# p.start()
#
# for p in jobs:
# p.join()
#
# results = [q.get() for j in jobs]
# return results
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
cpuResult=q.get()
fpsResult=q.get()
memResult=q.get()
return cpuResult,fpsResult,memResult
if __name__==‘__main__‘:
print(data_collect())
原文:https://www.cnblogs.com/judith0719/p/14868077.html