はじめに
前回、「RabbitMQからRubyで(同時に2つの)メッセージを取り出して処理する」で worker.rb が動き続けている途中で、RabbitMQが死んだらどうなるのかと思って実験してみました。
RabbitMQサービスを途中で停止して再開してみる
worker.rb
を動作させて、RabbitMQサービスを停止して、また再開してみました。
$ ruby worker.rb
START 'message-0:7'
END 'message-0:7'
START 'message-1:1'
END 'message-1:1'
START 'message-2:19' # ← このあと RabbitMQ サービス停止
W, [2018-06-07T16:26:09.336672 #93678] WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Recovering from connection.close (CONNECTION_FORCED - broker forced connection closure with reason 'shutdown')
W, [2018-06-07T16:26:09.338514 #93678] WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (no retry limit)...
W, [2018-06-07T16:26:19.345854 #93678] WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Retrying connection on next host in line: rabbitmq_server:5672
W, [2018-06-07T16:26:20.347697 #93678] WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Could not establish TCP connection to rabbitmq_server:5672: Connection refused - connect(2) for rabbitmq_server:5672
W, [2018-06-07T16:26:20.347790 #93678] WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: TCP connection failed, reconnecting in 5.0 seconds
W, [2018-06-07T16:26:20.347815 #93678] WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (no retry limit)...
...
START 'message-2:19' # ← ここで RabbitMQ サービス再開
END 'message-2:19'
...
予想では、例外が起こって停止すると思ってたのですが、そうはならなかったです。サービスが再開するまで待ち続けるんですね。自前で例外処理を拾う必要があると思っていたのですが、そんなことしなくて良いみたいです。
試しにRabbitMQのサーバーごと再起動した場合も同じでした。
worker_multi_thread.rb
の場合も同様でした。
これらの動作は、 Bunny.new
するときのオプションで、ある程度変更できそうです。
指定回数だけリトライする
recovoery_attempts
を指定します。
connection = Bunny.new(
host: 'rabbitmq_server',
vhost: '/nest',
user: 'peter',
password: 'xxxxx',
recovery_attempts: 3
)
こうすると3回まではリトライしますが、その後はリトライしません。また、リトライの間にRabbitMQを再起動すれば処理は再開しますが、3回リトライしたあとは、サービスを再起動しても処理を再開しません。
ですが、例外も発生しません。
... # 最後は、ERRORになる。
E, [2018-06-11T16:29:42.988276 #61657] ERROR -- #<Bunny::Session:0x7ff8cd0cb520 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Ran out of recovery attempts (limit set to 3)
リトライの間隔を変更する
network_recovery_interval
を指定します。 以下の例だと15秒間隔で再接続を試みます。
connection = Bunny.new(
host: 'rabbitmq_server',
vhost: '/nest',
user: 'peter',
password: 'xxxxx',
recovery_attempts: 3,
network_recovery_interval: 15
)
3回リトライして失敗したらプログラムを終了させる
worker.rb は、無限ループで回り続けるので、3回リトライしても終了しません。また、3回リトライして失敗したら、例外を発生させて停止するようにしたかったのですが、その方法がわかりませんでした。
コードを見る限り は、リトライ回数の上限に達したら、logにエラーメッセージを出力するだけっぽいんですよねー。
ということで、 worker.rb の while ループの中を、 should_retry_recovery?
を使って書き換えてみます。
# 無限ループで待ち続ける
while 1
sleep 10
unless connection.should_retry_recovery?
STDERR.puts 'retry failured now stopping worker...'
break
end
end
worker.rb を実行して、その実行中にRabbitMQを停止してみたところ、うまく停止しました。
...
W, [2018-06-12T09:32:35.167906 #31447] WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Could not establish TCP connection to rabbitmq_server:5672: Connection refused - connect(2) for rabbitmq_server:5672
W, [2018-06-12T09:32:35.168018 #31447] WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: TCP connection failed, reconnecting in 15 seconds
W, [2018-06-12T09:32:35.168060 #31447] WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (1 out of 3 left)...
retry failured now stopping worker... # ← これで停止しました
全体のコードは以下の通りです。
require 'bunny'
connection = Bunny.new(
host: 'rabbitmq_server',
vhost: '/nest',
user: 'peter',
password: 'password',
recovery_attempts: 3,
network_recovery_interval: 15
)
connection.start
begin
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
# 一度に取得するメッセージは1つ
channel.prefetch(1)
# メッセージを取り出すようにする
# 取得したメッセージは、自動的に削除しない。
queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
STDERR.puts "START '#{body}'"
sleep body.sub(/^message.*:/, '').to_i
STDERR.puts "END '#{body}'"
# 取得したメッセージを削除して良いことをRabbitMQに伝える
channel.ack(delivery_info.delivery_tag)
end
# 無限ループで待ち続ける
while 1
sleep 10
unless connection.should_retry_recovery?
STDERR.puts 'retry failured now stopping worker...'
break
end
end
rescue Interrupt => _ # Ctrl + C で終了
connection.close
end
まとめ
- RabbitMQサーバーを停止しても Bunny は停止しないで再接続を試みる。
- RabbitMQサーバーが再開したら、再開したところから、Bunnyはメッセージを取り出して処理を継続する。
- 停止する方法がイマイチなので、もっと良いスマートな方法があったら教えてください。
追記
RabbitMQサーバーから停止したときに retry せずに停止するので良ければ、以下のオプション
:automatic_recovery
と :recover_from_connection_close
を指定すると良さそうです。
connection = Bunny.new(
host: 'rabbitmq_server',
vhost: '/nest',
user: 'peter',
password: 'password',
automatic_recovery: false, # 自動リカバリしない
recover_from_connection_close: false #接続が切られたときのリカバリをしない
)