写帖子对我学习 locust 源码也是一种监督,也是对自己学习过程的记录便于之后查看。
希望大家指出我的不足之处,多谢。
github:[https://github.com/locustio/locust]
本身这个文件有 400 多行代码,其中包含了很多 locust 的细节信息。
它是 Locust 执行请求的实现类。
手写汉字注释,加入里面。
def weight_locusts(self, amount, stop_timeout = None):
"""
Distributes the amount of locusts for each WebLocust-class according to it's weight
returns a list "bucket" with the weighted locusts
"""
# 返回值是个数组,装载复制的用例的压力请求
bucket = []
# weight_sum 是用例中的所有weight值的综合,weight代表权重值。
weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
# 可以有多个用例。
for locust in self.locust_classes:
# 一些判断略过
if not locust.task_set:
warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)
continue
if self.host is not None:
locust.host = self.host
if stop_timeout is not None:
locust.stop_timeout = stop_timeout
# create locusts depending on weight
# 在循环中这是一个用例,percent 意味着这个用例在总体权重中的比例。
percent = locust.weight / float(weight_sum)
# 比如是设置了1000个用户,根据权重比例,计算出1000个用户中的多少个用户来执行这个用例。
num_locusts = int(round(amount * percent))
# 复制并添加到结果集中
bucket.extend([locust for x in xrange(0, num_locusts)])
return bucket
weight_locusts 是根据权重计算出要使用的用户数。
虽然 spawn 是卵的意思,但是这个方法中包含了用例的执行。
hatch_rate 的解释:The rate per second in which clients are spawned. Only used together with --no-web
可以看到,无限循环运行之前,计算了停止的时间(sleep_time = 1.0 / self.hatch_rate),也就是说利用了 sleep 来达到每秒运行多少用户的效果。
用例的 run() 方法是在 core.py 中定义的,self.task_set(self).run(),可见就是用例执行。
def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):
if spawn_count is None:
spawn_count = self.num_clients
# 计算后的用户数,实际执行的用户数。
bucket = self.weight_locusts(spawn_count, stop_timeout)
spawn_count = len(bucket)
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.state = STATE_HATCHING
self.num_clients = spawn_count
else:
self.num_clients += spawn_count
logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))
occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])
# 定义执行的方法
def hatch():
sleep_time = 1.0 / self.hatch_rate
while True:
if not bucket:
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
events.hatch_complete.fire(user_count=self.num_clients)
return
# 将用例弹出来
locust = bucket.pop(random.randint(0, len(bucket)-1))
occurence_count[locust.__name__] += 1
# 定义启动的方法,可以看到是执行run()方法
def start_locust(_):
try:
locust().run()
except GreenletExit:
pass
# 协程的执行方法,也是Group()的spawn
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
# 睡眠即等待指定时间。
gevent.sleep(sleep_time)
hatch()
if wait:
self.locusts.join()
logger.info("All locusts dead\n")
kill 掉多少个 locust,即用户/用例。
代码比较简单,干了几件事情:
1.根据权重计算出要干掉多少个用户。
2.被干掉的用户在协程池子中停掉,并从权重池子中弹出。
感觉这部分代码写的冗余了,没必要有三个循环。
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
for l in bucket:
if l == g.args[0]:
dying.append(g)
bucket.remove(l)
break
for g in dying:
self.locusts.killone(g)
# 收尾工作,主要是提示给页面和打日志
events.hatch_complete.fire(user_count=self.num_clients)
这个就不贴代码了,内在基本都是上面的方法。
主要功能是判断已经启动了多少用户,启动的状态是什么等,来决定是启动(spawn_locusts)还是停止(kill_locusts)多少个用户。
本地的 runner,继承了上面的 LocustRunner 类。
定义了 start_hatching 方法,代码还是比较简单的,主要是定义协程主体。
分布式的 runner,也是继承了 LocustRunner 类,功能更少,主要是设置 socket 链接用的端口号,IP 等。
主节点的 runner,继承了上面的 DistributedLocustRunner 类。
主要的功能就是收集各个分布式节点的状态,让各个分布式节点干事情,收集各个分布式节点的请求内容好汇总。
主要代码节选:
self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)
这还是使用协程 greenlet 来触发监听子节点的动作。
可以看到这里是用了 rpc.Server 来进行各个分布式之间的通信的。
rpc 定义来自 zmqrpc.py 文件。
本身是使用的 zmq 来通信的,import zmq.green as zmq。
zmq 被传闻是最快的 MQ,是因为没有中转站,是直接生产者和消费者相连,那肯定很快……具体请自行百度。
这里可以说,作者真是利用了很多已有的三方包啊。
self.num_clients = locust_count
slave_num_clients = locust_count // (num_slaves or 1)
slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)
remaining = locust_count % num_slaves
节选这段代码的意义在于,我想告诉大家,Locust 对每个分布式节点是不存在权重的。
即这里说的很清楚,每个分布式节点 slave 处理的用户数量是一样的,是平均的。
如果你硬件比较好,那么需要多分配一些 slave,这部分的 “权重” 配比是手工操作了。
for client in six.itervalues(self.clients):
data = {
"hatch_rate":slave_hatch_rate,
"num_clients":slave_num_clients,
"host":self.host,
"stop_timeout":None
}
if remaining > 0:
data["num_clients"] += 1
remaining -= 1
self.server.send(Message("hatch", data, None))
six 是兼容 python2 和 3 的桥梁,也就是 Locust 是支持 python2 和 3 的。
这段代码的意义在于,master 触发 slave 的任务,是自己发送命令,如:slave_hatch_rate,你启动吧!
而不是告诉 slave 具体的要做什么事情。
这也解释了,slave 节点为什么一定要有用例文件。
带来的问题是,如果用例文件有变动,每个 slave 节点的都要变。(当然这有方式解决,比如通过挂载盘)。
其它就没什么了,继续看 slave 的类。
slave 节点也是继承了 DistributedLocustRunner,和 master 节点的本源一样。
self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000)).encode('utf-8')).hexdigest()
从这里可以看到,每一个 client_id 是如何唯一化的。
中间一些代码和 master 的类似,略过。
def worker(self):
while True:
msg = self.client.recv()
if msg.type == "hatch":
self.client.send(Message("hatching", None, self.client_id))
job = msg.data
self.hatch_rate = job["hatch_rate"]
#self.num_clients = job["num_clients"]
self.host = job["host"]
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg.type == "stop":
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
self.client.send(Message("client_ready", None, self.client_id))
elif msg.type == "quit":
logger.info("Got quit message from master, shutting down...")
self.stop()
self.greenlet.kill(block=True)
slave 里面定义了一个 worker 方法。
其作用是比较明显的,就是用无限循环,一直在接受 master 的消息。
如果得到的消息是 hatch(干活),就干活,还是用的协程(self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"])))。
如果是 stop 或者 quit,那就停止。
slave 一直通过 zmq 向 master 发送各种状态结果。