3.4 多线程执行类

对于线程执行类来讲,最重要的两个功能就是执行测试任务和处理测试数据。其中执行测试任务涉及控制线程执行逻辑,稍显复杂,这里我们先将测试方案简化为执行 N 个并发,每个线程间隔 1 秒启动。如此一来,我们需要将已经创建好的 ThreadTask 类对象间隔提交给线程池执行即可。线程池的选择已经在上一节讲过,由于测试方案中并发数固定,我们只创建与之对应数量的线程池即可。

3.4.1 多线程类基础能力开发

首先我们设计这个类的属性,根据需求多线程执行类需要具备以下能力:

首先我们需要一个创建线程池的方法,该方法被设计成静态的,需要提取到一个多线程工具类中,这里先展示代码:

/**
 * 创建线程池,固定线程池大小
 * @param size 线程池大小
 * @return
 */
public static ThreadPoolExecutor createPool(int size) {
    return new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), // 创建线程池
            new ThreadFactory() { // 创建一个线程工厂
                AtomicInteger index = new AtomicInteger(); // 线程安全的线程编号

                @Override
                public Thread newThread(Runnable r) { // 重写创建线程方法
                    Thread thread = new Thread(r); // 创建线程
                    thread.setName("线程-" + index.incrementAndGet()); // 设置线程名称
                    return thread; // 返回创建的线程
                }
            }); // 创建线程池
}

然后根据需求分解,设计这个类的类属性:

/**
 * 线程池
 */
ThreadPoolExecutor poolExecutor;

/**
 * 任务数量,即并发数量
 */
public int TaskNum;

/**
 * 总执行次数
 */
public AtomicInteger executeNumStatistic;

/**
 * 执行错误次数
 */
public AtomicInteger errorNumStatistic;

/**
 * 用于记录所有请求时间
 */
public Vector<Integer> costTimeStatistic;

/**
 * 任务描述
 */
public String taskDesc;

/**
 * 开始时间,单位毫秒
 */
public long startTimestamp;

/**
 * 结束时间,单位毫秒
 */
public long endTimestamp;

上文提到我们有一个启动方法,也就是在创建执行类对象时,并不会直接把测试任务都提交到线程池,所以我们需要一个存放测试任务的 List 属性。

/**
 * 多线程任务集合
 */
public List<ThreadTask> tasks;

下面我们来设计构造方法,由于 tasks 属性的引入,所以任务数量即并发数量 TaskNum 并不是必须的,这里可以省略掉。其他属性中,只有 taskDesc 需要从外部获取了。那么,执行类的构造方法可以如下设计:

/**
 * 构造方法
 * @param taskDesc 任务描述
 * @param tasks 多线程任务集合
 */
public TaskExecutor(List<ThreadTask> tasks, String taskDesc) {
    this.tasks = tasks; // 初始化多线程任务集合
    this.taskDesc = taskDesc; // 初始化任务描述
    this.executeNumStatistic = new AtomicInteger(0); // 初始化执行次数
    this.errorNumStatistic = new AtomicInteger(0); // 初始化执行错误次数
    this.costTimeStatistic = new Vector<>(); // 初始化请求时间集合
    this.poolExecutor = ThreadTool.createPool(tasks.size()); // 初始化线程池
}

接下来,我们处理启动方法,启动方法中就是将 tasks 中的待执行任务,间隔 1 秒提交给线程池执行。代码如下:

/**
 * 开始执行任务
 */
public void start() {
    this.startTimestamp = System.currentTimeMillis(); // 记录开始时间
    for (ThreadTask task : tasks) { // 遍历多线程任务集合,提交线程池执行
        poolExecutor.execute(task); // 提交线程池执行
        ThreadTool.sleep(1000); // 休眠1秒,间隔提交多线程任务
    }
}

与之对应的主动结束测试任务的方法:

/**
 * 停止执行任务
 */
public void stop() {
    for (int i = 0; i < tasks.size(); i++) {
        tasks.get(i).needStop = true;
    }
}

3.4.2 任务类协同

除了执行类独有的功能,还有不少需要与任务类协同完成的功能。这里面主要两类:一是逻辑关联,包括启动、停止;另一类是数据汇总。

首先我们要解决的核心功能就是确认任务结束的时间点,且同步给多线程任务和多线程执行类。从理论上来说,每个多线程任务结束之后,在 after() 方法中把数据上报(请注意这里是线程安全的)。对于执行类来讲,需要所有任务线程结束之后,才会进行后续的数据处理。

经过这番描述,不知道读者会不会感觉到熟悉,这非常符合我们前面讲过的多线程同步类 java.util.concurrent.CountDownLatch 最佳使用场景,固定线程数,一次性的线程同步。

首先我们要给执行类增加一个 CountDownLatch 类实例属性:

/**
 * 用于停止任务的计数器
 */
public CountDownLatch stopCountDownLatch;

我们再给多线程任务类也添加一个同样的 CountDownLatch 类实例属性,代码与执行类相同,不在赘述。

然后在构造方法中完成初始化:

/**
 * 构造方法
 * @param taskDesc 任务描述
 * @param tasks 多线程任务集合
 */
