Quorum Consensus を Python で試してみた
分散データストアで、書き込みを何台に・読み取りを何台から成功とみなすかを決める Quorum を Python で書いて動かしてみました。
なぜ全員一致ではダメなのか
データを複数台に複製しておくと耐障害性が上がりますが、全 N 台に書けたら成功にすると 1 台落ちただけで書き込みが止まります。 逆に 1 台でも書けたら成功だと、その台が壊れるとデータを失います。クォーラムはその間を取る方式で、N・W・R という 3 つの数字でその塩梅を決めます。
N / W / R とは何か
N: レプリカの総数W: 書き込みは N 台中W台の ACK で成功とみなすR: 読み取りは N 台中R台の応答で確定する
W と R を小さくすると可用性が、大きくすると一貫性が上がります。
W + R > N で最新が読める
W + R > N なら、書いた W 台と読んだ R 台は必ず 1 台以上重なります。鳩の巣原理というやつですね。 重なった台が最新バージョンを持つので、読み取りは必ず最新値を拾えます。
N=3, W=2, R=2(W+R=4 > 3)なら、書いた台と読んだ台は必ず重なります。
| s0 | s1 | s2 | |
|---|---|---|---|
| 書いた台 (W=2) | ● | ● | |
| 読んだ台 (R=2) | ● | ● |
この場合、s1 が両方に入ってるので最新を拾えます。
一方 N=3, W=1, R=1(W+R=2 ≤ 3)だと重ならないことがあります。
| s0 | s1 | s2 | |
|---|---|---|---|
| 書いた台 (W=1) | ● | ||
| 読んだ台 (R=1) | ● |
この読み方の場合は s1 が最新を持たないので、古い値が返ります。
実装
レプリカ 1 台を Server、N 台の仲介役を Coordinator として書きます。
Server はキーごとに (値, バージョン) を持つだけのストアです。バージョンは整数で、読み取り時に一番大きいものを最新とみなします(last-writer-wins)。
class Server:
def __init__(self, id: str):
self.id = id
self._alive = True
self._store: dict[str, tuple[str, int]] = {} # key -> (値, バージョン)
def kill(self): self._alive = False # ダウンさせる
def recover(self): self._alive = True # 復帰(落ちてた間の書き込みは持っていない)
def write(self, key, value, version):
if not self._alive:
raise ServerDownError(f"{self.id} はダウン中")
# 古いバージョンで最新値を上書きしないよう、新しいときだけ反映する
cur = self._store.get(key)
if cur is None or version > cur[1]:
self._store[key] = (value, version)
def read(self, key):
if not self._alive:
raise ServerDownError(f"{self.id} はダウン中")
return self._store.get(key) # (値, バージョン) または None
Coordinator が put / get でクォーラムを判定します。put は全台へ書き込みを送り、受け取れた(生存)台数を ACK として数え、W 未満なら失敗とします。
class Coordinator:
def __init__(self, servers, N, W, R):
self.servers, self.N, self.W, self.R = servers, N, W, R
def put(self, key, value) -> int:
# 1. このキーに付ける「新しいバージョン番号」を決める。
# 読み取り時にバージョンを比べて最新を選ぶので、書き込むたびに
# 番号が必ず増えるようにしたい。そこで「いま生きている台が持つ
# 最大バージョン」を調べ、それに +1 する。
cur_max = 0
for s in self.servers:
# r = (値, バージョン)。まだ誰も書いていなければ read は None
if s.is_alive() and (r := s.read(key)) is not None:
cur_max = max(cur_max, r[1]) # r[1] がバージョン番号
version = cur_max + 1 # 既存の最大より 1 大きい = これが最新
# 2. 全台へ送り、受け取れた(生存)台数を ACK として数える
acks = 0
for s in self.servers:
if s.is_alive():
s.write(key, value, version)
acks += 1
# 3. クォーラム判定: ACK が W 台に満たなければ失敗
if acks < self.W:
raise QuorumNotMetError(f"書き込み未達: ACK {acks} < W={self.W}")
return version
get は先頭から生存サーバを R 台集め、その中で一番バージョンが大きい値を返します。 R が小さいと古いレプリカだけを読んでしまうことがあり、古い値が返る原因になります。
def get(self, key) -> str | None:
responses = []
for s in self.servers:
if not s.is_alive():
continue
responses.append(s.read(key))
if len(responses) >= self.R:
break # R 台集まったら確定(早く返った R 台で決める)
if len(responses) < self.R:
raise QuorumNotMetError(f"読み取り未達: 応答 {len(responses)} < R={self.R}")
# R 台の応答のうち、最新バージョン(最大)の値を返す
present = [r for r in responses if r is not None]
if not present:
return None
value, _version = max(present, key=lambda r: r[1])
return value
これらを KVStore(N, W, R) でまとめ、store.put(...) / store.get(...) / store.servers["s0"].kill() で触れるようにしておきます。
動かしてみる
1 台ダウンしても書き込める
N=3, W=2, R=2 で、サーバを落としながら書き込んでみます。
>>> store = KVStore(N=3, W=2, R=2)
>>> store.put("name", "Alice"); store.get("name")
'Alice'
>>> store.servers["s0"].kill() # 1 台ダウン
>>> store.put("name", "Bob") # 生存 2 台 = W なので成功
2
>>> store.get("name")
'Bob'
>>> store.servers["s1"].kill() # さらに 1 台ダウン(生存 1 台)
>>> store.put("name", "Carol")
QuorumNotMetError: 書き込み未達: ACK 1 < W=2
1 台落ちても W=2 で書けますが、2 台落ちると ACK が足りず失敗します。W が「何台までの同時故障に耐えるか」を決めます。
W+R>N なら常に最新が返る
v1 を全台に書いたあと s0 を落として v2 を書き、s0 を復帰させると、s0 だけ v1 のまま取り残されます。この状態を W=2, R=2(W+R=4 > 3)で読みます。
>>> store = KVStore(N=3, W=2, R=2)
>>> store.put("k", "v1") # 全台 v1
>>> store.servers["s0"].kill()
>>> store.put("k", "v2") # s1, s2 だけ v2
>>> store.servers["s0"].recover() # s0 は v1 のまま
>>> store.get("k")
'v2'
読むのは先頭 2 台の s0, s1 です。古い s0 (v1) も読みますが、同時に読んだ s1 が新しい v2 を持つので、バージョンの大きい v2 が返ります。W+R>N なら、どの R 台を読んでも最新を持つ台が必ず 1 つ混ざります。
W+R≤N だと古い値が返ることがある
同じ手順で古いレプリカを作り、今度は W=1, R=1(W+R=2 ≤ 3)で読みます。
>>> store = KVStore(N=3, W=1, R=1)
>>> store.put("k", "v1")
>>> store.servers["s0"].kill()
>>> store.put("k", "v2") # s1, s2 だけ v2
>>> store.servers["s0"].recover() # s0 は v1 のまま
>>> store.get("k") # 先頭の s0 だけ読む
'v1'
s1, s2 が v2 を持つのに、R=1 だと先頭の古い s0 だけ読んで確定してしまい、v1 (古い値) が返ります。
おわりに
W と R を動かすだけで、可用性と一貫性のトレードオフを選べるのは面白いですね。
今回は試していませんが、性能のチューニングにも効きそうです。W を大きくすると 1 回の書き込みで待つ台数が増え、R を大きくすると読み取りで待つ台数が増えます。 そのため Read と Write のうち量が少ないほうに大きい値を寄せれば、よく使う操作を軽くできて有利そうです。