AWS SQSからメッセージを取得して処理してくれるワーカーのshoryukenと非同期処理ライブラリであるconcurrent-rubyについて調べてみました。
バージョンは以下です。
ruby 2.5.1
shoryuken (3.2.3)
concurrent-ruby (1.0.5)
tl; dr
- スレッドを制御しているのは
concurrent-ruby
-
shoryuken
のconcurrency
は並列処理数は制限しているが実際のスレッド数と直接は関係ない - スレッドは必要になれば増えて、使ってなければ勝手に削除される
背景
shoryukenでスレッド毎のログを出したいと思って、以下のようなコードでログを出力するようにしてみたところconfig/shoryuken.yml
で設定したconcurrency
以上のログが作成されました。
concurrency = スレッド数
だと思っていたので不思議に思いコードを読んでみました。
必要なところ以外は省略してます。
concurrency: 4
delay: 60
queues:
- [test_queue, 1]
- [hoge_queue, 1]
- [fuga_queue, 1]
- [piyo_queue, 1]
class TestWorker
include Shoryuken::Worker
# ...
def perform
thread_logger.info('perform!')
end
def thread_logger
ActiveSupport::Logger.new("log/worker.thread.#{thread_number}.log")
end
def thread_number
Thread.current.to_s.match(/Thread\:([\da-zA-Z]+)/)[1]
end
end
これでshoryuken
を実行すると以下のようにconcurrency
以上のログが出ます。
ソースコードリーディング
shoryuken
shoryukenの処理はざっくりと以下のような流れになってます。
-
RAILS_ENV=development bundle exec shoryuken -R -C config/shoryuken.yml
とかを実行 -
Shoryuken::CLI::Runner
でShoryuken::Runner
のインスタンスを実行 -
Shoryuken::Runner
がShoryuken::Launcher
を呼ぶ -
Shoryuken::Launcher
がShoryuken::Manager
を作成し実行 -
Shoryuken::Manager
は無限ループ内でSQSの各キューからメッセージを受信してShoryuken::Processor
(自分で定義したワーカー)に処理を委譲
5では無限ループ内でconcurrent-ruby
による非同期処理が行われます。
以下、主な部分のコードだけ乗っけます。
Shoryuken::Runner
Shoryuken::Launcher.new
してstart
しているくらいなのでスルー。
Shoryuken::Launcher
module Shoryuken
class Launcher
include Util
def start
# ...
start_managers
end
def start_managers
@managers.each do |manager|
Concurrent::Future.execute { manager.start }
end
end
def executor
@_executor ||= Shoryuken.launcher_executor || Concurrent.global_io_executor
end
def create_managers
Shoryuken.groups.map do |group, options|
Shoryuken::Manager.new(
Shoryuken::Fetcher.new(group),
Shoryuken.polling_strategy(group).new(options[:queues]),
options[:concurrency],
executor
)
end
end
end
end
Shoryuken::Manager
インスタンスを作成してConcurrent::Future.execute
の中で実行しています。
Concurrent::Future
はconcurrent-ruby
が提供している並列実行クラスで
execute
メソッドは渡されたブロックをスレッドセーフに非同期実行できます。
参考
manager.start
では無限ループ内でスレッドを呼び出しているので上記メソッドを使用していると思われます。
Shoryuken::Manager
のイニシャライザには以下を定義してます。
-
Shoryuken::Fetcher
- SQSとの通信処理クラス -
Shoryuken.polling_strategy
- ポーリング戦略クラス -
concurrency
- 並列実行数 -
executor
- 非同期処理を実現するクラス
Shoryuken::Manager
shoryukenの動作の本質的な部分を担ってます。具体処理は各クラスに移譲しているのでまさしくManagerです。
以下はメッセージのバッチ取得ではなく単一取得処理に絞ってます。
module Shoryuken
class Manager
include Util
BATCH_LIMIT = 10
# See https://github.com/phstc/shoryuken/issues/348#issuecomment-292847028
MIN_DISPATCH_INTERVAL = 0.1
def initialize(fetcher, polling_strategy, concurrency, executor)
@fetcher = fetcher
@polling_strategy = polling_strategy
@max_processors = concurrency
@busy_processors = Concurrent::AtomicFixnum.new(0)
@executor = executor
@running = Concurrent::AtomicBoolean.new(true)
end
def start
dispatch_loop
end
def dispatch_loop
return unless running?
@executor.post { dispatch }
end
def dispatch
return unless running?
if ready <= 0 || (queue = @polling_strategy.next_queue).nil?
return sleep(MIN_DISPATCH_INTERVAL)
end
fire_event(:dispatch, false, queue_name: queue.name)
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
rescue => ex
handle_dispatch_error(ex)
ensure
dispatch_loop
end
def busy
@busy_processors.value
end
def ready
@max_processors - busy
end
def processor_done
@busy_processors.decrement
end
def assign(queue_name, sqs_msg)
return unless running?
logger.debug { "Assigning #{sqs_msg.message_id}" }
@busy_processors.increment
Concurrent::Promise.execute(
executor: @executor
) { Processor.process(queue_name, sqs_msg) }.then { processor_done }.rescue { processor_done }
end
def dispatch_single_messages(queue)
messages = @fetcher.fetch(queue, ready)
@polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| assign(queue.name, message) }
end
end
end
Shoryuken::Manager
は@busy_processors
という変数の整数値によって並列性を制御しています
(ready
やbusy
等も参照しているが、ready
はbusy
の値に依っているので実質1つの変数で管理)。
大まかに言えば、次々メッセージを取得してワーカーにタスクがアサインされたらready
が-1、busy
が+1、
タスクが完了したらready
が+1、busy
が-1されます。
ready
が0の場合はsleep
するのでこれによって並列実行数が制限されることになります。
@fetcher
の中身はShoryuken::Fetcher
でfetch
メソッドは指定にかかわらずバッチで取得するようになっていて、第二引数がメッセージ最大受信数なのでready
を渡すことでアサインできるだけのメッセージを取得しています。
処理の流れは、まずdispatch_loop
で無限ループさせて、その中でキューからのメッセージ取得と取得したメッセージをShoryuken::Processor
にアサインする処理(dispatch
メソッド)を@executor
に行わせています。
実はこの@executor
の中身はConcurrent::RubyExecutorService
(cruby
の場合)でこの中でThread.new
しています。
ちなみにShoryuken::Processor
ではユーザが定義したワーカー(test_worker.rb
)のperform
メソッドが呼ばれています。
ここまで見てきてスレッドを作成する処理はconcurrent-ruby
に移譲されていたのでそちらを見ていきます。
concurrent-ruby
並列処理のためのライブラリで、これ自体はshoryuken
が無くてもrailsアプリであればactivesupportがすでに一緒にインストールされています。
こっちもざっくり処理の流れを追います。
-
Shoryuken::Manager
が無限ループ内で@executor.post
を呼び出す -
Concurrent::RubyExecutorService#post
が呼び出され、それを継承したクラスであるConcurrent::RubyThreadPoolExecutor#ns_execute
が実行される -
RubyThreadPoolExecutor
はns_execute
によってタスクが渡される度にワーカー(Thread.new
)があればアサイン、無ければ起動してアサインする - アサインされたタスクが完了したらワーカーはプールされ、次のタスクがアサインされるまで待機します
大体こんな感じになっています。
Concurrent::RubyExecutorService
色々なクラスを継承していて、cruby
やjruby
などの処理系によっても継承クラスが変わったりしますが
LockableObject
という特徴的な名前のクラスを継承しているのでロック可能なオブジェクトなんだろうと思います(そのまま)。
実際オブジェクトをロックしてから処理を行うMutex#synchronize
を使ったりしていました。
module Concurrent
class RubyExecutorService < AbstractExecutorService
safe_initialization!
def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
synchronize do
# If the executor is shut down, reject this task
return handle_fallback(*args, &task) unless running?
ns_execute(*args, &task)
true
end
end
end
end
主に呼ばれるのはpost
メソッドでその中でこのクラスを継承したConcurrent::RubyThreadPoolExecutor#ns_execute
を呼んでいます。
主な処理はそちらになります。
safe_initialization!
はクラスをロードしたタイミングでスレッドセーフに初期化処理を行っている、と理解しました。
Concurrent::RubyThreadPoolExecutor
上記のクラスを継承しています。
スレッドプールを管理してスレッドを追加したり削除したりということをしています。
module Concurrent
class RubyThreadPoolExecutor < RubyExecutorService
def ns_execute(*args, &task)
ns_reset_if_forked
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
else
handle_fallback(*args, &task)
end
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
end
def ns_assign_worker(*args, &task)
# keep growing if the pool is not at the minimum yet
worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
if worker
worker << [task, args]
true
else
false
end
rescue ThreadError
# Raised when the operating system refuses to create the new thread
return false
end
def ns_add_busy_worker
return if @pool.size >= @max_length
@pool << (worker = Worker.new(self))
@largest_length = @pool.length if @pool.length > @largest_length
worker
end
def ns_prune_pool
return if @pool.size <= @min_length
last_used = @ready.shift
last_used << :idle_test if last_used
@next_gc_time = Concurrent.monotonic_time + @gc_interval
end
end
end
メインになるのはこの辺りです。
最初にns_execute
が呼ばれるとns_assign_worker
-> ns_add_busy_worker
という流れで
ビジー状態のワーカーが@pool
に追加されます。@pool.size
が@min_length
以上になるまで追加が行われます。
ワーカーが十分にプールされている場合は@ready
から待機状態のWorker
インスタンスを取得します。
待機状態のワーカーがない場合はまたns_add_busy_worker
によって追加されます。
また、ns_execute
では一定時間ごとに最後にGCのためにns_prune_pool
を呼んでいます。
@ready
の先頭のワーカーに:idle_test
という特別なメッセージを割り振り、
このメッセージをワーカーが受け取ると、ワーカーがアイドル状態の時間を計算し
設定した時間を超えていた場合はプールから削除し、超えてない場合はまた先頭に戻す、ということを行っています。
つまり一定時間ごとに古いワーカーを削除しているということになります。
@ready
について補足すると、ワーカーは基本的に末尾に追加され、末尾のワーカーからアサインされていきます。
つまり先頭にあるほど古いワーカーということになります。
@ready
を操作するメソッド一覧
-
ns_assign_worker
:@ready.pop
末尾から取得 -
ns_ready_worker
:@ready.push
末尾に追加 -
ns_worker_not_old_enough
:@ready.unshift
先頭に追加 -
ns_prune_pool
:@ready.shift
先頭から取得
ワーカーのプール数が@max_length
に達すると以下のように@queue
にタスクをエンキューします。
ワーカーが処理を完了するとns_ready_worker
が呼ばれ、@ready
に戻るタイミングで@queue
からタスクが取り出されアサインされます。
def ns_enqueue(*args, &task)
if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
else
false
end
end
def ns_ready_worker(worker, success = true)
task_and_args = @queue.shift
if task_and_args
worker << task_and_args
else
# stop workers when !running?, do not return them to @ready
if running?
@ready.push(worker)
else
worker.stop
end
end
end
しかしns_enqueue
はshoryuken
を通して呼ぶ限りはほぼ呼ばれない処理になります(理由は後述します)。
Concurrent::RubyThreadPoolExecutor::Worker
また、ワーカーの実態はThread
で同ファイルに定義されています。
module Concurrent
class RubyThreadPoolExecutor < RubyExecutorService
# ...
class Worker
include Concern::Logging
def initialize(pool)
# instance variables accessed only under pool's lock so no need to sync here again
@queue = Queue.new
@pool = pool
@thread = create_worker @queue, pool, pool.idletime
end
# ...
def create_worker(queue, pool, idletime)
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
last_message = Concurrent.monotonic_time
catch(:stop) do
loop do
case message = my_queue.pop
when :idle_test
if (Concurrent.monotonic_time - last_message) > my_idletime
my_pool.remove_busy_worker(self)
throw :stop
else
my_pool.worker_not_old_enough(self)
end
when :stop
my_pool.remove_busy_worker(self)
throw :stop
else
task, args = message
run_task my_pool, task, args
last_message = Concurrent.monotonic_time
my_pool.ready_worker(self)
end
end
end
end
end
def run_task(pool, task, args)
task.call(*args)
pool.worker_task_completed
rescue => ex
# let it fail
log DEBUG, ex
rescue Exception => ex
log ERROR, ex
pool.worker_died(self)
throw :stop
end
end
end
end
Thread.new
されるとthrow :stop
されるまで無限ループを繰り返してタスクを処理するようになっています。
ところで@max_length
はいくつになっているのでしょうか。
shoryuken.yml
でconcurrency
を4と設定したのでそうなっているかと思いきや、実際は2147483647という値になっていました。これはintの最大値です。
もう一度shoryuken
からこのConcurrent::RubyThreadPoolExecutor
を呼び出す部分を確認します。
module Shoryuken
class Launcher
# ...
def executor
@_executor ||= Shoryuken.launcher_executor || Concurrent.global_io_executor
end
# ...
end
end
Shoryuken.launcher_executor
を定義していなければConcurrent.global_io_executor
が呼ばれます。
これを追うとConcurrent::RubyThreadPoolExecutor
を呼び出す時に以下のように引数が渡されていました。
module Concurrent
# ...
def self.new_io_executor(opts = {})
ThreadPoolExecutor.new(
min_threads: [2, Concurrent.processor_count].max,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
# max_threads: 1000,
auto_terminate: opts.fetch(:auto_terminate, true),
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :abort # shouldn't matter -- 0 max queue
)
end
end
つまりmin_threads
は2かプロセッサ数(MacBookProだったので4)のうち大きい方、max_threads
はConcurrent::ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE
(2147483647)としてインスタンス化しています。
shoryuken
のconcurrency
自体はconcurrent-ruby
には渡されていません。
concurrency
はあくまでSQSからメッセージを受信してワーカーにアサインという処理の最大数を指定しているだけで、スレッドの数自体はconcurrent-ruby
のデフォルトの設定に依っています。
そして最低スレッド数は実行している環境のリソースによっても変わることが分かります。
先程見たようにconcurrent-ruby
はタスクが振られたらready状態のワーカーが無ければ追加、
そうでなくても古いワーカーは削除して新しく追加していくのでスレッド自体はどんどん新しくなります。
(デフォルトではidletime
を2で割った値 = 30秒が@gc_interval
なので30秒に1度GCを行おうとしています)
concurrencyとスレッド数の関係
ここでconcurrency
とスレッド数がどういう関係にあるのかを調べるため
以下のようにgemに直接ログを仕込んで試してみました。
def ns_execute(*args, &task)
ns_reset_if_forked
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
else
handle_fallback(*args, &task)
end
File.open('/Users/kurashita/Desktop/pool.txt', 'a') do |f|
f.puts "@ready: #{@ready.size} @pool: #{@pool.size}"
end
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
end
処理するメッセージの数は大体200〜300、処理内容はほぼログを出すだけです。
キューは4つです。
結果
以下はアイドル時の値です。メッセージを処理し始めると@ready
の値は下がります。
concurrency
が1の場合
@ready: 1〜2
、@pool: 4
concurrency
が4の場合
@ready: 4〜5
、@pool: 6〜7
concurrency
が10の場合
@ready: 10〜12
、@pool: 12〜14
concurrency
が20の場合
@ready: 〜20
、@pool: 20〜22
大体concurrency
+ 1〜2程度のスレッドがready状態にあることが多かったです。
concurrency
の値が大きいと@ready
の値が安定せず常に何かしらのタスクを処理していることが多かったです。
その他わかったこと
- メッセージが多く処理が集中するとワーカーが追加される傾向がある
- その後アイドル時に
ns_prune_pool
されて一定の数に戻る -
@pool
は@ready
の最大値よりも2つ多い -
concurrency
を極端に多くするとshoryuken
のプロセスは生きたまま処理が止まることがあった(エラーは出ない、ロックされているような感じ)
とりあえずこんなところです。
また分かり次第追記していきます。