说起 限速 ,想必各位不会陌生。通常在一个服务程序当中,限速指的是对同一类请求进行速率的限制,用来防止服务端某些资源被过度消耗,从而保障服务的稳定性。

限速的好处有以下几点:

  1. 保护系统稳定性: 限速可以避免系统因过多请求而过载,导致性能下降甚至崩溃。通过限制请求速率,可以平滑地处理请求,保持系统的稳定运行状态。
  2. 防止滥用和恶意攻击: 限速可以有效地防止恶意用户或自动化工具对系统进行滥用、DoS(拒绝服务)攻击或暴力破解等行为。通过限制请求速率,可以降低系统遭受攻击的风险。
  3. 控制资源消耗: 一些服务或资源可能具有有限的容量或成本,限速可以帮助控制资源的消耗,确保资源被合理分配和利用。例如,限速可以避免数据库或存储系统被过度查询,保护数据库服务器的稳定性和性能。
  4. 提高服务质量: 通过限速可以减少请求的排队和等待时间,提高系统对正常用户的响应速度和服务质量。合理的限速策略可以平衡不同用户和请求之间的竞争,使系统能够更公平地分配资源。

通常在业务服务研发当中,我们会借助成熟的框架来实现限流功能,例如下面所列举的:

  1. Guava RateLimiter: Guava 是 Google 开发的 Java 核心库,其中包含了一个名为 RateLimiter 的限流工具类。它基于令牌桶算法实现了简单的限流功能,可以轻松地控制代码的执行速率。
  2. Resilience4j: Resilience4j 是一个用于构建弹性和容错性应用的 Java 库,其中包含了限流器(Rate Limiter)功能。它提供了多种限流算法和配置选项,可以灵活地应用于各种场景。
  3. Sentinel: Sentinel 是阿里巴巴开源的流量控制框架,提供了流量控制、熔断降级、系统负载保护等功能。它支持基于 QPS、线程数、并发数等多种限流策略,并提供了实时监控和动态配置功能。
  4. Hystrix: Hystrix 是 Netflix 开源的容错框架,提供了限流、熔断、降级等功能。虽然 Hystrix 已经进入维护模式,但仍然被许多项目广泛使用。
  5. Bucket4j: Bucket4j 是一个基于令牌桶算法的 Java 限流库,具有简单易用和高性能的特点。它支持在内存、Redis、Hazelcast 等存储后端进行限流。

虽然这些框架的功能都非常强大,但是在简单场景当中,我们并不需要非常复杂的功能,只是对接口进行简单限流,不涉及负载问题、也不存在分布式需求。所以我打算继续发挥能亲自动手的就先试试的精神,自己实现一个限速的功能。

思路

配置管理:使用了一个Map<String, LimitConfig>来存储每个限流 key 对应的限流配置。这些配置包括了最大次数和限流时间窗口持续时间。提供了添加限流配置的相关方法,可以为每个限流 key 设置不同的最大次数和时间窗口。在是否被限流判断方法中,会检查配置中是否包含指定的限流 key,如果不包含则添加默认配置,以确保每个限流 key 都有相应的配置。

限流状态管理:使用了三个Map<String, Integer>来分别记录每个限流 key 的最后一次请求时间、请求次数以及用于同步的锁对象。判断是否限流的方法里面,会根据当前时间与最后一次请求时间的间隔以及请求次数来判断是否需要限流。如果超过了限流时间窗口,则重新计数请求次数;如果请求次数超过了最大次数,则需要限流。

线程安全性:使用了ReentrantLock来保证对限流配置和状态的线程安全访问。例如,在添加配置项方法中使用了一个全局的写锁,以确保添加限流配置时的线程安全性。在判断是否限流的方法中,对于每一个配置项也增加一个 ReentrantLock 保障修改操作的线程安全。同时在统计单个接口请求次数的类也用上了 java.util.concurrent.atomic.AtomicInteger

代码

根据粉丝建议,我加了一些注释,方便理解和使用。


import com.funtester.frame.SourceCode  

import java.util.concurrent.TimeUnit  
import java.util.concurrent.atomic.AtomicInteger  
import java.util.concurrent.locks.ReentrantLock  

/**  
 * 限流工具,支持N/M限流  
 */  
class RateLimit {  

    /**  
     * 总限流配置  
     */  
    Map<String, LimitConfig> config = [:]  

    /**  
     * 最后一次请求时间  
     */  
    Map<String, Integer> lastTime = [:]  

    /**  
     * 请求次数  
     */  
    Map<String, AtomicInteger> requestTimes = [:]  

    /**  
     * 所有key的锁  
     */  
    Map<String, ReentrantLock> allLock = [:]  

    /**  
     * 写锁  
     */  
    ReentrantLock writeLock = new ReentrantLock()  

    /**  
     * 是否限流  
     * @param key 限流key  
     * @return  
     */  
    boolean isLimit(String key) {  
        if (!config.containsKey(key)) {  
            addConfig(key, 2, 2) //默认配置  
            return isLimit(key) //递归,初始化配置  
        }  
        def mark = SourceCode.getMark()  
        if (mark - lastTime[key] >= config[key].duration) {//进入下一个限流周期  
            if (allLock[key].tryLock(1, TimeUnit.SECONDS)) {  
                if (mark - lastTime[key] >= config[key].duration) {  
                    lastTime[key] = mark  
                    requestTimes[key] = new AtomicInteger(1)  
                    allLock[key].unlock()  
                    return false  
                } else {  
                    return true  
                }  
            }  
        }  
        if (requestTimes[key].get() >= config[key].maxTimes) //超过最大次数  
            return true  
        requestTimes[key].getAndIncrement() //增加次数  
        return false  
    }  

    /**  
     * 添加限流配置  
     * @param key 限流key  
     * @param maxTimes 最大次数  
     * @param duration 限流时间,单位秒  
     * @return  
     */  
    def addConfig(String key, int maxTimes, int duration) {  
        if (writeLock.tryLock(1, TimeUnit.SECONDS)) {  
            try {  
                if (!config.containsKey(key)) {  
                    config[key] = new LimitConfig(maxTimes: maxTimes, duration: duration)  
                    allLock[key] = new ReentrantLock()  
                    lastTime[key] = SourceCode.getMark()  
                    requestTimes[key] = new AtomicInteger(0)  
                }  
            } catch (e) {  

            } finally {  
                writeLock.unlock()  
            }  
        }  
    }  

    /**  
     * 限流配置  
     */  
    static class LimitConfig {  

        /**  
         * 最大次数  
         */  
        int maxTimes  

        /**  
         * 限流时间计算持续时间,单位秒  
         */  
        int duration  

    }  
}

测试

测试的脚本如下:

import com.funtester.httpclient.FunHttp  
import com.funtester.utils.RateLimit  

class Routine extends FunHttp {  

    static void main(String[] args) {  
        def limit = new RateLimit()  
        limit.addConfig("test", 1, 1)  
        1000.times {  
            sleep(0.1)  
            fun {  
                def limit1 = limit.isLimit("t4est")  
                if (!limit1) {  
                    output("未限流")  
                }  
            }  
        }    }  
}

控制台打印:

22:19:20:545 F-1  未限流
22:19:20:644 F-2  未限流
22:19:22:094 F-8  未限流
22:19:22:195 F-1  未限流
22:19:24:048 F-3  未限流
22:19:24:150 F-4  未限流
22:19:25 uptime:6 s
22:19:25 finished: 49 task

可以看到按照 2/2s 的默认配置生效了。


↙↙↙阅读原文可查看相关链接,并与作者交流