RabbitMQ チュートリアル2(ワークキュー)

More than 3 years have passed since last update.

RabbitMQのチュートリアル2

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

の翻訳です。

翻訳の誤りなどあればご指摘お待ちしております。


前提条件

このチュートリアルでは、RabbitMQのがインストールされ、ローカルホストの標準のポート(5672)上で実行されている前提とします。別のホスト、ポート、または資格情報を使用する場合には、接続設定の調整が必要です。


問題が発生した場合

このチュートリアルを通して問題が発生した場合、メーリングリストを通して私たちに連絡することができます。


ワークキュー


(pika 0.9.8 Python clientを使用)

最初のチュートリアルでは、名前付きキューからメッセージを送受信するためのプログラムを書きました。これでは、複数のワーカーの間で時間のかかるタスクを分散するために使用されるワークキューを作成します。

ワークキュー(通称:タスクキュー)の背後にある主な考え方は、リソースを大量に消費するタスクを即時に実行し、完了するまで待機することを避けるためです。代わりに、タスクを後で実行するようにスケジュールします。メッセージとしてタスクをカプセル化し、キューに送信します。バックグラウンドで実行中のワーカープロセスがタスクを取り出し、最終的にジョブを実行します。多くのワーカーを実行すると、タスクは、それらの間で共有されます。

この概念はWebアプリケーションで特に有用です、短いHTTP要求の間に複雑なタスクを処理することは不可能です。


準備

このチュートリアルの前の部分では、"Hello World!"が含まれているメッセージを送信しました。これから、複雑なタスクを表す文字列を送信します。サイズを変更する画像やレンダリングされるPDFファイルのような、実世界のタスクを持っていないため、忙しくしているふりをすることによってそれの偽物をさせましょう、time.sleep()関数を使用して。文字列に含まれるドットの数によって複雑さを示しましょう、各々のドットは「仕事」の一秒を占めることとします。例えば、「Hello...」と記される偽のタスクは、3秒かかります。

以前の例からsend.pyコードを少しだけ変更します、任意のメッセージが、コマンドラインから送信されるように。このプログラムは、ワークキューにタスクをスケジュールするので、new_task.pyと名付けましょう:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print " [x] Sent %r" % (message,)

以前のreceive.pyスクリプトもまた、いくつかの変更を必要とします。メッセージ本体に含まれる各々のドットの、偽の一秒の仕事を必要とします。キューからメッセージを取り出し、タスクを実行するので、これをworker.pyと呼びましょう:

import time

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"


ラウンド・ロビン・ディスパッチ

タスクキューを使用する利点の1つは、簡単に作業を並列化することができる点です。ワークのバックログを構築すると、より多くのワーカーを追加することが可能であり、そのため、容易にスケールします。

まずは、同時に2つのworker.pyスクリプトを実行してみましょう。それらは両方のキューからメッセージを取得しますが、正確にはどのように?見てみましょう。

3つのコンソールオープンする必要があります。2つはworker.pyスクリプトを実行します。これらのコンソールは、2つの消費者、「C1」と「C2」になります。

shell1$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

shell2$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

3つ目のコンソールで、新しいタスクをパブリッシュします。消費者を開始した後、いくつかのメッセージをパブリッシュすることができます。

shell3$ python new_task.py First message.

shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

ワーカーに配信されるか見てみましょう:

shell1$ python worker.py

