4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQからRubyで(同時に2つの)メッセージを取り出して処理する

Last updated at Posted at 2018-06-06

はじめに

前回の記事「RubyからRabbitMQにメッセージを送信する」で、Rubyから送信する方法を試してみましたが、今回は、メッセージをRubyで取り出すことをやってみたいと思います。

メッセージの取り出しは1つのプロセスから、取り出す場合と2つのプロセスから取り出す場合と1つのプロセスで2つのスレッドから取り出す場合をやってみます。

1つのプロセスからメッセージを取り出す

何も考えずにメッセージを取り出すスクリプトです。

worker.rb
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)

delivery_info, metadata, payload = queue.pop

p delivery_info
p metadata
p payload

connection.close

実行結果は、以下のようになります。

$ ruby worker.rb                                           
{:delivery_tag=>#<Bunny::VersionedDeliveryTag:0x0000562b7809aa20 @tag=1, @version=0>, :redelivered=>true, :exchange=>"", :routing_key=>"task_queue", :channel=>#<Bunny::Channel:47372348815340 @id=1 @connection=#<Bunny::Session:0x562b780b1ba8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>>}
{:content_type=>"application/octet-stream", :delivery_mode=>2, :priority=>0}
"message-0"

rabbitmq サーバー側で確認すると1つメッセージが消えていることがわかります。

$ sudo rabbitmqctl list_queues -p /nest name messages messages_persistent
Timeout: 60.0 seconds ...
Listing queues for vhost /nest ...
task_queue      9       9

1つのプロセスからメッセージを待ち受けて取り出す

ただ、取り出すだけだと面白くないので、rabbitmqにメッセージが入ったら取り出すようにしてみたいと思います。
Bunny::Channel#subscribe を使います。
引数に manual_ack:true を指定して自動的にメッセージを削除しないようにします。

worker.rb
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
begin
  channel = connection.create_channel
  queue = channel.queue('task_queue', durable: true)

  # 一度に取得するメッセージは1つ
  channel.prefetch(1)

  # rabbitmq にメッセージが届けば、それを取り出す
  # 取得したメッセージは、自動的に削除しない。
  queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
    STDERR.puts "'#{body}'"
    # 取得したメッセージを削除して良いことをRabbitMQに伝える
    channel.ack(delivery_info.delivery_tag)
  end
  # 無限ループで待ち続ける。
  sleep 10 while true
rescue Interrupt => _ # Ctrl + C で終了
  connection.close
end

コンソールから worker.rb を実行してみます。

コンソール1
$ ruby worker.rb
'message-1'
'message-2'
'message-3'
'message-4'
'message-5'
'message-6'
'message-7'
'message-8'
'message-9'

別のコンソールから RabbitMQ にメッセージを送信する(前回の記事「RubyからRabbitMQにメッセージを送信する」を参照してください)と直ちにメッセージが worker.rb 側で取り出されます。

2つのプロセスでメッセージを取り出す

今度は、もう1つ別のコンソールを複数立ち上げて、worker.rb を実行してから、RabbitMQに送信してみます。

コンソール1
$ ruby worker.rb
'message-0'
'message-2'
'message-4'
'message-6'
'message-8'
コンソール2
$ ruby worker.rb
'message-1'
'message-3'
'message-5'
'message-7'
'message-9'

2つのプロセスにメッセージが別れて取り出されていることが確認できました。

2つのプロセスからメッセージを取り出して処理させる

これだけだとつまらないので、時間のかかる作業をやらせてみたいと思います。送信側で乱数を発生させて送信し、メッセージ受信側は、その乱数の秒数だけ sleep するようにします。

メッセージ送信側は、 rand(20) をつけて送信するだけです。

send.rb
require 'bunny'

connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
channel = connection.create_channel

# queue を永続化するために durable: true をセット
queue = channel.queue('task_queue', durable: true)

# 10個のメッージを送る
10.times do |i|
  # メッセージの最後に20までの乱数をつける
  message = "message-#{i}:#{rand(20)}" # ← 修正箇所
  # メッセージも消えないように永続化する
  queue.publish(message, persistent: true)
end
channel.close
connection.close

メッセージ受信側では、dataから乱数を抜き出して sleep するようにします。

woker.rb
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
begin
  channel = connection.create_channel
  queue = channel.queue('task_queue', durable: true)

  # 一度に取得するメッセージは1つ
  channel.prefetch(1)

  # メッセージを取り出すようにする
  # 取得したメッセージは、自動的に削除しない。
  queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
    STDERR.puts "START '#{body}'"          # ← 修正箇所
    # 乱数秒だけ sleep する
    sleep body.sub(/^message.*:/, '').to_i # ← 修正箇所
    STDERR.puts "END   '#{body}'"          # ← 修正箇所
    # 取得したメッセージを削除して良いことをRabbitMQに伝える
    channel.ack(delivery_info.delivery_tag)
  end

  # 無限ループで待ち続ける
  sleep 10 while true
rescue Interrupt => _ # Ctrl + C で終了
  connection.close
end

2つのコンソールから worker.rb を起動して、RabbitMQに送信してみます。

コンソール1
$ ruby worker.rb
START 'message-0:18'
END   'message-0:18'
START 'message-3:1'
END   'message-3:1'
START 'message-4:17'
END   'message-4:17'
START 'message-7:2'
END   'message-7:2'
START 'message-8:1'
END   'message-8:1'
START 'message-9:3'
END   'message-9:3'
コンソール2
$ ruby worker.rb
START 'message-1:2'
END   'message-1:2'
START 'message-2:18'
END   'message-2:18'
START 'message-5:11'
END   'message-5:11'
START 'message-6:12'
END   'message-6:12'

ちょっとわかりにくいかもですが、コンソール1で message-0 を処理している間に、コンソール2は、 message-1 の処理が終わって、message-2 を処理しています。

プロセス1つで2つのThreadでメッセージを取り出して処理する

今度は、worker.rb のプロセス1つで メッセージを取り出して処理する Thread を2つに増やしてみたいと思います。

worker_multi_thread.rb
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
puts Thread.main # ← 修正箇所。メインスレッドを表示する。
begin
  2.times do # ← 追加 2つのThreadで処理させるため、2回繰り返す。
    channel = connection.create_channel
    queue = channel.queue('task_queue', durable: true)

    # 一度に取得するメッセージは1つ
    channel.prefetch(1)

    # メッセージを取り出すようにする
    # 取得したメッセージは、自動的に削除しない。
    queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
      STDERR.puts "START #{Thread.current.to_s.gsub(/@.*\z/, '>')} '#{body}'" # ← 修正箇所。カレントスレッドを表示する
      sleep body.sub(/^message.*:/, '').to_i
      STDERR.puts "END   #{Thread.current.to_s.gsub(/@.*\z/, '>')} '#{body}'" # ← 修正箇所。カレントスレッドを表示する。
      # 取得したメッセージを削除して良いことをRabbitMQに伝える
      channel.ack(delivery_info.delivery_tag)
    end
  end

  # 無限ループで待ち続ける
  sleep 10 while true
rescue Interrupt => _ # Ctrl + C で終了
  connection.close
end

1つのコンソールから worker_multi_thread.rb のみ実行してみます。

コンソール1
$ ruby worker_multi_thread.rb
#<Thread:0x00007fa3c887d7c8 run>
START #<Thread:0x00007fa3c89d5b48> 'message-0:1'
START #<Thread:0x00007fa3c89e4a58> 'message-1:10'
END   #<Thread:0x00007fa3c89d5b48> 'message-0:1'
START #<Thread:0x00007fa3c89d5b48> 'message-2:9'
END   #<Thread:0x00007fa3c89e4a58> 'message-1:10'
START #<Thread:0x00007fa3c89e4a58> 'message-3:1'
END   #<Thread:0x00007fa3c89d5b48> 'message-2:9'
START #<Thread:0x00007fa3c89d5b48> 'message-4:15'
END   #<Thread:0x00007fa3c89e4a58> 'message-3:1'
START #<Thread:0x00007fa3c89e4a58> 'message-5:15'
END   #<Thread:0x00007fa3c89d5b48> 'message-4:15'
START #<Thread:0x00007fa3c89d5b48> 'message-6:6'
END   #<Thread:0x00007fa3c89e4a58> 'message-5:15'
START #<Thread:0x00007fa3c89e4a58> 'message-7:5'
END   #<Thread:0x00007fa3c89d5b48> 'message-6:6'
END   #<Thread:0x00007fa3c89e4a58> 'message-7:5'
START #<Thread:0x00007fa3c89d5b48> 'message-8:18'
START #<Thread:0x00007fa3c89e4a58> 'message-9:18'
END   #<Thread:0x00007fa3c89e4a58> 'message-9:18'
END   #<Thread:0x00007fa3c89d5b48> 'message-8:18'

メインスレッド Thread:0x00007fa3c887d7c8 の他に2つのスレッド Thread:0x00007fa3c89d5b48Thread:0x00007fa3c89e4a58 があることがわかります。
ちょっとわかりにくいかもですが、2つのスレッドが同時に動いてメッセージを処理しています(message-0 と message-1 が同時に取り出されて処理がスタートしています)。

まとめ

複数のRubyスクリプトのプロセスからRabbitMQのメッセージを取り出す例を書いてみました。
また、1つのプロセスで複数のスレッドからRabbitMQのメッセージを取り出す例を書いてみました。
どちらも同時並行で処理していることが確認できました。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?