使用pip安装即可(当前版本2.2.1,需要python版本3.6以上)
pip3 install locust
查看版本locust -V
文件名locustfile.py(文件命名需要为此,或者放在locustfiles文件夹下)
import time
from locust import HttpUser, task, between
# 这里为所有虚拟用户定义了一个继承自HttpUser的类,每个虚拟用户都提供了一个client属性
# 该属性是HttpSession的实例,可以用于向我们需要测试的目标发起http请求
class QuickStartUser(HttpUser):
wait_time = between(1, 5) # 模拟用户在每个任务执行后等待1-5秒
@task # task 任务,对于每个正在运行的用户,locust都会创建一个greenlet(协程)
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
@task(3) # 这是第二个task,后面的3表示权重,运行QuickStartUser时,会从多个task任务中随机选择一个,权重增加了他的选择几率
def view_items(self):
for item_id in range(10):
self.client.get(f"/item?id={item_id}", name="/item") # 统计信息是按照URL来分组,这里是为了将这些链接都归于item组内
time.sleep(1)
def on_start(self): # 每个用户启动都会调用此方法 on_stop则是每个用户停止时运行
self.client.post("/login", json={"username": "foo", "password": "bar"})
当测试开始之后,locust会为每个创建一个HttpSession的实例,每个用户都运行在自己的green gevent thread下
web ui方式执行
在命令行中输入locust
即可
然后在本地浏览器打开http://localhost:8089/
,就可以设置用户数,启动数,host了。
命令行方式执行
locust --headless --users 10 --spawn-rate 1 -H http://your-server.com
当使用CI服务执行时
locust -f locust_files/my_locust_file.py --headless -u 1000 -r 100
# -f 指定文件
# -u 指定生成的用户数
# -r 指定生成的速率(即每秒启动的用户数)在测试运行时,可以手动更改用户计数,即使在加速完成后也是如此。按 w 添加 1 个用户或按 W 添加 10。按 s 删除 1 或按 S 删除 10。
# --run-time 指定测试的运行时间 例如--run-time 1h30m 这里时间到立马关闭,可能还有部分请求没有完成
# --stop-timeout <seconds> 可以配合上一个使用,等待最后一个任务在指定时间内完成迭代
# --expect-workers 分布式启动来指定节点
设置退出码
例如如果满足以下任何条件,则将退出代码设置为非零:
import logging
from locust import events
@events.quitting.add_listener
def _(environment, **kw):
if environment.stats.total.fail_ratio > 0.01:
logging.error("Test failed due to failure ratio > 1%")
environment.process_exit_code = 1
elif environment.stats.total.avg_response_time > 200:
logging.error("Test failed due to average response time ratio > 200 ms")
environment.process_exit_code = 1
elif environment.stats.total.get_response_time_percentile(0.95) > 800:
logging.error("Test failed due to 95th percentile response time > 800 ms")
environment.process_exit_code = 1
else:
environment.process_exit_code = 0
等待时间属性
使用wait_time
方法,可以很简单的引入延迟,如果没有此方法则下一个任务立即执行,这一点有点类似loadrunner中的think_time
constant
固定时间内between
在最大和最小值之间的随机时间constant_throughput
确保任务每秒运行(最多)多少次的自适应时间constant_pacing
确保任务每多少秒运行一次的自适应时间例如我想每秒运行500次任务迭代,可以使用wait_time = constant_throughput(1)
和500的用户数
等待时间适用于任务,而不是请求,例如我指定wait_time = constant_throughput(2)
,并且每个任务里面有两个请求,那么每秒请求数(RPS)则为4
权重属性
如果文件中存在多个用户类,默认会生成相同数量的每个用户类。可以通过将类名作为参数传递来指定使用哪些类,例如
locust -f locust_file.py WebUser MobileUser
对每个类指定权重
class WebUser(User):
weight = 3
...
class MobileUser(User):
weight = 1
...
主机属性
主机属性就是要加载的服务器URL的前缀(类似"http://www.baidu.com"),一般在web ui上的host指定或者命令行中指定--host
。
如果在user类中声明了一个host属性,那么当没有--host
或者在web ui上的host指定时生效
环境属性
environment
是对用户正在运行的环境的引用,可以与当前环境进行互动,或者是runner,例如停止运行
self.environment.runner.quit()
如果是独立的locust实例,则此时整个停止运行。如果是在Node上运行,则会停止这个node
验证响应
如果 HTTP 响应代码正常(<400),则认为请求成功,但对响应进行一些额外验证通常很有用。
可以使用catch_response参数、with语句和对response.failure()的调用将请求标记为失败
with self.client.get("/", catch_response=True) as response:
if response.text != "Success":
response.failure("Got wrong response")
elif response.elapsed.total_seconds() > 0.5:
response.failure("Request took too long")
还可以将请求标记为成功,即使响应代码是错误的
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
response.success()
甚至可以通过抛出异常然后在with块之外捕获它来完全避免记录请求。或者可以抛出一个locust异常,就像下面的例子一样,让Locust捕获它
from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
raise RescheduleTask()
初始化
在每个用户之前初始化是on_start
函数,在每个用户之后初始化是on_stop
函数。
在每个进程之前初始化是init事件,方便在分布式模式中使用。
from locust import events
from locust.runners import MasterRunner
@events.init.add_listener
def on_locust_init(environment, **kwargs):
if isinstance(environment.runner, MasterRunner):
print("I‘m on master node")
else:
print("I‘m on a worker or standalone node")
在整个负载测试开始或者停止时运行可以使用test_start
和test_stop
事件
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("A new test is starting")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("A new test is ending")
@tag装饰器
对于@tag装饰的任务,可以使用--tags
和--exclude-tags
参数对在测试期间执行的任务进行排除,例如
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag(‘tag1‘)
@task
def task1(self):
pass
@tag(‘tag1‘, ‘tag2‘)
@task
def task2(self):
pass
@tag(‘tag3‘)
@task
def task3(self):
pass
@task
def task4(self):
pass
执行task2和task3为--tags tag2 tag3
排除执行task3为--exclude-tags tag3
原文:https://www.cnblogs.com/xuh-bky/p/15259402.html