LoginSignup
8
4

More than 5 years have passed since last update.

Ruby の RabbitMQクライアントのBunnyってスゴイと思った話

Last updated at Posted at 2018-06-12

はじめに

前回、「RabbitMQからRubyで(同時に2つの)メッセージを取り出して処理する」で worker.rb が動き続けている途中で、RabbitMQが死んだらどうなるのかと思って実験してみました。

RabbitMQサービスを途中で停止して再開してみる

worker.rb を動作させて、RabbitMQサービスを停止して、また再開してみました。

$ ruby worker.rb
START 'message-0:7'
END   'message-0:7'
START 'message-1:1'
END   'message-1:1'
START 'message-2:19' # ← このあと RabbitMQ サービス停止
W, [2018-06-07T16:26:09.336672 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Recovering from connection.close (CONNECTION_FORCED - broker forced connection closure with reason 'shutdown')
W, [2018-06-07T16:26:09.338514 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (no retry limit)...
W, [2018-06-07T16:26:19.345854 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Retrying connection on next host in line: rabbitmq_server:5672
W, [2018-06-07T16:26:20.347697 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Could not establish TCP connection to rabbitmq_server:5672: Connection refused - connect(2) for rabbitmq_server:5672
W, [2018-06-07T16:26:20.347790 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: TCP connection failed, reconnecting in 5.0 seconds
W, [2018-06-07T16:26:20.347815 #93678]  WARN -- #<Bunny::Session:0x7feac101f988 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (no retry limit)...
...
START 'message-2:19' # ← ここで RabbitMQ サービス再開
END   'message-2:19'
...

予想では、例外が起こって停止すると思ってたのですが、そうはならなかったです。サービスが再開するまで待ち続けるんですね。自前で例外処理を拾う必要があると思っていたのですが、そんなことしなくて良いみたいです。

試しにRabbitMQのサーバーごと再起動した場合も同じでした。

worker_multi_thread.rb の場合も同様でした。

これらの動作は、 Bunny.new するときのオプションで、ある程度変更できそうです。

指定回数だけリトライする

recovoery_attempts を指定します。

connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'xxxxx',
  recovery_attempts: 3
)

こうすると3回まではリトライしますが、その後はリトライしません。また、リトライの間にRabbitMQを再起動すれば処理は再開しますが、3回リトライしたあとは、サービスを再起動しても処理を再開しません。
ですが、例外も発生しません。

...  # 最後は、ERRORになる。
E, [2018-06-11T16:29:42.988276 #61657] ERROR -- #<Bunny::Session:0x7ff8cd0cb520 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Ran out of recovery attempts (limit set to 3)

リトライの間隔を変更する

network_recovery_interval を指定します。 以下の例だと15秒間隔で再接続を試みます。

worker.rb
connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'xxxxx',
  recovery_attempts: 3,
  network_recovery_interval: 15
)

3回リトライして失敗したらプログラムを終了させる

worker.rb は、無限ループで回り続けるので、3回リトライしても終了しません。また、3回リトライして失敗したら、例外を発生させて停止するようにしたかったのですが、その方法がわかりませんでした。
コードを見る限り は、リトライ回数の上限に達したら、logにエラーメッセージを出力するだけっぽいんですよねー。

ということで、 worker.rb の while ループの中を、 should_retry_recovery? を使って書き換えてみます。

worker.rb
  # 無限ループで待ち続ける
  while 1
    sleep 10
    unless connection.should_retry_recovery?
      STDERR.puts 'retry failured now stopping worker...'
      break
    end
  end

worker.rb を実行して、その実行中にRabbitMQを停止してみたところ、うまく停止しました。

...
W, [2018-06-12T09:32:35.167906 #31447]  WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Could not establish TCP connection to rabbitmq_server:5672: Connection refused - connect(2) for rabbitmq_server:5672
W, [2018-06-12T09:32:35.168018 #31447]  WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: TCP connection failed, reconnecting in 15 seconds
W, [2018-06-12T09:32:35.168060 #31447]  WARN -- #<Bunny::Session:0x7fd88c14f1d8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>: Will recover from a network failure (1 out of 3 left)...
retry failured now stopping worker... # ← これで停止しました

全体のコードは以下の通りです。

worker.rb
require 'bunny'

connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'password',
  recovery_attempts: 3,
  network_recovery_interval: 15
)
connection.start

begin
  channel = connection.create_channel
  queue = channel.queue('task_queue', durable: true)

  # 一度に取得するメッセージは1つ
  channel.prefetch(1)

  # メッセージを取り出すようにする
  # 取得したメッセージは、自動的に削除しない。
  queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
    STDERR.puts "START '#{body}'"
    sleep body.sub(/^message.*:/, '').to_i
    STDERR.puts "END   '#{body}'"
    # 取得したメッセージを削除して良いことをRabbitMQに伝える
    channel.ack(delivery_info.delivery_tag)
  end

  # 無限ループで待ち続ける
  while 1
    sleep 10
    unless connection.should_retry_recovery?
      STDERR.puts 'retry failured now stopping worker...'
      break
    end
  end
rescue Interrupt => _ # Ctrl + C で終了
  connection.close
end

まとめ

  • RabbitMQサーバーを停止しても Bunny は停止しないで再接続を試みる。
  • RabbitMQサーバーが再開したら、再開したところから、Bunnyはメッセージを取り出して処理を継続する。
  • 停止する方法がイマイチなので、もっと良いスマートな方法があったら教えてください。

追記

RabbitMQサーバーから停止したときに retry せずに停止するので良ければ、以下のオプション
:automatic_recovery:recover_from_connection_close を指定すると良さそうです。

worker.rb
connection = Bunny.new(
  host: 'rabbitmq_server',
  vhost: '/nest',
  user: 'peter',
  password: 'password',
  automatic_recovery: false, # 自動リカバリしない
  recover_from_connection_close: false #接続が切られたときのリカバリをしない
)
8
4
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4