Token Bucket Rate Limiter を Python で作ってみた

3 min
backendpython

最近読み始めたシステム設計の面接試験Rate Limiter の章が出てきたので、紹介されていたアルゴリズム Token Bucket を Python で書いて動作をみてみました。

Token Bucket

Token Bucket は、リクエストをトークンとして扱うレート制限アルゴリズムの一種です。 バケツの中にトークンが溜まっていて、1 リクエストにつき 1 個消費し、バケツは一定レートで自動的に補充されるが、容量 (capacity) を超えては溜まりません。

┌─────────────────────────────┐
│ capacity = 5                │   ← バケツの最大容量 (= バーストの上限)
│  ●●●●●                      │
│                             │ ← 1 秒あたり refill_rate 個のペースで補充
│ tokens 残量                  │
└─────────────────────────────┘
         │
         ▼ try_acquire()
   tokens >= 1 なら消費して通す
   足りなければ拒否

性質の特徴としては以下の2つがあります。

  • バーストを許す: しばらくアイドルだった後の連続アクセスは捌ける
  • 長期平均は補充レートに収まる: 持続的な過剰トラフィックは抑え込まれる

実装

状態は tokens (現在の残量) と last_refill (前回補充した時刻) の 2 つだけを持たせます。 実装を簡単にするため、補充は別スレッドで回さずに try_acquire() が呼ばれた瞬間に前回からの経過時間分を一気に補充する遅延補充にしています。

import time
from typing import Callable


class TokenBucket:
    def __init__(
        self,
        capacity: float,
        refill_rate: float,
        now: Callable[[], float] = time.monotonic,
    ) -> None:
        # capacity: バケツの最大容量 (= 一度に許す最大バースト)
        # refill_rate: 1 秒あたりに補充されるトークン数 (= 長期平均レートの上限)
        # now: 現在時刻を返す関数。テストで FakeClock を差し込めるよう依存注入にする
        self.capacity = float(capacity)
        self.refill_rate = float(refill_rate)
        self._now = now
        # 初期はバケツを満タンにしておく。最初の capacity 個までは連続で通る
        self.tokens = float(capacity)
        self.last_refill = now()

    def _refill(self) -> None:
        # 前回補充時刻からの経過秒数だけ、レートに比例してトークンを増やす。
        # ただし capacity を超えて溜まらないように上限でクランプする。
        current = self._now()
        elapsed = current - self.last_refill
        if elapsed <= 0:
            return
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = current

    def try_acquire(self, n: float = 1.0) -> bool:
        # 1. まず現在時刻に応じた補充を反映 (遅延補充)
        # 2. 残量が足りていれば消費して True、足りなければ消費せず False
        # 拒否時に消費しないのが重要 — 減らすと「拒否地獄」で復帰が遅れる
        self._refill()
        if self.tokens >= n:
            self.tokens -= n
            return True
        return False

動かしてみる

capacity=5, refill_rate=2.0/s のバケツに、0.3 秒間隔で 15 回リクエストを投げてみます。

import time
from token_bucket import TokenBucket

bucket = TokenBucket(capacity=5, refill_rate=2.0)
for i in range(15):
    allowed = bucket.try_acquire(1)
    print(f"[{i:02d}] allowed={allowed} tokens={bucket.tokens:.3f}")
    time.sleep(0.3)

実行結果:

[00] allowed=True  tokens=4.000
[01] allowed=True  tokens=3.603
[02] allowed=True  tokens=3.213
[03] allowed=True  tokens=2.823
[04] allowed=True  tokens=2.426
[05] allowed=True  tokens=2.030
[06] allowed=True  tokens=1.639
[07] allowed=True  tokens=1.249
[08] allowed=True  tokens=0.859
[09] allowed=True  tokens=0.468
[10] allowed=True  tokens=0.075
[11] allowed=False tokens=0.684
[12] allowed=True  tokens=0.292
[13] allowed=False tokens=0.902
[14] allowed=True  tokens=0.508

空になるまでの回数

消費と補充は同時に起きているので、1 回あたりに実際に減るトークン数は 1 - refill_rate × interval になります。

  • 1 個消費する
  • 0.3 秒経っているので 0.3 × 2.0 = 0.6 個補充されている
  • 差し引き 0.4 個しか減らない

そのため、空になるまでの回数は以下の式で求まります。

空になるまでの回数 ≒ capacity / (1 - refill_rate × interval)
                      (ただし refill_rate × interval < 1 のとき)

今回の値を入れると 5 / (1 - 2.0 × 0.3) = 5 / 0.4 = 12.5。実測の [11] で初めて拒否されるのと合っています。 容量だけでなく、呼び出し間隔と補充レートの組み合わせで挙動が決まるということがわかりますね。

参考

Stripe Engineering: Scaling your API with rate limiters