跳到主要内容
  1. Skills/
  2. 后端编程/

一个简单策略的限速器

··字数 4329·9 分钟
python howto

限速器,限制一定时间周期内的运动速度、频率等。网络应用中,后端服务通常使用限速器来保护服务应用程序的稳定,因控制单个服务使用资源难度大,而通过控制单位时间内可同时接受多个服务请求的最大请求数、连接数等来限制系统的并发度。也称为限流器。

当使用客户端向服务器同时发出过多的请求,导致服务器拒绝响应,需要对客户端的服务请求连接限速,并加上等待重试的特性,以免流程中断。

本文将介绍如何实现一个简单的限速器。

原理 #

如果对服务器的限流、与客户端的限速控制进行抽象,那么只要在循环并发过程中,添加一个检查器,当达到容量限制条件时暂停运行则可。按照这个思路,使用 python 实现一个简单策略的限速器。

为了简化限速器的实现,本文没有讨论速度控制策略的选择。

假设: 固定时间周期内,执行数不超过最大值。当时间周期结束后,进入下一个周期,重计执行数。

过程中的状态与操作 #

状态

  1. 时间周期,不变值。
  2. 最大执行数,不变值。
  3. 当前执行数,可变值。
  4. 最近一个时间周期结束时点,可变值。
  5. 当前时点,可变值。
  6. 其它中间值由上述各变量在运行中计算得到,如 剩余时间 = 当前时点 - 结束时点

操作

  1. 到达条件检查
  2. 累计当前执行数
  3. 重置最近一个周期的结束时点、当前执行数

python 实现 #

类的定义 #

与大多数网络文章或开源库类似,这个类也叫 RateLimiter

# -*- coding: utf-8 -*-
# ratelimiter.py

import time
import threading

class RateLimiter:

    def __init__(self, max_calls: int = 10, period: int = 2):
        """
        参数
        ----------
        max_calls : int, optional
                最多可执行数,大于0的整数. The default is 10.
        period : int, optional
                一个时间控制周期(秒),大于0的整数. The default is 2.

        """
        self.MAX_CALLS = max_calls
        self.PERIOD = period

        # 初始化控制器状态,状态值修改统一交由 self.reset 负责
        self.num_of_calls = 0
        self.last_reset = 0
        self.reset()

        # 为了线程安全创建一个🔒,在限速控制或重置修改内部状态时使用
        self.state_lock = threading.RLock()

    def clock(self) -> float:
        '''
        获取当前时钟
        '''
        pass

    def window_remain(self) -> float:
        '''
        计算当前周期的剩余时间
        '''
    	pass
    
    def reset(self) -> None:
        """
        重置限速器。以进入下一个周期。
        """
        pass

    def limiting(self) -> None:
        '''
        检查是否可以执行,若不符合执行条件,则抛异常。
        '''
        pass

与上一节设计中状态、操作对应如下:

分类 python
状态 时间周期 PERIOD
状态 最大可执行数 MAX_CALLS
状态 当前执行数 num_of_calls
状态 最后结束时点 last_reset
状态 当前时点 由函数 clock 来获得当前时间
状态 剩余时间 由函数 window_remain 计算得到
操作 条件检查 在函数 limiting 中实现
操作 累计当前执行数 在函数 limiting 中实现
操作 重置结束时点、当前执行数归0 由函数 reset 负责

类方法 - clock #

# ratelimiter.py
...

    def clock(self) -> float:
        '''
        获取当前时钟,用于控制时间周期

        ref: https://github.com/tomasbasham/ratelimit
        Use monotonic time if available, otherwise fall back to the system clock.
        '''
        if hasattr(time, 'monotonic'):
            return time.monotonic()
        return time.time()

...        

返回当前时间。并不需要知道是几日几时几分,只需一个唯一表示时点的数字即可,不同时刻它的值不同。两个不同的时点值之差,表示时点之间的秒数。

类方法 - window_remain #

# ratelimiter.py
...
    
    def window_remain(self) -> float:
        '''
        返回当前限速控制窗口的剩余时间.

        '''
        span = self.clock() - self.last_reset
        return self.PERIOD - span

...

