Redis Pub/Sub でシンプルな通知システムを書いてみる

4 min

Pub/Sub の理解を深めたかったので 1つのイベントを複数の処理に配るシンプルな通知システムを書いてみました。 asyncio + Redis Pub/Sub を使います。

1つのイベントに対して複数の処理を走らせたいので、注文が完了したらメールを送ってポイントも付与するという題材にします。

[Publisher]
    │ PUBLISH "order_completed"
    ├─→ [Subscriber: email]   メール通知を送る
    └─→ [Subscriber: points]  ポイントを付与する

発行する側(publisher)は通知を1回投げるだけで、受け取る側(subscriber)がそれぞれ独立して反応します。

実装

Publisher: イベントを投げるだけ

import asyncio
import json

import redis.asyncio as redis

CHANNEL = "order_completed"


async def main() -> None:
    # Redis に接続する
    client = redis.from_url("redis://localhost:6379")

    # 発火するイベント。JSON 文字列にしてチャンネルへ流す
    event = {"order_id": 1234, "user": "alice", "amount": 5000}
    # チャンネルに1回 PUBLISH するだけ。戻り値は受け取った subscriber の数
    received = await client.publish(CHANNEL, json.dumps(event))

    print(f"published to '{CHANNEL}': {event}")
    print(f"  → 受け取った subscriber 数: {received}")
    await client.aclose()


if __name__ == "__main__":
    asyncio.run(main())

チャンネルに1回 publish するだけです。publish の戻り値は、そのイベントを受け取った subscriber の数なので、ついでに表示しています。

Subscriber: 購読して待ち続ける

import asyncio
import json
import sys

import redis.asyncio as redis

CHANNEL = "order_completed"


# ロールごとの処理。email はメール送信、points はポイント付与を担当する
async def handle_email(event: dict) -> None:
    print(f"[email]  {event['user']} に注文完了メールを送信(order {event['order_id']})")


async def handle_points(event: dict) -> None:
    points = event["amount"] // 100   # 100円で1ポイント
    print(f"[points] {event['user']}{points} ポイント付与(order {event['order_id']})")


# 起動引数の文字列から担当ハンドラを引くための対応表
HANDLERS = {"email": handle_email, "points": handle_points}


async def main(role: str) -> None:
    handler = HANDLERS[role]          # このプロセスが担当する処理を1つ決める
    client = redis.from_url("redis://localhost:6379")

    # チャンネルを購読する
    pubsub = client.pubsub()
    await pubsub.subscribe(CHANNEL)
    print(f"[{role}] '{CHANNEL}' を購読開始。イベント待機中...")

    # メッセージが届くたびに処理する。届くまではここで待ち続ける
    async for message in pubsub.listen():
        # 購読開始時の確認メッセージなどは type != "message" なので飛ばす
        if message["type"] != "message":
            continue
        event = json.loads(message["data"])   # JSON 文字列を dict に戻す
        await handler(event)                  # ロールに応じた処理を呼ぶ


if __name__ == "__main__":
    # 起動時の引数でロールを切り替える(例: python subscriber.py points)
    role = sys.argv[1] if len(sys.argv) > 1 else "email"
    asyncio.run(main(role))

subscribe でチャンネルを購読し、メッセージが来るたびに処理します。引数でロール(email / points)を切り替えるようにしています。 ロールごとの違いはハンドラーだけで、購読のループは同じです。新しい通知先を足したいときは、ハンドラを1つ増やすだけで済みます。

動かしてみる

# ターミナル1:Redis
redis-server

# ターミナル2:subscriber(メール通知担当)
python subscriber.py email

# ターミナル3:subscriber(ポイント付与担当)
python subscriber.py points

Redis と subscriber 2つをそれぞれ起動します。

# ターミナル4:publisher
python publisher.py

publisher を実行してイベントを発火させます。

# ターミナル2(email)
[email]  alice に注文完了メールを送信(order 1234)

# ターミナル3(points)
[points] alice に 50 ポイント付与(order 1234)

すると subscriber 側にそれぞれの処理結果が出ます。 1回の PUBLISH が両方に独立して届いてハンドラごとに別の処理をしています。

published to 'order_completed': {'order_id': 1234, 'user': 'alice', 'amount': 5000}
  → 受け取った subscriber 数: 2

publisher 側には受け取った subscriber の数が出ています。

イベントは保存されない

Redis の Pub/Sub は publish した瞬間に購読している接続へ配るだけでイベントを保存しません。 そのため subscriber が落ちている間に発火したものは復帰しても受け取れることができません。

email を止めてから publish してみます。

published to 'order_completed': {'order_id': 1234, 'user': 'alice', 'amount': 5000}
  → 受け取った subscriber 数: 1

points だけが受け取り数も 1 に減りました。 取りこぼしたくない通知では、Docs に書いてあるようにイベントを残しておける Redis Streams のような仕組みが必要そうです。

おわりに

前日に試したタスクキューとは対照的でした。 タスクキューは1つのタスクを1人のワーカーが取って消えるのに対し、Pub/Sub は1つのイベントを聞いている全員に配ってその場で消えます。 取りこぼしたくない通知では、Docs に書いてあるように Redis Streams をイベントを残しておくような工夫が必要そうです。

0