通用技术 业界谈到的限流算法 你弄懂了吗?

七牛测试开发 · 2022年09月03日 · 4485 次阅读

在高并发系统的场景下,经常会听到几个概念:限流, 限速, 限并发。该如何理解它们呢?本文是以小白视角去理解学习其中的思想。

笔者认为服务限制可分为资源限制和频率限制两种。资源同时提供 n 个,用完后再申请就只能等释放/归还,如数据库连接池等。并发就是指这种限制,即某个 uid 只能同时有 n 个下载/上传的连接,或者 serverA 对 serviceB 只能同时有 n 个请求的连接;资源在周期内只能被访问 n 次,没有归还的概念,例如 qps, tps。

限速和限流指就是指这种限制, 一般是指某个 uid 对某个 api 只能在每秒访问 n 次以内,某个 uid 下载只能在每秒 n 字节以内。


实际生活中,也有很多场景,如双 11,618,各种秒杀活动等。通常限流思路应该是这样

与用户打交道的服务经常面临的场景


那么除了限流,还有哪些方案呢?系统保护的三大利器:限流、降级、熔断。服务降级是在服务器压力陡增的情况下,利用现有的资源,关闭一些服务接口或者页面,以此来释放服务器资源保证核心任务的正常运行。是降低了部分服务的处理能力,增加另一部分服务处理能力,访问量不变。

而限流器有四种实现思路:计数器、滑动窗口、漏桶、令牌桶。后两者在 nginx 等开源项目中很常用。

1. 固定窗口

思路

  1. 将时间划分为固定的窗口大小,例如 10s
  2. 在窗口时间段内,每来一个请求,对计数器加 1。
  3. 当计数器达到设定限制后,触发限流策略该窗口时间内的之后的请求都被丢弃处理。
  4. 该窗口时间结束后,计数器清零,从新开始计数。

缺点:可能会存在临界问题,双倍突发请求避免不了,流量能突破限制


关键代码

type limit struct {
    beginTime time.Time
    counter   int
    mu        sync.Mutex
}

func (limit *limit) apiLimit() bool {
    limit.mu.Lock()
    defer limit.mu.Unlock()

    nowTime := time.Now()

    if nowTime.Sub(limit.beginTime) >= WINDOWTIME {
        limit.beginTime = nowTime
        limit.counter = 0
    }

    if limit.counter > MAXREQUEST {
        return false
    }

    limit.counter++
    fmt.Println("counter: ", limit.counter)
    return true
}

2. 滑动窗口

顾名思义,滑动窗口就是时间窗口在随着时间推移不停地移动。

思路

  1. 时间划分为细粒度的区间,每个区间维持一个计数器,每进入一个请求则将计数器加一。
  2. 个区间组成一个时间窗口,每流逝一个区间时间后,则抛弃最老的一个区间,纳入新区间。如图中示例的窗口 T1 变为 窗口 T2
  3. 当前窗口的区间计数器总和超过设定的限制数量,则本窗口内的后续请求都被丢弃。

滑动窗口是固定窗口的优化,将统计周期进一步细分为 N 个小周期,限流统计时将所有小周期内统计时加和比较;滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
缺点:

  1. 要更多的存储空间,且小窗口越多,耗内存越多
  2. 丢请求

关键代码

type SlidingWindow struct {
   smallWindowTime int64         // 小窗口时间大小
   smallWindowNum  int64         // 小窗口总数
   smallWindowCap  int           // 小窗口请求容量
   counters        map[int64]int // 小窗口计数器
   mu              sync.Mutex    // 锁
}

func NewSlidingWindow(smallWindowTime time.Duration) (*SlidingWindow, error) {
   num := int64(WINDOWTIME / smallWindowTime) // 小窗口总数
   return &SlidingWindow{
      smallWindowTime: int64(smallWindowTime),
      smallWindowNum:  num,
      smallWindowCap:  MAXREQUEST / int(num),
      counters:        make(map[int64]int),
   }, nil
}

func (sw *SlidingWindow) ReqLimit() bool {
   sw.mu.Lock()
   defer sw.mu.Unlock()

   // 获取当前小格子窗口所在的时间值
   curSmallWindowTime := time.Now().Unix()
   // 计算当前小格子窗口起始时间
   beginTime := curSmallWindowTime - sw.smallWindowTime*(sw.smallWindowNum-1)

   // 计算当前小格子窗口请求总数
   var count int
   for sWindowTime, counter := range sw.counters { // 遍历计数器
      if sWindowTime < beginTime { // 判断不是当前小格子
         delete(sw.counters, sWindowTime)
      } else {
         count += counter // 当前小格子窗口计数器累加
      }
   }

   // 当前小格子请求到达请求限制,请求失败,返回 false
   if count >= sw.smallWindowCap {
      return false
   }

   // 没有到达请求上限,当前小格子窗口计数器+1,请求成功
   sw.counters[curSmallWindowTime]++
   return true
}


3. 漏桶

思路

  1. 每个请求看作 “水滴” 一样,放入漏桶中暂存。
  2. 漏桶以固定的速率,漏出请求来执行;如果漏桶空了,则停止漏水。
  3. 如果漏桶满了,则新来的请求会被丢弃。

很明显,漏桶算法在实现的数据结构会选择有容量限制的队列,请求执行者定期定点从队列取出请求来执行,新来的请求会被暂存在队列中或者被丢弃。漏桶算法的缺点是,不论当前系统的负载压力如何,所有请求都得进行排队,即便此时服务器负载处于低位。


