说起 限速
,想必各位不会陌生。通常在一个服务程序当中,限速指的是对同一类请求进行速率的限制,用来防止服务端某些资源被过度消耗,从而保障服务的稳定性。
限速的好处有以下几点:
通常在业务服务研发当中,我们会借助成熟的框架来实现限流功能,例如下面所列举的:
虽然这些框架的功能都非常强大,但是在简单场景当中,我们并不需要非常复杂的功能,只是对接口进行简单限流,不涉及负载问题、也不存在分布式需求。所以我打算继续发挥能亲自动手的就先试试的精神,自己实现一个限速的功能。
配置管理:使用了一个Map<String, LimitConfig>
来存储每个限流 key 对应的限流配置。这些配置包括了最大次数和限流时间窗口持续时间。提供了添加限流配置的相关方法,可以为每个限流 key 设置不同的最大次数和时间窗口。在是否被限流判断方法中,会检查配置中是否包含指定的限流 key,如果不包含则添加默认配置,以确保每个限流 key 都有相应的配置。
限流状态管理:使用了三个Map<String, Integer>
来分别记录每个限流 key 的最后一次请求时间、请求次数以及用于同步的锁对象。判断是否限流的方法里面,会根据当前时间与最后一次请求时间的间隔以及请求次数来判断是否需要限流。如果超过了限流时间窗口,则重新计数请求次数;如果请求次数超过了最大次数,则需要限流。
线程安全性:使用了ReentrantLock
来保证对限流配置和状态的线程安全访问。例如,在添加配置项方法中使用了一个全局的写锁,以确保添加限流配置时的线程安全性。在判断是否限流的方法中,对于每一个配置项也增加一个 ReentrantLock
保障修改操作的线程安全。同时在统计单个接口请求次数的类也用上了 java.util.concurrent.atomic.AtomicInteger
。
根据粉丝建议,我加了一些注释,方便理解和使用。
import com.funtester.frame.SourceCode
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
/**
* 限流工具,支持N/M限流
*/
class RateLimit {
/**
* 总限流配置
*/
Map<String, LimitConfig> config = [:]
/**
* 最后一次请求时间
*/
Map<String, Integer> lastTime = [:]
/**
* 请求次数
*/
Map<String, AtomicInteger> requestTimes = [:]
/**
* 所有key的锁
*/
Map<String, ReentrantLock> allLock = [:]
/**
* 写锁
*/
ReentrantLock writeLock = new ReentrantLock()
/**
* 是否限流
* @param key 限流key
* @return
*/
boolean isLimit(String key) {
if (!config.containsKey(key)) {
addConfig(key, 2, 2) //默认配置
return isLimit(key) //递归,初始化配置
}
def mark = SourceCode.getMark()
if (mark - lastTime[key] >= config[key].duration) {//进入下一个限流周期
if (allLock[key].tryLock(1, TimeUnit.SECONDS)) {
if (mark - lastTime[key] >= config[key].duration) {
lastTime[key] = mark
requestTimes[key] = new AtomicInteger(1)
allLock[key].unlock()
return false
} else {
return true
}
}
}
if (requestTimes[key].get() >= config[key].maxTimes) //超过最大次数
return true
requestTimes[key].getAndIncrement() //增加次数
return false
}
/**
* 添加限流配置
* @param key 限流key
* @param maxTimes 最大次数
* @param duration 限流时间,单位秒
* @return
*/
def addConfig(String key, int maxTimes, int duration) {
if (writeLock.tryLock(1, TimeUnit.SECONDS)) {
try {
if (!config.containsKey(key)) {
config[key] = new LimitConfig(maxTimes: maxTimes, duration: duration)
allLock[key] = new ReentrantLock()
lastTime[key] = SourceCode.getMark()
requestTimes[key] = new AtomicInteger(0)
}
} catch (e) {
} finally {
writeLock.unlock()
}
}
}
/**
* 限流配置
*/
static class LimitConfig {
/**
* 最大次数
*/
int maxTimes
/**
* 限流时间计算持续时间,单位秒
*/
int duration
}
}
测试的脚本如下:
import com.funtester.httpclient.FunHttp
import com.funtester.utils.RateLimit
class Routine extends FunHttp {
static void main(String[] args) {
def limit = new RateLimit()
limit.addConfig("test", 1, 1)
1000.times {
sleep(0.1)
fun {
def limit1 = limit.isLimit("t4est")
if (!limit1) {
output("未限流")
}
}
} }
}
控制台打印:
22:19:20:545 F-1 未限流
22:19:20:644 F-2 未限流
22:19:22:094 F-8 未限流
22:19:22:195 F-1 未限流
22:19:24:048 F-3 未限流
22:19:24:150 F-4 未限流
22:19:25 uptime:6 s
22:19:25 finished: 49 task
可以看到按照 2/2s 的默认配置生效了。