性能测试工具 Locust 源码阅读及特点思考 (二)

zyanycall · January 30, 2018 · Last by iTestOps replied at July 16, 2019 · 4841 hits

写帖子对我学习locust源码也是一种监督,也是对自己学习过程的记录便于之后查看。
希望大家指出我的不足之处,多谢。

github:[https://github.com/locustio/locust]

runners.py

本身这个文件有400多行代码,其中包含了很多locust的细节信息。
它是Locust执行请求的实现类。

weight_locusts(self, amount, stop_timeout = None)

手写汉字注释,加入里面。

def weight_locusts(self, amount, stop_timeout = None):
"""
Distributes the amount of locusts for each WebLocust-class according to it's weight
returns a list "
bucket" with the weighted locusts
"""

# 返回值是个数组,装载复制的用例的压力请求
bucket = []
# weight_sum 是用例中的所有weight值的综合,weight代表权重值。
weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
# 可以有多个用例。
for locust in self.locust_classes:
# 一些判断略过
if not locust.task_set:
warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)
continue

if self.host is not None:
locust.host = self.host
if stop_timeout is not None:
locust.stop_timeout = stop_timeout

# create locusts depending on weight
# 在循环中这是一个用例,percent 意味着这个用例在总体权重中的比例。
percent = locust.weight / float(weight_sum)
# 比如是设置了1000个用户,根据权重比例,计算出1000个用户中的多少个用户来执行这个用例。
num_locusts = int(round(amount * percent))
# 复制并添加到结果集中
bucket.extend([locust for x in xrange(0, num_locusts)])
return bucket

weight_locusts 是根据权重计算出要使用的用户数。

spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False)

虽然spawn是卵的意思,但是这个方法中包含了用例的执行。
hatch_rate 的解释:The rate per second in which clients are spawned. Only used together with --no-web
可以看到,无限循环运行之前,计算了停止的时间(sleep_time = 1.0 / self.hatch_rate),也就是说利用了sleep来达到每秒运行多少用户的效果。
用例的run()方法是在core.py中定义的,self.task_set(self).run(),可见就是用例执行。

def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):
if spawn_count is None:
spawn_count = self.num_clients

# 计算后的用户数,实际执行的用户数。
bucket = self.weight_locusts(spawn_count, stop_timeout)
spawn_count = len(bucket)
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.state = STATE_HATCHING
self.num_clients = spawn_count
else:
self.num_clients += spawn_count

logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))
occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])

# 定义执行的方法
def hatch():
sleep_time = 1.0 / self.hatch_rate
while True:
if not bucket:
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
events.hatch_complete.fire(user_count=self.num_clients)
return

# 将用例弹出来
locust = bucket.pop(random.randint(0, len(bucket)-1))
occurence_count[locust.__name__] += 1
# 定义启动的方法,可以看到是执行run()方法
def start_locust(_):
try:
locust().run()
except GreenletExit:
pass

# 协程的执行方法,也是Group()spawn
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
# 睡眠即等待指定时间。
gevent.sleep(sleep_time)

hatch()
if wait:
self.locusts.join()
logger.info("All locusts dead\n")

kill_locusts(self, kill_count)

kill掉多少个locust,即用户/用例。
代码比较简单,干了几件事情:
1.根据权重计算出要干掉多少个用户。
2.被干掉的用户在协程池子中停掉,并从权重池子中弹出。
感觉这部分代码写的冗余了,没必要有三个循环。

bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
for l in bucket:
if l == g.args[0]:
dying.append(g)
bucket.remove(l)
break
for g in dying:
self.locusts.killone(g)
# 收尾工作,主要是提示给页面和打日志
events.hatch_complete.fire(user_count=self.num_clients)

start_hatching(self, locust_count=None, hatch_rate=None, wait=False)

这个就不贴代码了,内在基本都是上面的方法。
主要功能是判断已经启动了多少用户,启动的状态是什么等,来决定是启动(spawn_locusts) 还是停止(kill_locusts)多少个用户。

class LocalLocustRunner(LocustRunner)

本地的runner,继承了上面的LocustRunner类。
定义了start_hatching方法,代码还是比较简单的,主要是定义协程主体。

DistributedLocustRunner(LocustRunner)

分布式的runner,也是继承了LocustRunner类,功能更少,主要是设置socket链接用的端口号,IP等。

class MasterLocustRunner(DistributedLocustRunner)

主节点的runner,继承了上面的DistributedLocustRunner类。
主要的功能就是收集各个分布式节点的状态,让各个分布式节点干事情,收集各个分布式节点的请求内容好汇总。
主要代码节选:

self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

这还是使用协程greenlet来触发监听子节点的动作。
可以看到这里是用了 rpc.Server 来进行各个分布式之间的通信的。
rpc定义来自 zmqrpc.py 文件。
本身是使用的 zmq 来通信的,import zmq.green as zmq。
zmq被传闻是最快的MQ,是因为没有中转站,是直接生产者和消费者相连,那肯定很快……具体请自行百度。
这里可以说,作者真是利用了很多已有的三方包啊。

self.num_clients = locust_count
slave_num_clients = locust_count // (num_slaves or 1)
slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)
remaining = locust_count % num_slaves

节选这段代码的意义在于,我想告诉大家,Locust对每个分布式节点是不存在权重的。
即这里说的很清楚,每个分布式节点slave处理的用户数量是一样的,是平均的。
如果你硬件比较好,那么需要多分配一些slave,这部分的“权重”配比是手工操作了。

