性能测试工具 Locust 源码阅读及特点思考 (二)

zyanycall · 2018年01月30日 · 最后由 织梦 回复于 2019年07月16日 · 4307 次阅读

写帖子对我学习 locust 源码也是一种监督,也是对自己学习过程的记录便于之后查看。
希望大家指出我的不足之处,多谢。

github:[https://github.com/locustio/locust]

runners.py

本身这个文件有 400 多行代码,其中包含了很多 locust 的细节信息。
它是 Locust 执行请求的实现类。

weight_locusts(self, amount, stop_timeout = None)

手写汉字注释,加入里面。

def weight_locusts(self, amount, stop_timeout = None):
    """
    Distributes the amount of locusts for each WebLocust-class according to it's weight
    returns a list "bucket" with the weighted locusts
    """
        # 返回值是个数组,装载复制的用例的压力请求
    bucket = []
        # weight_sum 是用例中的所有weight值的综合,weight代表权重值。
    weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
        # 可以有多个用例。
    for locust in self.locust_classes:
                # 一些判断略过
        if not locust.task_set:
            warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)
            continue

        if self.host is not None:
            locust.host = self.host
        if stop_timeout is not None:
            locust.stop_timeout = stop_timeout

        # create locusts depending on weight
                # 在循环中这是一个用例,percent 意味着这个用例在总体权重中的比例。
        percent = locust.weight / float(weight_sum)
                # 比如是设置了1000个用户,根据权重比例,计算出1000个用户中的多少个用户来执行这个用例。
        num_locusts = int(round(amount * percent))
                # 复制并添加到结果集中
        bucket.extend([locust for x in xrange(0, num_locusts)])
    return bucket

weight_locusts 是根据权重计算出要使用的用户数。

spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False)

虽然 spawn 是卵的意思,但是这个方法中包含了用例的执行。
hatch_rate 的解释:The rate per second in which clients are spawned. Only used together with --no-web
可以看到,无限循环运行之前,计算了停止的时间(sleep_time = 1.0 / self.hatch_rate),也就是说利用了 sleep 来达到每秒运行多少用户的效果。
用例的 run() 方法是在 core.py 中定义的,self.task_set(self).run(),可见就是用例执行。

def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):
    if spawn_count is None:
        spawn_count = self.num_clients

    # 计算后的用户数,实际执行的用户数。
    bucket = self.weight_locusts(spawn_count, stop_timeout)
    spawn_count = len(bucket)
    if self.state == STATE_INIT or self.state == STATE_STOPPED:
        self.state = STATE_HATCHING
        self.num_clients = spawn_count
    else:
        self.num_clients += spawn_count

    logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))
    occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])

     # 定义执行的方法
    def hatch():
        sleep_time = 1.0 / self.hatch_rate
        while True:
            if not bucket:
                logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
                events.hatch_complete.fire(user_count=self.num_clients)
                return

                    # 将用例弹出来
            locust = bucket.pop(random.randint(0, len(bucket)-1))
            occurence_count[locust.__name__] += 1
                    # 定义启动的方法,可以看到是执行run()方法
            def start_locust(_):
                try:
                    locust().run()
                except GreenletExit:
                    pass

                    # 协程的执行方法,也是Group()的spawn
            new_locust = self.locusts.spawn(start_locust, locust)
            if len(self.locusts) % 10 == 0:
                logger.debug("%i locusts hatched" % len(self.locusts))
                    # 睡眠即等待指定时间。
            gevent.sleep(sleep_time)

    hatch()
    if wait:
        self.locusts.join()
        logger.info("All locusts dead\n")

kill_locusts(self, kill_count)

kill 掉多少个 locust,即用户/用例。
代码比较简单,干了几件事情:
1.根据权重计算出要干掉多少个用户。
2.被干掉的用户在协程池子中停掉,并从权重池子中弹出。
感觉这部分代码写的冗余了,没必要有三个循环。

bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
    for l in bucket:
        if l == g.args[0]:
            dying.append(g)
            bucket.remove(l)
            break
