15
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

アクトインディAdvent Calendar 2015

Day 9

test-queueのソース読んでみた

Last updated at Posted at 2015-12-09

この記事はアクトインディ Advent Calendar 2015 10日目になります。

さっそくですが、いこーよはテスティングフレームワークにrspec(3.1.0)を使ってます。
exmpleは2275個もあって開発PC(MacbookPro15 2015)で並列テストを実行すると8分くらいかかってしまっています。遅いです。
放熱を頑張ることで下がっているクロックを取り戻しテスト実行時間短縮を考えていましたがCPUの発熱が激しぎて何やっても効果が見込めない感じがしたのでペルチェ素子の購入を最後に放熱は諦めました。本当にありがとうございました。

damedeshita.png

test-queueのREADMEを読むと分散モードというのがあるようで、これは良いソリューションになるのでは?と思いとりあえずソース読むことにしました。

test-queueについて

テストを並列実行してくれるライブラリです。
masterからforkしたプロセスをworkerとして使って暇になったworkerにテストを振っていきます。

遅いテストから順番に消化するのでworkerの終了時間にばらつきが出にくくなってます。
実行するテストの全情報をすべてのworkerが別々のメモリ空間で保持しているのと、workerプロセス + masterプロセス分が必要なのでメモリをよく食う気がします。

test-queueがやっているだいたいのこと

単一ホストの複数workerで動かしたとします。

  • テストを遅い順番に並び替える
  • 環境変数を読み込む
  • masterプロセスを開始する
    • UnixSocketをopen
  • forkでworkerプロセスを生成する
    • 環境変数TEST_QUEUE_WORKERS か CPUコアの数だけworkerを生成する
  • masterがworkerからの接続を待つ(A)
  • workerがmasterへUnixSocket経由でPOPという文字を送信する
  • POPを受信したmasterは1つのテストをUnixSocket経由でworkerへ送信する
  • テストを受け取ったworkerはテストを実行する
  • masterは終了したworkerがいないかチェックをする
    • 終了したworkerがあったらmasterがworkerの後始末をする
  • 実行待ちテストがなくなるまで(A)に戻る
  • masterを停止する
    • UnixSocketのclose
  • 終了

test-queueのやっていることを詳しく

テストを遅い順番に並び替える

lib/test_queue/runner/rspec.rb:18

module TestQueue
  class Runner
    class RSpec < Runner
      def initialize
        @rspec = ::RSpec::Core::QueueRunner.new
        super(@rspec.example_groups.sort_by{ |s| -(stats[s.to_s] || 0) })
      end
[...]

#statsの戻り値は、前回のテスト実行終了時に保存しておいたテスト毎に掛かった時間のハッシュを返すのでここで遅い順番に並び替えています。
.test_queue_statsというファイルに前回のテスト実行終了時の情報を保存してます。

workerを生成する

lib/test_queue/runner.rb:219

def spawn_workers
  @concurrency.times do |i|
    num = i+1
    pid = fork do
      @server.close if @server
      iterator = Iterator.new(relay?? @relay : @socket, @suites, method(:around_filter))
      after_fork_internal(num, iterator)
      ret = run_worker(iterator) || 0
      cleanup_worker
      Kernel.exit! ret
    end
[...]

@concurrencyの数だけループしてworkerを生成しています。
ここでは、EnumerableをincludeしたIteratorクラスをインスタンス化しており、そのIterator#eachmasterとSocket通信するコードが書かれています。
トリッキーな感じがしましたが、テスティングフレームワークに左右されない共通処理をまとめるためだと思われます。
ちなみにIterator#each#run_workerの中で呼ばれています。

masterがworkerにテストを振ってworkerがテストを実行する

lib/test_queue/runner.rb:318

# master側
until @queue.empty? && remote_workers == 0
  if IO.select([@server], nil, nil, 0.1).nil? # 
    reap_worker(false) if @workers.any? # check for worker deaths
  else
    sock = @server.accept
    cmd = sock.gets.strip
    case cmd
    when /^POP/
      if obj = @queue.shift
        data = Marshal.dump(obj.to_s)
        sock.write(data) # <= workerへテストを1つ送信してる
      end

workerの準備が整う(#run_worker実行してる)とmasterへPOPと送信します。するとmasterはworkerへ1つのテストを返信します。

lib/test_queue/iterator.rb:18

# worker側
def each
  fail "already used this iterator. previous caller: #@done" if @done

  while true
    client = connect_to_master('POP')
    break if client.nil?
    r, w, e = IO.select([client], nil, [client], nil)
    break if !e.empty?
    if data = client.read(65536)
      client.close
      item = Marshal.load(data)
      break if item.nil? || item.empty?
      suite = @suites[item]

      $0 = "#{@procline} - #{suite.respond_to?(:description) ? suite.description : suite}"
      start = Time.now
      if @filter
        @filter.call(suite){ yield suite } # <= ここでテストを実行してる
      else
        yield suite
          end
[...]

workerはテストを受け取ってyeildでテストを実行します。
workerは一度走りだすとnilを受け取るまでmasterにPOPを送信し続けます。
masterがworkerに何もを送信しない時は実行待ちテストが尽きた時です。workerがnilを受け取ると消化したテストを一時ファイルに書き込んでworkerプロセスは役目を終えます。

masterは、workerにテストを振りつつもも終了したworkerがいないかチェックもしています。終了したworkerがいた場合はそのworkerが終了時に残した実行結果が書かれた一時ファイルの収集と削除を行います。(#reap_worker)

workerが全部なくなると、workerから収集したテスト結果をmasterが標準出力に出力して並列テストは終了します。

分散モードについて

slaveホストとして起動したテストはmasterプロセスでSocketをオープンせず、外にあるホストのmasterプロセスに接続しにいくようになります。masterホストから見ると別のホストでテストを実行できるようになります。

環境変数TEST_QUEUE_SOCKETに設定されている値によってUnixSocketかTCPSocketのどちらかを作成します。
分散モードで実行したい(TCPSocketをオープンしたい)場合は、masterホスト側でTEST_QUEUE_SOCKETに自身のIP:Portの設定が必要です。
slaveホスト側では、環境変数TEST_QUEUE_RELAYにmasterホストのIP:Portを設定します。
また、masterとslaveに共通の環境変数TEST_QUEUE_RELAY_TOKENを設定する必要があります。

動かしてみます

TEST_QUEUE_SOCKET=192.168.1.100:12345 bundle exec minitest-queue ./test/sample_minitest5.rb &
sleep 0.1
TEST_QUEUE_RELAY=192.168.1.100:12345 bundle exec minitest-queue ./test/sample_minitest5.rb
wait

slaveホストは、masterのmasterプロセスに接続できるまでリトライするのでどちらから実行しても問題はないですが、masterホスト側を先に実行します。
masterホストが順番待ちテストを1つずつworkerに振っているフェーズ(#distribute_queue)に入ると、slaveホストがTCP経由でテストを受けとってテストを実行してくれます。
以降の流れは単体モードとだいたいと同じです。

ということで分散モードというのは、slaveホストの数に制限はなく、slaveホストをたくさん増やすとその分早くなりそうです。
単体モードだとI/Oがボトルネックになりがちなのでこれは便利そうです。

以上。

15
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?