- /
- Skills/
- 后端编程/
- 一个简单策略的限速器/
一个简单策略的限速器
目录
限速器,限制一定时间周期内的运动速度、频率等。网络应用中,后端服务通常使用限速器来保护服务应用程序的稳定,因控制单个服务使用资源难度大,而通过控制单位时间内可同时接受多个服务请求的最大请求数、连接数等来限制系统的并发度。也称为限流器。
当使用客户端向服务器同时发出过多的请求,导致服务器拒绝响应,需要对客户端的服务请求连接限速,并加上等待重试的特性,以免流程中断。
本文将介绍如何实现一个简单的限速器。
原理 #
如果对服务器的限流、与客户端的限速控制进行抽象,那么只要在循环并发过程中,添加一个检查器,当达到容量限制条件时暂停运行则可。按照这个思路,使用 python 实现一个简单策略的限速器。
为了简化限速器的实现,本文没有讨论速度控制策略的选择。
假设: 固定时间周期内,执行数不超过最大值。当时间周期结束后,进入下一个周期,重计执行数。
过程中的状态与操作 #
状态
- 时间周期,不变值。
- 最大执行数,不变值。
- 当前执行数,可变值。
- 最近一个时间周期结束时点,可变值。
- 当前时点,可变值。
- 其它中间值由上述各变量在运行中计算得到,如 剩余时间 = 当前时点 - 结束时点。
操作
- 到达条件检查
- 累计当前执行数
- 重置最近一个周期的结束时点、当前执行数
python 实现 #
类的定义 #
与大多数网络文章或开源库类似,这个类也叫 RateLimiter
# -*- coding: utf-8 -*-
# ratelimiter.py
import time
import threading
class RateLimiter:
def __init__(self, max_calls: int = 10, period: int = 2):
"""
参数
----------
max_calls : int, optional
最多可执行数,大于0的整数. The default is 10.
period : int, optional
一个时间控制周期(秒),大于0的整数. The default is 2.
"""
self.MAX_CALLS = max_calls
self.PERIOD = period
# 初始化控制器状态,状态值修改统一交由 self.reset 负责
self.num_of_calls = 0
self.last_reset = 0
self.reset()
# 为了线程安全创建一个🔒,在限速控制或重置修改内部状态时使用
self.state_lock = threading.RLock()
def clock(self) -> float:
'''
获取当前时钟
'''
pass
def window_remain(self) -> float:
'''
计算当前周期的剩余时间
'''
pass
def reset(self) -> None:
"""
重置限速器。以进入下一个周期。
"""
pass
def limiting(self) -> None:
'''
检查是否可以执行,若不符合执行条件,则抛异常。
'''
pass
与上一节设计中状态、操作对应如下:
分类 | 项 | python |
---|---|---|
状态 | 时间周期 | PERIOD |
状态 | 最大可执行数 | MAX_CALLS |
状态 | 当前执行数 | num_of_calls |
状态 | 最后结束时点 | last_reset |
状态 | 当前时点 | 由函数 clock 来获得当前时间 |
状态 | 剩余时间 | 由函数 window_remain 计算得到 |
操作 | 条件检查 | 在函数 limiting 中实现 |
操作 | 累计当前执行数 | 在函数 limiting 中实现 |
操作 | 重置结束时点、当前执行数归0 | 由函数 reset 负责 |
类方法 - clock #
# ratelimiter.py
...
def clock(self) -> float:
'''
获取当前时钟,用于控制时间周期
ref: https://github.com/tomasbasham/ratelimit
Use monotonic time if available, otherwise fall back to the system clock.
'''
if hasattr(time, 'monotonic'):
return time.monotonic()
return time.time()
...
返回当前时间。并不需要知道是几日几时几分,只需一个唯一表示时点的数字即可,不同时刻它的值不同。两个不同的时点值之差,表示时点之间的秒数。
类方法 - window_remain #
# ratelimiter.py
...
def window_remain(self) -> float:
'''
返回当前限速控制窗口的剩余时间.
'''
span = self.clock() - self.last_reset
return self.PERIOD - span
...
当剩余时间 <= 0 时,表示已经超过当前控制窗口,可以进入下一个周期。
类方法 - reset #
# ratelimiter.py
...
def reset(self) -> None:
"""
重置限速器。
"""
self.num_of_calls = 0
self.last_reset = self.clock()
...
self.num_of_calls
表示当前周期内的执行数,重置后归零。self.last_reset
表示最近一次重置的时点,用于计算控制窗口的剩余时间。它们的值仅在 self.reset()
函数中修改。
类方法 - limiting #
# ratelimiter.py
...
def limiting(self) -> None:
'''
检查是否可以执行,若不符合执行条件,则抛异常。
'''
with self.state_lock:
remaining = self.window_remain()
# 如果时间窗口已过,则重置内部状态
if remaining <= 0:
self.reset()
# 如果在时间窗口内,则检查调用数
# 当大于最大值时,抛异常或 sleep
self.num_of_calls += 1
if self.num_of_calls > self.MAX_CALLS:
raise RateLimitException(
f"限流,超过最大并发可执行数 {self.MAX_CALLS}," \
f"剩余 {remaining} 秒", remaining)
return
...
注意 self.state_lock
锁的作用,它具线程安全,内部状态的更改,都在 state_lock
的作用范围内。
控制函数中,先判断 剩余时间
是否 小于等于0 ?
- 若是,表示需要进入下一个周期,重置限制器状态,记录当前时点,当前执行数归零。
- 若否,则表示还处在当前周期的控制窗口内。执行数累加1,并判断是否超过阀值。若在最大执行数以内,程序正常执行。当超过阀值,则抛出异常。
当然,完全可以将 “raise exception” 改为 time.sleep(remaining)
,就得到了完整功能的限速器。但有悟认为,遇到异常应该抛出,然后在外层调用处捕捉再根据需要选择应对。这样程序更通用化、更具灵活性。
RateLimitException
类定义如下:
# ratelimiter.py
class RateLimitException(Exception):
'''
Rate limit 异常类.
包含一个 remaining 属性,当异常抛出被捕捉到时,可以做更详尽的提示
'''
def __init__(self, message, remaining):
super(RateLimitException, self).__init__(message)
self.remaining = remaining
运行测试 #
# ratelimiter.py
if __name__ == "__main__":
"""
用法1: 普通函数调用,抛异常
"""
def usage_with_raise():
print(f"\nuse case 2: 抛异常")
limiter = RateLimiter(4, 1)
for i in range(11):
limiter.limiting()
print(f"{i}")
usage_with_raise()
在脚本文件最后添加上面的测试代码。代码中显示了限速器的用法,先创建一个限速器对象,示例中的参数表示每秒最多只能循环4次。将在循环中调用控制函数 limiter.limiting()
,当超过限制时,程序会抛出异常并中断。
在命令行运行得到类似于这样的结果:
py/limit/limiter via 🐍 v3.9.15 (env)
❯ python ratelimiter.py
use case 1: 抛异常
0
1
2
3
Traceback (most recent call last):
File "/Users/macbook/projects/py/limit/limiter/ratelimiter.py", line 100, in <module>
usage_with_raise()
File "/Users/macbook/projects/py/limit/limiter/ratelimiter.py", line 97, in usage_with_raise
limiter.limiting()
File "/Users/macbook/projects/py/limit/limiter/ratelimiter.py", line 72, in limiting
raise RateLimitException(
__main__.RateLimitException: 限流,超过最大并发可执行数 4,剩余 0.999968913 秒
改进 #
等待并重试 #
上述实现已经是一个完整的限速器。但在限制条件达到时,只有抛异常。但有时更可能是需要程序等待片断后重试。在上述代码的基础上,为类 RateLimiter
增加一个函数 sleep_and_retry
,它实现捕捉异常,并等待剩余时间消逝后重试。
# ratelimiter.py
...
def sleep_and_retry(self):
'''
检查是否可以执行,若不符合执行条件,则 sleep 等待。
它与 self.limiting 函数属于两种不同的策略。
'''
while True:
try:
self.limiting()
except RateLimitException as e:
print(f"{e}。sleep。")
time.sleep(e.remaining)
else:
break
...
函数中使用到了 RateLimitException
异常中保存的剩余时间 remaining
。 修改 if __name__ == "__main__":
下的测试代码为:
# ratelimiter.py
if __name__ == "__main__":
"""
用法1: 普通函数调用,抛异常
"""
def usage_with_raise():
print(f"\nuse case 1: 抛异常")
limiter = RateLimiter(4, 1)
for i in range(11):
limiter.limiting()
print(f"{i}")
# 捕捉 limiting() 抛异常
try:
usage_with_raise()
except RateLimitException as e:
print(f"出现异常: {e}。退出。")
"""
用法2: 普通函数调用,等待并重试
"""
def usage_with_wait():
print(f"\nuse case 2: 等待并重试")
limiter = RateLimiter(4, 1)
for i in range(9):
limiter.sleep_and_retry()
print(f"{i}")
usage_with_wait()
在用法上, sleep_and_retry
与 limiting
相同。它内部也调用 limiting
,捕捉到它抛出的异常,然后睡眠剩余时间,以让程序可以继续执行。在运行前,修改脚本 捕捉 usage_with_raise
的异常,不然脚本无法往后运行。运行结果:
py/limit/limiter via 🐍 v3.9.15 (env) took 3s
❯ python simple_limiter.py
use case 1: 抛异常
0
1
2
3
出现异常: 限流,超过最大并发可执行数 4,剩余 0.999962436 秒。退出。
use case 2: 等待并重试
0
1
2
3
限流,超过最大并发可执行数 4,剩余 0.999934489 秒。sleep。
4
5
6
7
限流,超过最大并发可执行数 4,剩余 0.999908767 秒。sleep。
8
像装饰器一样使用(pythonic) #
在 python 世界中, @
符号到处可见,它是装饰器(也叫包装器,由英文 decorator 翻译而来)的专属符号。我们同样可以像使用装饰器一样的方式来使用 RateLimiter
。
由于存在 抛异常、等待并重试 两种方式,使用装饰器包装函数时,并没办法指定使用方法 limiting()
还是方法 sleep_and_retry()
?需要定义一个新的输入参数来接收这个选项。
首先,为 RateLimiter
添加一个属性,初始方法对应增加一个输入参数。
# ratelimiter.py
...
def __init__(self, max_calls: int = 10, period: int = 2, retry: bool=False):
"""
Parameters
----------
max_calls : int, optional
最多可执行数,大于0的整数. The default is 10.
period : int, optional
一个时间控制周期(秒),大于0的整数. The default is 2.
retry : bool, optional
是否重试. The default is False.
self.MAX_CALLS = max_calls
self.PERIOD = period
self.RETRY = retry # 只有当装饰器 @RateLimiter 时才使用
所增加的输入参数 retry: bool = False
,默认为 False,则当遇到超限时,抛异常。增加类属性 self.RETRY
,用于保存 retry
。
第二,定义 RateLimiter
的 __call__
魔术方法。
# ratelimiter.py
import functools
...
def __call__(self, func):
'''
python magic 方法,定义了类的 __call__ 方法后,
允许对象当用做函数使用,x(args) => type(x).__call__(args)。
定义 @RateLimiter(...) 为包装器,在执行之前,先执行 self.sleep_and_retry() 或者 self.limiting() 控制函数。
'''
@functools.wraps(func)
def wrapped(*args, **kwargs):
if self.RETRY:
self.sleep_and_retry()
else:
self.limiting()
return func(*args, **kwargs)
return wrapped
...
记得在脚本的前面 import functools
。__call__
函数本身是一个包装器,被它包装的函数在运行前,都会先运行 sleep_and_retry
或者 limiting
函数,这样就达到控制的目的,没有侵入性,人畜无害。
第三,增加测试样例代码,如下:
# ratelimiter.py
...
if __name__ == "__main__":
...
"""
用法3: 装饰器用法,重试
"""
def usage_with_decrator():
print(f"\nuse case 3: 装饰器, 等待并重试")
@RateLimiter(max_calls=5, period=2, retry=True)
def f(i):
print(f"{i}")
[f(i) for i in range(12)]
usage_with_decrator()
样子好帅,显式的 limiter.sleep_and_retry()
、limiter.limiting()
调用不见了。
第四,命令行运行,结果如下:
...
use case 3: 装饰器, 等待并重试
0
1
2
3
4
限流,超过最大并发可执行数 5,剩余 1.9998068769999997 秒。sleep。
5
6
7
8
9
限流,超过最大并发可执行数 5,剩余 1.9998702489999998 秒。sleep。
10
11
在 with 语句中使用(pythonic) #
与装饰器类似,with 语句也很 python。比如,函数 open
打开文件时,使用 with open(文件名) as f:
,它巧妙的将变量 f
作用范围包裹起来,仅可以在 with
下的语句块中有效,并且在跳出 with
块后,f
所指的资源也被释放,无须手动关闭。同样,如本文中例中的 state_lock
线程级别锁,在每次检查与修改内部状态时,都在其作用范围内。
python 为它定义了一个名词,With Statement Context Managers。
既然是上下文,那么就有这些确定的概念,创建上下文、进入上下文、退出上下文。 又有,
- 创建上下文
- 在进入一个上下文时的第一个动作,进行一系列初始化;
- whatever
- 退出上下文最后一个动作(往往是一些资源释放、空间清理相关的动作)
python 通过 with 语法,把上面 4个部分有机关联起来。反过来说,如果我们定义了 进入上下文、退出上下文,就可以像 open
一样,with XXXX() as x:
。
对于本例,限速器来说,可以把一个循环迭代看成是一次进、出上下文的过程。业务函数被包裹在上下文中执行。而限速检查,是进入上下文后的第一个动作。
使用 with
,会调用类对应的 __enter__
、__exit__
方法,只要定义这两个方法即可。
首先,
# ratelimiter.py
...
def __enter__(self):
'''
__enter__、与__exit__ 魔术方法,用在 context management。
定义这两个方法后,就可以用 with 语法。
__enter__ 表示进入作用域时的操作
'''
if self.RETRY:
self.sleep_and_retry()
else:
self.limiting()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
与 __enter__ 成对出现。定义了退出上下文(作用域)前最后的操作。
如清理资源等。
"""
pass
...
__enter__
与 __call__
逻辑相同,__exit__
空逻辑。
第二,增加测试样例,
# ratelimiter.py
...
if __name__ == "__main__":
...
"""
用法4: with 语法
"""
def usage_with_stat():
print(f"\nuse case 4: with 语法")
limiter = RateLimiter(max_calls=6, period=1, retry=True)
for i in range(14):
with limiter:
print(f"{i}")
usage_with_stat()
最后,结果如,
use case 4: with 语法
0
1
2
3
4
5
限流,超过最大并发可执行数 6,剩余 0.9999542940000001 秒。sleep。
6
7
8
9
10
11
限流,超过最大并发可执行数 6,剩余 0.9996002860000006 秒。sleep。
12
13
延伸与阅读 #
至此,我们已经知道如何设计开发一个限速器。上述例子,我们并无考虑周期内执行的均匀分布。有可能执行集中在一个时间窗口的前部,而后部是空闲的,容易出现峰值或脉冲。若是为后端服务设计并实现一个限流器,那么需要更好的限流控制策略,使得服务器限流均匀平滑,这样服务器才能更稳定。
总的来说,限流策略以时间因素、执行次数限额做为参数,背后依据单个服务处理时间、消耗资源、系统的负载吞吐能力、合理的控制窗口周期等综合考虑。因此,对于有性能要求、响应表现敏感的系统,需要精心选择策略或算法。
关于控制策略,有一些延伸阅读材料,
相关资料 #
本例完整脚本 #
在 python 3.9 或 python 3.10 下运行。
下载 👉 ratelimiter.py
引用 #
学习编写限速器时,参考了 python 开源库。