6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

shoryukenのconcurrencyと実際のスレッド数が違っていたので調べた

Last updated at Posted at 2018-09-20

AWS SQSからメッセージを取得して処理してくれるワーカーのshoryukenと非同期処理ライブラリであるconcurrent-rubyについて調べてみました。

バージョンは以下です。

  • ruby 2.5.1
  • shoryuken (3.2.3)
  • concurrent-ruby (1.0.5)

tl; dr

  • スレッドを制御しているのはconcurrent-ruby
  • shoryukenconcurrencyは並列処理数は制限しているが実際のスレッド数と直接は関係ない
  • スレッドは必要になれば増えて、使ってなければ勝手に削除される

背景

shoryukenでスレッド毎のログを出したいと思って、以下のようなコードでログを出力するようにしてみたところconfig/shoryuken.ymlで設定したconcurrency以上のログが作成されました。
concurrency = スレッド数だと思っていたので不思議に思いコードを読んでみました。

必要なところ以外は省略してます。

shoryuken.yml
concurrency: 4
delay: 60
queues:
  - [test_queue, 1]
  - [hoge_queue, 1]
  - [fuga_queue, 1]
  - [piyo_queue, 1]

test_worker.rb
class TestWorker
  include Shoryuken::Worker
  
  # ...
  
  def perform
  	thread_logger.info('perform!')
  end
  
  def thread_logger
    ActiveSupport::Logger.new("log/worker.thread.#{thread_number}.log")
  end

  def thread_number
    Thread.current.to_s.match(/Thread\:([\da-zA-Z]+)/)[1]
  end
end

これでshoryukenを実行すると以下のようにconcurrency以上のログが出ます。

スクリーンショット 2018-09-19 16.55.03.png

ソースコードリーディング

shoryuken

shoryukenの処理はざっくりと以下のような流れになってます。

  1. RAILS_ENV=development bundle exec shoryuken -R -C config/shoryuken.ymlとかを実行
  2. Shoryuken::CLI::RunnerShoryuken::Runnerのインスタンスを実行
  3. Shoryuken::RunnerShoryuken::Launcherを呼ぶ
  4. Shoryuken::LauncherShoryuken::Managerを作成し実行
  5. Shoryuken::Managerは無限ループ内でSQSの各キューからメッセージを受信してShoryuken::Processor(自分で定義したワーカー)に処理を委譲

5では無限ループ内でconcurrent-rubyによる非同期処理が行われます。

以下、主な部分のコードだけ乗っけます。


Shoryuken::Runner

Shoryuken::Launcher.newしてstartしているくらいなのでスルー。


Shoryuken::Launcher

shoryuken-3.2.3/lib/shoryuken/launcher.rb
module Shoryuken
  class Launcher
    include Util

    def start
      # ...
      
      start_managers
    end

    def start_managers
      @managers.each do |manager|
        Concurrent::Future.execute { manager.start }
      end
    end
    
    def executor
      @_executor ||= Shoryuken.launcher_executor || Concurrent.global_io_executor
    end

    def create_managers
      Shoryuken.groups.map do |group, options|
        Shoryuken::Manager.new(
          Shoryuken::Fetcher.new(group),
          Shoryuken.polling_strategy(group).new(options[:queues]),
          options[:concurrency],
          executor
        )
      end
    end
  end
end  

Shoryuken::Managerインスタンスを作成してConcurrent::Future.executeの中で実行しています。

Concurrent::Futureconcurrent-rubyが提供している並列実行クラスで
executeメソッドは渡されたブロックをスレッドセーフに非同期実行できます。
参考

https://techracho.bpsinc.jp/hachi8833/2018_08_07/60339

manager.startでは無限ループ内でスレッドを呼び出しているので上記メソッドを使用していると思われます。

Shoryuken::Managerのイニシャライザには以下を定義してます。

  • Shoryuken::Fetcher - SQSとの通信処理クラス
  • Shoryuken.polling_strategy - ポーリング戦略クラス
  • concurrency - 並列実行数
  • executor - 非同期処理を実現するクラス

Shoryuken::Manager

shoryukenの動作の本質的な部分を担ってます。具体処理は各クラスに移譲しているのでまさしくManagerです。
以下はメッセージのバッチ取得ではなく単一取得処理に絞ってます。

