写帖子对我学习 locust 源码也是一种监督,也是对自己学习过程的记录便于之后查看。
希望大家指出我的不足之处,多谢。

github:[https://github.com/locustio/locust]

runners.py

本身这个文件有 400 多行代码,其中包含了很多 locust 的细节信息。
它是 Locust 执行请求的实现类。

weight_locusts(self, amount, stop_timeout = None)

手写汉字注释,加入里面。

def weight_locusts(self, amount, stop_timeout = None):
    """
    Distributes the amount of locusts for each WebLocust-class according to it's weight
    returns a list "bucket" with the weighted locusts
    """
        # 返回值是个数组,装载复制的用例的压力请求
    bucket = []
        # weight_sum 是用例中的所有weight值的综合,weight代表权重值。
    weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
        # 可以有多个用例。
    for locust in self.locust_classes:
                # 一些判断略过
        if not locust.task_set:
            warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)
            continue

        if self.host is not None:
            locust.host = self.host
        if stop_timeout is not None:
            locust.stop_timeout = stop_timeout

        # create locusts depending on weight
                # 在循环中这是一个用例,percent 意味着这个用例在总体权重中的比例。
        percent = locust.weight / float(weight_sum)
                # 比如是设置了1000个用户,根据权重比例,计算出1000个用户中的多少个用户来执行这个用例。
        num_locusts = int(round(amount * percent))
                # 复制并添加到结果集中
        bucket.extend([locust for x in xrange(0, num_locusts)])
    return bucket

weight_locusts 是根据权重计算出要使用的用户数。

spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False)

虽然 spawn 是卵的意思,但是这个方法中包含了用例的执行。
hatch_rate 的解释:The rate per second in which clients are spawned. Only used together with --no-web
可以看到,无限循环运行之前,计算了停止的时间(sleep_time = 1.0 / self.hatch_rate),也就是说利用了 sleep 来达到每秒运行多少用户的效果。
用例的 run() 方法是在 core.py 中定义的,self.task_set(self).run(),可见就是用例执行。

def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):
    if spawn_count is None:
        spawn_count = self.num_clients

    # 计算后的用户数,实际执行的用户数。
    bucket = self.weight_locusts(spawn_count, stop_timeout)
    spawn_count = len(bucket)
    if self.state == STATE_INIT or self.state == STATE_STOPPED:
        self.state = STATE_HATCHING
        self.num_clients = spawn_count
    else:
        self.num_clients += spawn_count

    logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))
    occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])

     # 定义执行的方法
    def hatch():
        sleep_time = 1.0 / self.hatch_rate
        while True:
            if not bucket:
                logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
                events.hatch_complete.fire(user_count=self.num_clients)
                return

                    # 将用例弹出来
            locust = bucket.pop(random.randint(0, len(bucket)-1))
            occurence_count[locust.__name__] += 1
                    # 定义启动的方法,可以看到是执行run()方法
            def start_locust(_):
                try:
                    locust().run()
                except GreenletExit:
                    pass

                    # 协程的执行方法,也是Group()的spawn
            new_locust = self.locusts.spawn(start_locust, locust)
            if len(self.locusts) % 10 == 0:
                logger.debug("%i locusts hatched" % len(self.locusts))
                    # 睡眠即等待指定时间。
            gevent.sleep(sleep_time)

    hatch()
    if wait:
        self.locusts.join()
        logger.info("All locusts dead\n")

kill_locusts(self, kill_count)

kill 掉多少个 locust,即用户/用例。
代码比较简单,干了几件事情:
1.根据权重计算出要干掉多少个用户。
2.被干掉的用户在协程池子中停掉,并从权重池子中弹出。
感觉这部分代码写的冗余了,没必要有三个循环。

bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
    for l in bucket:
        if l == g.args[0]:
            dying.append(g)
            bucket.remove(l)
            break
for g in dying:
    self.locusts.killone(g)
# 收尾工作,主要是提示给页面和打日志
events.hatch_complete.fire(user_count=self.num_clients)

start_hatching(self, locust_count=None, hatch_rate=None, wait=False)

