FunTester 【连载 11】Phaser 类

FunTester · 2025年01月13日 · 876 次阅读

2.6 Phaser

Phaser 是上一节提到的更高级的线程同步工具。Phaser 的包路径是 java.util.concurrent.Phaser,属于 Java 多线程编程的核心功能。Phaser 类的主要功能是控制多个线程在特定的同步时间点同步执行。从文字介绍上看,它似乎没有特别之处,但其实际功能相比 CountDownLatch 增强了不止一星半点。Phaser 可以说是 Java 多线程同步的终极解决方案。

Phaser 类支持多阶段线程同步、动态的注册和注销、指定同步阶段、子同步功能,可以在到达集合点后不阻塞继续执行下一阶段,还可以中断等待的阶段、全局管理等。

终究是 Phaser 类功能太强大了,而作为性能测试工具,它有些高攀不起。所以在性能测试中使用到的还是 Phaser 类的基础功能。总结起来有两点原因:一是性能测试需要的场景复杂程度相对 Phaser 类来讲,还是小儿科了;二是使用 Java 进行性能测试时,尽量避免使用逻辑复杂的解决方案。还是那句话,如果遇到过于复杂的场景,则抛开 Phaser,寻求更加简单、可靠的解决方案。

相比 CountDownLatch,Phaser 在实战中典型的使用场景是处理不定数量的并发任务同步问题。CountDownLatch 需要提前确定同步数量,但 Phaser 不需要。在使用当中,通常的使用流程如下:

  1. 创建 Phaser 对象,同步数量为 1。
  2. 指定多线程任务,每个任务开始前使用 Phaser 对象注册,完成之后注销。
  3. 等待同步线程使用 Phaser 对象进行等待,直到全部注册任务都完成。

2.6.1 基础方法

在 Java 进行性能测试中,Phaser 类常用的构造方法只有 1 个:

public Phaser(int parties) {
    this(null, parties);
}

这个方法只有一个 int 数据类型的参数,表示同步数量,这一点跟 CountDownLatch 类一样。该方法对应 Phaser 工作流程的第一步。

Phaser 工作流程第二步,注册:

public int register() {
    return doRegister(1);
}

这个方法没有参数,含义是当前线程申请同步数量加一,返回 int 类型数据,含义是当前同步阶段。该方法存在失败的可能,若尝试注册数量超过阈值,则会抛出 IllegalStateException 异常。

注销:

public int arriveAndDeregister() {
    return doArrive(ONE_DEREGISTER);
}

这个方法同样没有参数,含义是当前线程到达集合点并申请同步数量减一,返回 int 类型数据,含义是当前同步阶段。该方法存在失败可能,若是注册人数或者未到达人数会因为该注销行为变成负值,则会抛出 IllegalStateException 异常。

如果多线程任务到达集合点,期望等待其他线程都到达,并且继续参与下一个阶段的同步,可以使用下面这个方法:

public int arriveAndAwaitAdvance() {
   // 方法体内容过多,省去。
}

如果多线程任务不想等其他线程,直接进入下一阶段同步,可以使用下面这个方法:

public int arrive() {
    return doArrive(ONE_ARRIVAL);
}

Phaser 工作流程第三步,同步线程的等待方法与多线程任务到达集合点方法重合,即使用 arriveAndAwaitAdvance() 方法。若是多阶段同步的话,还可以指定需要等待的同步阶段,通过调用下面的方法实现:

public int awaitAdvance(int phase) {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase)
        return root.internalAwaitAdvance(phase, null);
    return p;
}

这里建议尽量避免使用多阶段同步,尽量不在同步线程外调用该方法。因为这样会使得代码逻辑复杂程度数量级上升,容易造成无限等待。

通常我们会统计异步任务完成的数量,此时还会用到另外一个方法,获取同步计数:

public int getArrivedParties() {
    return arrivedOf(reconcileState());
}

该方法没有参数,返回 int 数据类型,含义是已经注册且已经到达集合点的数量。在 Java 性能测试中,通常用来统计多线程任务的完成进度。这里请注意,统计数量不包含那些已经注销的任务,如果要统计所有完成的任务,请在到达集合点时使用 arrive() 方法而不是 arriveAndDeregister()

2.6.2 最佳实战

下面通过一个小例子演示 Phaser 使用方法。

package org.funtester.performance.books.chapter02.section5;

import java.util.concurrent.Phaser;

/**
 * Phaser 演示类
 */