shoryuken-3.2.3/lib/shoryuken/manager.rb
module Shoryuken
  class Manager
    include Util

    BATCH_LIMIT = 10
    # See https://github.com/phstc/shoryuken/issues/348#issuecomment-292847028
    MIN_DISPATCH_INTERVAL = 0.1
    
    def initialize(fetcher, polling_strategy, concurrency, executor)
      @fetcher          = fetcher
      @polling_strategy = polling_strategy
      @max_processors   = concurrency
      @busy_processors  = Concurrent::AtomicFixnum.new(0)
      @executor         = executor
      @running          = Concurrent::AtomicBoolean.new(true)
    end

    def start
      dispatch_loop
    end
    
    def dispatch_loop
      return unless running?

      @executor.post { dispatch }
    end
    
    def dispatch
      return unless running?

      if ready <= 0 || (queue = @polling_strategy.next_queue).nil?
        return sleep(MIN_DISPATCH_INTERVAL)
      end

      fire_event(:dispatch, false, queue_name: queue.name)

      logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

      batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
    rescue => ex
      handle_dispatch_error(ex)
    ensure
      dispatch_loop
    end
    
    def busy
      @busy_processors.value
    end

    def ready
      @max_processors - busy
    end
    
    def processor_done
      @busy_processors.decrement
    end

    def assign(queue_name, sqs_msg)
      return unless running?

      logger.debug { "Assigning #{sqs_msg.message_id}" }

      @busy_processors.increment

      Concurrent::Promise.execute(
        executor: @executor
      ) { Processor.process(queue_name, sqs_msg) }.then { processor_done }.rescue { processor_done }
    end

    def dispatch_single_messages(queue)
      messages = @fetcher.fetch(queue, ready)

      @polling_strategy.messages_found(queue.name, messages.size)
      messages.each { |message| assign(queue.name, message) }
    end
  end
end

Shoryuken::Manager@busy_processorsという変数の整数値によって並列性を制御しています
readybusy等も参照しているが、readybusyの値に依っているので実質1つの変数で管理)。

大まかに言えば、次々メッセージを取得してワーカーにタスクがアサインされたらreadyが-1、busyが+1、
タスクが完了したらreadyが+1、busyが-1されます。

readyが0の場合はsleepするのでこれによって並列実行数が制限されることになります。

@fetcherの中身はShoryuken::Fetcherfetchメソッドは指定にかかわらずバッチで取得するようになっていて、第二引数がメッセージ最大受信数なのでreadyを渡すことでアサインできるだけのメッセージを取得しています。

処理の流れは、まずdispatch_loopで無限ループさせて、その中でキューからのメッセージ取得と取得したメッセージをShoryuken::Processorにアサインする処理(dispatchメソッド)を@executorに行わせています。
実はこの@executorの中身はConcurrent::RubyExecutorServicecrubyの場合)でこの中でThread.newしています。

ちなみにShoryuken::Processorではユーザが定義したワーカー(test_worker.rb)のperformメソッドが呼ばれています。


ここまで見てきてスレッドを作成する処理はconcurrent-rubyに移譲されていたのでそちらを見ていきます。

concurrent-ruby

並列処理のためのライブラリで、これ自体はshoryukenが無くてもrailsアプリであればactivesupportがすでに一緒にインストールされています。

こっちもざっくり処理の流れを追います。

  1. Shoryuken::Managerが無限ループ内で@executor.postを呼び出す
  2. Concurrent::RubyExecutorService#postが呼び出され、それを継承したクラスであるConcurrent::RubyThreadPoolExecutor#ns_executeが実行される
  3. RubyThreadPoolExecutorns_executeによってタスクが渡される度にワーカー(Thread.new)があればアサイン、無ければ起動してアサインする
  4. アサインされたタスクが完了したらワーカーはプールされ、次のタスクがアサインされるまで待機します

大体こんな感じになっています。


Concurrent::RubyExecutorService

色々なクラスを継承していて、crubyjrubyなどの処理系によっても継承クラスが変わったりしますが
LockableObjectという特徴的な名前のクラスを継承しているのでロック可能なオブジェクトなんだろうと思います(そのまま)。
実際オブジェクトをロックしてから処理を行うMutex#synchronizeを使ったりしていました。

concurrent-ruby-1.0.5/lib/concurrent/executor/ruby_executor_service.rb
module Concurrent
  class RubyExecutorService < AbstractExecutorService
    safe_initialization!

    def post(*args, &task)
      raise ArgumentError.new('no block given') unless block_given?
      synchronize do
        # If the executor is shut down, reject this task
        return handle_fallback(*args, &task) unless running?
        ns_execute(*args, &task)
        true
      end
    end
  end
end

主に呼ばれるのはpostメソッドでその中でこのクラスを継承したConcurrent::RubyThreadPoolExecutor#ns_executeを呼んでいます。
主な処理はそちらになります。

