Quorum Consensus を Python で試してみた

4 min
backendpython

分散データストアで、書き込みを何台に・読み取りを何台から成功とみなすかを決める Quorum を Python で書いて動かしてみました。

なぜ全員一致ではダメなのか

データを複数台に複製しておくと耐障害性が上がりますが、全 N 台に書けたら成功にすると 1 台落ちただけで書き込みが止まります。 逆に 1 台でも書けたら成功だと、その台が壊れるとデータを失います。クォーラムはその間を取る方式で、N・W・R という 3 つの数字でその塩梅を決めます。

N / W / R とは何か

  • N: レプリカの総数
  • W: 書き込みは N 台中 W 台の ACK で成功とみなす
  • R: 読み取りは N 台中 R 台の応答で確定する

W と R を小さくすると可用性が、大きくすると一貫性が上がります。

W + R > N で最新が読める

W + R > N なら、書いた W 台と読んだ R 台は必ず 1 台以上重なります。鳩の巣原理というやつですね。 重なった台が最新バージョンを持つので、読み取りは必ず最新値を拾えます。

N=3, W=2, R=2(W+R=4 > 3)なら、書いた台と読んだ台は必ず重なります。

s0s1s2
書いた台 (W=2)
読んだ台 (R=2)

この場合、s1 が両方に入ってるので最新を拾えます。

一方 N=3, W=1, R=1(W+R=2 ≤ 3)だと重ならないことがあります。

s0s1s2
書いた台 (W=1)
読んだ台 (R=1)

この読み方の場合は s1 が最新を持たないので、古い値が返ります。

実装

レプリカ 1 台を Server、N 台の仲介役を Coordinator として書きます。

Server はキーごとに (値, バージョン) を持つだけのストアです。バージョンは整数で、読み取り時に一番大きいものを最新とみなします(last-writer-wins)。

class Server:
    def __init__(self, id: str):
        self.id = id
        self._alive = True
        self._store: dict[str, tuple[str, int]] = {}  # key -> (値, バージョン)

    def kill(self):    self._alive = False  # ダウンさせる
    def recover(self): self._alive = True   # 復帰(落ちてた間の書き込みは持っていない)

    def write(self, key, value, version):
        if not self._alive:
            raise ServerDownError(f"{self.id} はダウン中")
        # 古いバージョンで最新値を上書きしないよう、新しいときだけ反映する
        cur = self._store.get(key)
        if cur is None or version > cur[1]:
            self._store[key] = (value, version)

    def read(self, key):
        if not self._alive:
            raise ServerDownError(f"{self.id} はダウン中")
        return self._store.get(key)  # (値, バージョン) または None

Coordinator が put / get でクォーラムを判定します。put は全台へ書き込みを送り、受け取れた(生存)台数を ACK として数え、W 未満なら失敗とします。

class Coordinator:
    def __init__(self, servers, N, W, R):
        self.servers, self.N, self.W, self.R = servers, N, W, R

    def put(self, key, value) -> int:
        # 1. このキーに付ける「新しいバージョン番号」を決める。
        #    読み取り時にバージョンを比べて最新を選ぶので、書き込むたびに
        #    番号が必ず増えるようにしたい。そこで「いま生きている台が持つ
        #    最大バージョン」を調べ、それに +1 する。
        cur_max = 0
        for s in self.servers:
            # r = (値, バージョン)。まだ誰も書いていなければ read は None
            if s.is_alive() and (r := s.read(key)) is not None:
                cur_max = max(cur_max, r[1])  # r[1] がバージョン番号
        version = cur_max + 1  # 既存の最大より 1 大きい = これが最新

        # 2. 全台へ送り、受け取れた(生存)台数を ACK として数える
        acks = 0
        for s in self.servers:
            if s.is_alive():
                s.write(key, value, version)
                acks += 1

        # 3. クォーラム判定: ACK が W 台に満たなければ失敗
        if acks < self.W:
            raise QuorumNotMetError(f"書き込み未達: ACK {acks} < W={self.W}")
        return version

get は先頭から生存サーバを R 台集め、その中で一番バージョンが大きい値を返します。 R が小さいと古いレプリカだけを読んでしまうことがあり、古い値が返る原因になります。

    def get(self, key) -> str | None:
        responses = []
        for s in self.servers:
            if not s.is_alive():
                continue
            responses.append(s.read(key))
            if len(responses) >= self.R:
                break  # R 台集まったら確定(早く返った R 台で決める)

        if len(responses) < self.R:
            raise QuorumNotMetError(f"読み取り未達: 応答 {len(responses)} < R={self.R}")

        # R 台の応答のうち、最新バージョン(最大)の値を返す
        present = [r for r in responses if r is not None]
        if not present:
            return None
        value, _version = max(present, key=lambda r: r[1])
        return value

これらを KVStore(N, W, R) でまとめ、store.put(...) / store.get(...) / store.servers["s0"].kill() で触れるようにしておきます。

動かしてみる

1 台ダウンしても書き込める

N=3, W=2, R=2 で、サーバを落としながら書き込んでみます。

>>> store = KVStore(N=3, W=2, R=2)
>>> store.put("name", "Alice"); store.get("name")
'Alice'
>>> store.servers["s0"].kill()      # 1 台ダウン
>>> store.put("name", "Bob")        # 生存 2 台 = W なので成功
2
>>> store.get("name")
'Bob'
>>> store.servers["s1"].kill()      # さらに 1 台ダウン(生存 1 台)
>>> store.put("name", "Carol")
QuorumNotMetError: 書き込み未達: ACK 1 < W=2

1 台落ちても W=2 で書けますが、2 台落ちると ACK が足りず失敗します。W が「何台までの同時故障に耐えるか」を決めます。

W+R>N なら常に最新が返る

v1 を全台に書いたあと s0 を落として v2 を書き、s0 を復帰させると、s0 だけ v1 のまま取り残されます。この状態を W=2, R=2(W+R=4 > 3)で読みます。

>>> store = KVStore(N=3, W=2, R=2)
>>> store.put("k", "v1")            # 全台 v1
>>> store.servers["s0"].kill()
>>> store.put("k", "v2")            # s1, s2 だけ v2
>>> store.servers["s0"].recover()   # s0 は v1 のまま
>>> store.get("k")
'v2'

読むのは先頭 2 台の s0, s1 です。古い s0 (v1) も読みますが、同時に読んだ s1 が新しい v2 を持つので、バージョンの大きい v2 が返ります。W+R>N なら、どの R 台を読んでも最新を持つ台が必ず 1 つ混ざります。

W+R≤N だと古い値が返ることがある

同じ手順で古いレプリカを作り、今度は W=1, R=1(W+R=2 ≤ 3)で読みます。

>>> store = KVStore(N=3, W=1, R=1)
>>> store.put("k", "v1")
>>> store.servers["s0"].kill()
>>> store.put("k", "v2")            # s1, s2 だけ v2
>>> store.servers["s0"].recover()   # s0 は v1 のまま
>>> store.get("k")                  # 先頭の s0 だけ読む
'v1'

s1, s2 が v2 を持つのに、R=1 だと先頭の古い s0 だけ読んで確定してしまい、v1 (古い値) が返ります。

おわりに

W と R を動かすだけで、可用性と一貫性のトレードオフを選べるのは面白いですね。

今回は試していませんが、性能のチューニングにも効きそうです。W を大きくすると 1 回の書き込みで待つ台数が増え、R を大きくすると読み取りで待つ台数が増えます。 そのため Read と Write のうち量が少ないほうに大きい値を寄せれば、よく使う操作を軽くできて有利そうです。