『Functional Programming in Scala』(FP in Scala)の「第15章: Stream processing and incremental I/O」に、いわゆる Streaming I/O についての解説があり、合成可能なProcess
を組み合わせて一連のデータのストリームを加工する技法が紹介されている。
fs2(Functional Streams for Scala)は、この Streaming I/O を Cats ベースで実装したもので、doobie や http4s などでも使われている。『FP in Scala』の Streaming I/O 実装とは若干違いがあるが、今回はこれを試してみる。
お題
『FP in Scala』でも、fs2 の README でも、華氏で温度が書かれたファイルを摂氏に変換する例が載っている。
ただし個人的には、プログラム開始時にすでに存在するデータ群の操作よりも、リアルタイムに発生して流れ込んでくるデータをリアクティブぽく処理する方が、なんとなく Streaming I/O の面白さがよく出る気がするので、ここでは同様の温度変換をコンソールベースのプログラムで試してみたい。
段取りとしては、
- まず標準入力を単にエコーするだけのコンソールアプリを最初に書いて、(エコー)
- これに少しずつコードを追加して華氏→摂氏変換ができるようにし、(変換)
- 最終的に状態を保たせて摂氏⇔華氏の二つのモードを切り替えられるようにしてみる(ステートフル)。
実装
- fs2-core、fs2-io の実装は 2.0.0 を使っている (2019/09/22 に 1.0.4 から変更)。
- 動くファイルはここに置いた
エコー
まずシンプルなエコーから始める。fs2-io を使って標準入力をそのまま標準出力に流すような Stream
を定義した。
def echo[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
io.stdin(4096, bl)
.through(io.stdout(bl)) // Stream[F, Byte]=>Stream[F, Unit]
through
は 受け取った関数を使ってストリームを変換するメソッドで、ここでは io.stdout
を用いて io.stdin
の Stream[F, Byte]
を Stream[F, Unit]
につなげている。
ちなみに ストリームの変換関数には下記のような型がある1。
型 | 定義 | 変換 | メモ |
---|---|---|---|
Pipe | Stream[F, I] => Stream[F, O] | through | 一般的なストリーム変換 |
Pipe2 | (Stream[F,I],Stream[F,I2])=>Stream[F,O] | through2 | ストリームの合流 |
io.stdin
、io.stdout
には Blocker
を渡す必要があるが2、ここでは利便性のため下記のような trait を作って、runWithEC
に関数を渡すと Blocker
が持つ ExecutionContext
上でストリームが評価されるようにしてみた。
trait StreamApp extends IOApp {
def runWithEC(s: Blocker => Stream[IO, Unit]): IO[ExitCode] =
Stream.resource(Blocker[IO]).flatMap(s).compile.drain.as(ExitCode.Success)
}
以下のように IOApp#run
を実装すると、コンソールアプリとして実行できるようになる。
def run(args: List[String]): IO[ExitCode] = runWithEC(echo[IO])
先のecho
はバイト列を入出力していたが、文字列として扱うには以下のようにデコード/エンコードを組み込む。
def echo[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
io.stdin(4096, bl)
.through(text.utf8Decode) // Stream[F, Byte] => Stream[F, String]
.through(text.lines) // Stream[F, String]=> Stream[F, String]
.map(line => s"$line\n")
.through(text.utf8Encode) // Stream[F, String]=> Stream[F, Byte]
.through(io.stdout(bl)) // Stream[F, Byte] => Stream[F, Unit]
ついでに":q"が入力されるとセッションを終了するようにしておく。
def echo[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
io.stdin(4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.takeWhile(s => s != ":q") // <-- ここ
.map(line => s"$line\n")
.through(text.utf8Encode)
.through(io.stdout(bl))
ここまでで下記のように実行できる。
hello
hello
こんにちは
こんにちは
:q
Process finished with exit code 0
変換
上のコードに華氏→摂氏の変換を追加してみる。
def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0)
def convertF2C(line: String): String =
s"${ Try(line.toDouble).fold(_ => "error", fahrenheitToCelsius(_).toString) }\n"
def convert[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
io.stdin(4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.takeWhile(s => s != ":q")
.through(_ map convertF2C) // Pipe[F, String, String]
.through(text.utf8Encode)
.through(io.stdout(bl))
convertF2C
は、_ map convertF2C
のように Pipe
にして through
に与えて、ストリームに組み込むことができる。Double
型に変換できない場合には、"error" を結果とするようにした。下のように実行できる(コメントは後でつけたもの)。
0 // 入力
-17.77777777777778
foo // 入力
error
100 // 入力
37.77777777777778
:q // 入力
Process finished with exit code 0
ステートフル
FS2: The Official Guide に Statefully transforming stream の節がある。これをヒントにして、状態を持たせるようにしてみたい。具体的には、華氏→摂氏の変換(デフォルト)と、摂氏→華氏の変換を、":f"と":c"で切り替えられるようにし、切り替わった状態を保持するようにする。
まず温度変換の関数と、どちら方向の変換かを表すモードを以下のように定義する。
def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0)
def celsiusToFahrenheit(c: Double): Double = c / (5.0/9.0) + 32.0
sealed trait Mode
case object F2C extends Mode
case object C2F extends Mode
次に入力によってモードを切り替えるパイプ changeMode
と、モードに応じて関数を選択して計算結果を文字列化するパイプ convertTemperature
を定義する。
def changeMode[F[_]]: Pipe[F, String, (Mode, String)] =
_.scan((F2C: Mode, "")) { case ((mode, _), b) =>
(b match {
case ":c" => F2C
case ":f" => C2F
case _ => mode
}, b)
}
.filter(p => p._2 != ":c" && p._2 != ":f")
.drop(1)
def convertTemperature[F[_]]: Pipe[F, (Mode, String), String] =
_.map { case (mode, line) =>
val (f, s) = if (mode == F2C) (fahrenheitToCelsius _, "%s°F ⇒ %s°C")
else (celsiusToFahrenheit _, "%s°C ⇒ %s°F")
s"${ Try(line.trim.toDouble).fold(_ => "error", d => s.format(d, f(d))) }\n"
}
この二つのパイプを andThen
で合成して、以下のようにストリームに組み込む。
def convert[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
io.stdin(4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.takeWhile(s => s != ":q")
.through(changeMode andThen convertTemperature) // <-- ここ
.through(text.utf8Encode)
.through(io.stdout(bl))
実行すると下のようになる(コメントは後で付け足した)。
32 // はじめは華氏→摂氏モード
32.0°F ⇒ 0.0°C
:f // モードを摂氏→華氏に切り替え
0
0.0°C ⇒ 32.0°F
test
error
:c // モードを華氏→摂氏に切り替え
0
0.0°F ⇒ -17.77777777777778°C
:q
Process finished with exit code 0
所感
他にも、ストリームを組み立てる各種の部品や、リソース管理、並列処理なども提供されてるので、今後すこしずつ試してみたい。
参照
-
ver 1.0.x の頃は
Sink = Stream[F,I] => Stream[F,Unit]
とメソッドto
があったが、現在は deprecated とされていて、Pipe
とthrough
を使うべしとされている。 ↩ -
ver 1.0 への変更で、それまでは渡す必要がなかった
ExecutionContext
を渡すようになり、さらに ver 1.1.0 で ExecutionContext が Blocker に置き換えられた。 ↩