Token Bucket Rate Limiter を Redis で分散環境対応してみた

4 min
backendredispython

前回 は Python で 1 プロセス用の Token Bucket を書きました。 今回はその続きで、複数プロセス・複数マシンから同じ Bucket を共有できる分散版に Redis を使って拡張してみます。

なぜ分散版がいるか

それぞれが独立したバケツを持つと全体として設定したレートを超えてしまいます。

ユーザ user-42 のリクエスト
       │
       ├── API server 1 (TokenBucket: 残量 10)
       ├── API server 2 (TokenBucket: 残量 10)
       └── API server 3 (TokenBucket: 残量 10)
                    ↓
   user-42 は実質 30 req/sec まで通せてしまう

そこで状態 (トークンの残数と最後に補充した時刻) を Redis に集約し、全 API サーバが同じバケツを見にいくようにします。

設計上の論点

分散版を Redis で組むときに決めることは大きく 4 つでした。

アトミック性をどう保証するか

愚直に「補充 → 残量チェック → 消費」を行うと、複数プロセスが同時に補充したり、同じトークンを消費してしまう可能性があります。 これを防ぐために Lua スクリプトを使います。Redis は Lua 実行中に他コマンドを処理しない ので、一連の処理が自動的にアトミックになります。

時刻をどこで取るか

補充計算には現在時刻が必要ですが、サーバ間の時計ずれを防ぐ必要があります。 そこで TIME コマンドを使って、Redis のサーバ側の時刻を基準にします。

状態をどう持つか

トークン残数と最後に補充した時刻を別々のキーに分けると、読み書きが 2 回に増えてしまいアトミックに扱いづらくなります。 そこで HASH 型で 1 キーに集約して、フィールド tokenslast_refill を持つ形にしました。

TTL を設定するか

一定時間アクセスがないと、トークンは capacity まで満タンに溜まりきるため状態を永続化しておく必要はありません。 満タンになるまでの最大時間 capacity / refill_rate に少し余裕を見た TTL を設定して、EXPIRE を使って削除します。

Lua スクリプトの実装

-- KEYS[1] : レート制限の対象キー (例: ratelimit:user:42)
-- ARGV[1] : capacity    … バケツの最大容量
-- ARGV[2] : refill_rate … 1 秒あたりに補充するトークン数
-- ARGV[3] : requested   … 今回消費したいトークン数
-- ARGV[4] : ttl         … キーの生存秒数 (アイドル時の自動削除用)

local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])

-- 現在時刻はクライアントではなく Redis サーバから取得する。
-- 全クライアントが同じ時計を見るので、サーバ間の時計ずれが起きない。
-- TIME は { 秒, マイクロ秒 } を返すので、秒の小数 1 つにまとめる。
local t = redis.call('TIME')
local now = tonumber(t[1]) + tonumber(t[2]) / 1e6

-- バケツの状態 (残量と最終更新時刻) を 1 回の HMGET でまとめて読む。
-- キーが無い (初アクセス or TTL で消えた後) なら満タン・now 起点で開始。
local raw = redis.call('HMGET', KEYS[1], 'tokens', 'last_refill')
local tokens = tonumber(raw[1]) or capacity
local last_refill = tonumber(raw[2]) or now

-- 遅延補充: 前回からの経過時間分だけトークンを足す。
-- capacity を超えないよう min でクランプする。
local elapsed = now - last_refill
if elapsed > 0 then
  tokens = math.min(capacity, tokens + elapsed * rate)
end

-- 残量が要求数以上なら消費して許可、足りなければ消費せず拒否。
-- 拒否時はトークンを減らさない(拒否でも減ると不公平)。
local allowed = 0
if tokens >= requested then
  tokens = tokens - requested
  allowed = 1
end

