はじめに
- 仕事でRabbitMQを触る必要性が出てきたのでDocker Composeで立てて遊んでみる
- 尚、本格的に構築する場合にはクラスタリング等も検討したほうが良い: https://www.rabbitmq.com/clustering.html
概要
RabbitMQとは
- 主にメッセージキューを実現するためのOSSで、メッセージブローカと呼ばれる
- Erlangで書かれている
- 紆余曲折あり、現在はVMware社がサポートを行っている
- 他の選択肢としては、OSSだとApache KafkaやActiveMQ、パブリッククラウドだとAmazon SQSやAmazon MQ(内部的にはRabbitMQやActiveMQが使われている)等がある
- GitHubリポジトリ: https://github.com/rabbitmq
メッセージキューとは
- 送信側と受信側の間にキューを挟み、送信側が任意のタイミングでキューにデータを溜め込み、受信側が任意のタイミングでキューに溜まったデータを取り出すような通信形態のこと
- 主に以下のようなメリットを享受することが期待できる
- 送信側は受信側の処理を待つことなく後続の処理ができる
- 送信側と受信側の間にメッセージブローカを挟むことによって疎結合なアーキテクチャを実現できる
- スケーラブルなアーキテクチャを実現できる
使用するプロトコル
- プロトコルはデフォルトではAMQP(Advanced Message Queuing Protocol)が使用される
- AMQPは元々、金融機関向けのプロトコルとして提供されており、JPモルガンやバンク・オブ・アメリカでも利用されていた
- 現在はOASISが標準化している他、ISO/IEC 19464として国際標準にもなっている
- MQTT(Message Queuing Telemetry Transport)等、他のプロトコルを使用することもできる
- これらのプロトコルを実装したシステムは金融機関のシステムの他、WebやIoTの文脈でも利用されている
基本的な概念
- プロデューサー
- メッセージを送信するプログラム
- コンシューマー
- メッセージを受信するプログラム
- キュー
- メッセージをバッファリングしておく仕組み
- エクスチェンジ
- この記事では触れないが、重要が概念であるため説明
- どのメッセージをどのように配送するかを決定する仕組み
- チャンネル
- 1つのTCPコネクションを分割して共有するためのパフォーマンス上の仕組み
- ドキュメント: https://www.rabbitmq.com/channels.html
Docker Composeの設定
- Docker Compose以外のインストール方法: https://www.rabbitmq.com/download.html
docker-compose.yml
services:
rabbitmq:
# -management を付与したイメージの場合、15672ポートでManagement Plugin(Web UIのようなもの)が利用できるようになる
# Management Pluginのドキュメント: https://www.rabbitmq.com/management.html
# イメージは現時点で最新のものを指定している
image: rabbitmq:3.11.7-management
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
# ユーザ名/パスワードの設定
# 特に設定しない場合、guest/guestが暗黙的に設定される
environment:
- RABBITMQ_DEFAULT_USER=root
- RABBITMQ_DEFAULT_PASS=password
# データの永続化
volumes:
- ./docker/rabbitmq/data:/var/lib/rabbitmq
- Webブラウザ上で
http://localhost:15672
を開くとRabbitMQのWeb UIにアクセスできる - username と password には
docker-compose.yml
に設定したRABBITMQ_DEFAULT_USER
とRABBITMQ_DEFAULT_PASS
を入力する
- ログイン後の画面
- RabbitMQの一通りの操作が可能
- e.g. コネクションやキューの確認、メッセージの監視、etc ...
Pythonから接続
- Pythonから接続する場合、pikaを利用する
- GitHubリポジトリ: https://github.com/pika/pika
- 以下はpikaを使ってメッセージの送信、受信をするサンプル
- だいたいここに書いてあることと同じ: https://www.rabbitmq.com/tutorials/tutorial-one-python.html
インストール
$ pip install pika
ソースコード
- 送信側
producer.py
import pika
# コネクションの確立
# usernameとpasswordにguest/guestが設定されている場合は認証情報を指定しなくて良い
credentials = pika.PlainCredentials("root", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials))
# チャンネルの作成・キューの指定
queue_name = "greet"
channel = connection.channel()
channel.queue_declare(queue=queue_name)
# メッセージの送信
message = "hello, world!"
channel.basic_publish(exchange="", routing_key=queue_name, body=message)
print("Sent:", message)
# コネクションの切断
connection.close()
- 受信側
consumer.py
import pika
# コネクションの確立
# usernameとpasswordにguest/guestが設定されている場合は認証情報を指定しなくて良い
credentials = pika.PlainCredentials("root", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials))
# チャンネルの作成・キューの指定
queue_name = "greet"
channel = connection.channel()
channel.queue_declare(queue=queue_name)
# メッセージの受信
# ちなみに auto_ack=True にすると、メッセージ送信直後に正常に配信されたとみなされる
# スループットは向上するが、実際に配信されるよりも前にコネクションやチャンネルが切断されると、送信されたメッセージは失われるため注意
# ドキュメント: https://www.rabbitmq.com/confirms.html
def callback(ch, method, props, body):
print("Received:", body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print("Waiting for messages ...")
channel.start_consuming()
実行結果
$ python consumer.py
Waiting for messages ...
Received: b'hello, world!'
$ python producer.py
Sent: hello, world!
- きちんと受信できていることが分かる
Web UI
- せっかくなので確認しておく
- Overview
- メッセージが配送・取得されていることが分かる
- Queues
- ソースコードから指定したキューが作成されている
- Channels
-
greet
キューに紐付いたチャンネルが作成されている
-