性能测试工具 【优分享】JMeter 源码解析之结果收集器

优测云服务平台 · 2023年02月27日 · 2624 次阅读

本文作者 优测性能测试专家高源。
简介:本文以最新的 JMeter 5.5 版本源代码为例详细介绍了单机模式和分布式模式下结果收集器的工作原理。通篇干货,还不快来了解一下!

一、JMeter 结果收集器概述
JMeter 是在压力领域中最常见的性能测试工具,由于其开源的特点,受到广大测试和开发同学的青睐。但是,在实际应用过程中,JMeter 存在的一些性能瓶颈也凸显出来,经常会遇到大并发下压不上去的情况。笔者通过深入分析其源码实现,找到 JMeter 存在的瓶颈问题及根本原因,为以后更好地使用工具提供一些思路。
结果收集器:在 JMeter 中担任报告数据收集的重任,无论是单机模式还是 master-slave 模式,每一个请求的结果都是通过相应的结果收集器进行数据采集的。在单机模式下用 Result Collector 这个监听器去采集,在分布式(master-slave)场景下通过配 RemoteSampleListenerWrapper 下的指定 sender 进行收集,具体配置 jmeter.property 文件的 mode 属性和队列长度实现。下面我们以当前最新的 JMeter 5.5 版本的源代码为例详细介绍下单机模式和分布式模式下结果收集器的工作原理。
二、单机模式
1、初始化
在命令行模式下,JMeter 会根据用户的 logfile 配置选择是否添加 Result Collector,一般在实际测试的时候,我们都是需要有详细统计报告生成的,所以都会添加 Result Collector,收集器放在了整个 hashtree 的第一个节点,代码如下:
void runNonGui(String testFile, String logFile, boolean remoteStart, String remoteHostsString, boolean generateReportDashboard){
....
ResultCollector resultCollector = null;
if (logFile != null) {
resultCollector = new ResultCollector(summariser);
resultCollector.setFilename(logFile);
clonedTree.add(clonedTree.getArray()[0], resultCollector);
}
else {
// only add Summariser if it can not be shared with the ResultCollector
if (summariser != null) {
clonedTree.add(clonedTree.getArray()[0], summariser);
}
}
....

}
2、加载流程
添加完结果收集器后,执行脚本过程中,JMeter 会根据 jmx 的编排,按照如下的执行顺序进行调用:

