はじめに
RabbitMQのチュートリアルをやるついでに、日本語版がなかったので雑に訳してみました。
言語はPythonです。
この表示はチュートリアルの内容から脱線していることを意味します。
これより下がチュートリアルの雑訳です。
Introduction
前提
RabbitMQがローカルにインストールされていて、localhost:5672
で起動していること。
異なるホストとかで起動している場合は自分で頑張って設定してね。
早速少し脱線
ローカルにRabbitMQをインストールしたくない人も多いと思うので、私のおすすめとしてDocker(Docker Compose)で起動する方法を紹介します。
以下のdocker-compose.yml
を作成してdocker compose up -d
するだけで起動できます。
version: "3"
services:
rabbitmq:
container_name: rabbitmq
image: rabbitmq:latest
ports:
- 5672:5672
- 15672:15672
RabbitMQはメッセージの受付と転送をするBrokerで、例えると、郵便ポスト/郵便局/郵便配達員すべてだと思っておけばOK。
郵便ポスト/郵便局/郵便配達員と違うところといえば、紙ではなくバイナリーデータを扱うところくらい。
RabbitMQなどのメッセージングにはいくつかの特有の言葉がある。
- Producer
- メッセージを送るプログラム
- Queue
- キューはキュー!
- キューの容量はホストマシンのディスクに依存する
- 多くのProducerがメッセージを送信することができて、多くのConsumerがそれを受信できる
- Consumer
- メッセージを待ち構えるプログラム
Hello World!
(pikaというRabbitMQクライアントを使う)
Producerが"Hello World"という文字列をhello
という名前のキューに送信して、受信したConsumerがコンソールに出力するという単純なプログラムを書いていく。
図にするとこんな感じ
RabbitMQ libraries
(かくかくしかじか)
Pikaは以下のコマンドでインストールできます。
python -m pip install pika --upgrade
Pikaのインストールができたら、実際にコードを書いていく。
Sending
send.py
に単一のメッセージをキューに送るプログラムを書いていく。
とりあえず、RabbitMQとの間にコネクションを確立しないと始まらないので、コネクションを張る。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
接続完了。
もし異なるマシンに接続したい場合は、単純にIPアドレスをlocalhost
の代わりに指定すれば良い。
次に、キューが存在するか確認する必要がある。
存在しないキューに対してメッセージを送信しても、RabbitMQはそれをシカトする。
メッセージをやり取りするためにhello
キューを作成する。
channel.queue_declare(queue="hello")
メッセージを送信する準備が整ったので、"Hello World!"
とhello
キューに送信していく。
RabbitMQではメッセージが直接キューに送られることはなく、必ずexchange
を経由する。
exchange
の詳細は第3パートでやる。
現時点では、「空文字を指定するとデフォルトのexchange
を使用することができる」ということだけ覚えておけばOK。
また、キュー名はrouting_key
パラメータで指定する必要がある。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
処理を終了する前に、ネットワークバッファーから情報が吐き出されたことと送信したメッセージがRabbitMQに運ばれたことを確認する必要があるが、コネクションをクローズすることでそれを実現することが出来る。
connection.close()
Sending doesn't work!
コンソールに"Sent"と表示されていない場合があるかもしれない。RabbitMQの起動には最低でも200MBは必要なので、細かい設定の変更が必要な場合はRabbitMQのログを見るなり、disk_free_limit
をいじるなりして頑張ろう。
Receiving
2つ目はメッセージをキューから受信して表示させるプログラムreceive.py
を書いていく。
コネクションの確立まではsend.py
と同じ。
次も、同じようにキューが存在するか確認する。
queue_declare
は冪等であるので、何度実行しても一回しかキューは生成されない。
channel.queue_declare(queue="hello")
なぜまたqueue_declare
を呼び出すのか?と思うかもしれないが、send.py
とreceive.py
の呼び出し順が決まっているわけではないので、どちらのプログラムでもキューの宣言をしている。
Listing queues
RabbitMQが持っているキューの数とそのキューの中に幾つメッセージがあるか確認したい場合には、rabbitmqctl
を使うと良い
sudo rabbitmq list_queues
Windowsの場合は、
rabbitmqctl.bat list_queues
メッセージの受信は送信に比べて複雑だ。
メッセージを受信した時に、callback
関数がPikaから呼ばれる。
今回は、コンソールに受信したメッセージを表示させるコールバック関数を作成する。
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
作成した関数をhello
キューからメッセージを受信した時に呼び出されるように登録する。
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
queue
パラーメーターには存在するキューを指定する(今回はhello
)。
auto_ack
パラメーターについては後で説明するよ。
最後に、無限ループをしながらキューにデータが入るのを待ち構えるようにして、KeyboardInterrupt
をキャッチして処理を終了できるようにする。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Putting it all together
これまでの全体
実際のコードはここ↓
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
コードが書けたら実際に動かしてみよう。
初めに、Consumer(receive.py)を起動する。
python receive.py
# => [*] Waiting for messages. To exit press CTRL+C
そしたら、新しいターミナルを開いてProducer(send.py)からメッセージを送信してみよう。
python send.py
# => [x] Sent 'Hello World!'
Consumerが起動しているウィンドウを見てみるとログが確認できるはず!
# => [x] Received 'Hello World!'
Consumerは次のメッセージも待ち構えているため、プログラムは終了しない。
停止したい場合は、CTRL+Cとすれば良い。
これで、私たちはメッセージの送受信の方法を学ことができたのでPart2でさらに理解を深めていこう。