3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RabbitMQのチュートリアルを雑に日本語に訳してみた -Hello World-

Posted at

はじめに

RabbitMQのチュートリアルをやるついでに、日本語版がなかったので雑に訳してみました。
言語はPythonです。

この表示はチュートリアルの内容から脱線していることを意味します。

これより下がチュートリアルの雑訳です。


Introduction

前提
RabbitMQがローカルにインストールされていて、localhost:5672で起動していること。
異なるホストとかで起動している場合は自分で頑張って設定してね。

早速少し脱線
ローカルにRabbitMQをインストールしたくない人も多いと思うので、私のおすすめとしてDocker(Docker Compose)で起動する方法を紹介します。
以下のdocker-compose.ymlを作成してdocker compose up -dするだけで起動できます。

docker-compose.yml
version: "3"

services:
  rabbitmq:
    container_name: rabbitmq
    image: rabbitmq:latest
    ports:
      - 5672:5672
      - 15672:15672

RabbitMQはメッセージの受付と転送をするBrokerで、例えると、郵便ポスト/郵便局/郵便配達員すべてだと思っておけばOK。
郵便ポスト/郵便局/郵便配達員と違うところといえば、紙ではなくバイナリーデータを扱うところくらい。

RabbitMQなどのメッセージングにはいくつかの特有の言葉がある。

  • Producer
    • メッセージを送るプログラム
  • Queue
    • キューはキュー!
    • キューの容量はホストマシンのディスクに依存する
    • 多くのProducerがメッセージを送信することができて、多くのConsumerがそれを受信できる
  • Consumer
    • メッセージを待ち構えるプログラム

Hello World!

(pikaというRabbitMQクライアントを使う)

Producerが"Hello World"という文字列をhelloという名前のキューに送信して、受信したConsumerがコンソールに出力するという単純なプログラムを書いていく。

図にするとこんな感じ

RabbitMQ libraries

(かくかくしかじか)
Pikaは以下のコマンドでインストールできます。

python -m pip install pika --upgrade

Pikaのインストールができたら、実際にコードを書いていく。

Sending

send.pyに単一のメッセージをキューに送るプログラムを書いていく。
とりあえず、RabbitMQとの間にコネクションを確立しないと始まらないので、コネクションを張る。

send.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

接続完了。
もし異なるマシンに接続したい場合は、単純にIPアドレスをlocalhostの代わりに指定すれば良い。

次に、キューが存在するか確認する必要がある。
存在しないキューに対してメッセージを送信しても、RabbitMQはそれをシカトする。
メッセージをやり取りするためにhelloキューを作成する。

send.py
channel.queue_declare(queue="hello")

メッセージを送信する準備が整ったので、"Hello World!"helloキューに送信していく。

RabbitMQではメッセージが直接キューに送られることはなく、必ずexchangeを経由する。
exchangeの詳細は第3パートでやる。
現時点では、「空文字を指定するとデフォルトのexchangeを使用することができる」ということだけ覚えておけばOK。
また、キュー名はrouting_keyパラメータで指定する必要がある。

send.py
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

処理を終了する前に、ネットワークバッファーから情報が吐き出されたことと送信したメッセージがRabbitMQに運ばれたことを確認する必要があるが、コネクションをクローズすることでそれを実現することが出来る。

send.py
connection.close()

Sending doesn't work!
コンソールに"Sent"と表示されていない場合があるかもしれない。RabbitMQの起動には最低でも200MBは必要なので、細かい設定の変更が必要な場合はRabbitMQのログを見るなり、disk_free_limitをいじるなりして頑張ろう。

Receiving

2つ目はメッセージをキューから受信して表示させるプログラムreceive.pyを書いていく。

コネクションの確立まではsend.pyと同じ。

次も、同じようにキューが存在するか確認する。
queue_declareは冪等であるので、何度実行しても一回しかキューは生成されない。

receive.py
channel.queue_declare(queue="hello")

なぜまたqueue_declareを呼び出すのか?と思うかもしれないが、send.pyreceive.pyの呼び出し順が決まっているわけではないので、どちらのプログラムでもキューの宣言をしている。

Listing queues
RabbitMQが持っているキューの数とそのキューの中に幾つメッセージがあるか確認したい場合には、rabbitmqctlを使うと良い

sudo rabbitmq list_queues

Windowsの場合は、

rabbitmqctl.bat list_queues

メッセージの受信は送信に比べて複雑だ。
メッセージを受信した時に、callback関数がPikaから呼ばれる。
今回は、コンソールに受信したメッセージを表示させるコールバック関数を作成する。

receive.py
def callback(ch, method, properties, body):
    print("[x] Received %r" % body)

作成した関数をhelloキューからメッセージを受信した時に呼び出されるように登録する。

receive.py
channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

queueパラーメーターには存在するキューを指定する(今回はhello)。

auto_ackパラメーターについては後で説明するよ。

最後に、無限ループをしながらキューにデータが入るのを待ち構えるようにして、KeyboardInterruptをキャッチして処理を終了できるようにする。

receive.py
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Putting it all together

これまでの全体
実際のコードはここ↓

send.py
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

コードが書けたら実際に動かしてみよう。
初めに、Consumer(receive.py)を起動する。

python receive.py
# => [*] Waiting for messages. To exit press CTRL+C

そしたら、新しいターミナルを開いてProducer(send.py)からメッセージを送信してみよう。

python send.py
# =>  [x] Sent 'Hello World!'

Consumerが起動しているウィンドウを見てみるとログが確認できるはず!

# => [x] Received 'Hello World!'

Consumerは次のメッセージも待ち構えているため、プログラムは終了しない。
停止したい場合は、CTRL+Cとすれば良い。

これで、私たちはメッセージの送受信の方法を学ことができたのでPart2でさらに理解を深めていこう。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?