RabbitMQのチュートリアル3
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
の翻訳です。
翻訳の誤りなどあればご指摘お待ちしております。
前提条件
このチュートリアルでは、RabbitMQのがインストールされ、ローカルホストの標準のポート(5672)上で実行されている前提とします。別のホスト、ポート、または資格情報を使用する場合には、接続設定の調整が必要です。
問題が発生した場合
このチュートリアルを通して問題が発生した場合、メーリングリストを通して私たちに連絡することができます。
パブリッシュ/サブスクライブ
(pika 0.9.8 Python clientを使用)
前のチュートリアルでは、ワークキューを作成しました。ワークキューの仮定は、各タスクが正確に1つのワーカーに配信されることです。このパートでは、まったく異なることをします、すなわち、複数の消費者にメッセージを配信します。このパターンは、「パブリッシュ/サブスクライブ」として知られています。
パターンを説明するために、簡単なロギング・システムを構築します。これは、2つのプログラムで構成されます、1つ目は、ログメッセージを発信し、2つ目は受信して印刷します。
ロギング・システムにおいて、レシーバー・プログラムの各実行中コピーは、メッセージを取得します。そのため、1つのレシーバーを実行してディスクへログを書き、同時に、他のレシーバーを実行して画面上でログを確認できるようになります。
基本的に、パブリッシュされたログメッセージは、すべてのレシーバーにブロードキャストされるようになります。
エクスチェンジ
チュートリアルの前の部分では、キューへ(キューから)メッセージを送受信しました。ここでは Rabbit のフル・メッセージング・モデルを導入します。
前のチュートリアルで説明したことを振り返りましょう:
- 生産者は、メッセージを送信するユーザー・アプリケーションです。
- キューは、メッセージを格納するバッファです。
- 消費者は、メッセージを受信するユーザー・アプリケーションです。
RabbitMQ の中のメッセージング・モデルの核となるアイデアは、生産者がキューに直接メッセージを送信しない、ということです。実際には、しばしば、生産者は、メッセージがキューに配信されるかどうかさえ、まったく知りません。
かわりに、生産者は1つの「エクスチェンジ」にのみメッセージを送信することができます。エクスチェンジは非常に単純なものです。一方で生産者からメッセージを受信し、他方でそれらをキューにプッシュします。エクスチェンジは、受信したメッセージについて何をすべきか正確に把握する必要があります。特定のキューに追加すべきか?多くのキューに追加すべきか?それとも破棄するべきか。それらの法則は、エクスチェンジのタイプで定義されます。
利用可能ないくつかのエクスチェンジ・タイプがあります、直接、トピック、ヘッダー、ファンアウトです。最後の1つに焦点を当てましょう、すなわち、ファンアウトです。では、そのタイプのエクスチェンジを作成し、それを"logs"と呼びましょう:
channel.exchange_declare(exchange='logs',
type='fanout')
ファンアウト・エクスチェンジは非常に単純です。名前から想像できるように、受信したすべてのメッセージを知っているすべてのキューにブロードキャストします。そして、それはまさに私たちのロガーに必要なものです。
エクスチェンジのリスト
サーバー上のエクスチェンジを一覧表示するには、 rabbitmqctl を利用します。
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.
このリストには、いくつかの"amq.*"というエクスチェンジとデフォルト(無名の)エクスチェンジがあります。これらは、デフォルトで作成されますが、現時点ではそれらを使用する必要はほとんどありません。
無名のエクスチェンジ
チュートリアルの前の部分では、エクスチェンジについて何も知りませんでしたが、キューにメッセージを送信することができました。空の文字列("")によって識別されるデフォルト・エクスチェンジを使用していたためです。
以前メッセージをパブリッシュしていた方法を思い出してください:
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
exchange パラメータは、エクスチェンジの名前です。空の文字列は、デフォルトまたは無名のエクスチェンジを意味します、それが存在する場合メッセージは routing_key で指定された名前を持つキューにルーティングされます。
かわりに、名前付きのエクスチェンジにパブリッシュすることができます:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
テンポラリ・キュー
これまでは、指定された名前を持つキューを使用していました(helloとtask_queueを覚えていますか?)。キューに名前を付けられることは、とても重要です、ワーカーが同じキューを指すために必要でした。生産者と消費者の間でキューを共有したいときには、キューに名前を付けることは重要です。
しかし、私たちのロガーの場合はそうではありません。ログメッセージの一部ではなく、すべてを聞きたいのです。私たちはまた、現在流れているメッセージにのみ興味があり、古いメッセージには関心がありません。これを解決するために、2つのことが必要です。
第一に、 Rabbit に接続するときにはいつでも、新鮮な、空のキューが必要です。そのために、ランダムな名前でキューを作成することができます、または、よりベターなのは、サーバーがランダムなキュー名を選択してくれることです。 queue_declare に queue パラメータを与えないことによってこれを行うことができます:
result = channel.queue_declare()
ここで、 result.method.queue にはランダムなキュー名が含まれています。例えば、"amq.gen-JzTY20BRgKO-HjmUJj0wLg"のような場合があります。
第二に、一旦消費者が切断されると、キューが削除されるようにするべきです。そのために exclusive フラグがあります:
result = channel.queue_declare(exclusive=True)
バインディング
ここまで、ファンアウト・エクスチェンジおよびキューを作成しました。次に、キューにメッセージを送信するようにエクスチェンジに指示する必要があります。エクスチェンジとキューとの間の関係は、バインディングと呼ばれます。
channel.queue_bind(exchange='logs',
queue=result.method.queue)
これで、"logs"エクスチェンジはメッセージを私たちのキューに追加します。
バインディングのリスト
ご想像の通り、 rabbitmqctl list_bindings を使用して、既存のバインディングを一覧表示することができます。
すべてのまとめ
ログメッセージを発信する生産者プログラムは、前のチュートリアルと大差ありません。最も重要な変更は、無名のエクスチェンジの代わりに"logs"エクスチェンジにメッセージをパブリッシュすることです。送信時に routing_key を与える必要がありますが、ファンアウト・エクスチェンジなのでその値は無視されます。emit_log.pyスクリプトのコード:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='logs',
10 type='fanout')
11
12 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13 channel.basic_publish(exchange='logs',
14 routing_key='',
15 body=message)
16 print " [x] Sent %r" % (message,)
17 connection.close()
ご覧のように、接続を確立した後、エクスチェンジを宣言します。存在しないエクスチェンジへのパブリッシュは禁止されているため、このステップは必要です。
いかなるキューもエクスチェンジにバインドされていない場合はメッセージが失われますが、問題ありません。いかなる消費者も受信していないのであれば、メッセージは安全に破棄することができます。
receive_logs.pyのコード:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='logs',
9 type='fanout')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 channel.queue_bind(exchange='logs',
15 queue=queue_name)
16
17 print ' [*] Waiting for logs. To exit press CTRL+C'
18
19 def callback(ch, method, properties, body):
20 print " [x] %r" % (body,)
21
22 channel.basic_consume(callback,
23 queue=queue_name,
24 no_ack=True)
25
26 channel.start_consuming()
これで完了です。ログをファイルに保存したい場合は、コンソールを開き、タイプします:
$ python receive_logs.py > logs_from_rabbit.log
画面上のログを参照したい場合は、新しい端末を起動して実行します:
$ python receive_logs.py
そしてもちろん、ログを発信するには、タイプします:
$ python emit_log.py
rabbitmqctl list_bindings を使用して、コードが実際に望みどおりのバインディングとキューを作成していることを確認することができます。2つのreceive_logs.pyプログラムを実行すると、以下のような表示がされます:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
結果の解釈は直接的です:"logs"エクスチェンジから来たデータは、サーバから割り当てられた名前を持つ2つのキューに行きます。まさに意図したとおりです。
メッセージのサブセットを受信する方法を識るために、チュートリアル4に進みましょう。