Celery + Redis で画像リサイズの非同期タスクキューを作ってみた
タスクキューを使った処理を書きたかったので、画像 URL を受け取ってリサイズする API を書いてみました。 FastAPI + Celery + Redis を使っています。
全体の構成
重い画像のリサイズをリクエスト内で処理するとレスポンスが詰まるため、受付だけして即座に返し、実処理はタスクキューを挟んで別プロセスのワーカーに任せます。
[クライアント] → [FastAPI] → [Redis(キュー)] → [Celery ワーカー]
│ │
└───── Redis(ステータス) ←───────────┘
API は依頼をキューに積んだら、完了を待たずに task_id を返します。
あとはワーカーが裏で処理してステータスを書き込み、クライアントは別途その task_id で進捗を確認して結果を受け取ります。
Redis はこのキューと進捗を記録するステータス置き場の両方に使います。
Redis をキューに使う仕組み
Redis は KV ストアですが、値として List を持てるので、これをキューに使えます。 タスクを List に push し、ワーカーが pop する。入れた順に取り出せば FIFO の待ち行列になります。
ただ pop するだけだと、タスクが無いときは空振りしてしまうので、BRPOP でタスクが積まれるまで待機し、入った瞬間に取り出します。
BRPOP myqueue 0 # 来るまで待ち、積まれた瞬間に取り出す
ステータスをどう持つか
クライアントは task_id だけ持っていて、あとからステータスを確認するので、ジョブの状態をどこかに記録する必要があります。
Celery でも状態を持てるっぽいんですが、今回は勉強も兼ねて Redis に状態を自前で書き込む方式にしています。
ステータスの遷移は次のとおりです。
queued → processing → done
└→ failed
queued: API がキューに積んだ直後processing: ワーカーが処理を始めたdone: 保存まで成功(output_pathも一緒に記録)failed: ダウンロードやリサイズで失敗(errorも一緒に記録)
書き込みは API とワーカーの両方から行うので、共通のモジュールを用意して Redis にアクセスさせます。
import json
import redis
# Redis への接続。decode_responses=True で取得値を str として受け取る(bytes にしない)
client = redis.Redis.from_url("redis://localhost:6379/0", decode_responses=True)
def set_status(task_id: str, data: dict) -> None:
# dict を JSON 文字列にして "task:{task_id}" というキーで保存する
client.set(f"task:{task_id}", json.dumps(data))
def get_status(task_id: str) -> dict | None:
# キーを引いて、あれば JSON を dict に戻す。無ければ None(=未登録のID)
raw = client.get(f"task:{task_id}")
return json.loads(raw) if raw else None
実装
API: キューを積んですぐ返す
# main.py
@app.post("/resize")
def resize(req: ResizeRequest):
# width / height は正の値だけ受け付ける(0 や負値は弾く)
if req.width <= 0 or req.height <= 0:
raise HTTPException(status_code=400, detail="width and height must be positive")
task_id = uuid.uuid4().hex[:12] # ジョブの識別子を自分で発行
store.set_status(task_id, {"status": "queued"}) # まず queued を記録
resize_task.delay(task_id, req.url, req.width, req.height) # キューに積むだけ(処理は待たない)
return {"task_id": task_id, "status": "queued"} # 処理完了を待たず即レスポンス
POST /resize は、自分で task_id を発行し、Redis にステータスとして queued を書いてからタスクをキューに積みます。
.delay(...) はここでリサイズを実行するわけではなく、タスクの内容(関数名と引数)を Redis のキューに追加するだけなので、ブロックせずすぐ返ります。
GET /status/{task_id} は Redis を読むだけです。
@app.get("/status/{task_id}")
def status(task_id: str):
data = store.get_status(task_id) # Redis からステータスを引く
if data is None: # 未登録の ID なら 404
raise HTTPException(status_code=404, detail="task not found")
return {"task_id": task_id, **data} # 保存した status / output_path などをそのまま返す
ワーカー: 実際の処理
# worker.py
# broker = タスクを積むキュー、backend = 結果置き場。どちらも Redis を使う
celery_app = Celery("image_resizer", broker=BROKER_URL, backend=BROKER_URL)
@celery_app.task(name="resize_task")
def resize_task(task_id: str, url: str, width: int, height: int) -> None:
store.set_status(task_id, {"status": "processing"}) # 取り出した瞬間に processing へ
try:
resp = requests.get(url, timeout=10) # 画像をダウンロード
resp.raise_for_status() # 4xx/5xx ならここで例外
data = resize_image(resp.content, width, height) # ← 純粋なリサイズ(後述)
os.makedirs(OUTPUT_DIR, exist_ok=True) # 保存先が無ければ作る
with open(os.path.join(OUTPUT_DIR, f"{task_id}.jpg"), "wb") as f:
f.write(data) # output/{task_id}.jpg に保存
# 成功。出力先パスも一緒に記録しておく
store.set_status(task_id, {"status": "done", "output_path": f"./output/{task_id}.jpg"})
except Exception as e:
# ダウンロード・リサイズ・保存のどこで失敗しても failed として記録
# (例外を握るのでワーカー自体は止まらず、次のタスクに進める)
store.set_status(task_id, {"status": "failed", "error": str(e)})
Celery がキューから resize_task を取り出すとこの関数が呼ばれ、ステータスを processing に更新し、ダウンロード・リサイズ・保存を実行、結果に応じてステータスを done / failed に更新という処理を行います。
動かしてみる
3つのプロセスを、それぞれ別のターミナルで起動します。
# Redis(キュー兼ステータス置き場)
redis-server
# ワーカー(処理役)— 起動しっぱなしにする常駐プロセス
celery -A worker worker --loglevel=info
# API サーバー(受付役)
uvicorn main:app
ワーカーは API とは別プロセスで常駐しており、BRPOP でキューを見張り続けてタスクが積まれるまで待機 → 積まれた瞬間に取り出して処理、を繰り返しています。
API はタスクを積むだけなので、ワーカーを起動していないとステータスは queued のまま進みません。
この状態でジョブを投げます。
$ curl -s -X POST http://localhost:8000/resize \
-H "Content-Type: application/json" \
-d '{"url": "https://picsum.photos/400/300", "width": 100, "height": 100}'
{"task_id":"09124dfdbc4e","status":"queued"}
即座に queued で task_id が返ってきました。あとはこのIDでステータスを覗きます。
$ curl -s http://localhost:8000/status/09124dfdbc4e
{"task_id":"09124dfdbc4e","status":"processing"}
$ curl -s http://localhost:8000/status/09124dfdbc4e
{"task_id":"09124dfdbc4e","status":"done","output_path":"./output/09124dfdbc4e.jpg"}
queued → processing → done と、設計したとおりに状態が動きました。
>>> from PIL import Image
>>> im = Image.open("output/09124dfdbc4e.jpg")
>>> im.size, im.mode, im.format
((100, 100), 'RGB', 'JPEG')
保存された画像も確かに 100x100 の JPEG になっています。
おわりに
受け付けて即返す API と後ろで処理するワーカーを分けるだけで、重い処理も詰まらずにさばけるようになりました。 複数ワーカーでの分散処理なども試してみたいです。