はじめに
これはRabbitMQのチュートリアルを雑に日本語に訳してみた -Hello World-の続きです。
言語は引き続きPythonです。
この表示はチュートリアルの内容から脱線していることを意味します。
これより下がチュートリアル(part2)の雑訳です。
Work Queues
(pikaというRabbitMQクライアントを使う)
前提
前提
RabbitMQがローカルにインストールされていて、localhost:5672
で起動していること。
異なるホストとかで起動している場合は自分で頑張って設定してね。
個人的なおすすめの動作環境
ローカルにRabbitMQをインストールしたくない人も多いと思うので、私のおすすめとしてDocker(Docker Compose)で起動する方法を紹介します。
以下のcompose.yml
を作成してdocker compose up -d
するだけで起動できます。
services:
rabbitmq:
container_name: rabbitmq
image: rabbitmq:latest
ports:
- 5672:5672
- 15672:15672
※rabbitmq:latest
のイメージを使って上手く動作しない場合はバージョンを少し落として再度起動してみてください。(私はなんかrabbitmq:latest
だと上手くいかなかった...)
What This Tutorial Focuses On
前回のチュートリアルでは、名前付きキューから受送信するためのプログラムを書きました。今回は時間がかかるタスクを複数のワーカーに分配するために使われるワークキューを作成します。
ワークキュー(aka: タスクキュー)の背後にある主な目的は、リソースを集中的に即座に実行して、それが完了するまで待たされるのを避けることにある。その代わりにタスクを後で実行するようにスケジュールします。メッセージとしてタスクをカプセル化してキューに送信します。バックグランドで実行されているワーカープロセスはタスクを取り出して、最終的にジョブが実行されます。たくさんのワーカーを起動した時はワーカー間でそれらを共有します。
このコンセプトは短いHTTPリクエスト間では処理することが不可能なWebアプリケーションで特に役立ちます。
前回のパートでは"Hello World!"
が含まれたメッセージを送信しました。今回は、複雑なタスクを表した文字列を送っていきます。time.sleep()
を使って時間のかかるタスクを偽装します。文字列のドット"."
を複雑さとして、ドットの数分だけ秒数がかかるようにします。例えば、"Hello..."
というメッセージの場合は3秒スリープさせます。
前回作成したsend.py
を修正してコマンドラインから任意のメッセージを送れるようにします。このプログラムはタスクをワークキューにスケジュールするため、new_task.py
という名前にします。
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent %r" % message)
receive.py
も受信したメッセージのドットの数だけ仕事をしているフリをするように修正する必要があります。メッセージをキューから取り出してタスクを実行するため、worker.py
とします。
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
Round-robin dispatching
タスクキューを使う利点の一つは作業の並列化が簡単にできることです。もしワーカーのバックログを構築していればワーカーを追加するだけで簡単にスケールすることができます。
2つのworker.py
を同時に起動してみましょう。2つのワーカーは両方ともキューからメッセージを取得しますが、正確にはどのようになっているのでしょうか?みていきましょう。
3つのコンソールを開く必要があります。2つのworker.py
を実行します。これらは2つのコンシューマー(C1,C2)です
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
コンシューマを開始したらら、3つ目のコンソールで以下のようにメッセージをPublishします。
# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
C1とC2のコンソールを確認すると以下のようになっています。
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
デフォルトでは、RabbitMQはメッセージを各コンシューマーに順番に送ります。平均してすべてのコンシューマーは同じ数のメッセージを受け取ります。このような分配方法をラウンドロビンと呼びます。3つ以上のワーカーで試してみてください。
Message acknowledgment
タスクの処理には数秒かかる場合があります。コンシューマーが処理の重いタスクが完了する前に終了してしまった場合はどうなるでしょう。現状のコードではコンシューマーがメッセージを受信するとすぐにメッセージは削除されてしまいます。特定のワーカーに送信されたが、完了していないメッセージも失われてしまいます。
ワーカーが終了した場合、メッセージを他のワーカーに再度送信することができます。
メッセージを失いたくない場合、RabbitMQはmessage acknowledgmentsをサポートしています。特定のメッセージが受信され、そして処理されたことを伝え、RabbitMQが自由にメッセージを消せるようにack(nowledgement)はコンシューマーからRabbitMQに送り返されます。
ackの送信をしないままコンシューマーが(チャンネルのクローズ、接続のクローズ、TCPコネクションの喪失などで)終了した場合、RabbitMQはそれを認識して再びメッセージをキューイングをします。そして、他のコンシューマーがオンライン状態の場合即座にメッセージが受信されます。このようにすることで、コンシューマーが予期せぬ終了をした場合でもメッセージの損失を防ぐことができます。
タイムアウト(デフォルトは30分)は処理がスタックしているコンシューマーの検知に役立ちます。タイムアウト時間の設定はDelivery Acknowledgement Timeoutで可能です。
Manual message acknowledgmentsはデフォルトでONになっています。前回の例ではauto_ack=True
フラグで明示的にOFFに設定していました。このフラグを削除して、タスクを完了させた後に適切なacknowledgmentをワーカーから送信するようにします。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.') )
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
上記のコードを使うことで、CTRL+Cでワーカーがメッセージの処理中に終了してもメッセージを失わずに済みます。全てのunacknowledgedなメッセージは再送されます。
Acknowledgementはメッセージを受け取ったチャネルで送る必要があります。別のチャネルからAcknowledgementを送った場合は例外が発生します。詳しくは→doc guide on confirmations
basic_ack
を送り損ねるのはよくあるミスですが、深刻な問題につながります。RabbitMQは未確認のメッセージを解放できないため多くのメモリを消費してしまいます。
rabbitmqctlを使ってmessages_unacknowledgedを表示させることで、未確認のメッセージがあるか確認することができます。
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windowsの場合はsudoがいらないので↓:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message durability
コンシューマーが異常終了してもメッセージを失わなくてすむ方法を学んだが、RabbitMQサーバーじたいが停止した場合はメッセージを失ってしまう。
RabbitMQが終了したりクラッシュした時は、キューやメッセージを失わないように設定をしないと、キューやメッセージは失われてしまう。キュートメッセージの両方をdurable
としてマークすることでこれを実現できる。
まず、RabbitMQが再起動後もキューが存続することを確認するために、キューにdurable
と宣言する。
channel.queue_declare(queue='hello', durable=True)
すでにdurable
でないhello
キューを定義済みであるため上記のコマンドは期待通り動作してくれない。RabbitMQは存在するキューの再定義は許容していないのでエラーとなる。task_queue
などの別名でキューを定義してこれを回避しましょう。
channel.queue_declare(queue='task_queue', durable=True)
queue_declare
の変更はプロデューサーとコンシューマーの両方に適応します。
この時点で、task_queue
はRabbitMQを再起動した後でも失われないようになりました。
メッセージの永続化をするには、delivery_mode
プロパティにpika.spec.PERSISTENT_DELIVERY_MODE
を設定します。
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
))
メッセージの永続化をするにあたっての注意点
メッセージの永続化を有効にすることはメッセージの損失を完全に保証するものではありません。RabbitMQがメッセージをディスクに保存するまでに短な間隔があります。また、RabbitMQはすべてのメッセージに対してfsync(2)
1を実行するわけではありません。-- つまりキャッシュに保存するだけディスクに書き込まれない場合があります。永続性の保証は強力ではないですが、単純なタスキューには十分です。もし、より強力な保証が必要な場合は、publisher confirmsを使用することができます。
Fair dispatch
お気づきかもしれないが、ディスパッチは私たちが期待した通り動作していません。例えば2つのワーカーが起動しているとします。すべての偶数番目のメッセージが重く、奇数番目のメッセージが軽い場合、特定のワーカーに処理が集中してしまいます。RabbitMQはそんなことは知らずにただ均等にメッセージをディスパッチします。
これは、RabbitMQはメッセージがキューに入った時にメッセージをディスパッチするからです。コンシューマーの未確認(unacknowledged
)メッセージは確認しません。ただn番目のコンシューマーにn番目ごとのメッセージを何も考えずにディスパッチします。
prefetch_count=1
を設定したChannel#basic_qos
メソッドを使ってこれを打開します。これはbasic.qos
プロトコルメソッドを使用して一度に1つ以上のメッセージが同時にワーカーに送信されないようにRabbitMQに指示します。つまり、ワーカーが前のメッセージを処理し確認(acknowledged
)するまで新しいメッセージをディスパッチしないということです。その代わり次のメッセージは別の忙しくないワーカーにディスパッチされます。
キューのサイズについての注意点
すべてのワーカーがビジーな場合、あなたのキューはまんぱんになるかもしれません。キューのメッセージ数を監視してワーカーを追加するか、message TTLを使うことをおすすめします。
Putting it all together
これまでの全体
実際のコードはここ↓
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
))
print(" [x] Sent %r" % message)
connection.close()
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
メッセージ確認(acknowledgments
)とprefetch_count
を使ってワークキューを設定できます。また、durability
オプションでRabbitMQが再起動した場合でもタスクを存続させることができます。
Part3に進んで、複数のコンシューマーに同じメッセージを届ける方法について学びましょう。