safe_initialization!はクラスをロードしたタイミングでスレッドセーフに初期化処理を行っている、と理解しました。


Concurrent::RubyThreadPoolExecutor

上記のクラスを継承しています。
スレッドプールを管理してスレッドを追加したり削除したりということをしています。

concurrent-ruby-1.0.5/lib/concurrent/executor/ruby_thread_pool_executor.rb
module Concurrent
  class RubyThreadPoolExecutor < RubyExecutorService
  
    def ns_execute(*args, &task)
      ns_reset_if_forked

      if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
        @scheduled_task_count += 1
      else
        handle_fallback(*args, &task)
      end

      ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
    end
  
    def ns_assign_worker(*args, &task)
      # keep growing if the pool is not at the minimum yet
      worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
      if worker
        worker << [task, args]
        true
      else
        false
      end
    rescue ThreadError
      # Raised when the operating system refuses to create the new thread
      return false
    end

    def ns_add_busy_worker
      return if @pool.size >= @max_length

      @pool << (worker = Worker.new(self))
      @largest_length = @pool.length if @pool.length > @largest_length
      worker
    end
    
    def ns_prune_pool
      return if @pool.size <= @min_length

      last_used = @ready.shift
      last_used << :idle_test if last_used

      @next_gc_time = Concurrent.monotonic_time + @gc_interval
    end
  end
end

メインになるのはこの辺りです。
最初にns_executeが呼ばれるとns_assign_worker -> ns_add_busy_workerという流れで
ビジー状態のワーカーが@poolに追加されます。@pool.size@min_length以上になるまで追加が行われます。

ワーカーが十分にプールされている場合は@readyから待機状態のWorkerインスタンスを取得します。
待機状態のワーカーがない場合はまたns_add_busy_workerによって追加されます。

また、ns_executeでは一定時間ごとに最後にGCのためにns_prune_poolを呼んでいます。

@readyの先頭のワーカーに:idle_testという特別なメッセージを割り振り、
このメッセージをワーカーが受け取ると、ワーカーがアイドル状態の時間を計算し
設定した時間を超えていた場合はプールから削除し、超えてない場合はまた先頭に戻す、ということを行っています。
つまり一定時間ごとに古いワーカーを削除しているということになります。

@readyについて補足すると、ワーカーは基本的に末尾に追加され、末尾のワーカーからアサインされていきます。
つまり先頭にあるほど古いワーカーということになります。

@readyを操作するメソッド一覧

  • ns_assign_worker: @ready.pop 末尾から取得
  • ns_ready_worker: @ready.push 末尾に追加
  • ns_worker_not_old_enough: @ready.unshift 先頭に追加
  • ns_prune_pool: @ready.shift 先頭から取得

ワーカーのプール数が@max_lengthに達すると以下のように@queueにタスクをエンキューします。
ワーカーが処理を完了するとns_ready_workerが呼ばれ、@readyに戻るタイミングで@queueからタスクが取り出されアサインされます。

    def ns_enqueue(*args, &task)
      if !ns_limited_queue? || @queue.size < @max_queue
        @queue << [task, args]
        true
      else
        false
      end
    end
    
    def ns_ready_worker(worker, success = true)
      task_and_args = @queue.shift
      if task_and_args
        worker << task_and_args
      else
        # stop workers when !running?, do not return them to @ready
        if running?
          @ready.push(worker)
        else
          worker.stop
        end
      end
    end

しかしns_enqueueshoryukenを通して呼ぶ限りはほぼ呼ばれない処理になります(理由は後述します)。


Concurrent::RubyThreadPoolExecutor::Worker

また、ワーカーの実態はThreadで同ファイルに定義されています。

concurrent-ruby-1.0.5/lib/concurrent/executor/ruby_thread_pool_executor.rb
module Concurrent
  class RubyThreadPoolExecutor < RubyExecutorService
    # ...
    
    class Worker
      include Concern::Logging

      def initialize(pool)
        # instance variables accessed only under pool's lock so no need to sync here again
        @queue  = Queue.new
        @pool   = pool
        @thread = create_worker @queue, pool, pool.idletime
      end
      
      # ...

      def create_worker(queue, pool, idletime)
        Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
          last_message = Concurrent.monotonic_time
          catch(:stop) do
            loop do

              case message = my_queue.pop
              when :idle_test
                if (Concurrent.monotonic_time - last_message) > my_idletime
                  my_pool.remove_busy_worker(self)
                  throw :stop
                else
                  my_pool.worker_not_old_enough(self)
                end

              when :stop
                my_pool.remove_busy_worker(self)
                throw :stop

              else
                task, args = message
                run_task my_pool, task, args
                last_message = Concurrent.monotonic_time

                my_pool.ready_worker(self)
              end
            end
          end
        end
      end

      def run_task(pool, task, args)
        task.call(*args)
        pool.worker_task_completed
      rescue => ex
        # let it fail
        log DEBUG, ex
      rescue Exception => ex
        log ERROR, ex
        pool.worker_died(self)
        throw :stop
      end
    end
  end