for client in six.itervalues(self.clients):
data = {
"hatch_rate":slave_hatch_rate,
"num_clients":slave_num_clients,
"host":self.host,
"stop_timeout":None
}

if remaining > 0:
data["num_clients"] += 1
remaining -= 1

self.server.send(Message("hatch", data, None))

six是兼容python2 和 3 的桥梁,也就是Locust是支持python2 和 3的。
这段代码的意义在于,master触发slave的任务,是自己发送命令,如:slave_hatch_rate,你启动吧!
而不是告诉slave具体的要做什么事情。
这也解释了,slave节点为什么一定要有用例文件。
带来的问题是,如果用例文件有变动,每个slave节点的都要变。(当然这有方式解决,比如通过挂载盘)。

其它就没什么了,继续看slave的类。

SlaveLocustRunner(DistributedLocustRunner)

slave节点也是继承了DistributedLocustRunner,和master节点的本源一样。

self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000)).encode('utf-8')).hexdigest()

从这里可以看到,每一个client_id 是如何唯一化的。
中间一些代码和master的类似,略过。

def worker(self):
while True:
msg = self.client.recv()
if msg.type == "hatch":
self.client.send(Message("hatching", None, self.client_id))
job = msg.data
self.hatch_rate = job["hatch_rate"]
#self.num_clients = job["num_clients"]
self.host = job["host"]
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg.type == "stop":
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
self.client.send(Message("client_ready", None, self.client_id))
elif msg.type == "quit":
logger.info("Got quit message from master, shutting down...")
self.stop()
self.greenlet.kill(block=True)

slave里面定义了一个worker方法。
其作用是比较明显的,就是用无限循环,一直在接受master的消息。
如果得到的消息是hatch(干活),就干活,还是用的协程(self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"])))。
如果是stop或者quit,那就停止。

slave一直通过zmq向master发送各种状态结果。

对Locust的思考为避免篇幅太长,后续展开。

Locust的源码思考链接:

Locust 源码阅读及特点思考 (三)

共收到 11 条回复 时间 点赞
zyanycall Locust 源码阅读及特点思考 (一) 中提及了此贴 30 Jan 15:19

楼主可否对比一下jmeter和locust的优劣呢? 可以给新人一个参考

孙高飞 回复

BlazeMeter做过一个比较,个人觉得还算中肯😃 :
url: https://www.blazemeter.com/blog/jmeter-vs-locust-which-one-should-you-choose

孙高飞 回复

我会稍微介绍一下的。

华全 回复

这篇文章粗略看了一下,说实话,他讲的和没讲一样,他基本没说性能测试的痛点,是每个都夸了一下。
我怀疑他是否真正做过性能测试。
如果我分析的话,会接地气一点儿吧。

你好,有个问题一直搞不明白,想请教下。

events.py文件中locust_start_hatching是在哪里设置handle的?一直没有看到。

想请教一下楼主,协程没有了多线程的上下文切换,从理论上来说运行效率应该是高的,我在我本机做了一个实验,往应用服务器发送10000次http请求并收到请求,10个线程并发用了8秒左右的时间,用单线程协程,采用异步通讯,用了6秒左右的时间,locust的效率为何还不如jmeter呢?

lovetest007 回复

1.主要是python语言和Java语言(解释型和编译型)自身的性能差距吧。还有一些其它的优化原因,主要有JVM的垃圾回收算法比python的先进,JVM的JIT即时编译。
2.还有关于你的测试数据,10个线程这种体量的并发,太弱了,你可以试试单线程搞10000次,可能都比10个线程的好。我们讲压力测试都是几百几千甚至集群的上万的线程这种压力。
3.还有真正的压测,脚本逻辑都是比较复杂的,比如创造数据,数据关联等等,逻辑和处理都会比较复杂,这时候python语言和Java语言的自身性能差异就会更明显了。换句话说,你的测试数据,对于真正的压测场景,代表性还有待商榷。
4.关于协程和线程这个还不好说的,协程相当于把用户端来处理多线程(一个线程达到多线程切换的效果),多线程是操作系统来处理多线程(用户端程序不管这个事情直接调用操作系统的接口来实现多线程)。Java没有协程是有原因的,因为用户端来处理多任务的切换逻辑,毕竟是在操作系统上层搞的事情,一是需要对操作系统CPU命令等非常了解,二是要适配多种环境多种操作系统多种CPU,成本是很高的。Java是讲究跨平台和高性能(接近于C的性能)的,协程Java觉得不行。总之,就是不要太迷恋协程的性能,python不存在多线程的,它是只能用协程。建议还是具体问题具体环境具体分析。python语言的优势在书写和语法糖,不在性能。
5.说了这么多,其实压力端对性能的要求很高的,一般语言不能胜任。如果你有兴趣,可以搞搞GO语言的压力端,GO语言是支持协程的。locust的作者自己都说高压力的压测要求locust不支持,咱们就别纠结这个了。

请问,locust作者有在哪里介绍说高压力的压测,locust不支持么?可以发我看一下么

chen 回复


作者说的很清楚,如果要用locust做高压力测试就堆硬件,毕竟硬件比人工便宜。如果硬件资源有限制,则推荐Jmeter。

lovetest007 回复

locust 默认了1s等待时间,min_wait=0,max_wait=0试试看。

zyanycall 回复

高压力测试 分布式 都是需要堆硬件的。

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up