• Place.Mat 如何拿到的。

    class Place(object):
        Ndarray = np.ndarray
        GpuMat = None
        UMat = cv2.UMat
    
    
  • 如何做音频算法测试 at 2020年12月22日
  • 对文中开头所讲的,有些个人的理解,如下:

    • 工具也好、自动化也好,开展之前的价值度量,这是在动手前需要明确的,如果只是讲提升效率,我感觉这不属于价值度量,因为工具、自动化本身自带效率提升的 buff,更多的关注的是效率提升背后的东西,它的持续性,同时效率提升转化成了什么。

    • 价值度量之后,是围绕着某个问题在某些场景下的解决方案设计,最终的工具、自动化只是一种解决问题的对外表现形式,真正的难点是解决方案设计是否真的解决痛点,甚至是类似场景下的痛点,以便推广是更具普适性。如果一上来就着手工具开发,容易忽略工具、自动化功能实现背后的真正痛点。工具、自动化本身最大的挑战不是实现,而是它的持续性,一个东西做出来,持续的时间越长,收益越高。然而往往有些工具的使用成本、维护成本太高,导致它很 “短寿”,自然也很难体现出它的价值。

    • 最后,摸索出适合本地化的工程效率组与业务组的合作模式,度量双方的价值产出,明确双方职责。在此之前基建很重要(基础的自动化、工具链平台建设),否则会发现没有一定基建的基础上很多东西,难以落地或者开展成本很高。

    • 最后技术栈尽量统一,工具、自动化开发设计、代码尽量规范。

  • 求助一个算法问题 at 2019年07月20日

  • 集合点

    看测试脚本,先说明下此集合点只会在每个用户(协程)启动时,集合一次,执行过程是没有集合动作的。如下是 locust 源码,通过locust().run(runner=self)起对应协程执行各用户任务。

    # runners.py 
            def hatch():
                sleep_time = 1.0 / self.hatch_rate
                while True:
                    if not bucket:
                        logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
                        events.hatch_complete.fire(user_count=self.num_clients)
                        return
    
                    locust = bucket.pop(random.randint(0, len(bucket)-1))
                    occurence_count[locust.__name__] += 1
                    def start_locust(_):
                        try:
                            locust().run(runner=self)
                        except GreenletExit:
                            pass
                    new_locust = self.locusts.spawn(start_locust, locust)
                    if len(self.locusts) % 10 == 0:
                        logger.debug("%i locusts hatched" % len(self.locusts))
                    gevent.sleep(sleep_time)
    

    再看run(),每个用户只会进行一次 on_start() 初始化准备工作,剩下的时间在while (True)中循环执行。同时也可以看到 locust 内部是没有集合点的,通过gevent.sleep(sleep_time)控制每秒加载用户,在此之前加载的用户已经在各自协程中执行任务了。

    # core.py
    def run(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
    
        try:
            if hasattr(self, "on_start"):
                self.on_start()
        except InterruptTaskSet as e:
            if e.reschedule:
                six.reraise(RescheduleTaskImmediately, RescheduleTaskImmediately(e.reschedule), sys.exc_info()[2])
            else:
                six.reraise(RescheduleTask, RescheduleTask(e.reschedule), sys.exc_info()[2])
    
        while (True):
            try:
                if self.locust.stop_timeout is not None and time() - self._time_start > self.locust.stop_timeout:
                    return
    
                if not self._task_queue:
                    self.schedule_task(self.get_next_task())
    
                try:
                    self.execute_next_task()
                except RescheduleTaskImmediately:
                    pass
                except RescheduleTask:
                    self.wait()
                else:
                    self.wait()
    

    楼主的集合点放到了on_start()中,只会集合一次,就是初始化的时候。另外再看 locust 用户任务执行。

    请求数量

    locust 每个协程(用户)运行任务时,具体任务执行有两种方式:@task修饰的权重随机任务(TaskSet),和@seq_task优先级顺序任务(TaskSequence),楼主采用的是@task方式

    # core.py
    
    class TaskSetMeta(type):
        """
        Meta class for the main Locust class. It's used to allow Locust classes to specify task execution 
        ratio using an {task:int} dict, or a [(task0,int), ..., (taskN,int)] list.
        """
    
        def __new__(mcs, classname, bases, classDict):
            new_tasks = []
            for base in bases:
                if hasattr(base, "tasks") and base.tasks:
                    new_tasks += base.tasks
    
            if "tasks" in classDict and classDict["tasks"] is not None:
                tasks = classDict["tasks"]
                if isinstance(tasks, dict):
                    tasks = six.iteritems(tasks)
    
                for task in tasks:
                    if isinstance(task, tuple):
                        task, count = task
                        for i in xrange(0, count):
                            new_tasks.append(task)
                    else:
                        new_tasks.append(task)
    
            for item in six.itervalues(classDict):
                if hasattr(item, "locust_task_weight"):
                    for i in xrange(0, item.locust_task_weight):
                        new_tasks.append(item)
    
            classDict["tasks"] = new_tasks
    
            return type.__new__(mcs, classname, bases, classDict)
    

    源码如上可以看到,上下文源码不贴了。

    以楼主代码为例 每个用户的任务集为[cms_queryUserList,cms_queryUserList,cms_login],这是根据@task权重生成的,旨在达到权重越大被执行的概率越高的效果,每次执行随机从这个列表中取一个执行,如下源码,所以出现了楼主所说请求数量不一致。

    def get_next_task(self):
        return random.choice(self.tasks)
    

    如何解决

    可以尝试使用@seq_task修饰器,确定执行顺序,然后顺序执行,如下:

    coding=utf-8
    import requests
    from locust import HttpLocust,TaskSet,task
    import os
    from locust import events
    from gevent._semaphore import Semaphore
    all_locusts_spawned = Semaphore()
    all_locusts_spawned.acquire()
    def on_hatch_complete(**kwargs):
        all_locusts_spawned.release()  #创建钩子方法
    
    events.hatch_complete += on_hatch_complete    #挂载到locust钩子函数(所有的Locust实例产生完成时触发)
    
    class Cms(TaskSequence):
        # 访问cms后台首页
        @seq_task(1)
        def cms_login(self):
            # 定义请求头
            header = {
                "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36"
            }
    
            req = self.client.get("/cms/manage/loginJump.do?userAccount=admin&loginPwd=123456",  headers=header, verify=False)
            if req.status_code == 200:
                print("success")
            else:
                print("fails")
    
        @seq_task(2)
        def cms_queryUserList(self):
            header = {
                "User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36"
            }
            queryUserList_data = {
                "startCreateDate": "",
                "endCreateDate": "",
                "searchValue": "",
                "page": "1"
            }
            req = self.client.post("/cms/manage/queryUserList.do",data=queryUserList_data)
            queryUserList_url = 'http://192.168.23.132:8080/cms/manage/queryUserList.do'
            if req.status_code == 200:    #u'查询用户成功!'
                print("success")
            else:
                print("fails")
    
        def on_start(self):
            """ on_start is called when a Locust start before any task is scheduled """
            # self.cms_login()
            # self.cms_queryUserList()
            all_locusts_spawned.wait()    #限制在所有用户准备完成前处于等待状态
    
    class websitUser(HttpLocust):
        task_set = Cms
        min_wait = 3000  # 单位为毫秒
        max_wait = 6000  # 单位为毫秒
    
    if __name__ == "__main__":
        import os
        os.system("locust -f lesson8.py --host=http://192.168.23.134:8080")
    
    
    '''
    semaphore是一个内置的计数器:
    每当调用acquire()时,内置计数器-1
    每当调用release()时,内置计数器+1
    计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
    '''
    
  • 4 年 半的个人测试经历 at 2019年07月18日

    这里的代码驱动的意思应该是关键字驱动吧,一般关键字驱动和数据驱动是一起使用的。

    去年也是基于 Python Requests 开发请求发送、响应校验的通用自定义关键字以及上下游自定义关键字,比如 Oralce、Mysql、Redis、Mongo 数据库操作关键字,基本满足目前的接口自动化需要,同时使用 Python Locust(采用 Request 库作为客户端)实现了单接口性能的自动化关键字开发,快速构建微服务接口的功能、性能自动化测试用例。

    不过基于 RF 关键字驱动的自动化测试,用例本身的维护性需要着重考虑,所以关键字设计时需要着重对可维护性需要把关,比如,代码设计复杂度、异常易于定位,输出指向明确、用例组织简洁。目前基本控制在了 4、5 行覆盖一条接口用例(包含了数据库验证)。

  • 高压力测试 分布式 都是需要堆硬件的。

  • locust 默认了 1s 等待时间,min_wait=0,max_wait=0 试试看。

  • locust 默认了 1s 等待时间,min_wait=0,max_wait=0 试试看。

  • 技术能力做的是将业务测试能力水平复制。

  • 目前做的大概就是这里说的基于算法模型的测试、大致分图像检测、图像检索(识别)两大类吧,结合实践说说自己的理解:

    算法测试的核心

    基于算法模型的两个核心:一是测试数据、二是评估指标 ,大致划分为三个测试类型功能测试、性能测试、效果测试(模型评估指标)。

    测试数据

    • 特征的覆盖:分析显性特征、隐形特征,进行特征标注。
    • 数据量的覆盖:在测试图像检测算法时,根据特征收敛曲线来确定数据量,结合经验来看 3000 张左右基本收敛,但同时考虑真实生产环境特征分布,我们一般采用 1~2 万张;图像检索算法,没有什么比较好的选取依据,一般采用 1000 组,每组 20 张(Top20)。

    为此,开发了一个算法数仓平台,主要用于算法测试数据的框图、特征标注、存储、查询、对接算法微服务自动化平台。

    算法测试数据尽量采用真实生产环境下的数据,尽可能的了解真实环境的特征分布,同时需要的注意的是,算法测试数据肯定是持续更新的,更新频率、方式需要结合真实数据源情况,比如之前系统接入都是办公室摄像头数据,现在接入的是路边摄像头数据,那么算法模型的效果肯定是不同的,因此我们需要感知真实环境下的数据变化。

    评估指标

    • 明确不同算法评估指标的差异性,不能一概而论 :比如图像检测,基于混淆矩阵 (精确率、召回率、虚警率、F1 等)、ROC(AUG)、PR(AP/MAP);目标检索评估指标不同于此,还需要采用其他指标评估,比如 TAR,FAR,FRR,ERR、前 N 连续正确率等。

    算法测试覆盖的测试类型

    功能测试

    我们的算法模型通过微服务接口对外提供服务,因此功能性验证是基于接口功能的验证,与普通微服务接口测试策略一致,我们采用 Python Request 基于关键字驱动、数据驱动实现了微服务接口的功能性测试,测试数据从上面介绍的算法数仓平台获取。

    性能测试

    大多数算法模型本身难以支持并发,比如人脸检索,数据库数据量 10 亿以上,虽然通过某些算法减少了计算量,但加载到内存的数据仍然几十、上百 GB。对于算法模型大多数做的是稳定性测试和版本性能波动感知(版本间的性能差异感知)。

    使用 Python Locust 基于关键字驱动、数据驱动,实现了算法微服务的性能测试自动化,测试数据从上面介绍的算法数仓平台获取。并且每次测试的执行策略(多少用户并发、每秒加载几个,运行多久),以及执行结果,如平均响应时间,*% 响应时间,TPS 等指标都会记录入数据库,每次运行均可以看到历史结果,并进行横向比较。

    效果测试

    这就是指的算法模型评估指标的计算了,使用 Python 封装各个指标计算方法作为自定义关键字,上传到自动化平台,根据不同算法模型指标评估需要组装对应指标计算方法即可,测试数据从上面介绍的算法数仓平台获取。每次执行记录各个指标,版本,入库存储,输出历史该接口的指标计算结果,进行横向比较。

    同时效果测试还包含竞品效果的比对测试。

    除了给出基本的指标计算结果,我们还尝试通过算法模型运行结果结合数据标注信息,分析出哪些特征对算法效果有直接影响,结合实际生产环境特征分布,来评估影响程度等。

    因此通过自研的算法数仓平台 + 自动化测试平台,基本实现了算法模型微服务的功能、性能、指标评估自动化测试,不仅关注每次执行结果,更关注版本间的差异性,感知持续优化程度。

    转载请说明出处,谢谢。

  • Setup teardown。

  • 关于测试开发的思考 at 2019年06月27日

    测试开发做的是水平赋能,如果仅仅把目标或者职责定位为工具、自动化开发,视角就略显局限了,也很难把工具、自动化做好,注意这里说的是做好,而不是做完,同时,团队多了重复造轮子的也就多了。
    对于测试开发而言,其本质不是需要掌握开发能力,更需要的是测试解决方案能力,这是要建立在测试理解、测试策略设计能力之上的,并不是仅仅会某一种开发语言。测试开发提供的是测试解决方案的技术实现的设计、开发服务,寻求质量和效率的平衡。同时解决方案绝不是解决局部问题。
    对于上述,虽然看起来 “要求太高”,但并不是不具备可行性。
    发展总是需要时间,由测试到测试开发的过程中各阶段的策略也是不同,比如从纯手工测试到测试能力分层到测试开发。

  • python 自动化测试框架选择 at 2019年06月11日

    Robot 已支持 Python3