0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RabbitMQ入門【チュートリアル触ってみた】

Posted at

はじめに

RabbitMQによるメッセージのやり取りを行うシステムに関わっているものの体系的な知識がなかったため、RabbitMQ公式のチュートリアルを触ってみました。
https://www.rabbitmq.com/getstarted.html

そこで得た知識と追加で調べた情報などを記載します。

想定読者

  • RabbitMQをこれから使う人
  • RabbitMQを使っているけどよくわかってない人
  • 記憶を失った時の自分

RabbitMQとは

RabbitMQはオープンソースのメッセージブローカー。
ブローカー(Broker)は直訳で「仲立ち人」。

複数のメッセージングプロトコルに対応しているがコアとなるのはAMQP。

登場する概念

名前 説明
Producer メッセージを送るプログラム(=Publisher)
Exchange Producerのメッセージはここに送られる。後述のtypeに応じてQueueへ送信
Queue メッセージを格納する場所
routing key ExchangeがQueueをbindするために指定される値
Consumer メッセージを受け取るプログラム(=Subscriber)

Exchange Type

Exchangeには複数のタイプがある。

fanout

受信したメッセージを全てのQueueにブロードキャストする。

direct

bindする時に指定するrouting key(binding key)がメッセージのrouting keyと一致するQueueに送信する。
言い換えると、routing keyによって送信するQueueを指定する。

topic

複数の条件に対応するため . 区切りで複数の単語を指定する。syslogの「facility.severity」みたいに。
*# でワイルドカード的な指定ができる。

チュートリアル内容

自分が関わるシステムがExchange type=direct でrouting_keyによるQueueの指定をしていることもあり、それに近い「4 Routing」について書きます。
言語はRubyを選択。bunny gem を読み込んで実行する形でした。

やること

ログレベルをrouting_keyとしてそれに応じて送信するQueueを振り分ける。
図はRabbitMQ公式より
こんなのを作る

手順

公式のDockerイメージがあるので、まずはそれでブローカーを起動

$ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management

以下、コード
正直、公式側を見た方がいいのでざっくり。

Publisher( emit_log_direct.rb )

#!/usr/bin/env ruby
require 'bunny'

# コネクション開始
connection = Bunny.new
connection.start

# Channelの作成
channel = connection.create_channel

# Exchangeの指定
exchange = channel.direct('direct_logs')

# 引数をいい感じにする
severity = ARGV.shift || 'info'
message = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')

# Publish!
exchange.publish(message, routing_key: severity)
puts " [x] Sent '#{message}'"

# コネクションクローズ
connection.close

Subscriber( receive_logs_direct.rb )

#!/usr/bin/env ruby
require 'bunny'

abort "Usage: #{$PROGRAM_NAME} [info] [warning] [error]" if ARGV.empty?

# コネクション開始
connection = Bunny.new
connection.start

# Channel作成
channel = connection.create_channel

# Exchange指定
exchange = channel.direct('direct_logs')

# Queueの宣言。冪等で、ない場合は作成されるらしい
# exclusive: true でコネクション終了時にQueueを削除する
queue = channel.queue('', exclusive: true)

# 実行(Subscriber起動)時に引数で指定するログレベルごとにQueueをbind
ARGV.each do |severity|
  queue.bind(exchange, routing_key: severity)
end

puts ' [*] Waiting for logs. To exit press CTRL+C'

# メッセージが送信されるとsubscribeメソッドが実行される
begin
  queue.subscribe(block: true) do |delivery_info, _properties, body|
    puts " [x] #{delivery_info.routing_key}:#{body}"
  end
rescue Interrupt => _
  channel.close
  connection.close

  exit(0)
end

Subscriberプログラムを起動して

$ ruby receive_logs_direct.rb info error warning
 [*] Waiting for logs. To exit press CTRL+C

Publisherプログラム実行

$ ruby emit_log_direct.rb error "Aka---n!"
 [x] Sent 'Aka---n!'

やったぜ。

$ ruby receive_logs_direct.rb info error warning
 [*] Waiting for logs. To exit press CTRL+C
 [x] error:Aka---n!

上記の図のように、引数として指定するログレベルを変えて(routing_keyを変えて)複数のSubscriberを立ち上げても意図した通りに動く。

$ ruby emit_log_direct.rb warning "Akankamoshiren"
 [x] Sent 'Akankamoshiren'

それぞれ別ターミナルで起動したSubscriber↓

$ ruby receive_logs_direct.rb error
 [*] Waiting for logs. To exit press CTRL+C
$ ruby receive_logs_direct.rb info error warning
 [*] Waiting for logs. To exit press CTRL+C
 [x] warning:Akankamoshiren

おまけ

Virtual Hosts

RabbitMQではVirtual Hosts(vhost)としてリソースを論理的なグループに分離することができる。
https://www.rabbitmq.com/vhosts.html

Namespace的なものだと解釈した。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?