Redis Streams でイベントを残せる通知システムを書いてみる

4 min

前回 Redis Pub/Sub で通知システムを書いたとき、subscriber が落ちている場合などにイベントを取りこぼすという弱点が残りました。 そこで、イベントを残せる Redis Streams を試してみました。

[Publisher] XADD order_stream {...}      イベントをストリームに追記(消えない)
    │
    ▼
  order_stream                            ログとして蓄積される
    │
    ▼
[Consumer] XREAD order_stream <last_id>   続きから読む(過去分も読める)

題材は前回と同じ通知シナリオ(注文完了でメール通知とポイント付与)です。Pub/Sub が撃ちっぱなしだったのに対し、Streams はイベントをログに残すので、consumer がいない間に追記されたものも後から読めます。

XADD と XREAD の役割

Stream は追記専用のログ、つまり末尾に足すだけで過去を書き換えないデータ構造です。redis-cliXADDXREAD を直接叩くと、動きが分かりやすいです。

XADD はエントリを1件、ストリームの末尾に追記します。

$ redis-cli XADD order_stream * data '{"order_id":1234,"user":"alice"}'
1781706981036-0
$ redis-cli XADD order_stream * data '{"order_id":5678,"user":"bob"}'
1781706981040-0

* は ID を自動採番する指定で、返ってくる 1781706981036-0<ミリ秒>-<連番>)がそのエントリの ID です。追記したエントリは読み手がいなくても残ります。

XREAD はストリームからエントリを読むコマンドで、開始位置を指定してその続きから読めます。

$ redis-cli XREAD STREAMS order_stream 0
order_stream
  1781706981036-0  data {"order_id":1234,"user":"alice"}
  1781706981040-0  data {"order_id":5678,"user":"bob"}

渡す ID で読む範囲が変わります。0 を渡すと、先頭から(過去分すべて)読めます。

$ redis-cli XREAD STREAMS order_stream 1781706981036-0
order_stream
  1781706981040-0  data {"order_id":5678,"user":"bob"}

ここでは1件目の ID を渡したので、2件目だけが返ります。

$ redis-cli XREAD STREAMS order_stream $
(nil)

$ を渡すと、今ある最後より後、つまりこれから来る新着だけが対象になります。 既存は読まないので新着が無ければ何も返りません。

実装

Publisher: XADD でストリームに追記する

import asyncio
import json

import redis.asyncio as redis

STREAM = "order_stream"


async def main() -> None:
    client = redis.from_url("redis://localhost:6379", decode_responses=True)

    event = {"order_id": 1234, "user": "alice", "amount": 5000}
    # XADD でストリームに追記。ID を省略すると自動採番される(redis-cli の * に相当)
    entry_id = await client.xadd(STREAM, {"data": json.dumps(event)})

    print(f"XADD {STREAM} → id={entry_id}: {event}")
    await client.aclose()


if __name__ == "__main__":
    asyncio.run(main())

前回 publisher が publish を呼んでいたところが、今回は xadd に変わっています。 publish は配るだけで何も残しませんでしたが、xadd はストリームへ1件追記し、自動採番された ID を返します。追記されたエントリはストリームに残り続けるので、読み手がいなくても消えません。

Consumer: XREAD で続きから読む

import asyncio
import json
import sys

import redis.asyncio as redis

STREAM = "order_stream"


async def handle_email(event: dict) -> None:
    print(f"[email]  {event['user']} に注文完了メールを送信(order {event['order_id']})")


async def handle_points(event: dict) -> None:
    points = event["amount"] // 100   # 100円で1ポイント
    print(f"[points] {event['user']}{points} ポイント付与(order {event['order_id']})")


HANDLERS = {"email": handle_email, "points": handle_points}


async def main(role: str) -> None:
    handler = HANDLERS[role]
    client = redis.from_url("redis://localhost:6379", decode_responses=True)

    # "0" = ストリームの最初から読む(過去分も全部)。"$" にすると新着のみ
    last_id = "0"
    print(f"[{role}] '{STREAM}' を id={last_id} から読み始めます...")

    while True:
        # block=0 で新着が来るまで待機。last_id より後のエントリだけ返る
        resp = await client.xread({STREAM: last_id}, block=0)
        for _stream, entries in resp:
            for entry_id, fields in entries:
                event = json.loads(fields["data"])
                await handler(event)
                last_id = entry_id   # 読んだ位置を進める(次はこの続きから)


if __name__ == "__main__":
    role = sys.argv[1] if len(sys.argv) > 1 else "email"
    if role not in HANDLERS:
        print(f"unknown role: {role!r}(選べるのは: {', '.join(HANDLERS)})")
        sys.exit(1)
    try:
        asyncio.run(main(role))
    except KeyboardInterrupt:
        pass

ロールごとの処理(email / points)を引数で切り替える構成は前回と同じです。 違うのは読み方で、xread に開始位置 last_id を渡してその続きを読みます。最初は "0" を渡しているのでストリームの先頭から(起動前に追記されたイベントも含めて)読み始めます。 読み終わったエントリの ID を last_id に保存しておき、次の xread ではその続きから受け取ります。block=0 を付けることで続きが無いときは待機します。

動かしてみる

# ターミナル1:Redis
redis-server

# ターミナル2:consumer はまだ起動せず、publisher だけ2回実行
python publisher.py
python publisher.py

この時点で consumer は1つも起動していませんが、ストリームの長さを確認すると2件残っています。

# ターミナル2
$ redis-cli xlen order_stream
2

Pub/Sub なら誰も購読していない瞬間の publish は消えていましたが、Streams は追記した2件がそのまま残っています。 ここで consumer(email)を起動します。

# ターミナル3:consumer(email)を起動
$ python consumer.py email
[email] 'order_stream'id=0 から読み始めます...
[email]  alice に注文完了メールを送信(order 1234)
[email]  alice に注文完了メールを送信(order 1234)

起動直後に、起動前へ追記しておいた2件をまとめて読みました。 consumer が動き始めたのは publisher の実行よりあとですが、id=0 から読んでいるので取りこぼしていません。

# ターミナル2:consumer が起動したあとに、もう1回 publisher
python publisher.py

続けて publisher をもう1回実行すると、3件目はリアルタイムで届きます。

# ターミナル3(email)
[email]  alice に注文完了メールを送信(order 1234)

block=0 で待機していた xread が新着を受け取り、すぐに処理されました。

おわりに

Pub/Sub での取りこぼしが、わりとシンプルなコードで解決できました。 とはいえ Redis 自体が落ちた場合はまた別の対応が必要なので、そのあたりの工夫も追々考えてみたいです。

0