当剩余时间 <= 0 时,表示已经超过当前控制窗口,可以进入下一个周期。

类方法 - reset #

# ratelimiter.py
...

    def reset(self) -> None:
        """
        重置限速器。
        """
        self.num_of_calls = 0
        self.last_reset = self.clock()

...

self.num_of_calls 表示当前周期内的执行数,重置后归零。self.last_reset 表示最近一次重置的时点,用于计算控制窗口的剩余时间。它们的值仅在 self.reset() 函数中修改。

类方法 - limiting #

# ratelimiter.py
...

    def limiting(self) -> None:
        '''
        检查是否可以执行,若不符合执行条件,则抛异常。
        '''
        with self.state_lock:
            remaining = self.window_remain()
            # 如果时间窗口已过,则重置内部状态
            if remaining <= 0:
                self.reset()

            # 如果在时间窗口内,则检查调用数
            # 当大于最大值时,抛异常或 sleep
            self.num_of_calls += 1
            if self.num_of_calls > self.MAX_CALLS:
                raise RateLimitException(
                    f"限流,超过最大并发可执行数 {self.MAX_CALLS}," \
                    f"剩余 {remaining} 秒", remaining)

                return

...

注意 self.state_lock 锁的作用,它具线程安全,内部状态的更改,都在 state_lock 的作用范围内。

控制函数中,先判断 剩余时间 是否 小于等于0

  • 若是,表示需要进入下一个周期,重置限制器状态,记录当前时点,当前执行数归零。
  • 若否,则表示还处在当前周期的控制窗口内。执行数累加1,并判断是否超过阀值。若在最大执行数以内,程序正常执行。当超过阀值,则抛出异常。

当然,完全可以将 “raise exception” 改为 time.sleep(remaining),就得到了完整功能的限速器。但有悟认为,遇到异常应该抛出,然后在外层调用处捕捉再根据需要选择应对。这样程序更通用化、更具灵活性。

RateLimitException 类定义如下:

# ratelimiter.py

class RateLimitException(Exception):
    '''
    Rate limit 异常类.

    包含一个 remaining 属性,当异常抛出被捕捉到时,可以做更详尽的提示
    '''
    def __init__(self, message, remaining):
        super(RateLimitException, self).__init__(message)
        self.remaining = remaining

运行测试 #

# ratelimiter.py

if __name__ == "__main__":

    """
    用法1: 普通函数调用,抛异常
    """
    def usage_with_raise():
        print(f"\nuse case 2: 抛异常")

        limiter = RateLimiter(4, 1)

        for i in range(11):
            limiter.limiting()
            print(f"{i}")

    usage_with_raise()

在脚本文件最后添加上面的测试代码。代码中显示了限速器的用法,先创建一个限速器对象,示例中的参数表示每秒最多只能循环4次。将在循环中调用控制函数 limiter.limiting(),当超过限制时,程序会抛出异常并中断。 在命令行运行得到类似于这样的结果:

py/limit/limiter via 🐍 v3.9.15 (env)
❯ python ratelimiter.py

