使用するバージョン
- RabbitMQ 3.3.5
- amqp 1.5.0
条件
- vhostは
/production
- ユーザは
user1
、パスワードはpass
とする
キューにメッセージを追加する(基本)
# -*- coding: utf-8 -*-
require 'amqp'
# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
# channelを作成
channel = AMQP::Channel.new(connection)
# queue1というキューを作成
# キューが無い場合は作成(既にある場合もエラーにはならない)
queue = channel.queue("queue1")
# exchangeを作成
exchange = channel.default_exchange
# queue1にmessageを追加
exchange.publish("Hello, world!", :routing_key => queue.name)
end
キューを確認すると登録されている
$ sudo /usr/sbin/rabbitmqctl list_queues -p /production
Listing queues ...
queue1 1
ただし、このやり方だとRabbitMQサーバーが再起動 or 落ちた場合、キュー+メッセージは消えてしまう
キューにメッセージを追加する(メッセージを永続)
RabbitMQサーバーが再起動 or 落ちてもキュー+メッセージが残るようにするには、durable
とpersistent
を指定する
# -*- coding: utf-8 -*-
require 'amqp'
# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
# channelを作成
channel = AMQP::Channel.new(connection)
# queue1というキューを作成
# :durable => trueとすることで、再起動してもキューは残る
queue = channel.queue("queue1", :nowait => false, :durable => true)
# exchangeを作成
exchange = channel.default_exchange
# queue1にmessageを追加
# :persistent => trueとすることで、再起動してもメッセージは残る
exchange.publish("Hello, world!", :routing_key => queue.name, :persistent => true)
end
キューからメッセージを取得(subscribe)
subscribe
を使う場合、キューにメッセージが登録されるごとにsubscribe
が実行される(メッセージが登録されない限り実行されない)
# -*- coding: utf-8 -*-
require 'amqp'
# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
# channelを作成
channel = AMQP::Channel.new(connection)
# queue1を作成
queue = channel.queue("queue1", :nowait => false, :durable => true)
# queue1からmessageを取得
# :ack => trueとすることで、ackを返さない限り、messageは削除されない
# :ack => falseの場合、messageを取得した時点でRabbitMQサーバーからmessageが削除される
queue.subscribe(:ack => true) do |hdr, msg|
p "queue = #{queue.name}" #=> "queue = queue1"
p "message= #{msg}" #=> "message= Hello, world!"
# RabbitMQのmessageを削除
hdr.ack
end
end
-
:ack => true
の場合、RabbitMQのメッセージを削除するにはackが必要。 - ackする前にアプリが落ちた(RabbitMQとのコネクションが切れた)場合は、メッセージはRabbitMQサーバーに戻るので、再取得が可能(メッセージは消失しない)
キューからメッセージを取得(pop)
キューからメッセージを取得するもう一つの方法はpop
がある。
popは明示的に(同期)してRabbitMQのキューからメッセージを取得する。
subscribe
と違い、キューにメッセージが登録されても、されなくても処理される。その為、メッセージが無い場合は、nilが返るので、nilチェックが必要。
# -*- coding: utf-8 -*-
require 'amqp'
# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
# channelを作成
channel = AMQP::Channel.new(connection)
# queue1を作成
queue = channel.queue("queue1", :nowait => false, :durable => true)
# queue1から1秒おきにmessageを取得(※timerを使わないと取得できない)
EventMachine.add_periodic_timer(1) do
# コネクションが切れるときは処理しない
next if connection and (connection.closing? || connection.closed?)
queue.pop(:ack => true) do |hdr, msg|
if msg
p "queue = #{queue.name}" #=> "queue = queue1"
p "message= #{msg}" #=> "message= Hello, world!"
# RabbitMQのmessageを削除
hdr.ack
end
end
end
end
キューからメッセージを取得(コネクション断時の再接続)
キューを取得する場合、RabbitMQをポーリングすることになる。
その場合、RabbitMQとのコネクションが切れる場合があるので、connection.on_tcp_connection_loss
とchannel.auto_recovery
で
コネクション再接続するようにする
# -*- coding: utf-8 -*-
require 'amqp'
# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
# コネクションが切れたら再接続するようにする
connection.on_tcp_connection_loss do |cl, settings|
# 再接続
cl.reconnect
end
# channelを作成
channel = AMQP::Channel.new(connection)
# channelが切れたら自動回復するようにする
channel.auto_recovery = true
# queue1を作成
queue = channel.queue("queue1", :nowait => false, :durable => true)
# queue1からmessageを取得
queue.subscribe(:ack => true) do |hdr, msg|
p "queue = #{queue.name}" #=> "queue = queue1"
p "message= #{msg}" #=> "message= Hello, world!"
# RabbitMQのmessageを削除
hdr.ack
end
end