FunTester 一文看懂 Gatherer 与 Groovy 集合能力

FunTester · 2026年05月01日 · 42 次阅读

JDK 24 中 Gatherer 的增强,让流管道在更多场景下具备了更灵活的中间处理能力。本文用 Groovy 来演示这些场景,同时对照 Groovy 早已提供的一些集合能力,看看两者在思路和实现上的对应关系。

本文示例使用 Groovy 4.0.26 测试。Gatherer 相关示例运行在 JDK 24.0.0 上,其他示例则分别在 JDK 8JDK 17JDK 24 上验证。

Gatherer 是什么

Java 开发者对 Stream 已经很熟悉了。流本质上是一个可能无限长的值序列,支持惰性计算。一次完整的流管道通常由三部分组成:元素源、零个或多个中间操作,以及一个终端操作。这个模型已经足够强大,但它对中间操作的扩展能力一直比较有限。很多稍复杂一点的场景,如果只靠现有的 mapfilterlimit 之类的操作,写起来会比较别扭。这正是 Gatherer 的价值所在:它允许我们自定义中间操作。有了 Gatherer 之后,流 API 新增了 gather 这个中间操作。它接收一个 Gatherer,输出一个新的流。一个 Gatherer 主要由四部分组成:

  • 初始化器:通常是一个 Supplier,用于创建初始状态。
  • 集成器:负责把当前元素并入状态,并决定是否向下游推送结果。
  • 完成器:在流结束时做最后的收尾处理。
  • 组合器:主要用于并行场景,把多个分支的状态合并起来。

最核心的通常是集成器,它的接口如下:

interface Integrator<A, T, R> {
    boolean integrate(A state, T element, Downstream<? super R> downstream);
}

这里的 state 是状态对象,后面的示例大多用列表来表示;element 是当前处理到的流元素;downstream 则是把结果送往下游阶段的钩子。本文中的大多数例子都依赖输入顺序,因此并不适合并行化,所以组合器不会成为重点。除了自定义 Gatherer,JDK 还内置了一些实用实现,比如 windowFixedwindowSlidingfold

集合切片与窗口基础

在正式进入 Gatherer 之前,先看看 Groovy 在集合切片这件事上有多灵活。Groovy 可以直接通过区间或索引列表选取集合的一部分:

assert (1..8)[0..2] == [1, 2, 3]                   // 闭区间索引
assert (1..8)[3<..<6] == [5, 6]                    // 开区间索引
assert (1..8)[0..2,3..4,5] == [1, 2, 3, 4, 5, 6]   // 多段区间组合
assert (1..8)[0..2,3..-1] == 1..8                  // 混合区间
assert (1..8)[0,2,4,6] == [1, 3, 5, 7]             // 取奇数位
assert (1..8)[1,3,5,7] == [2, 4, 6, 8]             // 取偶数位

如果只需要一个连续窗口,也可以直接用 takedrop

assert (1..8).take(3) == [1, 2, 3]
assert (1..8).drop(2).take(3) == [3, 4, 5]

在流 API 里,对应思路通常是 skip 配合 limit

assert (1..8).stream().limit(3).toList() == [1, 2, 3]
assert (1..8).stream().skip(2).limit(3).toList() == [3, 4, 5]

这一层还比较直接,但再往前走,比如 collatechop 这类更强的集合操作,如果没有 Gatherer,用流就不太顺手了。

固定窗口与滑动窗口

Groovy 的 collate 会把集合拆成固定大小的数据块:

assert (1..8).collate(3) == [[1, 2, 3], [4, 5, 6], [7, 8]]

最后一个块如果不够长,默认也会保留。如果你不想保留残余数据块,可以传入布尔参数:

assert (1..8).collate(3, false) == [[1, 2, 3], [4, 5, 6]]

这种需求非常常见,所以 JDK 直接提供了内置的 Gatherers.windowFixed

