在高并发系统的场景下,经常会听到几个概念:限流, 限速, 限并发。该如何理解它们呢?本文是以小白视角去理解学习其中的思想。
笔者认为服务限制可分为资源限制和频率限制两种。资源同时提供 n 个,用完后再申请就只能等释放/归还,如数据库连接池等。并发就是指这种限制,即某个 uid 只能同时有 n 个下载/上传的连接,或者 serverA 对 serviceB 只能同时有 n 个请求的连接;资源在周期内只能被访问 n 次,没有归还的概念,例如 qps, tps。
限速和限流指就是指这种限制, 一般是指某个 uid 对某个 api 只能在每秒访问 n 次以内,某个 uid 下载只能在每秒 n 字节以内。
实际生活中,也有很多场景,如双 11,618,各种秒杀活动等。通常限流思路应该是这样
与用户打交道的服务经常面临的场景
那么除了限流,还有哪些方案呢?系统保护的三大利器:限流、降级、熔断。服务降级是在服务器压力陡增的情况下,利用现有的资源,关闭一些服务接口或者页面,以此来释放服务器资源保证核心任务的正常运行。是降低了部分服务的处理能力,增加另一部分服务处理能力,访问量不变。
而限流器有四种实现思路:计数器、滑动窗口、漏桶、令牌桶。后两者在 nginx 等开源项目中很常用。
思路
缺点:可能会存在临界问题,双倍突发请求避免不了,流量能突破限制
关键代码
type limit struct {
beginTime time.Time
counter int
mu sync.Mutex
}
func (limit *limit) apiLimit() bool {
limit.mu.Lock()
defer limit.mu.Unlock()
nowTime := time.Now()
if nowTime.Sub(limit.beginTime) >= WINDOWTIME {
limit.beginTime = nowTime
limit.counter = 0
}
if limit.counter > MAXREQUEST {
return false
}
limit.counter++
fmt.Println("counter: ", limit.counter)
return true
}
顾名思义,滑动窗口就是时间窗口在随着时间推移不停地移动。
思路
滑动窗口是固定窗口的优化,将统计周期进一步细分为 N 个小周期,限流统计时将所有小周期内统计时加和比较;滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
缺点:
关键代码
type SlidingWindow struct {
smallWindowTime int64 // 小窗口时间大小
smallWindowNum int64 // 小窗口总数
smallWindowCap int // 小窗口请求容量
counters map[int64]int // 小窗口计数器
mu sync.Mutex // 锁
}
func NewSlidingWindow(smallWindowTime time.Duration) (*SlidingWindow, error) {
num := int64(WINDOWTIME / smallWindowTime) // 小窗口总数
return &SlidingWindow{
smallWindowTime: int64(smallWindowTime),
smallWindowNum: num,
smallWindowCap: MAXREQUEST / int(num),
counters: make(map[int64]int),
}, nil
}
func (sw *SlidingWindow) ReqLimit() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
// 获取当前小格子窗口所在的时间值
curSmallWindowTime := time.Now().Unix()
// 计算当前小格子窗口起始时间
beginTime := curSmallWindowTime - sw.smallWindowTime*(sw.smallWindowNum-1)
// 计算当前小格子窗口请求总数
var count int
for sWindowTime, counter := range sw.counters { // 遍历计数器
if sWindowTime < beginTime { // 判断不是当前小格子
delete(sw.counters, sWindowTime)
} else {
count += counter // 当前小格子窗口计数器累加
}
}
// 当前小格子请求到达请求限制,请求失败,返回 false
if count >= sw.smallWindowCap {
return false
}
// 没有到达请求上限,当前小格子窗口计数器+1,请求成功
sw.counters[curSmallWindowTime]++
return true
}
思路
很明显,漏桶算法在实现的数据结构会选择有容量限制的队列,请求执行者定期定点从队列取出请求来执行,新来的请求会被暂存在队列中或者被丢弃。漏桶算法的缺点是,不论当前系统的负载压力如何,所有请求都得进行排队,即便此时服务器负载处于低位。
关键代码
type LeakyBucket struct {
capacity int // 桶的容量 - 最高水位
currentReqNum int // 桶中当前请求数量 - 当前水位
lastTime time.Time // 桶中上次请求时间 - 上次放水时间
rate int // 桶中流出请求的速率,每秒流出多少请求,水流速度/秒
mu sync.Mutex // 锁
}
func NewLeakyBucket(rate int) *LeakyBucket {
return &LeakyBucket{
capacity: MAXREQUEST, //容量
lastTime: time.Now(),
rate: rate,
}
}
func (lb *LeakyBucket) ReqLimit() bool {
lb.mu.Lock()
defer lb.mu.Unlock()
now := time.Now()
// 计算距离上次放水时间间隔
gap := now.Sub(lb.lastTime)
fmt.Println("gap:", gap)
fmt.Println("lb.currentReqNum is:",lb.currentReqNum)
if gap >= time.Second {
// gap 这段时间流出的请求数=gap时间 * 每秒流出速率
out := int(gap/time.Second) * lb.rate
// 计算当前桶中请求数
lb.currentReqNum = maxInt(0, lb.currentReqNum-out)
lb.lastTime = now
}
// 桶中的当前请求数大于桶容量,请求失败
if lb.currentReqNum >= lb.capacity {
return false
}
// 若没超过桶容量,桶中请求量+1,返回true
lb.currentReqNum++
fmt.Println("curReqNum:", lb.currentReqNum)
return true
}
思路
和漏桶算法相比,令牌桶算法最大的优势在于应对突增的流量。漏桶算法只能匀速地消费,突增的流量只能抛弃,而令牌桶只要桶中的令牌足够即可,且令牌生产的速度本身可控可调。
关键代码
func NewTokenBucket(rate int) *TokenBucket {
return &TokenBucket{
capacity: MAXREQUEST,
lastTime: time.Now(),
rate: rate,
}
}
func (tb *TokenBucket) ReqLimit() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
// 距离上次发放令牌间隔时间
gap := now.Sub(tb.lastTime)
fmt.Println("gap: ", gap)
if gap >= time.Second {
// 这段时间发放的token数 = gap时间(秒为单位) * 每秒发放token速率
in := int(gap/time.Second) * tb.rate
// 这段时间桶中的token总量
// tb.currentTokenNum+in : 当前已有的token总量+发放的token树
// 当前桶中的token数量当然不能超过桶的最大容量
tb.currentTokenNum = minInt(tb.capacity, tb.currentTokenNum+in)
tb.lastTime = now
}
// 没有token,请求失败
if tb.currentTokenNum <= 0 {
return false
}
// 如果有token,当前 token 数量-1,也就是发放token,请求成功
tb.currentTokenNum--
fmt.Println("curTokenNum:", tb.currentTokenNum)
return true
}
其中,Golang 自带限流器(golang.org/x/time/rate)就是基于令牌桶算法来实现的。
主要包含以下几种方法:
func main() {
go server()
time.Sleep(time.Second)
// 请求持续5秒
e := time.Now().UnixNano() + 5*time.Second.Nanoseconds()
for {
if time.Now().UnixNano() > e {
break
}
do()
}
}
func server() {
// 每1毫秒投放一次令牌,桶容量大小为 10
r := rate.Every(1 * time.Millisecond)
limit := rate.NewLimiter(r, 10)
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
if limit.Allow() {
fmt.Printf("请求成功,当前时间:%s\n", time.Now().Format("2006-01-02 15:04:05"))
} else {
fmt.Printf("被限流了。。。\n")
}
})
err := http.ListenAndServe(":38888", nil)
if err != nil {
panic(err)
}
}
func do() {
api := "http://localhost:38888/"
res, err := http.Get(api)
if err != nil {
panic(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusOK {
fmt.Printf("200 ")
}
}
综上讲解,希望对大家理解限流有些许帮助。当然,限流需要与多种技术结合使用,才能更好提供用户体验。
https://blog.csdn.net/weixin_41846320/article/details/95941361
https://blog.csdn.net/weixin_41846320/article/details/95941361