はじめに
RabbitMqはソフトウェア間でメッセージのやり取りやメッセージのキューイングなど(AMQP)ができるものです。使用方法については多くのサイトで書かれていますが実際にメッセージの管理については書かれているところが少ないため、RabbitMqやpythonのpikaの用意からメッセージの確認までをまとめました。
環境
- python:3.6.5
- イメージ:rabbitmq:3-management
RabbitMqを使うのに必要なもの
- RabbitMqサーバ:メッセージを受けたり送ったりするもの(メッセージをためる場所)
- producer:メッセージを送るもの(クライアント)
- consumer:メッセージを受け取るもの(ホスト)
1. RabbitMqの用意
dockerのRabbitMqイメージを取得
RabbitMqのイメージは標準のものと管理プラグインが有効なものの2つがあります。今回はキューの状況を見たいため管理プラグインのものをpullします。rabbitmqという名前でコンテナが提供されているのでイメージをpullします。
docker pull rabbitmq:3-management
普通のrabbitmqの場合は、rabbitmq:3-management
ではなくrabbitmq
をpullしてください。
RabbitMqの起動
RabbitMqを起動します。今回はバックグラウンドで動き続けてほしいため-d
オプションをつけています。また、ポートは管理画面を見たいため、コンテナ内の15672をホストの8080に、実際にキューのやり取りをするポートをコンテナ内の5672とホストの5672に紐づけています。さらにキューを特定するために--hostname
を分かる名前にしています。
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 8080:15672 rabbitmq:3-management
普通のrabbitmqの場合は、rabbitmq:3-management
ではなくrabbitmq
をrunして、-p 8080:15672
も不要です。
RabbitMqの起動確認
今回は、管理プラグインが有効なコンテナを起動したのでlocalhost:8080
にブラウザからアクセスして中身を見てみます。アクセスするとログイン画面になるため、デフォルトのguest
で入ります。現在の状態がみれます。Nodesのところに先ほど--hostname
で指定した名前が入っています。
2. producer:メッセージを送るもの(クライアント)の用意
producerの作成にはpythonでキューのやり取りを行うため、pikaというライブラリを使用してrabbitmqにアクセスします。
pikaのインストール
pikaのインストールは普通にpipでインストールします。
pip install pika
producerの実装
メッセージの送信には次の手順が必要です。これらの手順はすべてrabbitmqに対して行われるため、consumerは必要ありません。
- コネクション作成
- チャンネル作成・取得
- キュー作成・取得
- メッセージの送信
コネクション作成
pythonからRabbitMqに向けて接続を行います。作成するパラメータにホスト名(IPアドレス)やポート番号、タイムアウトなどの設定を与えます。RabbitMqコンテナはホストの5672ポートに紐づいているため、パラメータにlocalhostを与えて、ポートはデフォルトの5672のままなので指定はしていません。その後、パラメータを与えてコネクションを生成して接続完了になります。
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
チャンネル作成・取得
接続が完了したら次はチャンネルの作成を行います。チャンネルとはRabbitMqへの道のようなイメージです。チャンネルが同じproducerとconsumerがメッセージのやり取りの対象になります。
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
最後の1行だけが追加されています。必要であればこの引数にチャンネルの識別番号を入力します。
キュー作成・取得
チャンネルを作成できたらキューの作成を行います。このキューとはメッセージをためる場所のようなイメージになります。そのため、キューの名前が同じでなければメッセージのやり取りは行えません。チャンネルとは違い必ず指定が必要になります。
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
最後の1行だけが追加されています。必要であればこの引数にキューの設定を入力します。
メッセージの送信
事前の用意ができたため、メッセージの送信を行います。basic_publish()
のrouting_key
にキューの名前を指定してbody
に送信したいメッセージを指定します。
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
最後に送信が完了したらコネクションを閉じます。
実際に送信してみる
RabbitMqへの送信用ソースができたため、実行してみます。コマンドはすぐに帰ってきます。
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
RabbitMqの管理画面でも確認してみます。Queuesタブを見てみるとMessageがReady:1になっているのがわかります。
3. consumer:メッセージを受け取るもの(ホスト)の用意
consumerも、pythonでキューのやり取りを行うため、pikaというライブラリを使用してRabbitMqにアクセスします。
consumerの実装
メッセージの送信には次の手順が必要です。これらの手順はすべてRabbitMqに対して行われるため、起動にproducerは必要ありません。コネクション作成からキュー作成まではproducerと同じです。
- コネクション作成
- チャンネル作成・取得
- キュー作成・取得
- コールバック(受信時処理)の作成
- queueメッセージ受付開始
コールバック(受信時処理)の作成
メッセージを受信したときに処理させたい関数を記載します。関数の最後に受信したからキューからメッセージを削除するために応答関数basic_ack()
を指定します。今回は例として受信したメッセージを表示する関数callback
を記載します。
import pika
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
![queue_receive.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/481713/d78bd155-4b9a-9093-998d-c85ff4af2cc7.png)
![queue_receive.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/481713/f6fbd5ef-322d-77d0-8e85-68626ac91783.png)
def callback(ch, method, properties, body):
print("{} Received".format(body))
ch.basic_ack(delivery_tag = method.delivery_tag)
queueメッセージ受付開始
作成したチャンネルのbasic_consume()
にキュー名とコールバック関数を指定します。その後にstart_consuming()
関数でメッセージの受信を開始します。この関数を開始すると関数内で延々とメッセージ待ちをするため終了するときは強制終了かコールバック関数に終了の契機を入れておく必要があります。
import pika
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("{} Received".format(body))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
実際に受信してみる
RabbitMqからの受信用ソースができたため、実行してみます。
PS C:\Users\xxxx\program\python\pika> python .\host_main.py
b'Hello World!' Received
producerで送信したメッセージが受信できて標準出力されていることが確認できます。
RabbitMqの管理画面でも確認してみます。Queuesタブを見てみるとMessageがReady:0になっているのがわかります。
おわりに
RabbitMqを使用してpythonでメッセージのやり取りを行う方法をまとめました。とはいえほとんど公式の内容と同じになってしまいました。これを使用することで容易に非同期な処理やキューに関する処理を実施することができそうです。