assert (1..8).stream().gather(windowFixed(3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

如果你的需求是丢掉最后不足一组的残余块,JDK 没有现成实现,但自己写一个并不难:

<T> Gatherer<T, ?, List<T>> windowFixedTruncating(int windowSize) {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { window, element, downstream ->
            window << element
            if (window.size() < windowSize) return true

            var result = List.copyOf(window)  // 满窗后先复制,再清空状态
            window.clear()
            downstream.push(result)
        }
    )
}

这段代码的思路很直接:初始化时创建一个空列表作为窗口,集成器不断把元素放进去;当窗口达到指定大小时,复制当前结果、推给下游,再把窗口清空。它本质上就是一个简化版的 windowFixed。因为这里故意忽略了残余块,所以连完成器都不需要。这里有几个细节值得注意:

  • 这是典型的顺序算法,所以用 ofSequential
  • 我们不会提前停止处理,因此使用 ofGreedy 更合适。
  • 示例没有加入参数校验。真要放到生产代码里,至少要处理 windowSize < 1 这种非法输入。

使用方式如下:

assert (1..8).stream().gather(windowFixedTruncating(3)).toList() ==
    [[1, 2, 3], [4, 5, 6]]

collate 还有带步长的重载版本。比如下面这个例子,每次向前滑动 1 个元素,并丢弃尾部不足一窗的部分:

assert (1..5).collate(3, 1, false) == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

这个场景和内置的 windowSliding 正好对应:

assert (1..5).stream().gather(windowSliding(3)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

如果步长不是 1,或者你还想精细控制是否保留残余块,那就需要自己实现一个 Gatherer 了。先看几组 Groovy 侧的参考结果:

assert (1..5).collate(3, 1) == [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).collate(3, 2) == [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).collate(3, 2, false) == [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).collate(3, 4, false) == [[1, 2, 3], [5, 6, 7]]
assert (1..8).collate(3, 3) == [[1, 2, 3], [4, 5, 6], [7, 8]]

对应的 Gatherer 可以这样写:

<T> Gatherer<T, ?, List<T>> windowSlidingByStep(
    int windowSize,
    int stepSize,
    boolean keepRemaining = true
) {
    int skip = 0

    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { window, element, downstream ->
            if (skip) {
                skip--
                return true
            }

            window << element
            if (window.size() < windowSize) return true

            var result = List.copyOf(window)
            skip = stepSize > windowSize ? stepSize - windowSize : 0
            [stepSize, windowSize].min().times { window.removeFirst() }  // 为下一窗移动窗口
            downstream.push(result)
        },
        (window, downstream) -> {
            if (keepRemaining) {
                while (window.size() > stepSize) {
                    downstream.push(List.copyOf(window))
                    stepSize.times { window.removeFirst() }
                }
                downstream.push(List.copyOf(window))
            }
        }
    )
}

这个实现有两个值得留意的点:

  • 如果 stepSize > windowSize,中间那段不会进入任何窗口的元素,可以直接跳过,没必要先存起来再丢掉。
  • 之所以需要完成器,是因为启用 keepRemaining 时,流结束后还可能有尚未输出的尾部窗口。

使用方式如下:

assert (1..5).stream().gather(windowSlidingByStep(3, 1)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2, false)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 4, false)).toList() ==
    [[1, 2, 3], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

在结束这一节之前,再看两个有意思的 Groovy GQL 例子。它们可以视为 takedrop 和滑动窗口的另一种表达方式:

assert GQL {
    from n in 1..8
    limit 3
    select n
} == [1, 2, 3]

assert GQL {
    from n in 1..8
    limit 2, 3
    select n
} == [3, 4, 5]

assert GQL {
    from ns in (
        from n in 1..8
        select n, (lead(n) over(orderby n)), (lead(n, 2) over(orderby n))
    )
    limit 3
    select ns
}*.toList() == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

按指定大小分块

Groovy 里还有一个和 collate 很像的扩展方法 chop。区别在于,collate 为所有数据块使用同一个固定大小,而 chop 允许你为每一块指定不同的大小。参数中的每个数字都只消费一次,特殊值 -1 表示把剩余所有元素都放到最后一块里。

assert (1..8).chop(3) == [[1, 2, 3]]
assert (1..8).chop(3, 2, 1) == [[1, 2, 3], [4, 5], [6]]
assert (1..8).chop(3, -1) == [[1, 2, 3], [4, 5, 6, 7, 8]]

JDK 目前没有现成的流操作或内置 Gatherer 来直接实现这个能力,不过我们可以自己写一个:

<T> Gatherer<T, ?, List<T>> windowMultiple(int... steps) {
    var remaining = steps.toList()

    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.of { window, element, downstream ->
            if (!remaining) {
                return false  // 分块计划用尽后,后续元素不再处理
            }

            window << element
            if (remaining[0] != -1) remaining[0]--
            if (remaining[0]) return true

            remaining.removeFirst()
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        },
        (window, downstream) -> {
            if (window) {
                downstream.push(List.copyOf(window))
            }
        }
    )
}

这段实现和前面的 collate 思路类似,但窗口大小不再固定,而是随着 steps 列表逐步变化。因为当最后一个计划块消费完之后,我们不希望继续处理后续元素,所以这里没有用贪婪集成器。它的使用方式如下:

assert (1..8).stream().gather(windowMultiple(3)).toList() ==
    [[1, 2, 3]]
assert (1..8).stream().gather(windowMultiple(3, 2, 1)).toList() ==
    [[1, 2, 3], [4, 5], [6]]
assert (1..8).stream().gather(windowMultiple(3, -1)).toList() ==
    [[1, 2, 3], [4, 5, 6, 7, 8]]

injectfoldscan

