Token Bucket Rate Limiter を Python で作ってみた
3 min
backendpython
最近読み始めたシステム設計の面接試験 に Rate Limiter の章が出てきたので、紹介されていたアルゴリズム Token Bucket を Python で書いて動作をみてみました。
Token Bucket
Token Bucket は、リクエストをトークンとして扱うレート制限アルゴリズムの一種です。
バケツの中にトークンが溜まっていて、1 リクエストにつき 1 個消費し、バケツは一定レートで自動的に補充されるが、容量 (capacity) を超えては溜まりません。
┌─────────────────────────────┐
│ capacity = 5 │ ← バケツの最大容量 (= バーストの上限)
│ ●●●●● │
│ │ ← 1 秒あたり refill_rate 個のペースで補充
│ tokens 残量 │
└─────────────────────────────┘
│
▼ try_acquire()
tokens >= 1 なら消費して通す
足りなければ拒否
性質の特徴としては以下の2つがあります。
- バーストを許す: しばらくアイドルだった後の連続アクセスは捌ける
- 長期平均は補充レートに収まる: 持続的な過剰トラフィックは抑え込まれる
実装
状態は tokens (現在の残量) と last_refill (前回補充した時刻) の 2 つだけを持たせます。
実装を簡単にするため、補充は別スレッドで回さずに try_acquire() が呼ばれた瞬間に前回からの経過時間分を一気に補充する遅延補充にしています。
import time
from typing import Callable
class TokenBucket:
def __init__(
self,
capacity: float,
refill_rate: float,
now: Callable[[], float] = time.monotonic,
) -> None:
# capacity: バケツの最大容量 (= 一度に許す最大バースト)
# refill_rate: 1 秒あたりに補充されるトークン数 (= 長期平均レートの上限)
# now: 現在時刻を返す関数。テストで FakeClock を差し込めるよう依存注入にする
self.capacity = float(capacity)
self.refill_rate = float(refill_rate)
self._now = now
# 初期はバケツを満タンにしておく。最初の capacity 個までは連続で通る
self.tokens = float(capacity)
self.last_refill = now()
def _refill(self) -> None:
# 前回補充時刻からの経過秒数だけ、レートに比例してトークンを増やす。
# ただし capacity を超えて溜まらないように上限でクランプする。
current = self._now()
elapsed = current - self.last_refill
if elapsed <= 0:
return
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = current
def try_acquire(self, n: float = 1.0) -> bool:
# 1. まず現在時刻に応じた補充を反映 (遅延補充)
# 2. 残量が足りていれば消費して True、足りなければ消費せず False
# 拒否時に消費しないのが重要 — 減らすと「拒否地獄」で復帰が遅れる
self._refill()
if self.tokens >= n:
self.tokens -= n
return True
return False
動かしてみる
capacity=5, refill_rate=2.0/s のバケツに、0.3 秒間隔で 15 回リクエストを投げてみます。
import time
from token_bucket import TokenBucket
bucket = TokenBucket(capacity=5, refill_rate=2.0)
for i in range(15):
allowed = bucket.try_acquire(1)
print(f"[{i:02d}] allowed={allowed} tokens={bucket.tokens:.3f}")
time.sleep(0.3)
実行結果:
[00] allowed=True tokens=4.000
[01] allowed=True tokens=3.603
[02] allowed=True tokens=3.213
[03] allowed=True tokens=2.823
[04] allowed=True tokens=2.426
[05] allowed=True tokens=2.030
[06] allowed=True tokens=1.639
[07] allowed=True tokens=1.249
[08] allowed=True tokens=0.859
[09] allowed=True tokens=0.468
[10] allowed=True tokens=0.075
[11] allowed=False tokens=0.684
[12] allowed=True tokens=0.292
[13] allowed=False tokens=0.902
[14] allowed=True tokens=0.508
空になるまでの回数
消費と補充は同時に起きているので、1 回あたりに実際に減るトークン数は 1 - refill_rate × interval になります。
- 1 個消費する
- 0.3 秒経っているので 0.3 × 2.0 = 0.6 個補充されている
- 差し引き 0.4 個しか減らない
そのため、空になるまでの回数は以下の式で求まります。
空になるまでの回数 ≒ capacity / (1 - refill_rate × interval)
(ただし refill_rate × interval < 1 のとき)
今回の値を入れると 5 / (1 - 2.0 × 0.3) = 5 / 0.4 = 12.5。実測の [11] で初めて拒否されるのと合っています。
容量だけでなく、呼び出し間隔と補充レートの組み合わせで挙動が決まるということがわかりますね。