关键代码

  type LeakyBucket struct {
   capacity      int        // 桶的容量 - 最高水位
   currentReqNum int        // 桶中当前请求数量 - 当前水位
   lastTime      time.Time  // 桶中上次请求时间 - 上次放水时间
   rate          int        // 桶中流出请求的速率,每秒流出多少请求,水流速度/秒
   mu            sync.Mutex // 锁
}

func NewLeakyBucket(rate int) *LeakyBucket {
   return &LeakyBucket{
      capacity: MAXREQUEST, //容量
      lastTime: time.Now(),
      rate:     rate,
   }
}

func (lb *LeakyBucket) ReqLimit() bool {
   lb.mu.Lock()
   defer lb.mu.Unlock()

   now := time.Now()
   // 计算距离上次放水时间间隔
   gap := now.Sub(lb.lastTime)
   fmt.Println("gap:", gap)
   fmt.Println("lb.currentReqNum  is:",lb.currentReqNum)
   if gap >= time.Second {
      // gap 这段时间流出的请求数=gap时间 * 每秒流出速率
      out := int(gap/time.Second) * lb.rate

      // 计算当前桶中请求数

      lb.currentReqNum = maxInt(0, lb.currentReqNum-out)
      lb.lastTime = now
   }

   // 桶中的当前请求数大于桶容量,请求失败
   if lb.currentReqNum >= lb.capacity {
      return false
   }

   // 若没超过桶容量,桶中请求量+1,返回true
   lb.currentReqNum++
   fmt.Println("curReqNum:", lb.currentReqNum)
   return true
}

4. 令牌桶

思路

  1. 令牌按固定速率发放,生成的令牌放入令牌桶中
  2. 令牌桶有容量限制,当桶满时,新生成的令牌会被丢弃。
  3. 请求到来时,先从令牌桶中获取令牌,如果取得,则执行请求;如果令牌桶为空,则丢弃该请求。

和漏桶算法相比,令牌桶算法最大的优势在于应对突增的流量。漏桶算法只能匀速地消费,突增的流量只能抛弃,而令牌桶只要桶中的令牌足够即可,且令牌生产的速度本身可控可调。


关键代码

func NewTokenBucket(rate int) *TokenBucket {
   return &TokenBucket{
      capacity: MAXREQUEST,
      lastTime: time.Now(),
      rate:     rate,
   }
}

func (tb *TokenBucket) ReqLimit() bool {
   tb.mu.Lock()
   defer tb.mu.Unlock()

   now := time.Now()
   // 距离上次发放令牌间隔时间
   gap := now.Sub(tb.lastTime)
   fmt.Println("gap: ", gap)
   if gap >= time.Second {
      // 这段时间发放的token数 = gap时间(秒为单位) * 每秒发放token速率
      in := int(gap/time.Second) * tb.rate
      // 这段时间桶中的token总量
      // tb.currentTokenNum+in : 当前已有的token总量+发放的token树
      // 当前桶中的token数量当然不能超过桶的最大容量
      tb.currentTokenNum = minInt(tb.capacity, tb.currentTokenNum+in)
      tb.lastTime = now
   }

   // 没有token,请求失败
   if tb.currentTokenNum <= 0 {
      return false
   }

   // 如果有token,当前 token 数量-1,也就是发放token,请求成功
   tb.currentTokenNum--
   fmt.Println("curTokenNum:", tb.currentTokenNum)
   return true
}

其中,Golang 自带限流器(golang.org/x/time/rate)就是基于令牌桶算法来实现的。
主要包含以下几种方法:

  1. 限流器构造 func NewLimiter(r Limit, b int) *Limiter
  2. Wait/WaitN func WaitN(c context.Context, n int) (err error)
  3. Allow/AllowN func AllowN(now time.Time, n int) bool
  4. Reserve/ReserveN func ReserveN(n time.Time, n int) *Reservation

实验: 起服务,模拟 5s 内不断请求 API,超限则拒绝!
func main() {
   go server()
   time.Sleep(time.Second)

   // 请求持续5秒
   e := time.Now().UnixNano() + 5*time.Second.Nanoseconds()
   for {
      if time.Now().UnixNano() > e {
         break
      }
      do()
   }
}

func server() {
   // 每1毫秒投放一次令牌,桶容量大小为 10
   r := rate.Every(1 * time.Millisecond)
   limit := rate.NewLimiter(r, 10)
   http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
      if limit.Allow() {
         fmt.Printf("请求成功,当前时间:%s\n", time.Now().Format("2006-01-02 15:04:05"))
      } else {
         fmt.Printf("被限流了。。。\n")
      }
   })

   err := http.ListenAndServe(":38888", nil)
   if err != nil {
      panic(err)
   }
}

func do() {
   api := "http://localhost:38888/"
   res, err := http.Get(api)
   if err != nil {
      panic(err)
   }
   defer res.Body.Close()

   if res.StatusCode == http.StatusOK {
      fmt.Printf("200 ")
   }
}

综上讲解,希望对大家理解限流有些许帮助。当然,限流需要与多种技术结合使用,才能更好提供用户体验。

引用

https://blog.csdn.net/weixin_41846320/article/details/95941361

https://blog.csdn.net/weixin_41846320/article/details/95941361

https://zhuanlan.zhihu.com/p/90206074

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册