RQ は、Redis をメッセージブローカー兼データストアとして利用することで分散タスクキューを実現する Python ライブラリ。
RQ: Simple job queues for Python
Redis はリスト型の LPUSH
と RPOP
だけでタスクキューっぽい動きができて、RQ はそれを薄くラップしただけのシンプルな API という印象。
Celery + RabbitMQ の方が高機能だが、シンプルなタスクキューでいいという場合には RQ の方がとっつきやすい (と思っている)。
動かしてみる
手動でプロセスを複数立ち上げるのも面倒なので、 Docker Compose でサクッと試してみる。
実際に運用する際はちゃんと Redis サーバと Worker サーバ複数を用意してやるはず。
FROM python:3.6
RUN pip install rq
Python 3 に RQ を追加しただけの Docker イメージ。
version: '3'
services:
redis:
image: redis
worker:
build: .
depends_on:
- redis
environment:
RQ_REDIS_URL: redis://redis
command: python worker.py
volumes:
- .:/app
working_dir: /app
Redis と Worker を定義した Docker Compose ファイル。
import os
import logging
import redis
import rq
logging.basicConfig(level=logging.DEBUG)
with rq.Connection(redis.from_url(os.environ.get('RQ_REDIS_URL'))):
worker = rq.Worker(['default'])
worker.work()
Worker のエントリポイントとなるスクリプト。
RQ Client を使えば Worker スクリプトを書かなくても Worker を起動できる (rq worker
) が、独自の例外処理をしたい場合 (後述) など、Worker スクリプトで設定を記述するケースが多々あるっぽいので現実的には書いた方がいいと思われる。
Redis と Worker を起動する
$ docker-compose up --scale worker=4
Worker は 4 プロセス起動させてみた。
タスクを実行する
import logging
logger = logging.getLogger(__name__)
def add(a, b):
logger.debug('{} + {} = {}'.format(a, b, a + b))
return a + b
足し算を行うだけのタスク add
を定義した。
import os
from time import sleep
import redis
from rq import Queue
from tasks import add
q = Queue(connection=redis.from_url(os.environ.get('RQ_REDIS_URL')))
# 10個のタスクの実行をキューに投げる
tasks = [q.enqueue(add, args=(i, 1)) for i in range(10)]
# タスク実行が完了するまで少し待つ
sleep(1)
# 結果を出力する
print([task.result for task in tasks])
タスクをキューに投げる処理を記述した。
$ docker-compose run --rm worker python app.py
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
実行すると、正しく実行結果が返ってきていることが確認できる。
Worker の方のログを見ると、10タスクを4つの Worker で分散実行できていることがわかると思う。
その他
実際に運用する上で気になる話など。
例外処理
RQ のデフォルト設定では、例外発生時には move_to_failed_queue
というハンドラが実行され、例外が発生したタスクを failed
キューに移動させる。これは本当にただ失敗タスクを移動してデータが消えないように保持しておいてくれるだけで、特にリトライや通知をしてくれるものではない。そのため、例えば例外が発生したら社内チャットツールに通知するなど、独自の例外処理を行いたい場合は Worker に exception_handlers
を指定する。
import os
import logging
import redis
import rq
from rq.handlers import move_to_failed_queue # デフォルトのエラーハンドラ
logging.basicConfig(level=logging.DEBUG)
def notify_error(job, exc_type, exc_value, traceback):
"""エラー発生をどこかに通知する処理"""
with rq.Connection(redis.from_url(os.environ.get('RQ_REDIS_URL'))):
worker = rq.Worker(
['default'],
exception_handlers=[move_to_failed_queue, notify_error],
)
worker.work()
リトライ
RQ にはリトライの仕組みはないっぽい?ので、やりたければエラーハンドラを追加して自力で実装する必要がありそう。
ちゃんとしたリトライの仕組みがどうしても必要なのであれば、RQ じゃなくて Celery の方を検討した方がいい。
タイムアウト
enqueue
時に timeout
でタスク実行のタイムアウト秒数を指定できる。
(3m
や 1h
のような文字列でも指定できる。)
task = q.enqueue(add, args=(1, 2), timeout=60)
指定した秒数を超えた場合 rq.timeouts.JobTimeoutException
が投げられる。
task.is_failed #=> True
TTL
result_ttl
で実行結果をいつまで保持するかの秒数を指定できる。
task = q.enqueue(add, args=(1, 2), result_ttl=60)
0
を指定すると「保持しない (実行結果はすぐに破棄される)」
-1
を指定すると「有効期限なし」
デフォルトでは 500 秒となっている。
定期実行
Cron のように定期的にあるタスクを実行したいというケースがあるかもしれないが、RQ にはそのような仕組みはないのでこれも自力で実装する必要がある。
Celery であれば Celery Beat という機能がある。