距离上次对 FunTester 测试框架功能规划之后,已经很久没有更新过功能规划了,主要因素是 FunTester 测试框架目前支持的功能已经完全满足工作需求。无论是分布式性能测试框架,还是全链路性能测试支持,以及量化模拟线上流量,基本技术验证都完成了,余下的都是在技术方案的上进行调整以更适应现在工作需求,不存在技术障碍。
虽然 FunTester 测试框架还在不断更新,但很久没进行过功能更新了。最近在设想为了可能用到的测试场景中,动态压力是目前最有可能在工作中应用的。
终极目标
最终想要实现的就是在不断压测过程中,灵活增加、减少或者保持测试压力,这样既可以一次执行用例的过程中,实现多个测试场景的覆盖,也可以避免从 0 开始加压导致测试时间的增加。
以往的性能测试用例,点击开始之后,除了主动结束,中间很少进行压力调节,动态压力就是为了在压测过程中干预压力,实现测试过程中对发压端进行调节。这个需求可以通过容器化和分布式等技术加持下实现。我的想法就是直接对单个 JVM 线程池运行的任务进行调节,适用于非集群压测和对压力需求比较灵活的场景,当然如果应用到日常性能巡检,也是可以提升效能的。
主要实现功能:
- 性能测试执行中,动态增减性能测试压力值。
- 性能测试执行中,动态注入新的流量模型任务。
这样的好处:
- 动态增减执行任务,达到一次试执行测试多种压力目的。
- 动态增减不同模型的任务,达到动态修改压测流量的目的。
万里长征第一步:增加暂停功能实现。
需求来源
在工作中,经常会遇到一类场景:预计系统提供的 QPS 在 1 万,线程数 100,我们设计了 0 线程为起始,间隔 30s 增加 10 的压力,最大值 120。(当然也可以使用 QPS 递增模式,下面的演示 Demo 会使用线程递增)。
可实际情况是 80 线程已经到达性能瓶颈,然后需求是在 80 线程的压力下保持一阵子,方便收集数据和获取现场。
思路
基本实现
因为使用场景是在压力递增的过程中,终止递增并保持压力,所以使用了性能测试软启动初探中的软启动方案。使用for
循环来递增压力。然后在某个时刻触发终止递增。
触发条件
预期是在性能测试服务化中实践和本地脚本测试中使用,目前只实现了本地脚本测试中功能,思路是通过一个线程安全变量来标记执行状态,是否是持续递增压力还是需要终止压力,这里选择了另起一个线程来完成这件任务,通过从控制台输入内容来控制这个开关。我目前只是做的 Demo 是一个没有回头路的方案,只有终止压力没有暂停后继续增加或者结束。
多线程类任务如下:
private static class FunTester implements Runnable {
@Override
public void run() {
waitForKey(INTPUT_KEY);
HOLD.set(1);
output("压力暂停");
}
}
这里从控制台等待输入,如果等于com.funtester.config.Constant#INTPUT_KEY
那么就设置com.funtester.frame.execute.HoldConcurrent#HOLD
设置为 1,从而标记测试任务已经进入了终止增压阶段。
多线程同步
这里参考Java 线程同步三剑客内容,我使用了java.util.concurrent.Phaser
,原因是java.util.concurrent.CountDownLatch
不灵活,无法满足需求,而java.util.concurrent.CyclicBarrier
虽然灵活也能满足多线程同步需求,但无法灵活增减需要同步的线程数,故而也放弃了。
导致问题
这种动态压力模型是在加压阶段进行的,也就是线程并不是固定,进而带来了一些问题,目前尚未解决。
- 本地数据统计失真。
- 暂不支持多任务,暂无法服务化。
实现 Demo
多线程任务类
首先是com.funtester.base.constaint.HoldThread
类,实现了com.funtester.base.constaint.ThreadBase
的部分方法,具体与其他实现区别主要是在com.funtester.base.constaint.HoldThread#before
和com.funtester.base.constaint.HoldThread#after
方法中对于标记多线程同步对象com.funtester.frame.execute.HoldConcurrent#phaser
的操作。
代码如下:
package com.funtester.base.constaint;
import com.funtester.frame.execute.HoldConcurrent;
import com.funtester.httpclient.GCThread;
import com.funtester.utils.Time;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* 动态压力模型,增压暂停
*/
public abstract class HoldThread<F> extends ThreadBase<F> {
private static final long serialVersionUID = -4617192188292407063L;
private static final Logger logger = LogManager.getLogger(HoldThread.class);
public HoldThread(F f, int limit, boolean isTimesMode) {
this.isTimesMode = isTimesMode;
this.limit = limit;
this.f = f;
}
protected HoldThread() {
super();
}
/**
*
*/
@Override
public void run() {
try {
before();
long ss = Time.getTimeStamp();
while (true) {
try {
executeNum++;
long s = Time.getTimeStamp();
doing();
long et = Time.getTimeStamp();
short diff = (short) (et - s);
costs.add(diff);
} catch (Exception e) {
logger.warn("执行任务失败!", e);
errorNum++;
} finally {
if ((isTimesMode ? executeNum >= limit : (Time.getTimeStamp() - ss) >= limit) || ThreadBase.needAbort() || status())
break;
}
}
long ee = Time.getTimeStamp();
if ((ee - ss) / 1000 > RUNUP_TIME + 3)
logger.info("线程:{},执行次数:{},错误次数: {},总耗时:{} s", threadName, executeNum, errorNum, (ee - ss) / 1000.0);
HoldConcurrent.allTimes.addAll(costs);
HoldConcurrent.requestMark.addAll(marks);
} catch (Exception e) {
logger.warn("执行任务失败!", e);
} finally {
after();
}
}
@Override
public void before() {
super.before();
HoldConcurrent.phaser.register();
}
@Override
protected void after() {
super.after();
GCThread.stop();
HoldConcurrent.phaser.arriveAndDeregister();
}
}
执行类
执行类com.funtester.frame.execute.HoldConcurrent
与其他模型区别在于:1.没有固定稳定压力,增压结束即稳定压力;2.多线程任务接受控制台输出内容控制任务阶段;3.当用户一直不输入com.funtester.config.Constant#INTPUT_KEY
,存在无法自然停止隐患。
代码如下:
package com.funtester.frame.execute;
import com.funtester.base.bean.PerformanceResultBean;
import com.funtester.base.constaint.ThreadBase;
import com.funtester.config.Constant;
import com.funtester.frame.Save;
import com.funtester.frame.SourceCode;
import com.funtester.utils.RWUtil;
import com.funtester.utils.Time;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.stream.Collectors.toList;
/**
* 并发类,用于启动压力脚本
*/
public class HoldConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(HoldConcurrent.class);
/**
* 用来标记状态
*/
public static AtomicInteger HOLD = new AtomicInteger(0);
/**
* 开始时间
*/
private long startTime;
/**
* 结束时间
*/
private long endTime;
/**
* 任务描述
*/
public String desc;
/**
* 任务集
*/
public List<ThreadBase> threads = new ArrayList<>();
/**
* 线程数
*/
public int threadNum;
/**
* 执行失败总数
*/
private int errorTotal;
/**
* 任务执行失败总数
*/
private int failTotal;
/**
* 执行总数
*/
private int executeTotal;
/**
* 用于记录所有请求时间
*/
public static Vector<Short> allTimes = new Vector<>();
/**
* 记录所有markrequest的信息
*/
public static Vector<String> requestMark = new Vector<>();
/**
* 线程池
*/
ExecutorService executorService;
/**
* 多线程多阶段同步类,用于多线程任务阶段管理
*/
public static Phaser phaser;
/**
* @param thread 线程任务
* @param threadNum 线程数
* @param desc 任务描述
*/
public HoldConcurrent(ThreadBase thread, int threadNum, String desc) {
this(threadNum, desc);
range(threadNum).forEach(x -> threads.add(thread.clone()));
}
/**
* @param threads 线程组
* @param desc 任务描述
*/
public HoldConcurrent(List<ThreadBase> threads, String desc) {
this(threads.size(), desc);
this.threads = threads;
}
private HoldConcurrent(int threadNum, String desc) {
this.threadNum = threadNum;
this.desc = StatisticsUtil.getFileName(desc);
phaser = new Phaser(1);
executorService = ThreadPoolUtil.createFixedPool(threadNum);
}
private HoldConcurrent() {
}
/**
* 执行多线程任务
* 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
*/
public PerformanceResultBean start() {
Thread funtester = new Thread(new FunTester());
funtester.start();
ThreadBase.progress = new Progress(threads, StatisticsUtil.getTrueName(desc));
ThreadBase.progress.threadNum = 0;
new Thread(ThreadBase.progress).start();
startTime = Time.getTimeStamp();
for (int i = 0; i < threadNum; i++) {
if (HOLD.get() == 1) {
threadNum = i;
break;
}
ThreadBase thread = threads.get(i);
if (StringUtils.isBlank(thread.threadName)) thread.threadName = StatisticsUtil.getTrueName(desc) + i;
sleep(RUNUP_TIME / threadNum);
executorService.execute(thread);
ThreadBase.progress.threadNum = i + 1;
logger.info("已经启动了 {} 个线程!", i + 1);
}
phaser.arriveAndAwaitAdvance();
executorService.shutdown();
ThreadBase.progress.stop();
threads.forEach(x -> {
if (x.status()) failTotal++;
errorTotal += x.errorNum;
executeTotal += x.executeNum;
});
endTime = Time.getTimeStamp();
HOLD.set(0);
logger.info("总计{}个线程,共用时:{} s,执行总数:{},错误数:{},失败数:{}", threadNum, Time.getTimeDiffer(startTime, endTime), formatLong(executeTotal), errorTotal, failTotal);
return over();
}
private static class FunTester implements Runnable {
@Override
public void run() {
waitForKey(INTPUT_KEY);
HOLD.set(1);
output("压力暂停");
}
}
private PerformanceResultBean over() {
Save.saveIntegerList(allTimes, DATA_Path.replace(LONG_Path, EMPTY) + StatisticsUtil.getFileName(threadNum, desc));
Save.saveStringListSync(HoldConcurrent.requestMark, MARK_Path.replace(LONG_Path, EMPTY) + desc);
allTimes = new Vector<>();
requestMark = new Vector<>();
return countQPS(threadNum, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
}
/**
* 计算结果
* <p>此结果仅供参考</p>
* 此处因为start和end的不准确问题,所以采用改计算方法,与fixQPS有区别
*
* @param name 线程数
*/
public PerformanceResultBean countQPS(int name, String desc, String start, String end) {
List<String> strings = RWUtil.readTxtFileByLine(Constant.DATA_Path + StatisticsUtil.getFileName(name, desc));
int size = strings.size() == 0 ? 1 : strings.size();
List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList());
int sum = data.stream().mapToInt(x -> x).sum();
String statistics = StatisticsUtil.statistics(data, desc, threadNum);
int rt = sum / size;
double qps = 1000.0 * name / (rt == 0 ? 1 : rt);
double qps2 = (executeTotal + errorTotal) * 1000.0 / (endTime - startTime);
return new PerformanceResultBean(desc, start, end, name, size, rt, qps, qps2, getPercent(executeTotal, errorTotal), getPercent(threadNum, failTotal), executeTotal, statistics);
}
/**
* 用于做后期的计算
*
* @param name
* @param desc
* @return
*/
public PerformanceResultBean countQPS(int name, String desc) {
return countQPS(name, desc, Time.getDate(), Time.getDate());
}
}
测试
测试脚本
这里用了简单的多线程sleep
进行测试,不再使用本地 HTTP 接口了。脚本内容如下:
package com.funtest.groovytest
import com.funtester.base.constaint.HoldThread
import com.funtester.base.constaint.ThreadBase
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.HoldConcurrent
/**
* 加压暂停
*/
class HoldTest extends SourceCode {
public static void main(String[] args) {
def tester = new FunTester()
new HoldConcurrent(tester,10,"终止压力递增保持压力测试").start()
}
private static class FunTester extends HoldThread {
FunTester() {
super(null, 100, true)
}
@Override
protected void doing() throws Exception {
sleep(0.1)
}
@Override
ThreadBase clone() {
return new FunTester()
}
}
}
控制台输出
下面是完整的控制台输出:
INFO-> 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/src/test/groovy/com/funtest/groovytest/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
WARN-> 请输入“FunTester”继续运行!
INFO-> 已经启动了 1 个线程!
INFO-> 终止压力递增保持压力测试进度:▍ 2% ,当前QPS: 0
INFO-> 已经启动了 2 个线程!
INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍ 31% ,当前QPS: 20
INFO-> 已经启动了 3 个线程!
INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍ 60% ,当前QPS: 30
FunTester
INFO-> 本次共等待:9.535秒!
INFO-> 压力暂停
INFO-> 已经启动了 4 个线程!
INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍ 89% ,当前QPS: 39
INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍ 100%
INFO-> 总计4个线程,共用时:13.439 s,执行总数:227,错误数:0,失败数:0
INFO-> 数据保存成功!文件名:/Users/oker/IdeaProjects/funtester/src/test/groovy/com/funtest/groovytest/long/data/终止压力递增保持压力测试231809_4
INFO->
~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~ JSON ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~
> {
> ① . "rt":103,
> ① . "failRate":0.0,
> ① . "threads":4,
> ① . "deviation":"56.51%",
> ① . "errorRate":0.0,
> ① . "executeTotal":227,
> ① . "qps2":16.891137733462312,
> ① . "total":227,
> ① . "qps":38.83495145631068,
> ① . "startTime":"2021-09-23 18:09:02",
> ① . "endTime":"2021-09-23 18:09:16",
> ① . "mark":"终止压力递增保持压力测试231809",
> ① . "table":"eJwBLwDQ/+aVsOaNrumHj+WkquWwkSzml6Dms5Xnu5jlm74hIOW6lOW9k+Wkp+S6jiAxMDI0/eodgA=="
> }
~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~ JSON ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~
INFO-> 数据量太少,无法绘图! 应当大于 1024
Process finished with exit code 0
可以看出QPS
和QPS2
相差一倍以上,这个比较容易理解,等差数列既视感。下一步会努力实现本地灵活增减压力值和全局任务管理功能,敬请期待!!!