はじめに
RabbitMQによるメッセージのやり取りを行うシステムに関わっているものの体系的な知識がなかったため、RabbitMQ公式のチュートリアルを触ってみました。
https://www.rabbitmq.com/getstarted.html
そこで得た知識と追加で調べた情報などを記載します。
想定読者
- RabbitMQをこれから使う人
- RabbitMQを使っているけどよくわかってない人
- 記憶を失った時の自分
RabbitMQとは
RabbitMQはオープンソースのメッセージブローカー。
ブローカー(Broker)は直訳で「仲立ち人」。
複数のメッセージングプロトコルに対応しているがコアとなるのはAMQP。
登場する概念
名前 | 説明 |
---|---|
Producer | メッセージを送るプログラム(=Publisher) |
Exchange | Producerのメッセージはここに送られる。後述のtypeに応じてQueueへ送信 |
Queue | メッセージを格納する場所 |
routing key | ExchangeがQueueをbindするために指定される値 |
Consumer | メッセージを受け取るプログラム(=Subscriber) |
Exchange Type
Exchangeには複数のタイプがある。
fanout
受信したメッセージを全てのQueueにブロードキャストする。
direct
bindする時に指定するrouting key(binding key)がメッセージのrouting keyと一致するQueueに送信する。
言い換えると、routing keyによって送信するQueueを指定する。
topic
複数の条件に対応するため .
区切りで複数の単語を指定する。syslogの「facility.severity」みたいに。
*
と #
でワイルドカード的な指定ができる。
チュートリアル内容
自分が関わるシステムがExchange type=direct
でrouting_keyによるQueueの指定をしていることもあり、それに近い「4 Routing」について書きます。
言語はRubyを選択。bunny gem を読み込んで実行する形でした。
やること
ログレベルをrouting_keyとしてそれに応じて送信するQueueを振り分ける。
図はRabbitMQ公式より
手順
公式のDockerイメージがあるので、まずはそれでブローカーを起動
$ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
以下、コード
正直、公式側を見た方がいいのでざっくり。
Publisher( emit_log_direct.rb
)
#!/usr/bin/env ruby
require 'bunny'
# コネクション開始
connection = Bunny.new
connection.start
# Channelの作成
channel = connection.create_channel
# Exchangeの指定
exchange = channel.direct('direct_logs')
# 引数をいい感じにする
severity = ARGV.shift || 'info'
message = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
# Publish!
exchange.publish(message, routing_key: severity)
puts " [x] Sent '#{message}'"
# コネクションクローズ
connection.close
Subscriber( receive_logs_direct.rb
)
#!/usr/bin/env ruby
require 'bunny'
abort "Usage: #{$PROGRAM_NAME} [info] [warning] [error]" if ARGV.empty?
# コネクション開始
connection = Bunny.new
connection.start
# Channel作成
channel = connection.create_channel
# Exchange指定
exchange = channel.direct('direct_logs')
# Queueの宣言。冪等で、ない場合は作成されるらしい
# exclusive: true でコネクション終了時にQueueを削除する
queue = channel.queue('', exclusive: true)
# 実行(Subscriber起動)時に引数で指定するログレベルごとにQueueをbind
ARGV.each do |severity|
queue.bind(exchange, routing_key: severity)
end
puts ' [*] Waiting for logs. To exit press CTRL+C'
# メッセージが送信されるとsubscribeメソッドが実行される
begin
queue.subscribe(block: true) do |delivery_info, _properties, body|
puts " [x] #{delivery_info.routing_key}:#{body}"
end
rescue Interrupt => _
channel.close
connection.close
exit(0)
end
Subscriberプログラムを起動して
$ ruby receive_logs_direct.rb info error warning
[*] Waiting for logs. To exit press CTRL+C
Publisherプログラム実行
$ ruby emit_log_direct.rb error "Aka---n!"
[x] Sent 'Aka---n!'
やったぜ。
$ ruby receive_logs_direct.rb info error warning
[*] Waiting for logs. To exit press CTRL+C
[x] error:Aka---n!
上記の図のように、引数として指定するログレベルを変えて(routing_keyを変えて)複数のSubscriberを立ち上げても意図した通りに動く。
$ ruby emit_log_direct.rb warning "Akankamoshiren"
[x] Sent 'Akankamoshiren'
それぞれ別ターミナルで起動したSubscriber↓
$ ruby receive_logs_direct.rb error
[*] Waiting for logs. To exit press CTRL+C
$ ruby receive_logs_direct.rb info error warning
[*] Waiting for logs. To exit press CTRL+C
[x] warning:Akankamoshiren
おまけ
Virtual Hosts
RabbitMQではVirtual Hosts(vhost)としてリソースを論理的なグループに分離することができる。
https://www.rabbitmq.com/vhosts.html
Namespace的なものだと解釈した。