end

Thread.newされるとthrow :stopされるまで無限ループを繰り返してタスクを処理するようになっています。

ところで@max_lengthはいくつになっているのでしょうか。
shoryuken.ymlconcurrencyを4と設定したのでそうなっているかと思いきや、実際は2147483647という値になっていました。これはintの最大値です。

もう一度shoryukenからこのConcurrent::RubyThreadPoolExecutorを呼び出す部分を確認します。

shoryuken-3.2.3/lib/shoryuken/launcher.rb
module Shoryuken
  class Launcher
  	# ...
  	
    def executor
      @_executor ||= Shoryuken.launcher_executor || Concurrent.global_io_executor
    end
    
    # ...
  end
end

Shoryuken.launcher_executorを定義していなければConcurrent.global_io_executorが呼ばれます。
これを追うとConcurrent::RubyThreadPoolExecutorを呼び出す時に以下のように引数が渡されていました。

concurrent-ruby-1.0.5/lib/concurrent/configuration.rb
module Concurrent
  # ...
  
  def self.new_io_executor(opts = {})
    ThreadPoolExecutor.new(
        min_threads:     [2, Concurrent.processor_count].max,
        max_threads:     ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
        # max_threads:     1000,
        auto_terminate:  opts.fetch(:auto_terminate, true),
        idletime:        60, # 1 minute
        max_queue:       0, # unlimited
        fallback_policy: :abort # shouldn't matter -- 0 max queue
    )
  end
end

つまりmin_threadsは2かプロセッサ数(MacBookProだったので4)のうち大きい方、max_threadsConcurrent::ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE(2147483647)としてインスタンス化しています。
shoryukenconcurrency自体はconcurrent-rubyには渡されていません。
concurrencyはあくまでSQSからメッセージを受信してワーカーにアサインという処理の最大数を指定しているだけで、スレッドの数自体はconcurrent-rubyのデフォルトの設定に依っています。

そして最低スレッド数は実行している環境のリソースによっても変わることが分かります。

先程見たようにconcurrent-rubyはタスクが振られたらready状態のワーカーが無ければ追加、
そうでなくても古いワーカーは削除して新しく追加していくのでスレッド自体はどんどん新しくなります。
(デフォルトではidletimeを2で割った値 = 30秒が@gc_intervalなので30秒に1度GCを行おうとしています)

concurrencyとスレッド数の関係

ここでconcurrencyとスレッド数がどういう関係にあるのかを調べるため
以下のようにgemに直接ログを仕込んで試してみました。

concurrent-ruby-1.0.5/lib/concurrent/executor/ruby_thread_pool_executor.rb
    def ns_execute(*args, &task)
      ns_reset_if_forked

      if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
        @scheduled_task_count += 1
      else
        handle_fallback(*args, &task)
      end

      File.open('/Users/kurashita/Desktop/pool.txt', 'a') do |f|
        f.puts "@ready: #{@ready.size} @pool: #{@pool.size}"
      end

      ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
    end

処理するメッセージの数は大体200〜300、処理内容はほぼログを出すだけです。
キューは4つです。

結果

以下はアイドル時の値です。メッセージを処理し始めると@readyの値は下がります。

concurrencyが1の場合
@ready: 1〜2@pool: 4

concurrencyが4の場合
@ready: 4〜5@pool: 6〜7

concurrencyが10の場合
@ready: 10〜12@pool: 12〜14

concurrencyが20の場合
@ready: 〜20@pool: 20〜22

大体concurrency + 1〜2程度のスレッドがready状態にあることが多かったです。
concurrencyの値が大きいと@readyの値が安定せず常に何かしらのタスクを処理していることが多かったです。

その他わかったこと

  • メッセージが多く処理が集中するとワーカーが追加される傾向がある
  • その後アイドル時にns_prune_poolされて一定の数に戻る
  • @pool@readyの最大値よりも2つ多い
  • concurrencyを極端に多くするとshoryukenのプロセスは生きたまま処理が止まることがあった(エラーは出ない、ロックされているような感じ)

とりあえずこんなところです。
また分かり次第追記していきます。

6
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?