Redis Streams の Consumer Group で処理済みを管理する

4 min

前回 Redis Streams で通知システムを書いたとき、consumer は読んだ位置 last_id をメモリに持つだけでした。再起動すると last_id0 に戻り、また最初から読み直してしまいます。 読んだ位置を Redis 側に覚えさせ、処理が終わったものを ack で管理したかったので、Consumer Group を試してみました。

[Publisher] XADD → order_stream
                     ├─→ group: email   メール送信して XACK
                     └─→ group: points  ポイント付与して XACK

email と points を別グループにすると、両方が同じイベントを独立して受け取ります。同じグループ内に consumer を増やすと、メッセージはそのグループ内で分担されます。

XGROUP / XREADGROUP / XACK の役割

redis-cli で動きを追ってみます。

$ redis-cli XADD order_stream * data '{"order_id":1234,"user":"alice"}'
1781709769104-0
$ redis-cli XADD order_stream * data '{"order_id":5678,"user":"bob"}'
1781709769108-0

まずイベントを2件追記しました。

$ redis-cli XGROUP CREATE order_stream email 0 MKSTREAM
OK

それからグループを作ります。 0 は読み始める位置(先頭から)、MKSTREAM はストリームが無ければ作る指定です。

$ redis-cli XREADGROUP GROUP email c1 STREAMS order_stream >
order_stream
  1781709769104-0  data {"order_id":1234,"user":"alice"}
  1781709769108-0  data {"order_id":5678,"user":"bob"}

XREADGROUP に特殊な ID > を渡すと、このグループにまだ配っていないメッセージだけが返ります。last_id を自分で持たなくても、どこまで配ったかはグループが覚えています。

$ redis-cli XPENDING order_stream email
2                  # 処理中(未 ack)の件数
1781709769104-0    # 最小 ID
1781709769108-0    # 最大 ID
c1  2              # consumer c1 が 2 件かかえている

配られたメッセージは、ack されるまで Pending として記録されます。XPENDING で確認すると2件あることが分かります。

$ redis-cli XACK order_stream email 1781709769104-0 1781709769108-0
2
$ redis-cli XREADGROUP GROUP email c1 STREAMS order_stream >
(nil)

処理が終わったら XACK を打ちます。 すると Pending から外れ、もう一度 > で読んでも返ってきません。

注意したいのは XACK してもストリームからエントリは消えないことです。 Stream はログなので、データは XDEL / XTRIM で削るまで残ります。XACK は、このグループの、この consumer が処理し終えたという印を Pending から外すだけです。

実装

Publisher

publisher は前回と同じで、XADD で追記するだけです。

import asyncio
import json

import redis.asyncio as redis

STREAM = "order_stream"


async def main() -> None:
    client = redis.from_url("redis://localhost:6379", decode_responses=True)

    event = {"order_id": 1234, "user": "alice", "amount": 5000}
    entry_id = await client.xadd(STREAM, {"data": json.dumps(event)})

    print(f"XADD {STREAM} → id={entry_id}: {event}")
    await client.aclose()


if __name__ == "__main__":
    asyncio.run(main())

Consumer

consumer は引数のロールをグループ名にして、> で新規だけ読み、処理後に ack します。

import asyncio
import json
import sys

import redis.asyncio as redis
from redis.exceptions import ResponseError

STREAM = "order_stream"


async def handle_email(event: dict) -> None:
    print(f"[email]  {event['user']} に注文完了メールを送信(order {event['order_id']})")


async def handle_points(event: dict) -> None:
    points = event["amount"] // 100   # 100円で1ポイント
    print(f"[points] {event['user']}{points} ポイント付与(order {event['order_id']})")


HANDLERS = {"email": handle_email, "points": handle_points}


async def ensure_group(client: redis.Redis, group: str) -> None:
    # グループを作る。id="0" で先頭から、無ければストリームごと作成。
    # 既にあれば BUSYGROUP になるので握りつぶす。
    try:
        await client.xgroup_create(STREAM, group, id="0", mkstream=True)
    except ResponseError as e:
        if "BUSYGROUP" not in str(e):
            raise


async def main(role: str, consumer_name: str) -> None:
    group = role
    handler = HANDLERS[role]
    client = redis.from_url("redis://localhost:6379", decode_responses=True)

    await ensure_group(client, group)
    print(f"[{group}/{consumer_name}] グループ '{group}' で待機中...")

    while True:
        # ">" = このグループにまだ配っていない新規メッセージだけ受け取る
        resp = await client.xreadgroup(group, consumer_name, {STREAM: ">"}, block=0)
        for _stream, entries in resp:
            for entry_id, fields in entries:
                event = json.loads(fields["data"])
                await handler(event)
                await client.xack(STREAM, group, entry_id)   # 処理済みを ack


if __name__ == "__main__":
    role = sys.argv[1] if len(sys.argv) > 1 else "email"
    consumer_name = sys.argv[2] if len(sys.argv) > 2 else f"{role}-1"
    asyncio.run(main(role, consumer_name))

前回との違いは、開始位置に last_id ではなく > を渡すところと、処理後に xack するところです。位置はグループが Redis 側で覚えるので自分で持つ必要がなくなりました。

ensure_groupXGROUP CREATE は、同じグループが既にあると BUSYGROUP エラーになります。 起動のたびに呼ぶので2回目以降は必ず出ますが、グループは一度あれば十分なのでその場合だけ無視しています。

動かしてみる

# ターミナル1:Redis
redis-server

# ターミナル2:consumer はまだ起動せず、publisher だけ2回
python publisher.py
python publisher.py

consumer を起動する前に、publisher を2回実行しておきます。

# ターミナル3
$ python consumer.py email
[email/email-1] グループ 'email' で待機中...
[email]  alice に注文完了メールを送信(order 1234)
[email]  alice に注文完了メールを送信(order 1234)

email グループの consumer を起動します。グループを 0 で作るので、起動前の2件も受け取って ack します。

# ターミナル3:再起動しても、ack 済みは再配送されない
$ python consumer.py email
[email/email-1] グループ 'email' で待機中...

ここで consumer を一度止めて、また起動してみます。> は配っていない分だけを返すので、ack 済みの2件は読み直しません。 last_id を持っていないのに 0 に戻らないのは、位置をグループが覚えているからです。この状態で publisher をもう1回実行すると、新規の1件だけが届きます。

# ターミナル4
$ python consumer.py points
[points/points-1] グループ 'points' で待機中...
[points] alice に 50 ポイント付与(order 1234)
[points] alice に 50 ポイント付与(order 1234)
[points] alice に 50 ポイント付与(order 1234)

points グループの consumer を起動すると、email とは独立に全件を受け取ります。 グループごとに位置が別なので email が処理済みでも points は最初から全部受け取っています。

おわりに

位置の管理を Redis に任せることで再起動しても続きから受け取れるようになりました。 ただし今の実装では、処理の途中で落ちたメッセージは > では拾えないのでそこは別途考えたいです。

0