FunTester 自定义限速功能实践——Caffeine

FunTester · 2024年03月25日 · 2068 次阅读

之前使用了 JDK 自带的 Map 实现了自定义限速的简单需求。在当时的实现当中,有一个被隐藏的小设计,就是如果是用使用异步线程,用来根据配置给请求次数数据重置。如此这样,校验方法会非常简单方便。

对于普通 Java 项目来说,如果使用异步线程处理,除了 deamon 进程以外,其他实现的确有点麻烦。即使 deamon 线程也很难做到完全的实用性,所以才使用了上篇文章的实现方案。

之前提到过一个非常有趣的高性能本地缓存 Caffeine 刚好能解决这个问题,可以通过缓存过期或者定时刷新功能来实现定时刷新的需求。这里我选择了定时刷新功能,这种选择会限制限流配置的种类,无法进行 2/3s , 10/2s 配置,我最终选择 TPS 进行配置,全部使用 1s 为限制周期。

代码

主要思路如下:

  1. 数据结构选择:使用了两种数据结构来实现限流功能:使用了一个 Map 来存储每个请求的限流配置,以请求的标识符作为键,以该请求的每秒事务数(TPS)作为值;使用了 Caffeine 缓存来存储每个请求的计数器,其中键为请求的标识符,值为一个原子整数,用于记录请求的处理数量。
  2. 限流判断逻辑:具体逻辑如下:首先从缓存中获取对应请求的计数器;判断当前计数器的值是否大于等于该请求的配置的每秒事务数(TPS);如果超过了配置的值,则表示需要限流,返回 true。否则,递增计数器并返回 false,表示不需要限流。
  3. 动态配置:动态添加请求的限流配置,将请求的标识符和对应的每秒事务数(TPS)添加到配置中,实现了动态配置的功能。
  4. 使用 Caffeine 缓存:使用了 Caffeine 缓存来存储请求的计数器,可以配置缓存的过期时间(1 秒),当缓存过期时会自动刷新。这样可以确保计数器在一定时间内有效,避免长时间未使用的请求占用内存。

代码如下:


import com.github.benmanes.caffeine.cache.Caffeine  
import com.github.benmanes.caffeine.cache.LoadingCache  

import java.util.concurrent.TimeUnit  
import java.util.concurrent.atomic.AtomicInteger  
/**  
 * 限流工具,基于Caffeine实现,支持动态配置,根据TPS限流  
 */  
class TpsLimit {  

    Map<String, Integer> qpsConfig = [:]  

    LoadingCache<Object, AtomicInteger> build = Caffeine.newBuilder().refreshAfterWrite(1, TimeUnit.SECONDS).build((key) -> {  
        return new AtomicInteger()  
    })  

    /**  
     * 是否限流  
     * @param key  
     * @return  
     */  
    boolean isLimit(String key) {  
        AtomicInteger atomicInteger = build.get(key)  
        if (atomicInteger.get() >= qpsConfig.get(key, 1)) {  
            return true  
        }  
        atomicInteger.incrementAndGet()  
        return false  
    }  

    /**  
     * 添加限流配置  
     * @param key  
     * @param qps  
     * @return  
     */  
    def addConfig(String key, Integer qps) {  
        qpsConfig.put(key, qps)  
    }  

}

测试

测试脚本如下,与前一篇文章大同小异:

import com.funtester.httpclient.FunHttp  
import com.funtester.utils.TpsLimit  

class Routine extends FunHttp {  

    static void main(String[] args) {  
        def limit = new TpsLimit()  
        limit.addConfig("test", 1)  
        1000.times {  
            sleep(0.1)  
            fun {  
                def limit1 = limit.isLimit("t4est")  
                if (!limit1) {  
                    output("未限流")  
                }  
            }  
        }    }  
}

控制台输出:

22:19:20:545 F-1  未限流
22:19:20:644 F-2  未限流
22:19:22:094 F-8  未限流
22:19:22:195 F-1  未限流
22:19:24:048 F-3  未限流
22:19:24:150 F-4  未限流
22:19:25 uptime:6 s
22:19:25 finished: 49 task

可以看出,按照默认配置 1 TPS 的配置实现。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册