每一个线程都是按照以上的顺序循环反复执行,直到压测停止。具体代码如下(相应的关键点已增加注释):
private void executeSamplePackage(Sampler current,
TransactionSampler transactionSampler,
SamplePackage transactionPack,
JMeterContext threadContext) {
threadContext.setCurrentSampler(current);
// Get the sampler ready to sample
SamplePackage pack = compiler.configureSampler(current);
runPreProcessors(pack.getPreProcessors());//运行前置处理器
// Hack: save the package for any transaction controllers
threadVars.putObject(PACKAGE_OBJECT, pack);
delay(pack.getTimers());//定时器 timer
SampleResult result = null;
if (running) {
Sampler sampler = pack.getSampler();
result = doSampling(threadContext, sampler);
}
// If we got any results, then perform processing on the result
if (result != null) {
if (! result.isIgnore()) {
...

runPostProcessors(pack.getPostProcessors());//运行后置处理器
checkAssertions(pack.getAssertions(), result, threadContext);//运行断言处理器
// PostProcessors can call setIgnore, so reevaluate here
if (! result.isIgnore()) {
// Do not send subsamples to listeners which receive the transaction sample
List sampleListeners = getSampleListeners(pack, transactionPack, transactionSampler);
notifyListeners(sampleListeners, result);//执行监听器,此处为执行报告收集器的 sampleOccurred 方法
}
compiler.done(pack);
...
}

收集器 Result Collector 执行的具体代码:
@Override
public void sampleOccurred(SampleEvent event) {
SampleResult result = event.getResult();
if (isSampleWanted(result.isSuccessful())) {
sendToVisualizer(result);
if (out != null && ! isResultMarked(result) && ! this.isStats) {
SampleSaveConfiguration config = getSaveConfig();
result.setSaveConfig(config);
try {
if (config.saveAsXml()) {
SaveService.saveSampleResult(event, out);
} else { // ! saveAsXml
CSVSaveService.saveSampleResult(event, out);
}
} catch (Exception err) {
log.error("Error trying to record a sample", err); // should throw exception back to caller
}
}
}
if(summariser != null) {
summariser.sampleOccurred(event);
}
}

以上主要实现了将每个请求的结果数据存储到日志文件中(CSV /XML),为后续的报告生成提供数据文件。
3、性能瓶颈分析
从以上的流程不难看出,由于每个线程的每个请求后都会频繁调用 Result Collector 的 sample Occurred 方法,即会频繁读写文件,有可能导致 IO 瓶颈。一旦存储的速度下降,必然导致线程循环发包的速度下降,从而导致压不上去的情况出现。所以单机模式下不建议设置超过 200 以上的并发,若非必须,尽量关闭日志采集和 html 报告生成,以免报告置信度存在问题。

三、分布式模式
为了应对单机的各种瓶颈问题,JMeter 采用了分布式(master-slave)模式。加载执行流程与单机基本一致,不再赘述,区别在于监听器换成了 Remote Sample ListenerImpl 收集器。
1、发送模式指定方法
下面我们重点看下 Remote Sample ListenerImpl 监听器的代码:
@Override
public void processBatch(List samples) {
if (samples != null && sampleListener != null) {
for (SampleEvent e : samples) {
sampleListener.sampleOccurred(e);
}
}
}
@Override
public void sampleOccurred(SampleEvent e) {
if (sampleListener != null) {
sampleListener.sampleOccurred(e);
}
}

从以上代码可以看出,这个监听器里又调用了 sample Listener 的 sample Occurred 方法,而 sample Listener 是通过用户在 jmeter.property 文件中指定的。

2、AsynchSampleSender 源码解析
下面我们以 Asynch Sample Sender 为例进行源码详细介绍:
public class AsynchSampleSender extends AbstractSampleSender implements Serializable {
protected Object readResolve() throws ObjectStreamException{
int capacity = getCapacity();
log.info("Using batch queue size (asynch.batch.queue.size): {}", capacity); // server log file
queue = new ArrayBlockingQueue<>(capacity);
Worker worker = new Worker(queue, listener);
worker.setDaemon(true);
worker.start();
return this;
}
@Override
public void testEnded(String host)
log.debug("Test Ended on {}", host);
try {
listener.testEnded(host);
queue.put(FINAL_EVENT);
} catch (Exception ex) {
log.warn("testEnded(host)", ex);
}
if (queueWaits > 0) {
log.info("QueueWaits: {}; QueueWaitTime: {} (nanoseconds)", queueWaits, queueWaitTime);
}
}
@Override
public void sampleOccurred(SampleEvent e)
try {
if (! queue.offer(e)){ // we failed to add the element first time
queueWaits++;
long t1 = System.nanoTime();
queue.put(e);
long t2 = System.nanoTime();
queueWaitTime += t2-t1;
}
} catch (Exception err) {
log.error("sampleOccurred; failed to queue the sample", err);
}
}
private static class Worker extends Thread {
@Override
public void run()
try {
boolean eof = false;
while (! eof) {
List l = new ArrayList<>();
SampleEvent e = queue.take();
// try to process as many as possible
// The == comparison is not an error
while (!(eof = e == FINAL_EVENT) && e != null) {
l.add(e);
e = queue.poll(); // returns null if nothing on queue currently
}
int size = l.size();
if (size > 0) {
try {
listener.processBatch(l);
} catch (RemoteException err) {
if (err.getCause() instanceof java.net.ConnectException){
throw new JMeterError("Could not return sample",err);
}
log.error("Failed to return sample", err);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.debug("Worker ended");
}
}
}

从以上代码可以看出,Asynch SampleSender 的 sample Occurred 方法里只进行入列的操作,而采集上报工作是启动了一个 work 线程实现的,相当于异步处理所有请求数据。这样设计不会阻塞发包的流程,性能上要优于单机模式。但是,在一定情况下,也是会出现性能瓶颈的。
这个队列采用的是 Array Blocking Queue(阻塞队列),这个队列有如下特点:
·Array Blocking Queue 是有界的初始化必须指定大小,队列满了后,无法入列。
·Array Blocking Queue 实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个 Reenter Lock 锁。
3、性能瓶颈分析
瓶颈点一:队列大小问题
当我们实际压测过程中,如果队列大小(asynch.batch.queue.size)设置过小,入列速度大于出列速度,就会导致队列满而阻塞整个发压流程,而如果队列设置过大,一旦请求的包体比较大,很容易造成内存溢出。
瓶颈点二:单一锁问题
在压测过程中,入列出列是非常频繁的,而同一个 Reenter Lock 锁也可能造成入列和出列过程中,因无法获得锁而入列或者出列延迟,继而影响发压效率。
四、总结
JMeter 因其完善的社区和开源特点,在日常压测中可广泛使用。JMeter 适合进行小规模的压测。但是在大规模的压测过程中,受本地机器性能、带宽等限制,不宜进行单机压测,可以使用 JMeter 的 master-slave 的方式进行分布式压测。但是需提前设置好结果收集器和队列的大小,并进行预先演练评估出上限 qps,防止出现压不上去的情况。此外,master-slave 通信方式是远程 RMI 的双向通信方式,连接数过多也会造成 master 的瓶颈出现,需要做好量级的提前评估。

* 版权声明:本文作者 优测性能测试专家高源。

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册