はじめに
前回の記事「RubyからRabbitMQにメッセージを送信する」で、Rubyから送信する方法を試してみましたが、今回は、メッセージをRubyで取り出すことをやってみたいと思います。
メッセージの取り出しは1つのプロセスから、取り出す場合と2つのプロセスから取り出す場合と1つのプロセスで2つのスレッドから取り出す場合をやってみます。
1つのプロセスからメッセージを取り出す
何も考えずにメッセージを取り出すスクリプトです。
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
delivery_info, metadata, payload = queue.pop
p delivery_info
p metadata
p payload
connection.close
実行結果は、以下のようになります。
$ ruby worker.rb
{:delivery_tag=>#<Bunny::VersionedDeliveryTag:0x0000562b7809aa20 @tag=1, @version=0>, :redelivered=>true, :exchange=>"", :routing_key=>"task_queue", :channel=>#<Bunny::Channel:47372348815340 @id=1 @connection=#<Bunny::Session:0x562b780b1ba8 peter@rabbitmq_server:5672, vhost=/nest, addresses=[rabbitmq_server:5672]>>}
{:content_type=>"application/octet-stream", :delivery_mode=>2, :priority=>0}
"message-0"
rabbitmq サーバー側で確認すると1つメッセージが消えていることがわかります。
$ sudo rabbitmqctl list_queues -p /nest name messages messages_persistent
Timeout: 60.0 seconds ...
Listing queues for vhost /nest ...
task_queue 9 9
1つのプロセスからメッセージを待ち受けて取り出す
ただ、取り出すだけだと面白くないので、rabbitmqにメッセージが入ったら取り出すようにしてみたいと思います。
Bunny::Channel#subscribe
を使います。
引数に manual_ack:true
を指定して自動的にメッセージを削除しないようにします。
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
begin
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
# 一度に取得するメッセージは1つ
channel.prefetch(1)
# rabbitmq にメッセージが届けば、それを取り出す
# 取得したメッセージは、自動的に削除しない。
queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
STDERR.puts "'#{body}'"
# 取得したメッセージを削除して良いことをRabbitMQに伝える
channel.ack(delivery_info.delivery_tag)
end
# 無限ループで待ち続ける。
sleep 10 while true
rescue Interrupt => _ # Ctrl + C で終了
connection.close
end
コンソールから worker.rb を実行してみます。
$ ruby worker.rb
'message-1'
'message-2'
'message-3'
'message-4'
'message-5'
'message-6'
'message-7'
'message-8'
'message-9'
別のコンソールから RabbitMQ にメッセージを送信する(前回の記事「RubyからRabbitMQにメッセージを送信する」を参照してください)と直ちにメッセージが worker.rb 側で取り出されます。
2つのプロセスでメッセージを取り出す
今度は、もう1つ別のコンソールを複数立ち上げて、worker.rb を実行してから、RabbitMQに送信してみます。
$ ruby worker.rb
'message-0'
'message-2'
'message-4'
'message-6'
'message-8'
$ ruby worker.rb
'message-1'
'message-3'
'message-5'
'message-7'
'message-9'
2つのプロセスにメッセージが別れて取り出されていることが確認できました。
2つのプロセスからメッセージを取り出して処理させる
これだけだとつまらないので、時間のかかる作業をやらせてみたいと思います。送信側で乱数を発生させて送信し、メッセージ受信側は、その乱数の秒数だけ sleep するようにします。
メッセージ送信側は、 rand(20)
をつけて送信するだけです。
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
channel = connection.create_channel
# queue を永続化するために durable: true をセット
queue = channel.queue('task_queue', durable: true)
# 10個のメッージを送る
10.times do |i|
# メッセージの最後に20までの乱数をつける
message = "message-#{i}:#{rand(20)}" # ← 修正箇所
# メッセージも消えないように永続化する
queue.publish(message, persistent: true)
end
channel.close
connection.close
メッセージ受信側では、dataから乱数を抜き出して sleep するようにします。
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
begin
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
# 一度に取得するメッセージは1つ
channel.prefetch(1)
# メッセージを取り出すようにする
# 取得したメッセージは、自動的に削除しない。
queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
STDERR.puts "START '#{body}'" # ← 修正箇所
# 乱数秒だけ sleep する
sleep body.sub(/^message.*:/, '').to_i # ← 修正箇所
STDERR.puts "END '#{body}'" # ← 修正箇所
# 取得したメッセージを削除して良いことをRabbitMQに伝える
channel.ack(delivery_info.delivery_tag)
end
# 無限ループで待ち続ける
sleep 10 while true
rescue Interrupt => _ # Ctrl + C で終了
connection.close
end
2つのコンソールから worker.rb を起動して、RabbitMQに送信してみます。
$ ruby worker.rb
START 'message-0:18'
END 'message-0:18'
START 'message-3:1'
END 'message-3:1'
START 'message-4:17'
END 'message-4:17'
START 'message-7:2'
END 'message-7:2'
START 'message-8:1'
END 'message-8:1'
START 'message-9:3'
END 'message-9:3'
$ ruby worker.rb
START 'message-1:2'
END 'message-1:2'
START 'message-2:18'
END 'message-2:18'
START 'message-5:11'
END 'message-5:11'
START 'message-6:12'
END 'message-6:12'
ちょっとわかりにくいかもですが、コンソール1で message-0
を処理している間に、コンソール2は、 message-1
の処理が終わって、message-2
を処理しています。
プロセス1つで2つのThreadでメッセージを取り出して処理する
今度は、worker.rb のプロセス1つで メッセージを取り出して処理する Thread を2つに増やしてみたいと思います。
require 'bunny'
connection = Bunny.new(host: 'rabbitmq_server', vhost: '/nest', user: 'peter', password: 'xxxxx')
connection.start
puts Thread.main # ← 修正箇所。メインスレッドを表示する。
begin
2.times do # ← 追加 2つのThreadで処理させるため、2回繰り返す。
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
# 一度に取得するメッセージは1つ
channel.prefetch(1)
# メッセージを取り出すようにする
# 取得したメッセージは、自動的に削除しない。
queue.subscribe(manual_ack: true) do |delivery_info, _properties, body|
STDERR.puts "START #{Thread.current.to_s.gsub(/@.*\z/, '>')} '#{body}'" # ← 修正箇所。カレントスレッドを表示する
sleep body.sub(/^message.*:/, '').to_i
STDERR.puts "END #{Thread.current.to_s.gsub(/@.*\z/, '>')} '#{body}'" # ← 修正箇所。カレントスレッドを表示する。
# 取得したメッセージを削除して良いことをRabbitMQに伝える
channel.ack(delivery_info.delivery_tag)
end
end
# 無限ループで待ち続ける
sleep 10 while true
rescue Interrupt => _ # Ctrl + C で終了
connection.close
end
1つのコンソールから worker_multi_thread.rb のみ実行してみます。
$ ruby worker_multi_thread.rb
#<Thread:0x00007fa3c887d7c8 run>
START #<Thread:0x00007fa3c89d5b48> 'message-0:1'
START #<Thread:0x00007fa3c89e4a58> 'message-1:10'
END #<Thread:0x00007fa3c89d5b48> 'message-0:1'
START #<Thread:0x00007fa3c89d5b48> 'message-2:9'
END #<Thread:0x00007fa3c89e4a58> 'message-1:10'
START #<Thread:0x00007fa3c89e4a58> 'message-3:1'
END #<Thread:0x00007fa3c89d5b48> 'message-2:9'
START #<Thread:0x00007fa3c89d5b48> 'message-4:15'
END #<Thread:0x00007fa3c89e4a58> 'message-3:1'
START #<Thread:0x00007fa3c89e4a58> 'message-5:15'
END #<Thread:0x00007fa3c89d5b48> 'message-4:15'
START #<Thread:0x00007fa3c89d5b48> 'message-6:6'
END #<Thread:0x00007fa3c89e4a58> 'message-5:15'
START #<Thread:0x00007fa3c89e4a58> 'message-7:5'
END #<Thread:0x00007fa3c89d5b48> 'message-6:6'
END #<Thread:0x00007fa3c89e4a58> 'message-7:5'
START #<Thread:0x00007fa3c89d5b48> 'message-8:18'
START #<Thread:0x00007fa3c89e4a58> 'message-9:18'
END #<Thread:0x00007fa3c89e4a58> 'message-9:18'
END #<Thread:0x00007fa3c89d5b48> 'message-8:18'
メインスレッド Thread:0x00007fa3c887d7c8
の他に2つのスレッド Thread:0x00007fa3c89d5b48
と Thread:0x00007fa3c89e4a58
があることがわかります。
ちょっとわかりにくいかもですが、2つのスレッドが同時に動いてメッセージを処理しています(message-0 と message-1 が同時に取り出されて処理がスタートしています)。
まとめ
複数のRubyスクリプトのプロセスからRabbitMQのメッセージを取り出す例を書いてみました。
また、1つのプロセスで複数のスレッドからRabbitMQのメッセージを取り出す例を書いてみました。
どちらも同時並行で処理していることが確認できました。