之前吹过的牛逼终于实现了,之前分享过一个简化的版本动态模型之增压暂停【FunTester 测试框架】。今天给大家分享一下动态增减压力的实现的简化版本,总的来说就是在压测过程中随时调整(增加或者减少)压力(线程数)。

思路

首先要抛弃原有的模型结构,将每个多线程任务都当做一个可管理对象,需要有一个中断方法,然后有一个全局的运行状态的管理类,包含一些基础添加,删除,终止单个多线程任务的能力。

通过一个外部因子触发不同的管理类方法:比如增加用例,然后从任务池中随机(后期会选择某一任务)克隆一个任务,重新放到任务池中。

本地版本的 FunTester 测试框架将键盘输入当做外部因子,分布式服务化 FunTester 测试框架将接口请求当做外部因子,本次演示本地版本。

运行过程如下:

改造

多线程任务类

首先对多线程任务基础类进行改造,我重新写了一个com.funtester.base.constaint.ThreadBase的子类com.funtester.base.constaint.FunThread,专门用于创建动态模型任务。这里改造分两种:1.是增加终止属性com.funtester.base.constaint.FunThread#BREAK_KEY和对应方法com.funtester.base.constaint.FunThread#interrupt;2.是简化了com.funtester.base.constaint.FunThread#run方法,改造成为一个会一直运行的方法,避免克隆中出现终止现象。

package com.funtester.base.constaint;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Vector;

public abstract class FunThread<F> extends ThreadBase {

    private static final long serialVersionUID = 7878297575504772944L;

    private static final Logger logger = LogManager.getLogger();

    /**
     * 统一管理所有存活线程
     */
    private static Vector<FunThread> threads = new Vector<>();

    /**
     * 单线程中断开关,用于动态调整并发压力,默认值false
     */
    private boolean BREAK_KEY = false;

    public FunThread(F f, String name) {
        this.isTimesMode = true;
        this.threadName = name;
        this.limit = Integer.MAX_VALUE;
        this.f = f;
    }

    protected FunThread() {
        super();
    }


    @Override
    public void run() {
        before();
        while (!BREAK_KEY) {
            try {
                doing();
            } catch (Exception e) {
                logger.warn("执行任务失败!", e);
            }
        }
    }

    /**
     * 运行待测方法的之前的准备
     */
    public void before() {
    }

    /**
     * 动态模型正常不会结束
     */
    protected void after() {
    }


    private static synchronized boolean checkName(String name) {
        for (FunThread thread : threads) {
            String threadName = thread.threadName;
            if (StringUtils.isAnyBlank(threadName, name) || threadName.equalsIgnoreCase(name)) {
                return false;
            }
        }
        return true;
    }

    /**
     * 拷贝对象方法,用于统计单一对象多线程调用时候的请求数和成功数,对于<T>的复杂情况,需要将T类型也重写clone方法
     *
     * @return
     */
    @Override
    public abstract FunThread clone();

    /**
     * 线程终止,用于动态调节并发压力
     */
    public void interrupt() {
        BREAK_KEY = true;
    }


}

管理功能

管理功能我目前写在了com.funtester.base.constaint.FunThread类中,通过一个java.util.Vector集合存放所有的运行任务,当做一个任务池,增加了添加、删除、查询、终止、克隆的方法。

/**
 * 用于在某些情况下提前终止测试
 */
public static synchronized void stop() {
    threads.forEach(f -> f.interrupt());
    threads.clear();
}

public static synchronized boolean addThread(FunThread base) {
    if (!checkName(base.threadName)) return false;
    return threads.add(base);
}

/**
 * 删除某个任务,或者停止
 *
 * @param base
 */
public static synchronized void remoreThread(FunThread base) {
    base.interrupt();
    threads.remove(base);
}

public static synchronized FunThread find(String name) {
    for (int i = 0; i < threads.size(); i++) {
        FunThread funThread = threads.get(i);
        if (StringUtils.isNoneBlank(funThread.threadName, name) && funThread.threadName.equalsIgnoreCase(name)) {
            return funThread;
        }
    }
    return null;
}

public static synchronized void remoreThread(String name) {
    FunThread funThread = find(name);
    if (funThread == null) remoreThread(funThread);
}

public static synchronized FunThread getRandom() {
    return random(threads);
}

public static synchronized int aliveSize() {
    return threads.size();
}


动态执行类

执行类由于不需要统计数据了,只需要进行任务池管理即可,所以显得非常简单。

package com.funtester.frame.execute;

import com.funtester.base.constaint.FunThread;
import com.funtester.base.interfaces.IFunController;
import com.funtester.config.HttpClientConstant;
import com.funtester.frame.SourceCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
 * 动态压测模型的启动类
 */
public class FunConcurrent extends SourceCode {

    private static Logger logger = LogManager.getLogger(FunConcurrent.class);

    /**
     * 任务集
     */
    public List<FunThread> threads = new ArrayList<>();

    /**
     * 线程池
     */
    public static ExecutorService executorService;

