TestOps
Place.Mat 如何拿到的。
class Place(object):
Ndarray = np.ndarray
GpuMat = None
UMat = cv2.UMat
对文中开头所讲的,有些个人的理解,如下:
工具也好、自动化也好,开展之前的价值度量,这是在动手前需要明确的,如果只是讲提升效率,我感觉这不属于价值度量,因为工具、自动化本身自带效率提升的 buff,更多的关注的是效率提升背后的东西,它的持续性,同时效率提升转化成了什么。
价值度量之后,是围绕着某个问题在某些场景下的解决方案设计,最终的工具、自动化只是一种解决问题的对外表现形式,真正的难点是解决方案设计是否真的解决痛点,甚至是类似场景下的痛点,以便推广是更具普适性。如果一上来就着手工具开发,容易忽略工具、自动化功能实现背后的真正痛点。工具、自动化本身最大的挑战不是实现,而是它的持续性,一个东西做出来,持续的时间越长,收益越高。然而往往有些工具的使用成本、维护成本太高,导致它很 “短寿”,自然也很难体现出它的价值。
最后,摸索出适合本地化的工程效率组与业务组的合作模式,度量双方的价值产出,明确双方职责。在此之前基建很重要(基础的自动化、工具链平台建设),否则会发现没有一定基建的基础上很多东西,难以落地或者开展成本很高。
最后技术栈尽量统一,工具、自动化开发设计、代码尽量规范。
看测试脚本,先说明下此集合点只会在每个用户(协程)启动时,集合一次,执行过程是没有集合动作的。如下是 locust 源码,通过locust().run(runner=self)
起对应协程执行各用户任务。
# runners.py
def hatch():
sleep_time = 1.0 / self.hatch_rate
while True:
if not bucket:
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
events.hatch_complete.fire(user_count=self.num_clients)
return
locust = bucket.pop(random.randint(0, len(bucket)-1))
occurence_count[locust.__name__] += 1
def start_locust(_):
try:
locust().run(runner=self)
except GreenletExit:
pass
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
gevent.sleep(sleep_time)
再看run()
,每个用户只会进行一次 on_start() 初始化准备工作,剩下的时间在while (True)
中循环执行。同时也可以看到 locust 内部是没有集合点的,通过gevent.sleep(sleep_time)
控制每秒加载用户,在此之前加载的用户已经在各自协程中执行任务了。
# core.py
def run(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
try:
if hasattr(self, "on_start"):
self.on_start()
except InterruptTaskSet as e:
if e.reschedule:
six.reraise(RescheduleTaskImmediately, RescheduleTaskImmediately(e.reschedule), sys.exc_info()[2])
else:
six.reraise(RescheduleTask, RescheduleTask(e.reschedule), sys.exc_info()[2])
while (True):
try:
if self.locust.stop_timeout is not None and time() - self._time_start > self.locust.stop_timeout:
return
if not self._task_queue:
self.schedule_task(self.get_next_task())
try:
self.execute_next_task()
except RescheduleTaskImmediately:
pass
except RescheduleTask:
self.wait()
else:
self.wait()
楼主的集合点放到了on_start()
中,只会集合一次,就是初始化的时候。另外再看 locust 用户任务执行。
locust 每个协程(用户)运行任务时,具体任务执行有两种方式:@task
修饰的权重随机任务(TaskSet),和@seq_task
优先级顺序任务(TaskSequence),楼主采用的是@task方式
# core.py
class TaskSetMeta(type):
"""
Meta class for the main Locust class. It's used to allow Locust classes to specify task execution
ratio using an {task:int} dict, or a [(task0,int), ..., (taskN,int)] list.
"""
def __new__(mcs, classname, bases, classDict):
new_tasks = []
for base in bases:
if hasattr(base, "tasks") and base.tasks:
new_tasks += base.tasks
if "tasks" in classDict and classDict["tasks"] is not None:
tasks = classDict["tasks"]
if isinstance(tasks, dict):
tasks = six.iteritems(tasks)
for task in tasks:
if isinstance(task, tuple):
task, count = task
for i in xrange(0, count):
new_tasks.append(task)
else:
new_tasks.append(task)
for item in six.itervalues(classDict):
if hasattr(item, "locust_task_weight"):
for i in xrange(0, item.locust_task_weight):
new_tasks.append(item)
classDict["tasks"] = new_tasks
return type.__new__(mcs, classname, bases, classDict)
源码如上可以看到,上下文源码不贴了。
以楼主代码为例 每个用户的任务集为[cms_queryUserList,cms_queryUserList,cms_login]
,这是根据@task
权重生成的,旨在达到权重越大被执行的概率越高的效果,每次执行随机从这个列表中取一个执行,如下源码,所以出现了楼主所说请求数量不一致。
def get_next_task(self):
return random.choice(self.tasks)
可以尝试使用@seq_task修饰器,确定执行顺序,然后顺序执行,如下:
coding=utf-8
import requests
from locust import HttpLocust,TaskSet,task
import os
from locust import events
from gevent._semaphore import Semaphore
all_locusts_spawned = Semaphore()
all_locusts_spawned.acquire()
def on_hatch_complete(**kwargs):
all_locusts_spawned.release() #创建钩子方法
events.hatch_complete += on_hatch_complete #挂载到locust钩子函数(所有的Locust实例产生完成时触发)
class Cms(TaskSequence):
# 访问cms后台首页
@seq_task(1)
def cms_login(self):
# 定义请求头
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36"
}
req = self.client.get("/cms/manage/loginJump.do?userAccount=admin&loginPwd=123456", headers=header, verify=False)
if req.status_code == 200:
print("success")
else:
print("fails")
@seq_task(2)
def cms_queryUserList(self):
header = {
"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36"
}
queryUserList_data = {
"startCreateDate": "",
"endCreateDate": "",
"searchValue": "",
"page": "1"
}
req = self.client.post("/cms/manage/queryUserList.do",data=queryUserList_data)
queryUserList_url = 'http://192.168.23.132:8080/cms/manage/queryUserList.do'
if req.status_code == 200: #u'查询用户成功!'
print("success")
else:
print("fails")
def on_start(self):
""" on_start is called when a Locust start before any task is scheduled """
# self.cms_login()
# self.cms_queryUserList()
all_locusts_spawned.wait() #限制在所有用户准备完成前处于等待状态
class websitUser(HttpLocust):
task_set = Cms
min_wait = 3000 # 单位为毫秒
max_wait = 6000 # 单位为毫秒
if __name__ == "__main__":
import os
os.system("locust -f lesson8.py --host=http://192.168.23.134:8080")
'''
semaphore是一个内置的计数器:
每当调用acquire()时,内置计数器-1
每当调用release()时,内置计数器+1
计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
'''
这里的代码驱动的意思应该是关键字驱动吧,一般关键字驱动和数据驱动是一起使用的。
去年也是基于 Python Requests 开发请求发送、响应校验的通用自定义关键字以及上下游自定义关键字,比如 Oralce、Mysql、Redis、Mongo 数据库操作关键字,基本满足目前的接口自动化需要,同时使用 Python Locust(采用 Request 库作为客户端)实现了单接口性能的自动化关键字开发,快速构建微服务接口的功能、性能自动化测试用例。
不过基于 RF 关键字驱动的自动化测试,用例本身的维护性需要着重考虑,所以关键字设计时需要着重对可维护性需要把关,比如,代码设计复杂度、异常易于定位,输出指向明确、用例组织简洁。目前基本控制在了 4、5 行覆盖一条接口用例(包含了数据库验证)。
高压力测试 分布式 都是需要堆硬件的。
locust 默认了 1s 等待时间,min_wait=0,max_wait=0 试试看。
locust 默认了 1s 等待时间,min_wait=0,max_wait=0 试试看。
技术能力做的是将业务测试能力水平复制。
TestOps