Groovy 的 inject 和流 API 里的 reduce 有相似之处,但两者并不完全等价。reduce 的签名对输入输出类型限制更强,而 inject 可以更灵活地累积不同类型的结果。比如下面这个例子,把数字序列拼成字符串:

assert (1..5).inject('') { string, number -> string + number } == '12345'

内置的 fold 可以写出对应的流处理方式:

assert (1..5).stream()
             .gather(fold(() -> '', (string, number) -> string + number))
             .findFirst()
             .get() == '12345'

再看一个更有代表性的例子:累积和。对序列 [1, 2, 3, 4] 来说,累积和结果是 [1, 3, 6, 10]。Groovy 用 inject 写起来很自然:

assert (1..5).inject([]) { acc, next ->
    acc + [acc ? acc.last() + next : next]  // 取上一次结果继续累加
} == [1, 3, 6, 10, 15]

Groovy 还有别的实现方式,例如:

assert (1..5).inits().grep().reverse()*.sum() == [1, 3, 6, 10, 15]

在 Java 侧,这类操作足够常见,以至于数组 API 里直接提供了 parallelPrefix

Integer[] nums = 1..5
Arrays.parallelPrefix(nums, Integer::sum)
assert nums == [1, 3, 6, 10, 15]

如果想在流管道里做这件事,scan 就很合适:

assert (1..5).stream()
             .gather(scan(() -> 0, Integer::sum))
             .toList() == [1, 3, 6, 10, 15]

initstails 判断子序列

最后看一个更偏函数式的小例子:判断一个列表是否是另一个列表的子序列。先准备原始单词列表和待查找片段:

var words = 'the quick brown fox jumped over the lazy dog'.split().toList()
var search = 'brown fox'.split().toList()

其实 JDK 集合工具已经提供了现成答案:

assert Collections.indexOfSubList(words, search) != -1

不过,如果你接触过 Haskell,可能对 initstails 这两个函数并不陌生。Groovy 里同样有它们:

assert (1..6).inits() == [[1, 2, 3, 4, 5, 6],
                          [1, 2, 3, 4, 5],
                          [1, 2, 3, 4],
                          [1, 2, 3],
                          [1, 2],
                          [1],
                          []]

assert (1..6).tails() == [[1, 2, 3, 4, 5, 6],
                          [2, 3, 4, 5, 6],
                          [3, 4, 5, 6],
                          [4, 5, 6],
                          [5, 6],
                          [6],
                          []]

于是,Groovy 里可以写出一个很对称的解法:

var found = words.tails().any { subseq -> subseq.inits().contains(search) }
assert found

这未必是效率最高的方式,但它很好地展示了集合 API 的表达力。如果我们想从 tails 开始做一个 Gatherer,可以这样写:

<T> Gatherer<T, ?, List<T>> tails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails().each(downstream::push)  // 统一在收尾阶段输出所有后缀
        }
    )
}

这个实现可以工作,但严格来说并没有真正发挥流式处理的优势,因为几乎所有逻辑都堆在完成器里了。验证方式如下:

assert search.stream().gather(tails()).toList() ==
    [['brown', 'fox'], ['fox'], []]

照这个思路,我们还可以继续写一个 initsOfTails

<T> Gatherer<T, ?, List<T>> initsOfTails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails()*.inits().sum().each(downstream::push)
        }
    )
}

它依旧能工作:

assert words.stream().gather(initsOfTails()).anyMatch { it == search }

但这种实现仍然把大量工作推迟到了最后。真要追求效率,很多时候还不如先把流收成列表,再直接调用 Groovy 已有的 initstails。当然,也不是完全没有更流式一点的写法。如果我们接受 inits 的输出顺序和传统定义相反,就可以在处理中逐步向下游推送结果:

<T> Gatherer<T, ?, List<T>> inits() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            downstream.push(List.copyOf(state))  // 先输出当前前缀
            state << element
            return true
        },
        (state, downstream) -> {
            downstream.push(state)
        }
    )
}

它的效果如下:

assert search.stream().gather(inits()).toList() ==
    [[], ['brown'], ['brown', 'fox']]

这个例子很说明问题:并不是所有算法都天然适合流式处理,但有了 Gatherer 之后,我们至少可以更从容地权衡表达力、性能和实现复杂度。

什么时候该用 Gatherer

Groovy 的集合 API 一直很强,这一点不需要 Gatherer 来证明。但 Gatherer 的出现,让 Java 流在中间操作层面的可扩展性补上了一块关键拼图。两者放在一起看,你会更清楚地理解一件事:不是所有问题都该用流来解,但当流确实适合时,Gatherer 能把原本难以表达的处理过程写得更自然、更贴近问题本身。


FunTester 原创精华
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暫無回覆。
需要 登录 後方可回應,如果你還沒有帳號按這裡 注册