14
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQ チュートリアル4(ルーティング)

Posted at

RabbitMQのチュートリアル4
https://www.rabbitmq.com/tutorials/tutorial-four-python.html
の翻訳です。
翻訳の誤りなどあればご指摘お待ちしております。

前提条件

このチュートリアルでは、RabbitMQのがインストールされ、ローカルホストの標準のポート(5672)上で実行されている前提とします。別のホスト、ポート、または資格情報を使用する場合には、接続設定の調整が必要です。

問題が発生した場合

このチュートリアルを通して問題が発生した場合、メーリングリストを通して私たちに連絡することができます。

ルーティング

(pika 0.9.8 Python clientを使用)

前のチュートリアルでは、簡単なロギング・システムを構築しました。多くのレシーバーにログメッセージをブロードキャストすることができました。

このチュートリアルでは、それに機能を追加します、メッセージの一部のみをサブスクライブすることを可能にするつもりです。例えば、(ディスク・スペース節約のため)致命的なエラー・メッセージのみをログファイルに出力し、コンソールにはそのまますべてのメッセージを出力し続けることができるようになります。

バインディング

前の例で、すでにバインディングを生成しました。以下のようなコードを思い出すでしょう:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

バインディングは、エクスチェンジとキューとの間の関係です。これは、このキューはこのエクスチェンジからのメッセージに関心がある、と読むことができます。

バインディングは、追加の routing_key パラメータを取ることができます。basic_publish でのパラメータとの混同を避けるために、それをバインディング・キーと呼ぶことにします。キーを持つバインディングは、以下のように生成することができます:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

バインディング・キーの意味は、エクスチェンジ・タイプによって異なります。以前使用したファンアウト・エクスチェンジは、単にその値を無視しました。

ダイレクト・エクスチェンジ

前のチュートリアルのロギング・システムでは、すべてのメッセージをすべての消費者へブロードキャストします。これを、重大度に基づいてメッセージをフィルタリングすることができるように拡張をしたいと思います。例えば、受信した致命的エラーのログ・メッセージのみをディスクに書き、警告または情報レベルのログ・メッセージによってディスク領域を無駄にしないようにできます。

これまで使用してきたファンアウト・エクスチェンジは、あまり柔軟性がありません、それは盲目的なブロードキャストしかできません。

かわりに、ダイレクト・エクスチェンジを使用します。ダイレクト・エクスチェンジのルーティング・アルゴリズムは単純です、メッセージは、ルーティング・キーに正確に一致するバインディング・キーを持つキューに行きます。

説明のために、次の設定を考えてみます:

(図)

この設定では、ダイレクト・エクスチェンジ "X" と、それに結合した2つのキューがあります。最初のキューは、"orange" バインディング・キーにより結合し、2番目のキューは、1つは "black" バインディング・キーにより、もう1つは "green" バインディング・キーによる、2つの結合を持ちます。

このような設定では、ルーティング・キー "orange" を持つメッセージのエクスチェンジへのパブリッシュは"Q1"キューにルーティングされます。"black" や "green" のルーティング・キーを持つメッセージは、"Q2"に行きます。他のすべてのメッセージは破棄されます。

複数のバインディング

同一のバインディング・キーで複数のキューと結合することはまったく問題ありません。この例では、"X"と"Q1"の間に "black" キーによる結合を追加することができます。その場合、ダイレクト・エクスチェンジは、ファンアウトのように動作し、一致するすべてのキューにメッセージをブロードキャストします。ルーティング・キー "black" をもつメッセージは、"Q1"と"Q2"の両方に配信されます。

ログの発行

ロギング・システムにこのモデルを使用します。ファンアウトのかわりにダイレクト・エクスチェンジにメッセージを送信します。ルーティング・キーとしてログの重大度を与えます。そうすることによって、受信側のスクリプトは、受信したい重大度を選択することができます。まずはログの発行に焦点を当ててみましょう。

いつものように、最初にエクスチェンジを生成する必要があります:

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

そして、メッセージを送信する準備が整いました:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

単純化のため、重大度は「info」「warning」「error」のいずれかとします。

サブスクライブ

メッセージの受信は、1つの例外を除き、前のチュートリアルと同じように動作します、関心のある、各重大度のための新しいバインディングを生成します。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

すべてのまとめ

emit_log_direct.pyのコード:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='direct_logs',
10                             type='direct')
11
12    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
13    message = ' '.join(sys.argv[2:]) or 'Hello World!'
14    channel.basic_publish(exchange='direct_logs',
15                          routing_key=severity,
16                          body=message)
17    print " [x] Sent %r:%r" % (severity, message)
18    connection.close()

receive_logs_direct.pyのコード:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='direct_logs',
10                             type='direct')
11
12    result = channel.queue_declare(exclusive=True)
13    queue_name = result.method.queue
14
15    severities = sys.argv[1:]
16    if not severities:
17        print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
18                             (sys.argv[0],)
19        sys.exit(1)
20
21    for severity in severities:
22        channel.queue_bind(exchange='direct_logs',
23                           queue=queue_name,
24                           routing_key=severity)
25
26    print ' [*] Waiting for logs. To exit press CTRL+C'
27
28    def callback(ch, method, properties, body):
29        print " [x] %r:%r" % (method.routing_key, body,)
30
31    channel.basic_consume(callback,
32                          queue=queue_name,
33                          no_ack=True)
34
35    channel.start_consuming()

「wraning」と「error」(「info」は除く)のログメッセージのみをファイルに保存したい場合は、コンソールを開き、タイプします:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

すべてのログメッセージを画面上に表示したい場合は、新しいターミナルを開き、実行します:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

そして、例えば「error」ログ・メッセージを発行するには、単にタイプします:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

パターンに基づきメッセージを受信する方法を識るために、チュートリアル5に移りましょう。

14
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?