for g in dying:
    self.locusts.killone(g)
# 收尾工作,主要是提示给页面和打日志
events.hatch_complete.fire(user_count=self.num_clients)

start_hatching(self, locust_count=None, hatch_rate=None, wait=False)

这个就不贴代码了,内在基本都是上面的方法。
主要功能是判断已经启动了多少用户,启动的状态是什么等,来决定是启动(spawn_locusts)还是停止(kill_locusts)多少个用户。

class LocalLocustRunner(LocustRunner)

本地的 runner,继承了上面的 LocustRunner 类。
定义了 start_hatching 方法,代码还是比较简单的,主要是定义协程主体。

DistributedLocustRunner(LocustRunner)

分布式的 runner,也是继承了 LocustRunner 类,功能更少,主要是设置 socket 链接用的端口号,IP 等。

class MasterLocustRunner(DistributedLocustRunner)

主节点的 runner,继承了上面的 DistributedLocustRunner 类。
主要的功能就是收集各个分布式节点的状态,让各个分布式节点干事情,收集各个分布式节点的请求内容好汇总。
主要代码节选:

self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

这还是使用协程 greenlet 来触发监听子节点的动作。
可以看到这里是用了 rpc.Server 来进行各个分布式之间的通信的。
rpc 定义来自 zmqrpc.py 文件。
本身是使用的 zmq 来通信的,import zmq.green as zmq。
zmq 被传闻是最快的 MQ,是因为没有中转站,是直接生产者和消费者相连,那肯定很快……具体请自行百度。
这里可以说,作者真是利用了很多已有的三方包啊。

self.num_clients = locust_count
slave_num_clients = locust_count // (num_slaves or 1)
slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)
remaining = locust_count % num_slaves

节选这段代码的意义在于,我想告诉大家,Locust 对每个分布式节点是不存在权重的。
即这里说的很清楚,每个分布式节点 slave 处理的用户数量是一样的,是平均的。
如果你硬件比较好,那么需要多分配一些 slave,这部分的 “权重” 配比是手工操作了。

for client in six.itervalues(self.clients):
    data = {
        "hatch_rate":slave_hatch_rate,
        "num_clients":slave_num_clients,
        "host":self.host,
        "stop_timeout":None
    }

    if remaining > 0:
        data["num_clients"] += 1
        remaining -= 1

    self.server.send(Message("hatch", data, None))

six 是兼容 python2 和 3 的桥梁,也就是 Locust 是支持 python2 和 3 的。
这段代码的意义在于,master 触发 slave 的任务,是自己发送命令,如:slave_hatch_rate,你启动吧!
而不是告诉 slave 具体的要做什么事情。
这也解释了,slave 节点为什么一定要有用例文件。
带来的问题是,如果用例文件有变动,每个 slave 节点的都要变。(当然这有方式解决,比如通过挂载盘)。

其它就没什么了,继续看 slave 的类。

SlaveLocustRunner(DistributedLocustRunner)

slave 节点也是继承了 DistributedLocustRunner,和 master 节点的本源一样。

self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000)).encode('utf-8')).hexdigest()

从这里可以看到,每一个 client_id 是如何唯一化的。
中间一些代码和 master 的类似,略过。

def worker(self):
    while True:
        msg = self.client.recv()
        if msg.type == "hatch":
            self.client.send(Message("hatching", None, self.client_id))
            job = msg.data
            self.hatch_rate = job["hatch_rate"]
            #self.num_clients = job["num_clients"]
            self.host = job["host"]
            self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
        elif msg.type == "stop":
            self.stop()
            self.client.send(Message("client_stopped", None, self.client_id))
            self.client.send(Message("client_ready", None, self.client_id))
        elif msg.type == "quit":
            logger.info("Got quit message from master, shutting down...")
            self.stop()
            self.greenlet.kill(block=True)

