之前吹过的牛逼终于实现了,之前分享过一个简化的版本动态模型之增压暂停【FunTester 测试框架】。今天给大家分享一下动态增减压力的实现的简化版本,总的来说就是在压测过程中随时调整(增加或者减少)压力(线程数)。
首先要抛弃原有的模型结构,将每个多线程任务都当做一个可管理对象,需要有一个中断方法,然后有一个全局的运行状态的管理类,包含一些基础添加,删除,终止单个多线程任务的能力。
通过一个外部因子触发不同的管理类方法:比如增加用例,然后从任务池中随机(后期会选择某一任务)克隆一个任务,重新放到任务池中。
本地版本的 FunTester 测试框架将键盘输入当做外部因子,分布式服务化 FunTester 测试框架将接口请求当做外部因子,本次演示本地版本。
运行过程如下:
首先对多线程任务基础类进行改造,我重新写了一个com.funtester.base.constaint.ThreadBase
的子类com.funtester.base.constaint.FunThread
,专门用于创建动态模型任务。这里改造分两种:1.是增加终止属性com.funtester.base.constaint.FunThread#BREAK_KEY
和对应方法com.funtester.base.constaint.FunThread#interrupt
;2.是简化了com.funtester.base.constaint.FunThread#run
方法,改造成为一个会一直运行的方法,避免克隆中出现终止现象。
package com.funtester.base.constaint;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Vector;
public abstract class FunThread<F> extends ThreadBase {
private static final long serialVersionUID = 7878297575504772944L;
private static final Logger logger = LogManager.getLogger();
/**
* 统一管理所有存活线程
*/
private static Vector<FunThread> threads = new Vector<>();
/**
* 单线程中断开关,用于动态调整并发压力,默认值false
*/
private boolean BREAK_KEY = false;
public FunThread(F f, String name) {
this.isTimesMode = true;
this.threadName = name;
this.limit = Integer.MAX_VALUE;
this.f = f;
}
protected FunThread() {
super();
}
@Override
public void run() {
before();
while (!BREAK_KEY) {
try {
doing();
} catch (Exception e) {
logger.warn("执行任务失败!", e);
}
}
}
/**
* 运行待测方法的之前的准备
*/
public void before() {
}
/**
* 动态模型正常不会结束
*/
protected void after() {
}
private static synchronized boolean checkName(String name) {
for (FunThread thread : threads) {
String threadName = thread.threadName;
if (StringUtils.isAnyBlank(threadName, name) || threadName.equalsIgnoreCase(name)) {
return false;
}
}
return true;
}
/**
* 拷贝对象方法,用于统计单一对象多线程调用时候的请求数和成功数,对于<T>的复杂情况,需要将T类型也重写clone方法
*
* @return
*/
@Override
public abstract FunThread clone();
/**
* 线程终止,用于动态调节并发压力
*/
public void interrupt() {
BREAK_KEY = true;
}
}
管理功能我目前写在了com.funtester.base.constaint.FunThread
类中,通过一个java.util.Vector
集合存放所有的运行任务,当做一个任务池,增加了添加、删除、查询、终止、克隆的方法。
/**
* 用于在某些情况下提前终止测试
*/
public static synchronized void stop() {
threads.forEach(f -> f.interrupt());
threads.clear();
}
public static synchronized boolean addThread(FunThread base) {
if (!checkName(base.threadName)) return false;
return threads.add(base);
}
/**
* 删除某个任务,或者停止
*
* @param base
*/
public static synchronized void remoreThread(FunThread base) {
base.interrupt();
threads.remove(base);
}
public static synchronized FunThread find(String name) {
for (int i = 0; i < threads.size(); i++) {
FunThread funThread = threads.get(i);
if (StringUtils.isNoneBlank(funThread.threadName, name) && funThread.threadName.equalsIgnoreCase(name)) {
return funThread;
}
}
return null;
}
public static synchronized void remoreThread(String name) {
FunThread funThread = find(name);
if (funThread == null) remoreThread(funThread);
}
public static synchronized FunThread getRandom() {
return random(threads);
}
public static synchronized int aliveSize() {
return threads.size();
}
执行类由于不需要统计数据了,只需要进行任务池管理即可,所以显得非常简单。
package com.funtester.frame.execute;
import com.funtester.base.constaint.FunThread;
import com.funtester.base.interfaces.IFunController;
import com.funtester.config.HttpClientConstant;
import com.funtester.frame.SourceCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* 动态压测模型的启动类
*/
public class FunConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(FunConcurrent.class);
/**
* 任务集
*/
public List<FunThread> threads = new ArrayList<>();
/**
* 线程池
*/
public static ExecutorService executorService;
public static IFunController controller;
/**
* @param threads 线程组
*/
public FunConcurrent(List<FunThread> threads) {
this.threads = threads;
executorService = ThreadPoolUtil.createCachePool(HttpClientConstant.THREADPOOL_MAX);
}
private FunConcurrent() {
}
/**
* 执行多线程任务
* 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
*/
public void start() {
if (controller == null) controller = new FunTester();
new Thread(controller,"接收器").start();
threads.forEach(f -> addTask(f));
}
public static void addTask(FunThread thread) {
boolean b = FunThread.addThread(thread);
logger.info("任务{}添加{}", thread.threadName, b ? "成功" : "失败");
if (b) executorService.execute(thread);
}
public static void addTask() {
FunThread thread = FunThread.getRandom();
addTask(thread.clone());
}
public static void removeTask(FunThread thread) {
logger.info("任务{}被终止", thread.threadName);
FunThread.remoreThread(thread);
}
public static void removeTask(String name) {
logger.info("任务{}被终止", name);
FunThread.remoreThread(name);
}
public static void removeTask() {
FunThread thread = FunThread.getRandom();
removeTask(thread);
}
}
这里我实现的比较简单,只实现了加一和减一以及终止,后续会增加批量增减的功能,以及动态从 Groovy 脚本中引入压测任务,当然这依赖于更精细化的任务池管理。
private static class FunTester implements IFunController {
boolean key = true;
@Override
public void run() {
while (key) {
String input = getInput();
switch (input) {
case "+":
add();
break;
case "-":
reduce();
break;
case "*":
over();
key = false;
break;
default:
break;
}
}
}
@Override
public void add() {
addTask();
}
@Override
public void reduce() {
removeTask();
}
@Override
public void over() {
logger.info("动态结束任务!");
FunThread.stop();
}
}
基本功能已经实现,下面让我们来测试一下吧。
我用了两个任务当做基础任务,然后执行压测,通过键盘输出控制用例增减。视频演示版本在最后面,或者去 B 站以及视频号关注我,都叫 FunTester。
package com.funtest.funthead;
import com.funtester.base.constaint.FunThread;
import com.funtester.frame.SourceCode;
import com.funtester.frame.execute.FunConcurrent;
import java.util.Arrays;
public class Ft extends SourceCode {
public static void main(String[] args) {
FunTester e2 = new FunTester("task A");
FunTester e22 = new FunTester("task B");
new FunConcurrent(Arrays.asList(e2, e22)).start();
}
private static class FunTester extends FunThread {
public FunTester(String name) {
super(null, name);
}
@Override
protected void doing() throws Exception {
sleep(3.0 + getRandomDouble());
output(threadName + TAB + "任务正在运行!");
}
@Override
public FunThread clone() {
return new FunTester(this.threadName + "克隆体");
}
}
}
以下只展示一些必要的信息,用例必需手动终止或者被外界触发终止。克隆失败的原因是任务名称重复,计划以任务名称作为任务的标志进行管理,所以不能重复。避免重复可以在com.funtest.funthead.Ft.FunTester#clone
实现名称赋值随机和唯一性。
INFO-> main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
INFO-> main 任务task A添加成功
INFO-> main 任务task B添加成功
INFO-> FT-2 task B 任务正在运行!
INFO-> FT-1 task A 任务正在运行!
+
INFO-> 接收器 输入内容:+
INFO-> 接收器 任务task A克隆体添加成功
INFO-> FT-2 task B 任务正在运行!
INFO-> FT-1 task A 任务正在运行!
INFO-> FT-3 task A克隆体 任务正在运行!
+
INFO-> 接收器 输入内容:+
INFO-> 接收器 任务task A克隆体添加失败
INFO-> FT-2 task B 任务正在运行!
INFO-> FT-1 task A 任务正在运行!
INFO-> FT-3 task A克隆体 任务正在运行!
+
INFO-> 接收器 输入内容:+
INFO-> 接收器 任务task B克隆体添加成功
INFO-> FT-3 task A克隆体 任务正在运行!
INFO-> FT-2 task B 任务正在运行!
INFO-> FT-4 task B克隆体 任务正在运行!
INFO-> FT-1 task A 任务正在运行!
_
INFO-> 接收器 输入内容:_
INFO-> FT-3 task A克隆体 任务正在运行!
-
INFO-> 接收器 输入内容:-
INFO-> 接收器 任务task B克隆体被终止
INFO-> FT-4 task B克隆体 任务正在运行!
INFO-> FT-2 task B 任务正在运行!
INFO-> FT-1 task A 任务正在运行!
Process finished with exit code 130 (interrupted by signal 2: SIGINT)