use case 1: 抛异常
0
1
2
3
Traceback (most recent call last):
  File "/Users/macbook/projects/py/limit/limiter/ratelimiter.py", line 100, in <module>
    usage_with_raise()
  File "/Users/macbook/projects/py/limit/limiter/ratelimiter.py", line 97, in usage_with_raise
    limiter.limiting()
  File "/Users/macbook/projects/py/limit/limiter/ratelimiter.py", line 72, in limiting
    raise RateLimitException(
__main__.RateLimitException: 限流,超过最大并发可执行数 4,剩余 0.999968913 秒

改进 #

等待并重试 #

上述实现已经是一个完整的限速器。但在限制条件达到时,只有抛异常。但有时更可能是需要程序等待片断后重试。在上述代码的基础上,为类 RateLimiter 增加一个函数 sleep_and_retry,它实现捕捉异常,并等待剩余时间消逝后重试。

# ratelimiter.py
...

    def sleep_and_retry(self):
        '''
        检查是否可以执行,若不符合执行条件,则 sleep 等待。
        它与 self.limiting 函数属于两种不同的策略。
        '''
        while True:
            try:
                self.limiting()
            except RateLimitException as e:
                print(f"{e}。sleep。")
                time.sleep(e.remaining)
            else:
                break

...

函数中使用到了 RateLimitException 异常中保存的剩余时间 remaining。 修改 if __name__ == "__main__": 下的测试代码为:

# ratelimiter.py
if __name__ == "__main__":
    """
    用法1: 普通函数调用,抛异常
    """
    def usage_with_raise():
        print(f"\nuse case 1: 抛异常")

        limiter = RateLimiter(4, 1)

        for i in range(11):
            limiter.limiting()
            print(f"{i}")

    # 捕捉 limiting() 抛异常
    try:
        usage_with_raise()
    except RateLimitException as e:
        print(f"出现异常: {e}。退出。")


    """
    用法2: 普通函数调用,等待并重试
    """
    def usage_with_wait():
        print(f"\nuse case 2: 等待并重试")

        limiter = RateLimiter(4, 1)
        for i in range(9):
            limiter.sleep_and_retry()
            print(f"{i}")

    usage_with_wait()

在用法上, sleep_and_retrylimiting 相同。它内部也调用 limiting,捕捉到它抛出的异常,然后睡眠剩余时间,以让程序可以继续执行。在运行前,修改脚本 捕捉 usage_with_raise 的异常,不然脚本无法往后运行。运行结果:

py/limit/limiter via 🐍 v3.9.15 (env) took 3s
❯ python simple_limiter.py

use case 1: 抛异常
0
1
2
3
出现异常: 限流,超过最大并发可执行数 4,剩余 0.999962436 秒。退出。

use case 2: 等待并重试
0
1
2
3
限流,超过最大并发可执行数 4,剩余 0.999934489 秒。sleep。
4
5
6
7
限流,超过最大并发可执行数 4,剩余 0.999908767 秒。sleep。
8

像装饰器一样使用(pythonic) #

在 python 世界中, @ 符号到处可见,它是装饰器(也叫包装器,由英文 decorator 翻译而来)的专属符号。我们同样可以像使用装饰器一样的方式来使用 RateLimiter

由于存在 抛异常等待并重试 两种方式,使用装饰器包装函数时,并没办法指定使用方法 limiting() 还是方法 sleep_and_retry()?需要定义一个新的输入参数来接收这个选项。

首先,为 RateLimiter 添加一个属性,初始方法对应增加一个输入参数。

# ratelimiter.py
...

    def __init__(self, max_calls: int = 10, period: int = 2, retry: bool=False):
        """

        Parameters
        ----------
        max_calls : int, optional
                最多可执行数,大于0的整数. The default is 10.
        period : int, optional
                一个时间控制周期(秒),大于0的整数. The default is 2.
        retry : bool, optional
                是否重试. The default is False.

        self.MAX_CALLS = max_calls
        self.PERIOD = period
        self.RETRY = retry # 只有当装饰器 @RateLimiter 时才使用

所增加的输入参数 retry: bool = False,默认为 False,则当遇到超限时,抛异常。增加类属性 self.RETRY,用于保存 retry

第二,定义 RateLimiter__call__ 魔术方法。

# ratelimiter.py
import functools
...

    def __call__(self, func):
        '''
        python magic 方法,定义了类的 __call__ 方法后,
        允许对象当用做函数使用,x(args) => type(x).__call__(args)。

        定义 @RateLimiter(...) 为包装器,在执行之前,先执行 self.sleep_and_retry() 或者 self.limiting() 控制函数。
        '''
        @functools.wraps(func)
        def wrapped(*args, **kwargs):
            if self.RETRY:
                self.sleep_and_retry()
            else:
                self.limiting()
            return func(*args, **kwargs)

        return wrapped

...

记得在脚本的前面 import functools__call__ 函数本身是一个包装器,被它包装的函数在运行前,都会先运行 sleep_and_retry 或者 limiting 函数,这样就达到控制的目的,没有侵入性,人畜无害。

第三,增加测试样例代码,如下:

# ratelimiter.py
...
if __name__ == "__main__":
	...

    """
    用法3: 装饰器用法,重试
    """
    def usage_with_decrator():
        print(f"\nuse case 3: 装饰器, 等待并重试")

        @RateLimiter(max_calls=5, period=2, retry=True)
        def f(i):
            print(f"{i}")

        [f(i) for i in range(12)]

    usage_with_decrator()

样子好帅,显式的 limiter.sleep_and_retry()limiter.limiting() 调用不见了。

第四,命令行运行,结果如下:

...

use case 3: 装饰器, 等待并重试
0
1
2
3
4
限流,超过最大并发可执行数 5,剩余 1.9998068769999997 秒。sleep。
5
6
7
8
9
限流,超过最大并发可执行数 5,剩余 1.9998702489999998 秒。sleep。
10
11

在 with 语句中使用(pythonic) #

与装饰器类似,with 语句也很 python。比如,函数 open 打开文件时,使用 with open(文件名) as f:,它巧妙的将变量 f作用范围包裹起来,仅可以在 with 下的语句块中有效,并且在跳出 with 块后,f 所指的资源也被释放,无须手动关闭。同样,如本文中例中的 state_lock 线程级别锁,在每次检查与修改内部状态时,都在其作用范围内。

python 为它定义了一个名词,With Statement Context Managers

既然是上下文,那么就有这些确定的概念,创建上下文、进入上下文、退出上下文。 又有,

  1. 创建上下文
  2. 在进入一个上下文时的第一个动作,进行一系列初始化;
  3. whatever
  4. 退出上下文最后一个动作(往往是一些资源释放、空间清理相关的动作)

python 通过 with 语法,把上面 4个部分有机关联起来。反过来说,如果我们定义了 进入上下文退出上下文,就可以像 open 一样,with XXXX() as x:

对于本例,限速器来说,可以把一个循环迭代看成是一次进、出上下文的过程。业务函数被包裹在上下文中执行。而限速检查,是进入上下文后的第一个动作。

使用 with,会调用类对应的 __enter____exit__ 方法,只要定义这两个方法即可。

首先

# ratelimiter.py
...

    def __enter__(self):
        '''
        __enter__、与__exit__ 魔术方法,用在 context management。
        
        定义这两个方法后,就可以用 with 语法。

        __enter__ 表示进入作用域时的操作
        '''
        if self.RETRY:
            self.sleep_and_retry()
        else:
            self.limiting()

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """
        与 __enter__ 成对出现。定义了退出上下文(作用域)前最后的操作。
        如清理资源等。
        """
        pass

...

__enter____call__ 逻辑相同,__exit__ 空逻辑。

第二,增加测试样例,

# ratelimiter.py
...
if __name__ == "__main__":
	...

    """
    用法4: with 语法
    """
    def usage_with_stat():
        print(f"\nuse case 4: with 语法")
        limiter = RateLimiter(max_calls=6, period=1, retry=True)

        for i in range(14):
            with limiter:
                print(f"{i}")

    usage_with_stat()

最后,结果如,

use case 4: with 语法
0
1
2
3
4
5
限流,超过最大并发可执行数 6,剩余 0.9999542940000001 秒。sleep。
6
7
8
9
10
11
限流,超过最大并发可执行数 6,剩余 0.9996002860000006 秒。sleep。
12
13

延伸与阅读 #

至此,我们已经知道如何设计开发一个限速器。上述例子,我们并无考虑周期内执行的均匀分布。有可能执行集中在一个时间窗口的前部,而后部是空闲的,容易出现峰值或脉冲。若是为后端服务设计并实现一个限流器,那么需要更好的限流控制策略,使得服务器限流均匀平滑,这样服务器才能更稳定。

总的来说,限流策略以时间因素、执行次数限额做为参数,背后依据单个服务处理时间、消耗资源、系统的负载吞吐能力、合理的控制窗口周期等综合考虑。因此,对于有性能要求、响应表现敏感的系统,需要精心选择策略或算法。

关于控制策略,有一些延伸阅读材料,

相关资料 #

本例完整脚本 #

在 python 3.9 或 python 3.10 下运行。

下载 👉 ratelimiter.py

引用 #

学习编写限速器时,参考了 python 开源库。