slave 里面定义了一个 worker 方法。
其作用是比较明显的,就是用无限循环,一直在接受 master 的消息。
如果得到的消息是 hatch(干活),就干活,还是用的协程(self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"])))。
如果是 stop 或者 quit,那就停止。

slave 一直通过 zmq 向 master 发送各种状态结果。

对 Locust 的思考为避免篇幅太长,后续展开。

Locust 的源码思考链接:

Locust 源码阅读及特点思考 (三)

共收到 11 条回复 时间 点赞
zyanycall Locust 源码阅读及特点思考 (一) 中提及了此贴 01月30日 15:19

楼主可否对比一下 jmeter 和 locust 的优劣呢? 可以给新人一个参考

孙高飞 回复

BlazeMeter 做过一个比较,个人觉得还算中肯😃 :
url: https://www.blazemeter.com/blog/jmeter-vs-locust-which-one-should-you-choose

孙高飞 回复

我会稍微介绍一下的。

华全 回复

这篇文章粗略看了一下,说实话,他讲的和没讲一样,他基本没说性能测试的痛点,是每个都夸了一下。
我怀疑他是否真正做过性能测试。
如果我分析的话,会接地气一点儿吧。

你好,有个问题一直搞不明白,想请教下。

events.py 文件中 locust_start_hatching 是在哪里设置 handle 的?一直没有看到。

想请教一下楼主,协程没有了多线程的上下文切换,从理论上来说运行效率应该是高的,我在我本机做了一个实验,往应用服务器发送 10000 次 http 请求并收到请求,10 个线程并发用了 8 秒左右的时间,用单线程协程,采用异步通讯,用了 6 秒左右的时间,locust 的效率为何还不如 jmeter 呢?

lovetest007 回复

1.主要是 python 语言和 Java 语言(解释型和编译型)自身的性能差距吧。还有一些其它的优化原因,主要有 JVM 的垃圾回收算法比 python 的先进,JVM 的 JIT 即时编译。
2.还有关于你的测试数据,10 个线程这种体量的并发,太弱了,你可以试试单线程搞 10000 次,可能都比 10 个线程的好。我们讲压力测试都是几百几千甚至集群的上万的线程这种压力。
3.还有真正的压测,脚本逻辑都是比较复杂的,比如创造数据,数据关联等等,逻辑和处理都会比较复杂,这时候 python 语言和 Java 语言的自身性能差异就会更明显了。换句话说,你的测试数据,对于真正的压测场景,代表性还有待商榷。
4.关于协程和线程这个还不好说的,协程相当于把用户端来处理多线程(一个线程达到多线程切换的效果),多线程是操作系统来处理多线程(用户端程序不管这个事情直接调用操作系统的接口来实现多线程)。Java 没有协程是有原因的,因为用户端来处理多任务的切换逻辑,毕竟是在操作系统上层搞的事情,一是需要对操作系统 CPU 命令等非常了解,二是要适配多种环境多种操作系统多种 CPU,成本是很高的。Java 是讲究跨平台和高性能(接近于 C 的性能)的,协程 Java 觉得不行。总之,就是不要太迷恋协程的性能,python 不存在多线程的,它是只能用协程。建议还是具体问题具体环境具体分析。python 语言的优势在书写和语法糖,不在性能。
5.说了这么多,其实压力端对性能的要求很高的,一般语言不能胜任。如果你有兴趣,可以搞搞 GO 语言的压力端,GO 语言是支持协程的。locust 的作者自己都说高压力的压测要求 locust 不支持,咱们就别纠结这个了。

匿名 #9 · 2019年01月26日

请问,locust 作者有在哪里介绍说高压力的压测,locust 不支持么?可以发我看一下么


作者说的很清楚,如果要用 locust 做高压力测试就堆硬件,毕竟硬件比人工便宜。如果硬件资源有限制,则推荐 Jmeter。

lovetest007 回复

locust 默认了 1s 等待时间,min_wait=0,max_wait=0 试试看。

zyanycall 回复

高压力测试 分布式 都是需要堆硬件的。

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册