这个就不贴代码了,内在基本都是上面的方法。
主要功能是判断已经启动了多少用户,启动的状态是什么等,来决定是启动(spawn_locusts)还是停止(kill_locusts)多少个用户。

class LocalLocustRunner(LocustRunner)

本地的 runner,继承了上面的 LocustRunner 类。
定义了 start_hatching 方法,代码还是比较简单的,主要是定义协程主体。

DistributedLocustRunner(LocustRunner)

分布式的 runner,也是继承了 LocustRunner 类,功能更少,主要是设置 socket 链接用的端口号,IP 等。

class MasterLocustRunner(DistributedLocustRunner)

主节点的 runner,继承了上面的 DistributedLocustRunner 类。
主要的功能就是收集各个分布式节点的状态,让各个分布式节点干事情,收集各个分布式节点的请求内容好汇总。
主要代码节选:

self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

这还是使用协程 greenlet 来触发监听子节点的动作。
可以看到这里是用了 rpc.Server 来进行各个分布式之间的通信的。
rpc 定义来自 zmqrpc.py 文件。
本身是使用的 zmq 来通信的,import zmq.green as zmq。
zmq 被传闻是最快的 MQ,是因为没有中转站,是直接生产者和消费者相连,那肯定很快……具体请自行百度。
这里可以说,作者真是利用了很多已有的三方包啊。

self.num_clients = locust_count
slave_num_clients = locust_count // (num_slaves or 1)
slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)
remaining = locust_count % num_slaves

节选这段代码的意义在于,我想告诉大家,Locust 对每个分布式节点是不存在权重的。
即这里说的很清楚,每个分布式节点 slave 处理的用户数量是一样的,是平均的。
如果你硬件比较好,那么需要多分配一些 slave,这部分的 “权重” 配比是手工操作了。

for client in six.itervalues(self.clients):
    data = {
        "hatch_rate":slave_hatch_rate,
        "num_clients":slave_num_clients,
        "host":self.host,
        "stop_timeout":None
    }

    if remaining > 0:
        data["num_clients"] += 1
        remaining -= 1

    self.server.send(Message("hatch", data, None))

six 是兼容 python2 和 3 的桥梁,也就是 Locust 是支持 python2 和 3 的。
这段代码的意义在于,master 触发 slave 的任务,是自己发送命令,如:slave_hatch_rate,你启动吧!
而不是告诉 slave 具体的要做什么事情。
这也解释了,slave 节点为什么一定要有用例文件。
带来的问题是,如果用例文件有变动,每个 slave 节点的都要变。(当然这有方式解决,比如通过挂载盘)。

其它就没什么了,继续看 slave 的类。

SlaveLocustRunner(DistributedLocustRunner)

slave 节点也是继承了 DistributedLocustRunner,和 master 节点的本源一样。

self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000)).encode('utf-8')).hexdigest()

从这里可以看到,每一个 client_id 是如何唯一化的。
中间一些代码和 master 的类似,略过。

def worker(self):
    while True:
        msg = self.client.recv()
        if msg.type == "hatch":
            self.client.send(Message("hatching", None, self.client_id))
            job = msg.data
            self.hatch_rate = job["hatch_rate"]
            #self.num_clients = job["num_clients"]
            self.host = job["host"]
            self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
        elif msg.type == "stop":
            self.stop()
            self.client.send(Message("client_stopped", None, self.client_id))
            self.client.send(Message("client_ready", None, self.client_id))
        elif msg.type == "quit":
            logger.info("Got quit message from master, shutting down...")
            self.stop()
            self.greenlet.kill(block=True)

slave 里面定义了一个 worker 方法。
其作用是比较明显的,就是用无限循环,一直在接受 master 的消息。
如果得到的消息是 hatch(干活),就干活,还是用的协程(self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"])))。
如果是 stop 或者 quit,那就停止。

slave 一直通过 zmq 向 master 发送各种状态结果。

对 Locust 的思考为避免篇幅太长,后续展开。

Locust 的源码思考链接:

Locust 源码阅读及特点思考 (三)


↙↙↙阅读原文可查看相关链接,并与作者交流