性能测试工具 性能测试工具 Locust 源码浅析

上帝De助手 · 2019年07月10日 · 最后由 萝卜 回复于 2019年10月14日 · 3684 次阅读

近期由于有项目需要做性能评测,于是半道出家的我便从选择性能测试工具,开始了我的性能之旅。

为什么要做工具评测

作为性能测试的老司机们而言,要么对各大性能测试工具的特性都了然于心了,要么已经使用 “惯” 了手头上的工具;他们是不会没事做个性能评测的,只有新手们才会认认真真的、按部就班的从第一步走起。

而对于性能测试而言,首要的任务自然是选择工具了。所以就有了性能测试工具评测这一趴!

为什么要解析 Locust 源码

由于 Python 是我的主语言,所以在选择性能工具评测的时候,自然是会多 “关照” 下 Locust 了。因为对评测的结果不是很满意,所以就乘着兴致顺便看了下源码。而本文就是对 Locust 源码解析的简述。

Locust 的执行流程

首先,来看下 Locust 的执行命令如下:

locust -f performan.py --host http://www.testqa.cn --no-web -c 10 -r 5 -t 50s
# 执行performan.py进行性能测试,并发数为10,每秒启动5个并发,执行时间为50秒

那么执行了这一条语句后,Locust 究竟在后台做了哪些事情呢?请看下面的程序执行流程。

|--Python进程
    |--主线程
        |--参数解析(-f、--host等)
        |--性能场景脚本(-f参数后的文件名)加载、分析VUser数量
        |--协程1(local、master、slave)
            |--计算各VUser的并发数占比(按VUser的权重)
            |--生成VUser启动列表
            |--启动VUser协程组
            |        |--子协程1(对应一个VUser)
            |        |--...
            |        |--子协程N
            |               |--获取VUser任务集
            |               |--循环执行任务(顺序、按权重)
            |               |       |--嵌套执行子任务
            |               |--执行指定时间后停止(需设定)
            |
            |--协程组阻塞等待

接下来,我们一个个的来过一下。首先启动进程和主线程这个不用讲,所有的程序都是一样的。然后再是参数解析,这个也是大多数程序都会提供的常规逻辑。

在解析-f 参数成功之后(没有指定-f 参数则不会启动成功),会去自动的导入该脚本模块;再通过 python 的自省能力来检查脚本中的 VUser 类,主要检查继承自 Locust 且带有 task_set 属性的子类;一个子类相当于一个 VUser。通过-l 参数则可以直接列出脚本中所有的 VUser 名称且不会执行脚本。

当 VUser 类都检查完毕之后,会把这些 VUser 类收集到一个列表中去;之后就会根据指定的启动模式(local、no-web、master、slave)来启动一个协程,并且会把 VUser 列表和解析后的命令行参数内容都作为参数传递过去。

在该协程中会先计算各 VUser 的权重,这会影响 VUser 被执行的次数。具体的实现代码如下:

def weight_locusts(self, amount, stop_timeout = None):
    """
    Distributes the amount of locusts for each WebLocust-class according to it's weight
    returns a list "bucket" with the weighted locusts
    """
    bucket = []
    weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
    for locust in self.locust_classes:
        ... # 部分代码省略
        # create locusts depending on weight
        percent = locust.weight / float(weight_sum)
        num_locusts = int(round(amount * percent))
        bucket.extend([locust for x in xrange(0, num_locusts)])
    return bucket

代码的主要实现逻辑是,先把所有的权重数都加起来求总和,再计算每个 VUser 的权重百分比;接着用总的 VUser 数乘以这个百分比后取整,就得到了该 VUser 需要启动数量,最后把指定数量的 VUser 都填充到队列中再返回。举个例子:

VUser1.weight = 1
VUser2.weight = 2
VUser3.weight = 3
假设现在需要启动12个VUser,那么各VUser的数量是:
VUser1  => 1/(1+2+3)*12=2
VUser2  => 2/(1+2+3)*12=4
VUser3  => 3/(1+2+3)*12=6
经过这个函数处理之后,会得到一个如下的列表:
[VUser1, VUser1, VUser2, VUser2, VUser2, VUser2, VUser3, VUser3, VUser3, VUser3, VUser3, VUser3]

拿到这个 VUser 启动列表之后,会依次随机 pop 一个 VUser 类,然后新起一个协程来实例化它,实例之后调用它的 run 方法开始执行该 VUser 的任务内容,直到所有 VUser 都实例化完成。

from gevent.pool import Group
...
self.locusts = Group()
...
locust = bucket.pop(random.randint(0, len(bucket)-1))
occurence_count[locust.__name__] += 1
def start_locust(_):
    try:
        locust().run(runner=self)
    except GreenletExit:
        pass
new_locust = self.locusts.spawn(start_locust, locust)
...
if wait:
    self.locusts.join()

实例化 VUser 的协程会在一个协程组内,该协程组会根据外部参数确定是否阻塞主线程。

VUser 的执行流程

上面介绍了 Locust 从启动后,开始执行性能测试的整体流程。而在这个整体流程内其实还包含另外一个子流程,就是 VUser 执行任务的流程。在介绍具体的流程之前,可以先看下 Locust 的脚本文件样例:

from locust import HttpLocust, TaskSet, task

class WebsiteTasks(TaskSet):
    @task
    def index(self):
        self.client.get("/")

class WebsiteUser(HttpLocust):
    task_set = WebsiteTasks
    host = "https://www.baidu.com"
    min_wait = 5000
    max_wait = 15000

这是一个最简答的 Locust 的性能脚本文件,其中 WebsiteUser 就代表了 VUser,它具备成为 VUser 的 2 个充要条件:Locust 的子类、具有 task_set 属性且为真。(HttpLocust 是 Locust 的子类)