[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'

shell2$ python worker.py

[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'

デフォルトでは、RabbitMQはそれぞれのメッセージを、列の次の消費者に送信します。平均すれば、すべての消費者は、同数のメッセージを取得します。メッセージを配信するこの方法は、ラウンド・ロビンと呼ばれています。 3つ、もしくはそれ以上のワーカーでこれを試してみてください。


メッセージの確認応答

タスクを実行することに、数秒かかることがあります。消費者のいずれかが長いタスクを開始し、部分的に完了し、死んだ場合、何が起こるか疑問に思うかもしれません。現在のコードでは、RabbitMQは、一度メッセージを顧客に配信し、すぐにメモリから削除します。この場合、ワーカーをキルしたら、処理中のメッセージを失うことになります。また、特定のワーカーにディスパッチされたが、まだ処理されなかったすべてのメッセージも、失うことになります。

しかし、すべてのタスクを失いたくはありません。ワーカーが死んだ場合、タスクが他のワーカーに配信されるようにしたいと思います。

メッセージが決して失われないようにするために、RabbitMQは、メッセージの確認応答をサポートしています。ack(nowledgement)は、特定のメッセージが受信し、処理されたことをRabbitMQに伝えるために、消費者から返送され、RabbitMQは、それを自由に削除します。

消費者がackを送信せずに死んだ場合、RabbitMQは、メッセージが完全に処理されなかったことを理解し、別の消費者に再配信します。そうすれば、ワーカーが時折死ぬ場合でも、メッセージが失われないことを確信することができます。

いかなるメッセージ・タイムアウトもありません。 RabbitMQは、ワーカーの接続が死んだ場合のみ、メッセージを再配信します。メッセージを処理することは、非常に、非常に長い時間がかかる場合でも構いません。

メッセージの確認応答は、デフォルトでオンになっています。前の例では、no_ack=Trueフラグを介して明示的にオフにしています。このフラグを削除し、タスクが完了したら、ワーカーから適切な確認応答を送信しましょう。

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
queue='hello')

このコードを使用すると、メッセージを処理している間にCTRL+Cキーを使用して、ワーカーをキルしても、何も失われないことを確認することができます。ワーカーが死んだ直後に、すべての未応答のメッセージが再配信されます。

忘れられた確認応答

basic_ackし忘れることは良くある間違いです。それは簡単なエラーですが、結果は深刻です。クライアントが終了したときにメッセージが再配信されますが(ランダムな再配信のように見えるかもしれませんが)、任意のunackedメッセージを解放することができなくなり、RabbitMQは、より多くのメモリを使用するようになります。

この種の誤りをデバッグするためには、messages_unacknowledgedフィールドを出力するrabbitmqctlを使用することができます:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.


メッセージの耐久性

消費者が死んだ場合でも、確実にタスクが失われないようにする方法を学びました。しかし、RabbitMQのサーバが停止した場合にも、ワークはまだ失われてしまいます。

RabbitMQが終了またはクラッシュした場合、そうしないように指示しない限り、キューとメッセージを忘れてしまいます。メッセージが失われないことを確かにするためには、二つのことが必要です:キューおよびメッセージの両方を耐久(durable)としてマークする必要があります。

まず、RabbitMQがキューを失わないようにする必要があります。そのために、durableとして宣言する必要があります:

channel.queue_declare(queue='hello', durable=True)

このコマンドは、それ自体は正しいですが、私たちのセットアップでは動作しません。helloと呼ばれるキューはすでに、非耐久として定義しているからです。 RabbitMQは、異なるパラメータを使用して既存のキューを再定義することはできず、そうしようとするあらゆるプログラムにエラーを返します。しかし、迅速な回避策があります、別の名前でキューを定義しましょう、例えば、task_queueのように:

channel.queue_declare(queue='task_queue', durable=True)

このqueue_declareの変更は生産者と消費者のコードの両方に適用する必要があります。

これで、RabbitMQが再起動してもtask_queueキューが失われないことを確認することができます。次に、メッセージを永続的とマークしましょう、delivery_modeに値2を与えることによって。

channel.basic_publish(exchange='',

routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))

メッセージの永続性に関する注意

永続としてメッセージをマークしたとしても、メッセージが絶対に失われないことを保証するものではありません。それはRabbitMQにメッセージをディスクに保存するように指示していますが、RabbitMQがメッセージを受け入れ、まだそれを保存していない間にまだ短い時間差があります。また、RabbitMQは、メッセージごとにfsync(2)を実行しません、キャッシュに保存されるだけで、実際にディスクには書き込まれないことがあります。永続性の保証は強くないが、それは簡単なタスクキューのためには十分すぎるほどです。強い保証を必要とするなら、パブリッシャーの確認を使用することができます。


公平なディスパッチ

私たちが望むようにディスパッチがまだ正しく動作しないことに気づいたかもしれません。例えば2つのワーカーがある状況で、すべての奇数のメッセージが重く、偶数のメッセージが軽い場合、一方のワーカーは常に忙しく、他方は、ほとんど仕事をしません。RabbitMQはそのことについて何も知らず、引き続き均等にメッセージをディスパッチします。

このようなことが起こるのは、メッセージがキューに入るとRabbitMQは単にメッセージをディスパッチするためです。消費者の未確認メッセージの数を見ていません。単に盲目的にすべてのn番目のメッセージをn番目の消費者へ送出します。

こうしないために、basic.qosメソッドをprefetch_count=1の設定で使用することができます。これは、一つのワーカーに一度に複数のメッセージを与えないよう、RabbitMQに指示します。言い換えると、ワーカーが一つ前のメッセージを処理し、確認応答するまで、ワーカーに新しいメッセージを送出しません。代わりに、まだビジー状態でない、次のワーカーにそれをディスパッチします。

channel.basic_qos(prefetch_count=1)

キューのサイズに関する注意

すべてのワーカーが使用中である場合、キューがいっぱいになることがあります。このような場合には、より多くのワーカーを追加するか、他のいくつかの戦略を取る必要があります。


すべてのまとめ

new_task.pyスクリプトの最終的なコード:

 1    #!/usr/bin/env python

2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='task_queue', durable=True)
10
11 message = ' '.join(sys.argv[1:]) or "Hello World!"
12 channel.basic_publish(exchange='',
13 routing_key='task_queue',
14 body=message,
15 properties=pika.BasicProperties(
16 delivery_mode = 2, # make message persistent
17 ))
18 print " [x] Sent %r" % (message,)
19 connection.close()

そしてワーカー:

 1    #!/usr/bin/env python

2 import pika
3 import time
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='task_queue', durable=True)
10 print ' [*] Waiting for messages. To exit press CTRL+C'
11
12 def callback(ch, method, properties, body):
13 print " [x] Received %r" % (body,)
14 time.sleep( body.count('.') )
15 print " [x] Done"
16 ch.basic_ack(delivery_tag = method.delivery_tag)
17
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20 queue='task_queue')
21
22 channel.start_consuming()

メッセージの確認応答とprefetch_countを使用して、ワークキューを設定することが可能です。耐久性のオプションはRabbitMQのが再起動された場合でもタスクが存続するようにします。

これで、チュートリアル3に移り、多くの消費者に同じメッセージを配信する方法を学ぶことができます。