12
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PlayでOSコマンドの出力をリアルタイムに返したい

Last updated at Posted at 2014-11-27

Play Framework Streamingとかで検索するとこういうページが出てきて、

def index = Action {

  val data = getDataStream
  val dataContent: Enumerator[Array[Byte]] = Enumerator.fromStream(data)
  
  Ok.chunked(dataContent)
}

とやると Content-Encoding: chuked でレスポンスを返すことができるらしい。

ふむふむ、さてコマンドの実行結果をchunkedで送るにはとなると初心者には厳しい。

まずコマンドの実行は

"ls" !
Process("xargs test -z") run()

など色々方法があり、Enumeratorにするにはどうするのかよくわからない。

PlayCLI

import playcli._
Ok.chunked(CLI.enumerate("tail -f aLogFile"))

こんな感じで使えるライブラリ。まさにやりたかったことができてる。

ただ、ちょっと放置気味なライブラリだし、CLI.enumerate(Process("ls") #| Process("tee /tmp/log.txt")) などとやるとwarningが出るので、自分で実装しようと思った。

InputStreamを使う

Process("ls") lineStream

などとやると Stream[String] というのが得られるらしい

これと Enumerator.fromStream というのを組み合わせられるのかと思いきや、Enumerator.fromStream はJavaのInputStreamのことであってScalaでいうところのStreamとは別物だそうな。

結論から言うと、こんなふうにやった。

object ProcessEnumerator {
  def apply(process: ProcessBuilder): Enumerator[Array[Byte]] = {
    val in = new PipedInputStream()
    val out = new PipedOutputStream(in)
    Future(process.#>(out).run())
    Enumerator.fromStream(in)
  }
}

これでコマンドの実行結果がEnumeratorになり、Content-Encoding: chunkedで送れるようになった。

もしかしたら

Enumerator.outputStream { out => Future(process.#>(out).run()) }

という手もあったのかもしれないが試してないのでわからない。

(さらっと書いているが、ここに至るまでProcessとProcessBuilderのことやEnumeratorとかStreamとInputStreamについてだいぶ調べた)

ハマりどころ

世の中には Content-Encoding: chunked をきちんと扱えないプロキシがあって、nginxですらデフォルトでは(設定ファイルを書かないと)いけない。

プロキシでバッファリングされるケースもある。Playからはストリーミングしているつもりなのに、クライアントには全然流れてこないケース。

あとハマったのが、UNIXのコマンドは ターミナル出力でない場合は 4096バイトずつバッファリングされる。ターミナル出力の場合は行バッファなのでちゃんと動いているように見えても、ウェブを介するとなぜか詰まるということになる。これはlibcレベルの問題で、まっとうな方法で解決するのはけっこうたいへん。検索してみると、ターミナル出力に見せかけるためのコマンド使う方法などが見つかった。自分はscriptコマンドを使った。Mac, Linux両対応なのと、標準エラー出力を標準出力にリダイレクトしてくれるため。

WebSocket対応

プロキシの問題があったのでContent-Encoding: chunkedは諦めてWebSocketでやることにした。

幸いなことに、EnumeratorになってしまえばWebSocketにするのは簡単

…と思ってた時期が僕にもありました。

問題は、Enumerator.fromStreamでできるEnumeratorは、 Enumerator[Array[Byte]] だということ。

これだと、WebSocketのBinaryFrameで送られちゃうので、JavaScriptからはBlobやArrayBufferでしか受け取れない。JSでBlobからStringに変換するのは結構面倒なので、きちんとテキストとして返したい。

Enumerator[Array[Byte]]Enumerator[String] に変換できるラッパーを書いたら良かったのだけど、そこまでは調べきれなかったので、コマンドの出力を行ごとに読んで出力するという方向でやることにした。

object ProcessEnumerator {
  def apply(process: ProcessBuilder): Enumerator[String] = {
    val in = new PipedInputStream()
    val out = new PipedOutputStream(in)
    Future(process.#>(out).run())

    val reader = new BufferedReader(new InputStreamReader(in, "UTF-8"))

    Enumerator.fromCallback1[String](_ => Future {
      reader.readLine match {
        case line: String => Some(line + "\n")
        case _ => None
      }
    }, { () => // oncomplete
      in.close()
    })
  }
}

Enumerator.fromCallback1という、第一引数の関数をひたすら実行してくれてEnumeratorを作れるものがあった。

詳しくはこのあたりをどうぞ。

追記

色々試行錯誤してたらもっと簡単にできた。

object ProcessEnumerator {
  def apply(process: ProcessBuilder): Enumerator[String] = {
    Enumerator.enumerate[String](
      process.lineStream_!(ProcessLogger(line => ())).map { line => line + "\n" }
    )
  }
}
12
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?