Play Framework Streamingとかで検索するとこういうページが出てきて、
def index = Action {
val data = getDataStream
val dataContent: Enumerator[Array[Byte]] = Enumerator.fromStream(data)
Ok.chunked(dataContent)
}
とやると Content-Encoding: chuked
でレスポンスを返すことができるらしい。
ふむふむ、さてコマンドの実行結果をchunkedで送るにはとなると初心者には厳しい。
まずコマンドの実行は
"ls" !
Process("xargs test -z") run()
など色々方法があり、Enumeratorにするにはどうするのかよくわからない。
PlayCLI
import playcli._
Ok.chunked(CLI.enumerate("tail -f aLogFile"))
こんな感じで使えるライブラリ。まさにやりたかったことができてる。
ただ、ちょっと放置気味なライブラリだし、CLI.enumerate(Process("ls") #| Process("tee /tmp/log.txt"))
などとやるとwarningが出るので、自分で実装しようと思った。
InputStreamを使う
Process("ls") lineStream
などとやると Stream[String]
というのが得られるらしい。
これと Enumerator.fromStream
というのを組み合わせられるのかと思いきや、Enumerator.fromStream
はJavaのInputStreamのことであってScalaでいうところのStreamとは別物だそうな。
結論から言うと、こんなふうにやった。
object ProcessEnumerator {
def apply(process: ProcessBuilder): Enumerator[Array[Byte]] = {
val in = new PipedInputStream()
val out = new PipedOutputStream(in)
Future(process.#>(out).run())
Enumerator.fromStream(in)
}
}
これでコマンドの実行結果がEnumeratorになり、Content-Encoding: chunkedで送れるようになった。
もしかしたら
Enumerator.outputStream { out => Future(process.#>(out).run()) }
という手もあったのかもしれないが試してないのでわからない。
(さらっと書いているが、ここに至るまでProcessとProcessBuilderのことやEnumeratorとかStreamとInputStreamについてだいぶ調べた)
ハマりどころ
世の中には Content-Encoding: chunked
をきちんと扱えないプロキシがあって、nginxですらデフォルトでは(設定ファイルを書かないと)いけない。
プロキシでバッファリングされるケースもある。Playからはストリーミングしているつもりなのに、クライアントには全然流れてこないケース。
あとハマったのが、UNIXのコマンドは ターミナル出力でない場合は 4096バイトずつバッファリングされる。ターミナル出力の場合は行バッファなのでちゃんと動いているように見えても、ウェブを介するとなぜか詰まるということになる。これはlibcレベルの問題で、まっとうな方法で解決するのはけっこうたいへん。検索してみると、ターミナル出力に見せかけるためのコマンド使う方法などが見つかった。自分はscriptコマンドを使った。Mac, Linux両対応なのと、標準エラー出力を標準出力にリダイレクトしてくれるため。
WebSocket対応
プロキシの問題があったのでContent-Encoding: chunked
は諦めてWebSocketでやることにした。
幸いなことに、EnumeratorになってしまえばWebSocketにするのは簡単。
…と思ってた時期が僕にもありました。
問題は、Enumerator.fromStream
でできるEnumeratorは、 Enumerator[Array[Byte]]
だということ。
これだと、WebSocketのBinaryFrameで送られちゃうので、JavaScriptからはBlobやArrayBufferでしか受け取れない。JSでBlobからStringに変換するのは結構面倒なので、きちんとテキストとして返したい。
Enumerator[Array[Byte]]
を Enumerator[String]
に変換できるラッパーを書いたら良かったのだけど、そこまでは調べきれなかったので、コマンドの出力を行ごとに読んで出力するという方向でやることにした。
object ProcessEnumerator {
def apply(process: ProcessBuilder): Enumerator[String] = {
val in = new PipedInputStream()
val out = new PipedOutputStream(in)
Future(process.#>(out).run())
val reader = new BufferedReader(new InputStreamReader(in, "UTF-8"))
Enumerator.fromCallback1[String](_ => Future {
reader.readLine match {
case line: String => Some(line + "\n")
case _ => None
}
}, { () => // oncomplete
in.close()
})
}
}
Enumerator.fromCallback1
という、第一引数の関数をひたすら実行してくれてEnumeratorを作れるものがあった。
詳しくはこのあたりをどうぞ。
追記
色々試行錯誤してたらもっと簡単にできた。
object ProcessEnumerator {
def apply(process: ProcessBuilder): Enumerator[String] = {
Enumerator.enumerate[String](
process.lineStream_!(ProcessLogger(line => ())).map { line => line + "\n" }
)
}
}