LoginSignup
9
10

More than 5 years have passed since last update.

RabbitMQ チュートリアル6(RPC)

Posted at

RabbitMQのチュートリアル6
https://www.rabbitmq.com/tutorials/tutorial-six-python.html
の翻訳です。
翻訳の誤りなどあればご指摘お待ちしております。

前提条件

このチュートリアルでは、RabbitMQのがインストールされ、ローカルホストの標準のポート(5672)上で実行されている前提とします。別のホスト、ポート、または資格情報を使用する場合には、接続設定の調整が必要です。

問題が発生した場合

このチュートリアルを通して問題が発生した場合、メーリングリストを通して私たちに連絡することができます。

リモート・プロシージャ・コール(RPC)

(pika 0.9.8 Python clientを使用)

第2のチュートリアルでは、複数のワーカーの間で時間のかかるタスクを分散するためにワークキューを使用する方法を学びました。

しかし、リモートコンピュータ上で関数を実行し、その結果を待つ必要がある場合にはどうでしょうか?それは別の話です。このパターンは、一般的にリモート・プロシージャ・コールまたはRPCとして知られています。

このチュートリアルでは、RPCシステム(クライアントとスケーラブルなRPCサーバー)を構築するために RabbitMQ を使用します。値を供給する、時間のかかるタスクを持っていないため、フィボナッチ数を返すダミーのRPCサービスを作成しましょう。

クライアント・インターフェース

RPCサービスがどのように使用されるかを説明するために、簡単なクライアント・クラスを作成します。このクラスは、RPCリクエストを送信して答えを受信するまでブロックする、"call"という名前のメソッドを公開します:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
RPCに関する注意

RPCは、コンピューティングではかなり一般的なパターンですが、それはしばしば批判されます。問題は、プログラマが関数呼び出しがローカルであるかどうかを認識していない場合や、遅いRPCの場合に発生します。そのような混乱は、予測できないシステムおよびデバッグに不要な複雑さを加える結末となります。ソフトウェアを単純にするかわりに、RPCの誤った使い方は保守不能なスパゲッティコードをもたらします。

心の中で、次のアドバイスを検討してください:

どの関数呼び出しがローカルであり、どれがリモートであるかが明らかであることを確認してください。
お使いのシステムを文書化します。コンポーネント間の依存関係を明確にします。
エラーケースを処理します。RPCサーバーが長時間ダウンしているときはクライアントはどのように反応するべきですか?

疑問がある場合はRPCは避けてください。可能であれば、RPCのようにブロックする方法のかわりに、非同期パイプラインを使用するべきです、結果は非同期に次の計算ステージにプッシュされます。

コールバック・キュー

一般的に、RabbitMQでのRPCは簡単です。クライアントが要求メッセージを送信すると、サーバーは応答メッセージで応答します。応答を受信するために、クライアントは、要求に「コールバック」キューのアドレスを送信する必要があります。では、それを試してみましょう:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...
メッセージ・プロパティ

AMQPプロトコルは、メッセージとともに運ばれる14のプロパティを定義しています。大部分のプロパティは、以下の例外を除いて、使用されることは稀です:

delivery_mode: メッセージを永続(値 2)または一時的(他の値)としてマークします。チュートリアル2で、このプロパティを使用しました。
content_type: エンコードのMIMEタイプを記述するために使用します。例えば、頻繁に使用されるJSONエンコーディングでは、このプロパティを「application/json」に設定することをお勧めします。
reply_to: 普通コールバック・キューに名前を付けるために使用されます。
correlation_id: RPC応答を要求に対応づけるために有用です。

コリレーションid

上記のメソッドの中で、すべてのRPC要求のためのコールバック・キューを生成することを提示しました。これはかなり非効率的ですが、幸い、より良い方法があります、クライアントごとに1つのコールバック・キューを生成しましょう。

ここで、キューで応答を受信した場合にその応答が属する要求が明らかでないという、新たな問題が発生します。そのため、correlation_idプロパティを使用します。すべての要求に一意の値を設定します。後にコールバック・キューにメッセージを受信したとき、このプロパティに基づいて、応答を要求に対応づけることができます。知らないcorrelation_idの値を持つ場合、安全にメッセージを廃棄することができます、それは私たちの要求に属していません。

コールバック・キューの中の未知のメッセージを、エラーで失敗するのではなく、無視する必要がありますが、なぜでしょうか?これは、サーバー側での競合状態の可能性に起因します。ありそうにないことですが、RPCサーバーが、ちょうど答えを発信した直後で、要求の確認応答メッセージを送信する前に、死ぬ可能性があります。その場合は、再起動したRPCサーバは再び要求を処理します。そういうわけでクライアント上で重複した応答は優雅に処理しなければならず、また、RPCは理想的には冪等であるべきです。

要約

私たちのRPCは、次のように動作します:

クライアントが起動すると、匿名の排他的なコールバック・キューを生成します。
RPCリクエストのために、クライアントは2つのプロパティを伴うメッセージを送信します:コールバック・キューを設定したreply_toと、リクエストごとに一意の値に設定されるcorrelation_idです。
要求は"rpc_queue"キューに送信されます。
RPCワーカー(通称:サーバー)がそのキューの要求を待っています。リクエストが現れたら、仕事をし、reply_toフィールドに指定されたキューを使用して、クライアントに結果とメッセージを送信します。
クライアントは、コールバック・キューのデータを待ちます。メッセージが現れたら、correlation_idプロパティをチェックします。要求からの値と一致する場合には、アプリケーションに応答を返します。

