性能测试工具 记一次 jmeter 的 rocketmq 插件开发

饶凯 · 2023年08月08日 · 3356 次阅读

负责项目需要进行 rocketmq 压测,原本尝试使用 locust 结合 rocketmq 的 py 库进行编写和执行压测脚本,但实测发现 rocketmq 的 py 库仅支持在 Linux 环境下运行,且实际测试并发量大之后,出现大量未知原因报错,所以考虑使用 jmeter 来做,但是 jmeter 并没有现成的插件支持 rocketmq,之前有了解过 jmeter 可以自己开发第三方插件,由此决定自己编写参照 jmeter 插件写法,打包一个 rocketmq 的 jemter 插件来进行压测

虽然这是用于 rocketmq 的性能压测插件,但是 jmeter 其他 Java 类请求插件都可以套这个模板,自己编写可以灵活性很高,希望对大家类似的需求有帮助!

1.准备

jdk: 采用 1.8,配置环境变量
IDE:idea 2021
依赖管理和打包工具:maven

2.maven 依赖设置

maven 依赖设置如下

<dependencies>
         <!--  jmeter依赖-->
        <dependency>
            <groupId>org.apache.jmeter</groupId>
            <artifactId>ApacheJMeter_core</artifactId>
            <version>5.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.jmeter</groupId>
            <artifactId>ApacheJMeter_java</artifactId>
            <version>5.4.3</version>
        </dependency>
        <!-- rcketmq依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.1.0</version>
        </dependency>


    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

   <!--  设置为连同依赖一起打包为一个jar包-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.5.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.xxg.Main</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <finalName>${project.name}</finalName>

                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

3.rocketmq 主类编写

因为我们最终目的是把 jar 包作为 jmeter 的插件使用,指定对应参数发起大量请求,所以主要有以下几点注意

  • 1. 请求主类编写:因为需要被 jmeter 识别,所以需要继承 AbstractJavaSamplerClient,这样后面 jmeter 添加采用器的时候,我们才能选到自己的开发的第三方采样器插件和类
    • a. 重写 setupTest 作为请求的前置处理(这里我选择作为 producer 的初始化)
    • b. 重写 teardownTest 作为请求的后置处理(这里我选择销毁 producer)
    • c. 重写 runTest 作为请求的主题处理(这里我选择发起 mq 的生产及响应断言)
    • d. 重写 getDefaultParameters 作为设置请求默认参数(这里我选择填充默认的 mq 参数,如 namesrv、tag)
  • 2. 关于 mq 并发的一些注意事项
    • a. producerName、producerGroup 每次请求需要指定为唯一值,不然并发量起来会大量报错
    • b. mq 的发送方式包含同步发送、异步发送、OneWay(单向)发送等(具体可参考:https://www.cnblogs.com/wzh2010/p/16629876.htmlOneWay 这 2 类方式)),需要选择根据具体场景可选择合适发送方式,或者可以都写上,通过参数化实现切换(可以参见下面代码,我指定了同步发送、
    • c.实际运行注意删除/注释测试代码,尤其是延时,不要问为什么,因为我吃过亏,mq 的本身的发送速度极快,之所以速度提不上来,就是固定延时惹的祸!

代码详见:

import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import sun.awt.windows.WPrinterJob;

import java.util.Random;


public class JmeterMqProducer extends AbstractJavaSamplerClient {
    //这里定义类变量(不能定义为static,会报错)
    private DefaultMQProducer producer;
    private String producerName;
    private String producerGroup;
    private String serverUrl;
    private String topic;
    private String tags;
    private String keys;
    private String body;
    private String delayTime;
    private String timeout;
    private String sendType;
    private long cur_time;
    private byte[] bodyBytes;
    private String orginData;

   //这里定义这里作为请求的前置处理
    @Override
    public void setupTest(JavaSamplerContext context) {
        serverUrl = context.getParameter("serverUrl");
        topic = context.getParameter("topic");
        tags = context.getParameter("tags");
        keys = context.getParameter("keys");
        body = context.getParameter("messageBody");
        producerName = context.getParameter("producerName");
        producerGroup = context.getParameter("producerGroup");
        timeout = context.getParameter("timeout");
        sendType = context.getParameter("sendType");
        delayTime = context.getParameter("delayTime");

        try {
            producer = getProducer(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //这里自定义了一个Producer的单例方法
    public DefaultMQProducer getProducer(int type) throws InterruptedException {
        if (producer == null){
            producer = new DefaultMQProducer(producerGroup);
            if (type == 1){
                System.out.println("=======init producer == null===========");
                cur_time = System.currentTimeMillis();
            }else {
                System.out.println("=======runtest producer == null===========");
            }
            producer.setNamesrvAddr(serverUrl);
            producer.setInstanceName(producerName);
            producer.setVipChannelEnabled(false);
            // 设置超时时间
            producer.setSendMsgTimeout(Integer.parseInt(timeout));
        }
        try {
            producer.start();
        } catch (MQClientException e) {
            System.out.println("启动init!忽略");
        }
        return producer;
    }

     //这里是一个请求结束的后置处理
    @Override
    public void teardownTest(JavaSamplerContext context) {
        producer.shutdown();
    }

    //这里是一个请求主体执行部分
    @Override
    public SampleResult runTest(JavaSamplerContext context) {
            SampleResult sr = new SampleResult();
            sr.sampleStart();
            // sr.setRequestHeaders("请求原始的msg_body:"+ orginData); 设置请求头内容
            Message msg = new Message(topic,
                    tags,
                    keys,
                    bodyBytes);
           // msg.getProperties().put("traceparent", "xxx");
            try {
                if ("oneWay".equals(sendType)){
                    producer.sendOneway(msg);
                    sr.setResponseData("Oneway发送成功","utf-8");
                }else {
                    SendResult sendResult = producer.send(msg);
                    sr.setResponseData(sendResult.toString(),"utf-8");
                    if(sendResult ==null || sendResult.getSendStatus() != SendStatus.SEND_OK){
                        System.err.println(sendResult);
                        sr.setResponseData("{'code' : 1, 'msg': '失败'}","utf-8");
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
                sr.setResponseData("{'code' : 2, 'msg': '其他失败啊'}","utf-8");
                producer.shutdown();

            }
            sr.setDataType(SampleResult.TEXT);
            sr.setSuccessful(true);
        }catch(Exception e){
            sr.setSuccessful(false);
            e.printStackTrace();
        }
        finally {
            sr.sampleEnd();
        }
        return sr;

    }

   // 给参数填充默认值
    @Override
    public Arguments getDefaultParameters() {
        Arguments params = new Arguments();
        params.addArgument("serverUrl", "http://mq.xxx.com");
        params.addArgument("topic", "test_topic");
        params.addArgument("tags", "test_tag");
        params.addArgument("keys", "test_key");
        params.addArgument("messageBody", "test_body");
        params.addArgument("producerName", "test_producerName");
        params.addArgument("producerGroup", "producerGroup");
        params.addArgument("timeout", "6000");
        params.addArgument("sendType", "oneWay");
        params.addArgument("delayTime", "100");

        return params;
    }
}

4.打包

使用 mvn package 命令打包,获得 jar 包

5.运行

  • 放置到 jmeter 的 \lib\ext 命令下

  • 重启 jemter, 再重新添加一个取样器

即可发现新增了一个 java request 取样器,就是我们编写的 rocketmq 插件了

好了,恭喜你,接下来就可以进行性能测试了,实测使用 oneway 发送单条消息,在压力机和服务器处于同一网络且网络状况良好情况下,单条消息耗时在毫秒级,qps 轻松破万,速度很快

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册