public TaskExecutor(List<ThreadTask> tasks, String taskDesc) {
    this.tasks = tasks; // 初始化多线程任务集合
    this.taskDesc = taskDesc; // 初始化任务描述
    this.executeNumStatistic = new AtomicInteger(0); // 初始化执行次数
    this.errorNumStatistic = new AtomicInteger(0); // 初始化执行错误次数
    this.costTimeStatistic = new Vector<>(); // 初始化请求时间集合
    this.poolExecutor = ThreadTool.createPool(tasks.size()); // 初始化线程池
    this.stopCountDownLatch = new CountDownLatch(tasks.size()); // 初始化停止任务计数器
    for (int i = 0; i < tasks.size(); i++) { // 初始化多线程任务的CountDownLatch同步类属性
        tasks.get(i).stopCountDownLatch = stopCountDownLatch;
    }
}

在多线程任务类中,我们修改 run() 方法内容,添加 CountDownLatch 类计数功能。这里我们遵守最佳实战中的展示方法:

/**
 * 多线程任务执行方法
 */
@Override
public void run() {
    try {
        before(); // 前置处理
        while (true) {
            if (ABORT.get() || needStop || executeNum >= totalNum) { // 判断是否终止测试任务
                break;
            }
            try {
                executeNum++; // 记录执行次数
                long start = System.currentTimeMillis(); // 记录开始时间
                test(); // 测试方法
                long end = System.currentTimeMillis(); // 记录结束时间
                costTime.add((int) (end - start));
            } catch (Exception e) {
                errorNum++; // 记录错误次数
                e.printStackTrace();
            }
        }
        after(); // 后置处理
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        stopCountDownLatch.countDown(); // 计数器减一
    }
}

最后我们要改造执行类的 start() 方法,在等待所有多线程任务都结束之后,执行后续操作。

/**
 * 开始执行任务
 */
public void start() {
    this.startTimestamp = System.currentTimeMillis(); // 记录开始时间
    for (ThreadTask task : tasks) { // 遍历多线程任务集合,提交线程池执行
        poolExecutor.execute(task); // 提交线程池执行
        ThreadTool.sleep(1000); // 休眠1秒,间隔提交多线程任务
    }
    try {
        stopCountDownLatch.await(); // 等待停止任务计数器为0
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    this.endTimestamp = System.currentTimeMillis(); // 记录结束时间
    this.poolExecutor.shutdown(); // 关闭线程池
    System.out.println(System.currentTimeMillis() + "  性能测试任务执行完毕!"); // 打印任务执行完毕
}

万事俱备,下面即将迎来激动人心的时刻,笔者将通过一个演示案例见证我们设计的压测引擎第一次启动运行:

package org.funtester.performance.books.chapter03.section4;

import org.funtester.performance.books.chapter03.common.ThreadTool;
import org.funtester.performance.books.chapter03.section3.ThreadTask;

import java.util.ArrayList;
import java.util.List;

public class TestEngineDemo {

    public static void main(String[] args) {
        int tasksNum = 2; // 任务数,即并发数量
        int totalNum = 3; // 单个任务的总执行次数
        List<ThreadTask> tasks = new ArrayList<>(); // 任务集合
        for (int i = 0; i < tasksNum; i++) {
            ThreadTask threadTask = new ThreadTask() { // 创建任务对象
                @Override
                public void before() { // 重写前置处理方法
                    System.out.println(System.currentTimeMillis() + "  before testing !  " + Thread.currentThread().getName()); // 打印前置处理日志
                }

                @Override
                public void test() {
                    ThreadTool.sleep(500); // 模拟业务操作
                    System.out.println(System.currentTimeMillis() + "  testing !  " + Thread.currentThread().getName()); // 打印业务操作日志
                }

                @Override
                public void after() {
                    System.out.println(System.currentTimeMillis() + "  after testing !  " + Thread.currentThread().getName()); // 打印后置处理日志
                }

            };
            threadTask.totalNum = totalNum; // 设置任务的总执行次数
            threadTask.costTime = new ArrayList<>(totalNum); // 设置任务的执行时间集合,设置容量,避免频繁扩容
            tasks.add(threadTask); // 将任务添加到任务集合
        }
        new TaskExecutor(tasks, "性能测试引擎演示").start(); // 创建并发任务执行器并启动
    }
}

在上面这个例子中,实现了一个 2 个并发,单个多线程任务执行 3 次的性能测试用例。用例每个线程任务中 before()test()after() 方法都做了日志处理。控制台打印的内容如下:

1700142013891  线程-1  before testing !
1700142014393  线程-1  testing !
1700142014895  线程-2  before testing !
1700142014896  线程-1  testing !
1700142015400  线程-2  testing !
1700142015400  线程-1  testing !
1700142015400  线程-1  after testing !
1700142015904  线程-2  testing !
1700142016409  线程-2  testing !
1700142016409  线程-2  after testing !
1700142016409  性能测试任务执行完毕!

从打印信息可以看出,线程任务均按照我们预期的方式执行,当所有任务执行完,执行类关闭线程池,打印结束日志。

书的名字:从 Java 开始做性能测试

如果本书内容对你有所帮助,希望各位不吝赞赏,让我可以贴补家用。赞赏两位数可以提前阅读未公开章节。我也会尝试制作本书的视频教程,包括必要的答疑。

FunTester 原创精华

【连载】从 Java 开始性能测试


↙↙↙阅读原文可查看相关链接,并与作者交流