task_set 就是该 VUser 要执行的请求任务集合,这个集合里面可以有 1 或 N 个任务,还可以包含子任务集;子任务集还可以包含任务和子子任务集,所以任务集是可以嵌套的。

而 VUser 在实例化之后,通过调用 run 方法就会开始执行真正的请求任务。整体的示意流程如下:

|--VUser
    |--思考时间(默认1秒)
    |--host
    |--client(可自定义)
    |--任务集
        |--子任务集
        |--普通任务
            |--request(requests.session)
            |   |--get
                |--post
            |--check
            |--result(response.success)

首先会存储相关的执行属性,比如:思考时间,host 等。这里需要注意的是,Locust 默认会把思考时间设置为 1 秒,所以如果你不期望有思考时间,那么你最好显式的把 min_wait 和 max_wait 都设置为 0。

与此同时还会实例化真正的请求客户端,以便于在后面直接可以用来发送请求,而默认 Locust 发送请求的客户端其实就是 requests。具体源码如下:

import requests
...
class HttpSession(requests.Session):
...
class HttpLocust(Locust):
    client = None
    def __init__(self):
        super(HttpLocust, self).__init__()
        if self.host is None:
            raise LocustError("You must specify the base host. Either in the host attribute in the Locust class, or on the command line using the --host option.")

        self.client = HttpSession(base_url=self.host)   # 实例化发送请求的client

这些准备工作都完成之后,就开始实例化 task_set 变量对应的类,也就是样例代码中的 WebsiteTasks 类(TaskSet 的子类);接着会调用 TaskSet 实例的 run 方法来执行所有的任务。

在 TaskSet.run 方法内,会先检查是否有 on_start 方法,如果有会执行它;然后会进入一个 while 死循环,循环内每次会获取一个要执行的任务并执行完成,直到执行时间结束或者主动中断。

在获取执行任务的逻辑中会分 2 种情况:一种是随机,另一种是按顺序。这主要取决于你在标注任务方法时,使用的是@task装饰器,还是@seq_task装饰器。除此之外,task 也是有权重的概念,通常权重越高的 task 被执行的概率就越高。

...
    for item in six.itervalues(classDict):
        if hasattr(item, "locust_task_weight"):
            for i in xrange(0, item.locust_task_weight):
                new_tasks.append(item)
    classDict["tasks"] = new_tasks  
...
def get_next_task(self):
    return random.choice(self.tasks)

这个是随机获取任务的实现片段,首先在生成 tasks 列表的时候,会根据任务的 locust_task_weight 属性值来添加同等数量的任务;之后在获取任务的时候,直接使用随机函数从 tasks 列表中获取即可。因为权重越高在 tasks 列表中出现的次数就越多,所以被随机选到的概率就越高。(随机权重轮询算法)

class TaskSequence(TaskSet):
    def __init__(self, parent):
        super(TaskSequence, self).__init__(parent)
        self._index = 0
        self.tasks.sort(key=lambda t: t.locust_task_order if hasattr(t, 'locust_task_order') else 1)

    def get_next_task(self):
        task = self.tasks[self._index]
        self._index = (self._index + 1) % len(self.tasks)
        return task

这个是顺序获取任务的实现,首先在实例化时就根据 locust_task_order 属性对任务进行排序,之后获取任务的 get_next_task 方法内会按照索引依次获取任务,并且支持无限循环的获取方式。

最后则是执行具体任务的逻辑,任务分 3 种:TaskSet 实例的成员方法;子任务集;普通函数。根据任务类型的不同,会执行相应的调用操作:

  • 如果是 TaskSet 成员方法,会直接调用
  • 如果是子任务集,会递归调用子任务集的 run 方法
  • 如果是普通函数,会直接调用并把 Locust 实例作为第一参数
def execute_task(self, task, *args, **kwargs):
    # check if the function is a method bound to the current locust, and if so, don't pass self as first argument
    if hasattr(task, "__self__") and task.__self__ == self:
        # task is a bound method on self
        task(*args, **kwargs)
    elif hasattr(task, "tasks") and issubclass(task, TaskSet):
        # task is another (nested) TaskSet class
        task(self).run(*args, **kwargs)
    else:
        # task is a function
        task(self, *args, **kwargs)

需要注意的是:TaskSet 中的任务集在实例初始化时都被组装到了 tasks 成员列表内,而 tasks 列表中即可能包含普通任务,也可能包含子任务集。

小结

分析到这里其实会发现 Locust 的逻辑还是蛮清晰的,这些主要逻辑只包含在 2 个文件中。而通过源码分析也解答了我的一个疑惑,就是虽然各 VUser 之间是并发执行的,但是 VUser 内的请求确实顺序执行的。

而这与浏览器行为是有所差异的,现代浏览器通常可以支持同时 6-8 个并发请求。正是因为想解开这个迷惑,所以才有查看 Locust 代码的想法;显然它和 Jmeter 是一样的,VUser 内的请求是顺序的。

PS:除了源码分析,还对 Locust 进行了性能评测及优化实验,感觉兴趣的同学可以关注公众号,后期会分享相关文章哦!!

获取更多关于 Python 和自动化测试的文章,请扫描如下二维码!

共收到 5 条回复 时间 点赞

👍之前有过计划写 locust 的源码解析,一直搁置了。看了这篇我就不用写啦😀

debugtalk 回复

这篇文章只是把流程梳理了出来,还有很多其它的细节方面都没有覆盖到。_^

感觉还厉害啊 差距。。。

赞,我还是比较喜欢用 locust 的,远没有楼主透彻

上帝De助手 Locust 性能评测及优化详解 中提及了此贴 07月22日 12:52

这个东西官方也无法解释与实际直不符的现象,还是用一些其它的工具,比如解析 jmeter 源码

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册