Ruby
RabbitMQ

Ruby の RabbitMQクライアントのBunnyってスゴイと思った話

はじめに

前回、「RabbitMQからRubyで(同時に2つの)メッセージを取り出して処理する」で worker.rb が動き続けている途中で、RabbitMQが死んだらどうなるのかと思って実験してみました。

RabbitMQサービスを途中で停止して再開してみる

worker.rb を動作させて、RabbitMQサービスを停止して、また再開してみました。

$ ruby worker.rb
START 'message-0:7'
END   'message-0:7'
START 'message-1:1'
END   'message-1:1'
START 'message-2:19' # ← このあと RabbitMQ サービス停止
W, [2018-06-07T16:26:09.336672 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Recovering from connection.close (CONNECTION_FORCED - broker forced connection closure with reason 'shutdown')
W, [2018-06-07T16:26:09.338514 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (no retry limit)...
W, [2018-06-07T16:26:19.345854 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Retrying connection on next host in line: rabbitmq_server:5672
W, [2018-06-07T16:26:20.347697 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Could not establish TCP connection to rabbitmq_server:5672: Connection refused - connect(2) for rabbitmq_server:5672
W, [2018-06-07T16:26:20.347790 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: TCP connection failed, reconnecting in 5.0 seconds
W, [2018-06-07T16:26:20.347815 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (no retry limit)...
...
START 'message-2:19' # ← ここで RabbitMQ サービス再開
END   'message-2:19'
...

予想では、例外が起こって停止すると思ってたのですが、そうはならなかったです。サービスが再開するまで待ち続けるんですね。自前で例外処理を拾う必要があると思っていたのですが、そんなことしなくて良いみたいです。

試しにRabbitMQのサーバーごと再起動した場合も同じでした。

worker_multi_thread.rb の場合も同様でした。

これらの動作は、 Bunny.new するときのオプションで、ある程度変更できそうです。

指定回数だけリトライする

recovoery_attempts を指定します。

connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'xxxxx',
  recovery_attempts: 3
)

こうすると3回まではリトライしますが、その後はリトライしません。また、リトライの間にRabbitMQを再起動すれば処理は再開しますが、3回リトライしたあとは、サービスを再起動しても処理を再開しません。
ですが、例外も発生しません。

...  # 最後は、ERRORになる。
E, [2018-06-11T16:29:42.988276 #61657] ERROR -- #<Bunny::Session:0x7ff8cd0cb520 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Ran out of recovery attempts (limit set to 3)

リトライの間隔を変更する

network_recovery_interval を指定します。 以下の例だと15秒間隔で再接続を試みます。

worker.rb
connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'xxxxx',
  recovery_attempts: 3,
  network_recovery_interval: 15
)

3回リトライして失敗したらプログラムを終了させる

worker.rb は、無限ループで回り続けるので、3回リトライしても終了しません。また、3回リトライして失敗したら、例外を発生させて停止するようにしたかったのですが、その方法がわかりませんでした。
コードを見る限り は、リトライ回数の上限に達したら、logにエラーメッセージを出力するだけっぽいんですよねー。

ということで、 worker.rb の while ループの中を、 should_retry_recovery? を使って書き換えてみます。

worker.rb
  # 無限ループで待ち続ける
  while 1
    sleep 10
    unless connection.should_retry_recovery?
      STDERR.puts 'retry failured now stopping worker...'
      break
    end
  end

worker.rb を実行して、その実行中にRabbitMQを停止してみたところ、うまく停止しました。

...
W, [2018-06-12T09:32:35.167906 #31447]  WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Could not establish TCP connection to rabbitmq_server:5672: Connection refused - connect(2) for rabbitmq_server:5672
W, [2018-06-12T09:32:35.168018 #31447]  WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: TCP connection failed, reconnecting in 15 seconds
W, [2018-06-12T09:32:35.168060 #31447]  WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (1 out of 3 left)...
retry failured now stopping worker... # ← これで停止しました

全体のコードは以下の通りです。

worker.rb
require 'bunny'

connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'password',
  recovery_attempts: 3,
  network_recovery_interval: 15
)
connection.start

begin
  channel = connection.create_channel
  queue = channel.queue('task_queue', durable: true)

  # 一度に取得するメッセージは1つ
  channel.prefetch(1)

  # メッセージを取り出すようにする
  # 取得したメッセージは、自動的に削除しない。
  queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
    STDERR.puts "START '#{body}'"
    sleep body.sub(/^message.*:/, '').to_i
    STDERR.puts "END   '#{body}'"
    # 取得したメッセージを削除して良いことをRabbitMQに伝える
    channel.ack(delivery_info.delivery_tag)
  end

  # 無限ループで待ち続ける
  while 1
    sleep 10
    unless connection.should_retry_recovery?
      STDERR.puts 'retry failured now stopping worker...'
      break
    end
  end
rescue Interrupt => _ # Ctrl + C で終了
  connection.close
end

まとめ

  • RabbitMQサーバーを停止しても Bunny は停止しないで再接続を試みる。
  • RabbitMQサーバーが再開したら、再開したところから、Bunnyはメッセージを取り出して処理を継続する。
  • 停止する方法がイマイチなので、もっと良いスマートな方法があったら教えてください。

追記

RabbitMQサーバーから停止したときに retry せずに停止するので良ければ、以下のオプション
:automatic_recovery:recover_from_connection_close を指定すると良さそうです。

worker.rb
connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'password',
  automatic_recovery: false, # 自動リカバリしない
  recover_from_connection_close: false #接続が切られたときのリカバリをしない
)