すべてのまとめ

rpc_server.pyのコード:

 1    #!/usr/bin/env python
 2    import pika
 3
 4    connection = pika.BlockingConnection(pika.ConnectionParameters(
 5            host='localhost'))
 6
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='rpc_queue')
10
11    def fib(n):
12        if n == 0:
13            return 0
14        elif n == 1:
15            return 1
16        else:
17            return fib(n-1) + fib(n-2)
18
19    def on_request(ch, method, props, body):
20        n = int(body)
21
22        print " [.] fib(%s)"  % (n,)
23        response = fib(n)
24
25        ch.basic_publish(exchange='',
26                         routing_key=props.reply_to,
27                         properties=pika.BasicProperties(correlation_id = \
28                                                         props.correlation_id),
29                         body=str(response))
30        ch.basic_ack(delivery_tag = method.delivery_tag)
31
32    channel.basic_qos(prefetch_count=1)
33    channel.basic_consume(on_request, queue='rpc_queue')
34
35    print " [x] Awaiting RPC requests"
36    channel.start_consuming()

サーバーのコードはかなり簡単です:

(4) いつものように、接続を確立し、キューを宣言することから始めます。
(11) フィボナッチ関数を宣言します。これは、有効な正の整数の入力のみを仮定しています。 (大きな数値のために働くことを期待しないでください、これはおそらく、あり得る実装の中で最も遅い再帰的な実装です)。
(19) basic_consumeのためのコールバック、RPCサーバーの核を宣言します。要求を受信したときに実行されます。仕事をして、応答を返信します。
(32) 複数のサーバー・プロセスを実行したいかもしれません。複数のサーバーに均等に負荷を分散するために、prefetch_countを設定する必要があります。

rpc_client.pyのコード:

 1    #!/usr/bin/env python
 2    import pika
 3    import uuid
 4
 5    class FibonacciRpcClient(object):
 6        def __init__(self):
 7            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 8                    host='localhost'))
 9
10            self.channel = self.connection.channel()
11
12            result = self.channel.queue_declare(exclusive=True)
13            self.callback_queue = result.method.queue
14
15            self.channel.basic_consume(self.on_response, no_ack=True,
16                                       queue=self.callback_queue)
17
18        def on_response(self, ch, method, props, body):
19            if self.corr_id == props.correlation_id:
20                self.response = body
21
22        def call(self, n):
23            self.response = None
24            self.corr_id = str(uuid.uuid4())
25            self.channel.basic_publish(exchange='',
26                                       routing_key='rpc_queue',
27                                       properties=pika.BasicProperties(
28                                             reply_to = self.callback_queue,
29                                             correlation_id = self.corr_id,
30                                             ),
31                                       body=str(n))
32            while self.response is None:
33                self.connection.process_data_events()
34            return int(self.response)
35
36    fibonacci_rpc = FibonacciRpcClient()
37
38    print " [x] Requesting fib(30)"
39    response = fibonacci_rpc.call(30)
40    print " [.] Got %r" % (response,)

クライアント・コードはやや入り組んでいます:

(7) 接続、チャネルを確立し、返信用に排他的な「コールバック」キューを宣言します。
(16) RPC応答を受け取ることができるように、「コールバック」キューをサブスクライブします。
(18) すべての応答に対して実行される「on_response」コールバックは非常に簡単な仕事をしています、各応答メッセージに対し、correlation_idが我々が探しているものかどうか、チェックします。もしそうなら、応答をself.responseに保存し、消費ループをブレイクします。
(23) 次に、主要なcallメソッドを定義します、ここでは実際のRPC要求を行います。
(24) このメソッドの中で、まず一意のcorrelation_id番号を生成し、それを保存します、「on_response」コールバック関数は、適切な応答を捕捉するためにこの値を使用します。
(25) 次に、2つのプロパティ(「reply_to」と「correlation_id」)を持つ要求メッセージを発行します。
(32) ここで、落ち着いて、適切な応答が到着するまで待ちます。
(33) そして最後に、戻って、ユーザーへの応答を返します。

RPCサービスの準備ができました。サーバを起動することができます:

$ python rpc_server.py
 [x] Awaiting RPC requests

フィボナッチ数を要求するためにクライアントを実行します:

$ python rpc_client.py
 [x] Requesting fib(30)

この設計は、RPCサービスの唯一の可能な実装ではありませんが、いくつかの重要な利点があります。

RPCサーバーが非常に遅い場合は、単にもう1つ実行するだけでスケールアップすることができます。新しいコンソールで2つ目のrpc_server.pyを実行してみてください。
クライアント側では、RPCは、ただ1つのメッセージのみを送信および受信する必要があります。 queue_declareのような同期呼び出しは必要ありません。その結果、RPCクライアントは、1つのRPC要求に対して1つのネットワーク周回旅行のみを必要とします。

このコードはまだかなり単純化されており、以下のような、より複雑な(しかし重要な)問題を解決しません:

実行中のサーバーがない場合、クライアントはどう反応するべきか?
クライアントはRPCに対して何らかのタイムアウトを持つべきか?
サーバーが故障して、例外が発生した場合は、クライアントに転送する必要があるか?
処理前の、不正な着信メッセージからの保護(例えば、境界チェック)。

試してみる場合、キューを表示するにはrabbitmq-managementプラグインが有効です。
9
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
10