最近仕方なくMySQLでジョブを管理するコードを書いたが、sidekqやCeleryなどメジャーなツールのバックエンドによくRedisが使われているということで調べてみた。
実現方法
ジョブはRedisのLISTデータ型に格納する。
ジョブを追加する際はRPUSHでリストの後ろから追加し、ジョブを取得する際はBLPOPを使って先頭から取得する。
BLPOPはリストからの先頭の要素を取得しつつ削除するコマンドだが、リストが空の場合はリストに要素が追加されるかタイムアウトまで待つ(ブロッキング)。このコマンドのおかげでワーカーは何度もジョブの有無を確認(ポーリング)しなくて済むし、ジョブが追加されたら即座に取得して実行できる。
利用コマンド
コマンド | 動作 |
---|---|
BLPOP (Blocking Left Pop?) |
リストの先頭から要素を取得しつつ削除する。 リストが空の場合は追加されるかタイムアウトまで待つ。 |
RPUSH (Right Push?) |
リストの後ろに要素を追加する。 |
CLIENT ID | 現在のConnectionを識別するclient_idを取得する。 |
CLIENT UNBLOCK | client_idのブロッキングを解除する。 |
コード
PythonのRedisライブラリを使って実装した。
スレッドでワーカーを2つ動かし、標準入力からの文字列をジョブとしてキューに追加している。
ワーカーを終了時にBLPOPのタイムアウトを待たないと綺麗に終了できず悩んだが、CLIENT UNBLOCKコマンドを使うことでブロッキング状態を解除し、すぐに終了させることができた。
import redis
from threading import Thread
class WorkerThread(Thread):
def __init__(self, key):
super().__init__()
# single_connection_clientにTrueをセットすると、RedisClientが使うConnectionが固定される
# Connectionを固定しないとclient_idの違うConnectionでBLPOPが実行される可能性があり、CLIENT UNBLOCKが実行できなくなる
self.client = redis.Redis(
host='redis', port=6379, db=0, single_connection_client=True
)
self.client_id = self.client.client_id()
self.key = key
self.running = True
def run(self):
while self.running:
message = self.client.blpop(self.key, timeout=30)
if (message):
print("{}: {}".format(self.client_id, message))
def stop(self):
self.running = False
# self.clientはConnectionが固定されているので、ブロッキング中に追加でコマンドを実行できない
# CLIENT UNBLOCK実行のため新しいクライアントを作成する
client = redis.Redis(host='redis', port=6379, db=0)
client.client_unblock(self.client_id)
def main(n_workers=2):
client = redis.Redis(host='redis', port=6379, db=0)
threads = [WorkerThread("queue") for _ in range(n_workers)]
try:
for t in threads: t.start()
# 空文字入力で終了
s = input()
while s:
client.rpush("queue", s)
s = input()
finally:
print("waiting for workers stopped")
for t in threads:
t.stop()
for t in threads: t.join()
if __name__ == '__main__':
main()
実行結果
文字を入力すると、2つのワーカーが順番にprintすることが確認できた。
/app # python3 redis_queue.py
A
125: (b'queue', b'A')
B
126: (b'queue', b'B')
C
125: (b'queue', b'C')
D
126: (b'queue', b'D')
waiting for workers stopped