负责项目需要进行 rocketmq 压测,原本尝试使用 locust 结合 rocketmq 的 py 库进行编写和执行压测脚本,但实测发现 rocketmq 的 py 库仅支持在 Linux 环境下运行,且实际测试并发量大之后,出现大量未知原因报错,所以考虑使用 jmeter 来做,但是 jmeter 并没有现成的插件支持 rocketmq,之前有了解过 jmeter 可以自己开发第三方插件,由此决定自己编写参照 jmeter 插件写法,打包一个 rocketmq 的 jemter 插件来进行压测
虽然这是用于 rocketmq 的性能压测插件,但是 jmeter 其他 Java 类请求插件都可以套这个模板,自己编写可以灵活性很高,希望对大家类似的需求有帮助!
jdk: 采用 1.8,配置环境变量
IDE:idea 2021
依赖管理和打包工具:maven
maven 依赖设置如下
<dependencies>
<!-- jmeter依赖-->
<dependency>
<groupId>org.apache.jmeter</groupId>
<artifactId>ApacheJMeter_core</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.jmeter</groupId>
<artifactId>ApacheJMeter_java</artifactId>
<version>5.4.3</version>
</dependency>
<!-- rcketmq依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<!-- 设置为连同依赖一起打包为一个jar包-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.xxg.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>${project.name}</finalName>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
因为我们最终目的是把 jar 包作为 jmeter 的插件使用,指定对应参数发起大量请求,所以主要有以下几点注意
代码详见:
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import sun.awt.windows.WPrinterJob;
import java.util.Random;
public class JmeterMqProducer extends AbstractJavaSamplerClient {
//这里定义类变量(不能定义为static,会报错)
private DefaultMQProducer producer;
private String producerName;
private String producerGroup;
private String serverUrl;
private String topic;
private String tags;
private String keys;
private String body;
private String delayTime;
private String timeout;
private String sendType;
private long cur_time;
private byte[] bodyBytes;
private String orginData;
//这里定义这里作为请求的前置处理
@Override
public void setupTest(JavaSamplerContext context) {
serverUrl = context.getParameter("serverUrl");
topic = context.getParameter("topic");
tags = context.getParameter("tags");
keys = context.getParameter("keys");
body = context.getParameter("messageBody");
producerName = context.getParameter("producerName");
producerGroup = context.getParameter("producerGroup");
timeout = context.getParameter("timeout");
sendType = context.getParameter("sendType");
delayTime = context.getParameter("delayTime");
try {
producer = getProducer(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//这里自定义了一个Producer的单例方法
public DefaultMQProducer getProducer(int type) throws InterruptedException {
if (producer == null){
producer = new DefaultMQProducer(producerGroup);
if (type == 1){
System.out.println("=======init producer == null===========");
cur_time = System.currentTimeMillis();
}else {
System.out.println("=======runtest producer == null===========");
}
producer.setNamesrvAddr(serverUrl);
producer.setInstanceName(producerName);
producer.setVipChannelEnabled(false);
// 设置超时时间
producer.setSendMsgTimeout(Integer.parseInt(timeout));
}
try {
producer.start();
} catch (MQClientException e) {
System.out.println("启动init!忽略");
}
return producer;
}
//这里是一个请求结束的后置处理
@Override
public void teardownTest(JavaSamplerContext context) {
producer.shutdown();
}
//这里是一个请求主体执行部分
@Override
public SampleResult runTest(JavaSamplerContext context) {
SampleResult sr = new SampleResult();
sr.sampleStart();
// sr.setRequestHeaders("请求原始的msg_body:"+ orginData); 设置请求头内容
Message msg = new Message(topic,
tags,
keys,
bodyBytes);
// msg.getProperties().put("traceparent", "xxx");
try {
if ("oneWay".equals(sendType)){
producer.sendOneway(msg);
sr.setResponseData("Oneway发送成功","utf-8");
}else {
SendResult sendResult = producer.send(msg);
sr.setResponseData(sendResult.toString(),"utf-8");
if(sendResult ==null || sendResult.getSendStatus() != SendStatus.SEND_OK){
System.err.println(sendResult);
sr.setResponseData("{'code' : 1, 'msg': '失败'}","utf-8");
}
}
}catch (Exception e){
e.printStackTrace();
sr.setResponseData("{'code' : 2, 'msg': '其他失败啊'}","utf-8");
producer.shutdown();
}
sr.setDataType(SampleResult.TEXT);
sr.setSuccessful(true);
}catch(Exception e){
sr.setSuccessful(false);
e.printStackTrace();
}
finally {
sr.sampleEnd();
}
return sr;
}
// 给参数填充默认值
@Override
public Arguments getDefaultParameters() {
Arguments params = new Arguments();
params.addArgument("serverUrl", "http://mq.xxx.com");
params.addArgument("topic", "test_topic");
params.addArgument("tags", "test_tag");
params.addArgument("keys", "test_key");
params.addArgument("messageBody", "test_body");
params.addArgument("producerName", "test_producerName");
params.addArgument("producerGroup", "producerGroup");
params.addArgument("timeout", "6000");
params.addArgument("sendType", "oneWay");
params.addArgument("delayTime", "100");
return params;
}
}
使用 mvn package 命令打包,获得 jar 包
放置到 jmeter 的 \lib\ext 命令下
重启 jemter, 再重新添加一个取样器
即可发现新增了一个 java request 取样器,就是我们编写的 rocketmq 插件了
好了,恭喜你,接下来就可以进行性能测试了,实测使用 oneway 发送单条消息,在压力机和服务器处于同一网络且网络状况良好情况下,单条消息耗时在毫秒级,qps 轻松破万,速度很快