Help us understand the problem. What is going on with this article?

PlayでOSコマンドの出力をリアルタイムに返したい

More than 3 years have passed since last update.

Play Framework Streamingとかで検索するとこういうページが出てきて、

https://www.playframework.com/documentation/2.3.x/ScalaStream

def index = Action {

  val data = getDataStream
  val dataContent: Enumerator[Array[Byte]] = Enumerator.fromStream(data)

  Ok.chunked(dataContent)
}

とやると Content-Encoding: chuked でレスポンスを返すことができるらしい。

ふむふむ、さてコマンドの実行結果をchunkedで送るにはとなると初心者には厳しい。

まずコマンドの実行は

"ls" !
Process("xargs test -z") run()

など色々方法があり、Enumeratorにするにはどうするのかよくわからない。

PlayCLI

https://github.com/gre/playCLI

import playcli._
Ok.chunked(CLI.enumerate("tail -f aLogFile"))

こんな感じで使えるライブラリ。まさにやりたかったことができてる。

ただ、ちょっと放置気味なライブラリだし、CLI.enumerate(Process("ls") #| Process("tee /tmp/log.txt")) などとやるとwarningが出るので、自分で実装しようと思った。

InputStreamを使う

Process("ls") lineStream

などとやると Stream[String] というのが得られるらしい

これと Enumerator.fromStream というのを組み合わせられるのかと思いきや、Enumerator.fromStream はJavaのInputStreamのことであってScalaでいうところのStreamとは別物だそうな。

結論から言うと、こんなふうにやった。

object ProcessEnumerator {
  def apply(process: ProcessBuilder): Enumerator[Array[Byte]] = {
    val in = new PipedInputStream()
    val out = new PipedOutputStream(in)
    Future(process.#>(out).run())
    Enumerator.fromStream(in)
  }
}

これでコマンドの実行結果がEnumeratorになり、Content-Encoding: chunkedで送れるようになった。

もしかしたら

Enumerator.outputStream { out => Future(process.#>(out).run()) }

という手もあったのかもしれないが試してないのでわからない。

(さらっと書いているが、ここに至るまでProcessとProcessBuilderのことやEnumeratorとかStreamとInputStreamについてだいぶ調べた)

ハマりどころ

世の中には Content-Encoding: chunked をきちんと扱えないプロキシがあって、nginxですらデフォルトでは(設定ファイルを書かないと)いけない。

プロキシでバッファリングされるケースもある。Playからはストリーミングしているつもりなのに、クライアントには全然流れてこないケース。

あとハマったのが、UNIXのコマンドは ターミナル出力でない場合は 4096バイトずつバッファリングされる。ターミナル出力の場合は行バッファなのでちゃんと動いているように見えても、ウェブを介するとなぜか詰まるということになる。これはlibcレベルの問題で、まっとうな方法で解決するのはけっこうたいへん。検索してみると、ターミナル出力に見せかけるためのコマンド使う方法などが見つかった。自分はscriptコマンドを使った。Mac, Linux両対応なのと、標準エラー出力を標準出力にリダイレクトしてくれるため。

WebSocket対応

プロキシの問題があったのでContent-Encoding: chunkedは諦めてWebSocketでやることにした。

幸いなことに、EnumeratorになってしまえばWebSocketにするのは簡単

…と思ってた時期が僕にもありました。

問題は、Enumerator.fromStreamでできるEnumeratorは、 Enumerator[Array[Byte]] だということ。

これだと、WebSocketのBinaryFrameで送られちゃうので、JavaScriptからはBlobやArrayBufferでしか受け取れない。JSでBlobからStringに変換するのは結構面倒なので、きちんとテキストとして返したい。

Enumerator[Array[Byte]]Enumerator[String] に変換できるラッパーを書いたら良かったのだけど、そこまでは調べきれなかったので、コマンドの出力を行ごとに読んで出力するという方向でやることにした。

object ProcessEnumerator {
  def apply(process: ProcessBuilder): Enumerator[String] = {
    val in = new PipedInputStream()
    val out = new PipedOutputStream(in)
    Future(process.#>(out).run())

    val reader = new BufferedReader(new InputStreamReader(in, "UTF-8"))

    Enumerator.fromCallback1[String](_ => Future {
      reader.readLine match {
        case line: String => Some(line + "\n")
        case _ => None
      }
    }, { () => // oncomplete
      in.close()
    })
  }
}

Enumerator.fromCallback1という、第一引数の関数をひたすら実行してくれてEnumeratorを作れるものがあった。

詳しくはこのあたりをどうぞ。

追記

色々試行錯誤してたらもっと簡単にできた。

object ProcessEnumerator {
  def apply(process: ProcessBuilder): Enumerator[String] = {
    Enumerator.enumerate[String](
      process.lineStream_!(ProcessLogger(line => ())).map { line => line + "\n" }
    )
  }
}
uzabase
企業活動の意思決定を支える情報インフラの提供
https://www.uzabase.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away