2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

IO.select処理で役に立つセルフパイプテクニック

Last updated at Posted at 2020-11-03

はじめに

IO.select処理をしていて困った問題としてブロッキング問題があると思います。ブロッキングとは、バッファにデータが全くなくてデータ読み込み処理で待機が発生したり、バッファにデータがあるが、読み込みたいバイト数のデータがなくてデータ読み込み処理が待機したりすることで起きる現象です(書き込みも同様)。IO.select処理でブロッキングが発生したら後続の処理が待たされてしまい非効率です。この問題を解決する方法としてセルフパイプテクニックがあります。セルフパイプテクニックの歴史は1990年頃から始まり、現在も私たちが普段使っているライブラリーで使われてます。セルフパイプテクニック

IO.selectでブロッキングが起きる例と解決例

例えば、rubyでIO.select処理を行いたい場合は、IO.pipeというものを使いこのように書くことができます。これはブロッキングが起きてしまう例です。

例1
eg_1 = -> {
 async_heavy_process = -> { puts "heavy" }
 
 r, w = IO.pipe
 fork { sleep 5; w.puts "hoge" }
 
 IO.select([r])
 async_heavy_process.()
 puts r.gets 
}
eg_1.()

5秒立たないと、async_heavy_process処理が走らなく効率が悪いプログラムになってしまってます。非同期で実行される重い処理は早く実行したいものです。では問題を解決する事を考えましょう。IO.selectに渡されたIOオブジェクトの中に常に準備完了状態のものがあればIO.selectでブロッキングされることは無くなります。 このアイディアの元生まれたのがセルフパイプテクニックです。ではこのアイディアを埋め込んだ例1の修正版のコード例2をみてみましょう。

例2
eg_2 = -> {
  async_heavy_process = -> { puts "heavy"; }
  self_reader, self_writer = IO.pipe
  self_writer.puts 0

  r, w = IO.pipe
  fork { sleep 5; w.puts "hoge" }

  IO.select([r, self_reader])
  async_heavy_process.()
  puts r.gets
}
eg_2.()

実行したらすぐに"heavy"が表示されてIO.selectの部分でブロッキングされない事がわります。他にも用途があると思うがもし知っていたら教えて欲しいです。

セルフパイプテクニックが使われているライブラリ例

例で紹介した例は非常にシンプルで実用性に乏しいので最後に短なライブラリーでこのセルフパイプテクニックがどのように使われているのかを紹介します。

  • foreman
  • unicorn

foreman

foremanは起動したいコマンドを定義したProcfileを読み取り、マルチプロセスで実行、各プロセスで発生した標準出力(標準エラー)はパイプを通してメインプロセスで起動しているプログラムに渡されて、標準出力で表示されるツールです。具体的にはこんな感じのもの

Procfile
app: sleep 5 && echo 'app' && exit 1;          # 子プロセス1
web: while :; do sleep 1 && echo 'web'; done;  # 子プロセス2
bash
$ foreman start
00:57:43 app.1  | started with pid 21149              # メインプロセス/メインスレッドで出力
00:57:43 web.1  | started with pid 21150              # メインプロセス/メインスレッドで出力
00:57:44 web.1  | web                                 # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:45 web.1  | web                                 # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:46 web.1  | web                                 # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:47 web.1  | web                                 # 子プロセス2にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:48 app.1  | app                                 # 子プロセス1にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:48 web.1  | web                                 # 子プロセス1にwriterを渡して書き込ませ、readerを通して、メインプロセス/スレッド2で出力
00:57:48 app.1  | exited with code 1                  # メインプロセス/スレッド2で子プロセス1の終了を確認
00:57:48 system | sending SIGTERM to all processes    # メインプロセス/メインスレッドから子プロセスへSIGTERMが送られた時の出力(windowsだとSIGKILL)
00:57:48 web.1  | terminated by SIGTERM               # メインプロセス/メインスレッドから全ての子プロセスのterminatedを確認した時の出力

である。でどこの処理で使われているかというと、子プロセスに渡したパイプからの標準出力(標準エラー)を取得する処理(watch_for_output)で使われている。コードでいえばここである。

  # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L406-L420
  def watch_for_output
    Thread.new do
      begin
        loop do
          io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)
          read_self_pipe
          handle_signals
          handle_io(io ? io.first : [])
        end
      rescue Exception => ex
        puts ex.message
        puts ex.backtrace
      end
    end
  end
io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)

もしこのセルフパイプがなければどうなるかというと、何らかの原因でIO.selectが永久的にブロッキングされてしまったら、watch_for_outputの後続の処理である子プロセスの終了確認チェック処理(wait_for_shutdown_or_child_termination)が実行されなくなってしまうという事である。それはforemanでは子プロセスをkillする事ができなくなる事を意味しており最悪の事態です。

 # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L54-L63
 def start
    register_signal_handlers
    startup
    spawn_processes
    watch_for_output
    sleep 0.1
    wait_for_shutdown_or_child_termination
    shutdown
    exit(@exitstatus) if @exitstatus
  end

unicorn

こちらはコードを丁寧に読んでないのであまり詳しくは言及しませんがコードをgrepするとセルフパイプが使われている事がわかります。

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L82-L91

    # We use @self_pipe differently in the master and worker processes:
    #
    # * The master process never closes or reinitializes this once
    # initialized.  Signal handlers in the master process will write to
    # it to wake up the master from IO.select in exactly the same manner
    # djb describes in https://cr.yp.to/docs/selfpipe.html
    #
    # * The workers immediately close the pipe they inherit.  See the
    # Unicorn::Worker class for the pipe workers use.
    @self_pipe = []

そして多分この処理でブロッキングを回避するのに役立ってると思います。

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L748

  def worker_loop(worker)
   #
   # 省略
   #
   ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
   #
   # 省略
   #
  end

興味があったらコードを読んでみたらいいと思います。

まとめ

IO.select処理で発生するブロッキングを回避するためのセルフパイプテクニックを紹介しました。普段お世話になっているforemanやunicornなどのライブラリーで結構使われているテクニックなのでこの際習得してものにしてみたらどうでしょうか。日本での記事が全くなかったので記事にしてみました。至らぬところはあるかと思いますが役に立つと幸いです。

参考

2
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?