-- 更新後の状態を書き戻す。last_refill は now で上書きして、
-- 次回呼び出し時の「経過時間」計算の起点にする。
redis.call('HMSET', KEYS[1], 'tokens', tostring(tokens), 'last_refill', tostring(now))
-- アクセスのたびに TTL を延長。使われ続ける限り消えず、放置されれば自然消滅する。
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[4]))

-- 許可フラグ (1/0) と現在の残量を返す。tokens は float なので文字列化して返す。
return { allowed, tostring(tokens) }

複数プロセス同時アクセスで効いていることを確認

Python でマルチプロセスを立てて、同じバケツに同時にアクセスしてみます。 条件は次のとおりです。

  • トークン上限: 10
  • 補充速度: 5.0/s
  • 各リクエストの間隔: 0.1 秒
  • ワーカー数: 3
  • 各ワーカーのリクエスト数: 30
import multiprocessing as mp
import time
import redis
from distributed.lua_bucket import LuaBucket

KEY = "ratelimit:demo:shared"

def worker(worker_id, result_queue):
    # 各ワーカーは別プロセス・別接続だが、見にいくキー (バケツ) は同じ
    r = redis.Redis(host="localhost", port=6379, decode_responses=True)
    bucket = LuaBucket(r, KEY, capacity=10, refill_rate=5.0)

    allowed = 0
    for i in range(30):
        ok = bucket.try_acquire(1)
        if ok:
            allowed += 1
        print(f"[worker {worker_id}] {i:02d} allowed={ok}")
        time.sleep(0.1)
    result_queue.put((worker_id, allowed))

def main():
    redis.Redis(decode_responses=True).delete(KEY)  # 満タンから始めるためクリア

    queue = mp.Queue()
    procs = [mp.Process(target=worker, args=(i, queue)) for i in (1, 2, 3)]
    for p in procs:
        p.start()
    for p in procs:
        p.join()

    results = sorted(queue.get() for _ in procs)
    for worker_id, allowed in results:
        print(f"worker {worker_id}: allowed={allowed}")
    print(f"合計許可数: {sum(a for _, a in results)}")

実行すると次の結果になりました。

[worker 1] 00 allowed=True
[worker 2] 00 allowed=True
[worker 3] 00 allowed=True
[worker 2] 01 allowed=True
[worker 1] 01 allowed=True
[worker 3] 01 allowed=True
[worker 2] 02 allowed=True
[worker 1] 02 allowed=True
[worker 3] 02 allowed=True
[worker 2] 03 allowed=True
[worker 1] 03 allowed=True
[worker 3] 03 allowed=False
...
worker 1: allowed=6
worker 2: allowed=11
worker 3: allowed=8
合計許可数: 25

開始直後はバケツが満タン (capacity=10) なので、まず 10 個分は無条件で通ります。 それに加えて、リクエストを捌いている数百ミリ秒の間にも少しずつ補充が進みます。最初の拒否までの約 0.3 秒分の補充を足すと、

補充分     = refill_rate × 経過秒 = 5.0 × 0.3 ≒ 1.5 個
通った回数  ≒ capacity + 補充分 = 10 + 1.5 ≒ 11.5

となり、11 回ほど通ったところで最初の拒否が出るのは自然な挙動です。

合計が 25 に収まる理由

3 ワーカー合計で 90 回 (30 × 3) リクエストしましたが、通ったのは 25 回だけです。これは理論上限とほぼ一致しています。

理論上限 = capacity + refill_rate × 経過秒
        = 10 + 5.0 × (30 回 × 0.1 秒)
        = 10 + 5.0 × 3.0
        = 25

ワーカーをいくら増やしても許可されるのは、初期の満タン分 + 経過時間に応じた補充分だけです。 ワーカー間の取り分 (6 / 11 / 8) はタイミング次第でブレますが、合計が上限を超えないのがレート制御が分散環境で効いている証拠になります。

感想

無事に分散版の Token Bucket Rate Limiter が作れました。 Redis をちゃんと触ったのも初めてだったので面白かったです。