public class PhaserDemo {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(1); // 创建 Phaser 对象,将参与的线程数初始化为 1
        for (int i = 0; i < 3; i++) { // 创建并启动 3 个线程
            phaser.register(); // 每创建并启动一个线程,注册一次
            new Thread(() -> { // 创建异步线程
                phaser.arrive(); // 每个线程执行完任务后,通知 phaser,当前线程任务完成
                System.out.println(System.currentTimeMillis() + "  完成完成 " + Thread.currentThread().getName()); // 打印当前线程完成任务的时间和线程名称
            }).start(); // 启动异步线程
        }
        System.out.println(System.currentTimeMillis() + "  完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数
        Thread.sleep(10); // 等待 10 毫秒
        System.out.println(System.currentTimeMillis() + "  完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数
        phaser.arriveAndAwaitAdvance(); // 通知 phaser,当前线程任务完成,并等待其他线程完成任务
        System.out.println(System.currentTimeMillis() + "  完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数
    }
}

上面这个例子中,首先创建了 Phaser 对象,并设置同步数量等于 1。其次创建 3 个异步线程,分别在创建线程之前将同步对象注册一次,每个线程执行逻辑为:到达集合点,不阻塞立即打印日志。main 线程立即打印任务总数日志,然后休眠 10 毫秒,再打印任务总数日志,到达同步点并且阻塞等待所有线程到达同步点,最后再打印一次任务总数日志。

1698560341651  完成完成 Thread-0
1698560341651  完成任务总数: 2
1698560341651  完成完成 Thread-1
1698560341652  完成完成 Thread-2
1698560341662  完成任务总数: 3
1698560341662  完成任务总数: 0

可以看出,第一次打印任务总数时,只有 2 个线程完成了任务。当 main 线程休眠完成之后,所有线程完成,所以第二次打印任务总数就是 3 了。当 main 线程到达同步点后,再打印日志任务总数就是 0 了,原因是因为所有线程到达集合点之后,已经进行了第二阶段的同步,所以打印出来的是第二个阶段到达集合点的线程数,即为 0。

2.6.4 使用场景

对于大多数线程同步场景来说,动用 Phaser 的确大材小用,所以实际使用场景也不是很多。上面提到过的多线程处理批量任务,例如我需要把 1 万个用户个人资料都添加上收货地址,然后再用这 1 万账号进行商品下单的操作。那么需要这么设计用例:

  1. 前置阶段初始化用户的收货地址。
  2. 待所有任务完成后,进行下单的性能测试。
  3. 待压测结束后,重置用户数据,恢复测试用户的元状态。

这其中步骤 2 和 3 均涉及到了多线程同步,Phaser 是最好的选择。此外,具有阶段性的多线程任务非常适合 Phaser 大展拳脚,例如:要先从注册账号开始,其次将注册成功的账号进行用户信息初始化,然后再执行性能测试,最后清理数据。

2.6.5 自定义同步类

虽然 java.util.concurrent.Phaser 功能强大,但毕竟不是为了性能测试开发的功能类,在实践中也会遇到一些水土不服的情况,总结为下面两种:

  1. 注册同步数量有上限,对应代码 private static final int MAX_PARTIES = 0xffff,约 6 万多。
  2. 极限性能不理想。在 Phaser 功能设计中,涉及多处锁的操作,在高并发情况下性能表现不佳。

基于这样的情况,如果我们有需求,就可以自己设计一款功能简化之后的同步类。这个同步类需要实现以下功能:

  1. 线程安全计数,统计未完成的注册任务数量。
  2. 线程安全计数,统计已完成任务数量。
  3. 提供注册和完成方法。
  4. 提供返回注册数量和完成数量的方法。

线程安全技术类,我们就选择 java.util.concurrent.atomic.AtomicInteger,可以规避掉 Phaser 上限较低的问题,如果还觉得不够,可以用 java.util.concurrent.atomic.AtomicLong 替代。其他方法就比较容易,笔者将这个自定义的同步类叫做 FunPhaser,以下是代码实践内容:

package org.funtester.performance.books.chapter02.section6;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义多线程同步类
 */
public class FunPhaser {

    /**
     * 任务总数索引, 用于标记任务完成状态
     * 注册增加, 任务完成减少
     */
    AtomicInteger index;

    /**
     * 任务总数, 用于记录任务完成数量
     */
    AtomicInteger taskNum;

    public FunPhaser() {
        this.index = new AtomicInteger(); // 初始化
        this.taskNum = new AtomicInteger(); // 初始化
    }

    /**
     * 注册任务, 并返回当前注册数量
     * @return
     */
    public int register() {
        return this.index.incrementAndGet();
    }

    /**
     * 任务完成
     * @return
     */
    public void done() {
        this.index.getAndDecrement();
        this.taskNum.getAndIncrement();
    }

    /**
     * 等待所有任务完成
     * @return
     */
    public void await() throws InterruptedException {
        long start = System.currentTimeMillis();
        while (index.get() > 0) {
            if (System.currentTimeMillis() - start > 100000) { // 默认超时时间 100 秒
                System.out.println(System.currentTimeMillis() - start);
                break;
            }
            Thread.sleep(100);
        }
    }

    /**
     * 等待所有任务完成
     * @param timeout 超时时间, 单位毫秒
     * @return
     */
    public void await(int timeout) throws InterruptedException {
        long start = System.currentTimeMillis();
        while (index.get() > 0) {
            if (System.currentTimeMillis() - start >= timeout) {
                break;
            }
            Thread.sleep(100);
        }
    }

    /**
     * 获取注册总数
     * @return
     */
    public int queryRegisterNum() {
        return index.get();
    }

    /**
     * 获取任务完成总数
     * @return
     */
    public int queryTaskNum() {
        return taskNum.get();
    }
}

下面写个用例进行测试。思路如下:启动 10 个线程,每个线程注册一次,休眠模拟业务执行,最后完成任务。代码如下:

FunPhaser phaser = new FunPhaser(); // 创建 Phaser
for (int i = 0; i < 10; i++) { // 创建 10 个线程
    new Thread(() -> { // 创建线程
        for (int j = 0; j < 10; j++) { // 每个线程执行 10 次任务
            phaser.register();
            try {
                Thread.sleep(10); // 模拟任务执行时间
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                phaser.done(); // 任务完成
            }
        }
    }).start();
}
try {
    phaser.await(); // 等待所有任务完成
} catch (InterruptedException e) {
    throw new RuntimeException(e);
}
System.out.println("任务注册数 " + phaser.queryRegisterNum() + " 个");
System.out.println("任务完成总数 " + phaser.queryTaskNum() + " 个");

控制台打印如下:

任务注册数 0 个
任务完成总数 100 个

可以看出,我们最初的预想已经完美实现。

书的名字:从 Java 开始做性能测试

如果本书内容对你有所帮助,希望各位不吝赞赏,让我可以贴补家用。赞赏两位数可以提前阅读未公开章节。我也会尝试制作本书的视频教程,包括必要的答疑。

FunTester 原创精华

【连载】从 Java 开始性能测试

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册