2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RedisでJobQueueを実装する

Last updated at Posted at 2019-09-21

最近仕方なくMySQLでジョブを管理するコードを書いたが、sidekqCeleryなどメジャーなツールのバックエンドによくRedisが使われているということで調べてみた。

実現方法

ジョブはRedisのLISTデータ型に格納する。

ジョブを追加する際はRPUSHでリストの後ろから追加し、ジョブを取得する際はBLPOPを使って先頭から取得する。

BLPOPはリストからの先頭の要素を取得しつつ削除するコマンドだが、リストが空の場合はリストに要素が追加されるかタイムアウトまで待つ(ブロッキング)。このコマンドのおかげでワーカーは何度もジョブの有無を確認(ポーリング)しなくて済むし、ジョブが追加されたら即座に取得して実行できる。

利用コマンド

コマンド 動作
BLPOP
(Blocking Left Pop?)
リストの先頭から要素を取得しつつ削除する。
リストが空の場合は追加されるかタイムアウトまで待つ。
RPUSH
(Right Push?)
リストの後ろに要素を追加する。
CLIENT ID 現在のConnectionを識別するclient_idを取得する。
CLIENT UNBLOCK client_idのブロッキングを解除する。

コード

PythonのRedisライブラリを使って実装した。
スレッドでワーカーを2つ動かし、標準入力からの文字列をジョブとしてキューに追加している。

ワーカーを終了時にBLPOPのタイムアウトを待たないと綺麗に終了できず悩んだが、CLIENT UNBLOCKコマンドを使うことでブロッキング状態を解除し、すぐに終了させることができた。

import redis
from threading import Thread


class WorkerThread(Thread):
  def __init__(self, key):
    super().__init__()
    # single_connection_clientにTrueをセットすると、RedisClientが使うConnectionが固定される
    # Connectionを固定しないとclient_idの違うConnectionでBLPOPが実行される可能性があり、CLIENT UNBLOCKが実行できなくなる
    self.client = redis.Redis(
        host='redis', port=6379, db=0, single_connection_client=True
    )
    self.client_id = self.client.client_id()
    self.key = key
    self.running = True

  def run(self):
    while self.running:
      message = self.client.blpop(self.key, timeout=30)
      if (message):
        print("{}: {}".format(self.client_id, message))

  def stop(self):
    self.running = False
    # self.clientはConnectionが固定されているので、ブロッキング中に追加でコマンドを実行できない
    # CLIENT UNBLOCK実行のため新しいクライアントを作成する
    client = redis.Redis(host='redis', port=6379, db=0)
    client.client_unblock(self.client_id)


def main(n_workers=2):
  client = redis.Redis(host='redis', port=6379, db=0)
  threads = [WorkerThread("queue") for _ in range(n_workers)]

  try:
    for t in threads: t.start()
    # 空文字入力で終了
    s = input()
    while s:
      client.rpush("queue", s)
      s = input()
  finally:
    print("waiting for workers stopped")
    for t in threads:
      t.stop()

    for t in threads: t.join()


if __name__ == '__main__':
  main()

実行結果

文字を入力すると、2つのワーカーが順番にprintすることが確認できた。

/app # python3 redis_queue.py
A
125: (b'queue', b'A')
B
126: (b'queue', b'B')
C
125: (b'queue', b'C')
D
126: (b'queue', b'D')

waiting for workers stopped
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?