    public static IFunController controller;

    /**
     * @param threads 线程组
     */
    public FunConcurrent(List<FunThread> threads) {
        this.threads = threads;
        executorService = ThreadPoolUtil.createCachePool(HttpClientConstant.THREADPOOL_MAX);
    }

    private FunConcurrent() {

    }

    /**
     * 执行多线程任务
     * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
     */
    public void start() {
        if (controller == null) controller = new FunTester();
        new Thread(controller,"接收器").start();
        threads.forEach(f -> addTask(f));
    }

    public static void addTask(FunThread thread) {
        boolean b = FunThread.addThread(thread);
        logger.info("任务{}添加{}", thread.threadName, b ? "成功" : "失败");
        if (b) executorService.execute(thread);
    }

    public static void addTask() {
        FunThread thread = FunThread.getRandom();
        addTask(thread.clone());
    }

    public static void removeTask(FunThread thread) {
        logger.info("任务{}被终止", thread.threadName);
        FunThread.remoreThread(thread);
    }

    public static void removeTask(String name) {
        logger.info("任务{}被终止", name);
        FunThread.remoreThread(name);
    }

    public static void removeTask() {
        FunThread thread = FunThread.getRandom();
        removeTask(thread);
    }


}

处理外部因子多线程类

这里我实现的比较简单,只实现了加一和减一以及终止,后续会增加批量增减的功能,以及动态从 Groovy 脚本中引入压测任务,当然这依赖于更精细化的任务池管理。

private static class FunTester implements IFunController {

    boolean key = true;

    @Override
    public void run() {
        while (key) {
            String input = getInput();
            switch (input) {
                case "+":
                    add();
                    break;
                case "-":
                    reduce();
                    break;
                case "*":
                    over();
                    key = false;
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void add() {
        addTask();
    }

    @Override
    public void reduce() {
        removeTask();
    }

    @Override
    public void over() {
        logger.info("动态结束任务!");
        FunThread.stop();
    }

}

基本功能已经实现,下面让我们来测试一下吧。

测试

测试脚本

我用了两个任务当做基础任务,然后执行压测,通过键盘输出控制用例增减。视频演示版本在最后面,或者去 B 站以及视频号关注我,都叫 FunTester。

package com.funtest.funthead;

import com.funtester.base.constaint.FunThread;
import com.funtester.frame.SourceCode;
import com.funtester.frame.execute.FunConcurrent;

import java.util.Arrays;

public class Ft extends SourceCode {

    public static void main(String[] args) {
        FunTester e2 = new FunTester("task A");
        FunTester e22 = new FunTester("task B");
        new FunConcurrent(Arrays.asList(e2, e22)).start();
    }

    private static class FunTester extends FunThread {

        public FunTester(String name) {
            super(null, name);
        }

        @Override
        protected void doing() throws Exception {
            sleep(3.0 + getRandomDouble());
            output(threadName + TAB + "任务正在运行!");
        }

        @Override
        public FunThread clone() {
            return new FunTester(this.threadName + "克隆体");
        }

    }

}

控制台输出

以下只展示一些必要的信息,用例必需手动终止或者被外界触发终止。克隆失败的原因是任务名称重复,计划以任务名称作为任务的标志进行管理,所以不能重复。避免重复可以在com.funtest.funthead.Ft.FunTester#clone实现名称赋值随机和唯一性。

INFO-> main 当前用户oker工作目录/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
INFO-> main 任务task A添加成功
INFO-> main 任务task B添加成功
INFO-> FT-2   task B    任务正在运行!
INFO-> FT-1   task A    任务正在运行!
+
INFO-> 接收器 输入内容+
INFO-> 接收器 任务task A克隆体添加成功
INFO-> FT-2   task B    任务正在运行!
INFO-> FT-1   task A    任务正在运行!
INFO-> FT-3   task A克隆体   任务正在运行!
+
INFO-> 接收器 输入内容+
INFO-> 接收器 任务task A克隆体添加失败
INFO-> FT-2   task B    任务正在运行!
INFO-> FT-1   task A    任务正在运行!
INFO-> FT-3   task A克隆体   任务正在运行!
+
INFO-> 接收器 输入内容+
INFO-> 接收器 任务task B克隆体添加成功
INFO-> FT-3   task A克隆体   任务正在运行!
INFO-> FT-2   task B    任务正在运行!
INFO-> FT-4   task B克隆体   任务正在运行!
INFO-> FT-1   task A    任务正在运行!
_
INFO-> 接收器 输入内容_
INFO-> FT-3   task A克隆体   任务正在运行!
-
INFO-> 接收器 输入内容-
INFO-> 接收器 任务task B克隆体被终止
INFO-> FT-4   task B克隆体   任务正在运行!
INFO-> FT-2   task B    任务正在运行!
INFO-> FT-1   task A    任务正在运行!

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

欢迎关注 FunTester,Have Fun ~ Tester !


↙↙↙阅读原文可查看相关链接,并与作者交流