8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

fs2 によるリアクティブな温度変換

Last updated at Posted at 2018-02-21

『Functional Programming in Scala』(FP in Scala)の「第15章: Stream processing and incremental I/O」に、いわゆる Streaming I/O についての解説があり、合成可能なProcessを組み合わせて一連のデータのストリームを加工する技法が紹介されている。

fs2(Functional Streams for Scala)は、この Streaming I/O を Cats ベースで実装したもので、doobiehttp4s などでも使われている。『FP in Scala』の Streaming I/O 実装とは若干違いがあるが、今回はこれを試してみる。

お題

『FP in Scala』でも、fs2 の README でも、華氏で温度が書かれたファイルを摂氏に変換する例が載っている。

ただし個人的には、プログラム開始時にすでに存在するデータ群の操作よりも、リアルタイムに発生して流れ込んでくるデータをリアクティブぽく処理する方が、なんとなく Streaming I/O の面白さがよく出る気がするので、ここでは同様の温度変換をコンソールベースのプログラムで試してみたい。

段取りとしては、

  • まず標準入力を単にエコーするだけのコンソールアプリを最初に書いて、(エコー)
  • これに少しずつコードを追加して華氏→摂氏変換ができるようにし、(変換)
  • 最終的に状態を保たせて摂氏⇔華氏の二つのモードを切り替えられるようにしてみる(ステートフル)。

実装

  • fs2-core、fs2-io の実装は 2.0.0 を使っている (2019/09/22 に 1.0.4 から変更)。
  • 動くファイルはここに置いた

エコー

まずシンプルなエコーから始める。fs2-io を使って標準入力をそのまま標準出力に流すような Stream を定義した。

  def echo[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
    io.stdin(4096, bl)
      .through(io.stdout(bl)) // Stream[F, Byte]=>Stream[F, Unit]

through は 受け取った関数を使ってストリームを変換するメソッドで、ここでは io.stdout を用いて io.stdinStream[F, Byte]Stream[F, Unit] につなげている。

ちなみに ストリームの変換関数には下記のような型がある1

定義 変換 メモ
Pipe Stream[F, I] => Stream[F, O] through 一般的なストリーム変換
Pipe2 (Stream[F,I],Stream[F,I2])=>Stream[F,O] through2 ストリームの合流

io.stdinio.stdout には Blocker を渡す必要があるが2、ここでは利便性のため下記のような trait を作って、runWithEC に関数を渡すと Blocker が持つ ExecutionContext 上でストリームが評価されるようにしてみた。

trait StreamApp extends IOApp {
  def runWithEC(s: Blocker => Stream[IO, Unit]): IO[ExitCode] =
    Stream.resource(Blocker[IO]).flatMap(s).compile.drain.as(ExitCode.Success)
}

以下のように IOApp#run を実装すると、コンソールアプリとして実行できるようになる。

  def run(args: List[String]): IO[ExitCode] = runWithEC(echo[IO])

先のechoはバイト列を入出力していたが、文字列として扱うには以下のようにデコード/エンコードを組み込む。

  def echo[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
    io.stdin(4096, bl)
      .through(text.utf8Decode) // Stream[F, Byte]  => Stream[F, String]
      .through(text.lines)      // Stream[F, String]=> Stream[F, String]
      .map(line => s"$line\n")
      .through(text.utf8Encode) // Stream[F, String]=> Stream[F, Byte]
      .through(io.stdout(bl))   // Stream[F, Byte]  => Stream[F, Unit]

ついでに":q"が入力されるとセッションを終了するようにしておく。

  def echo[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
    io.stdin(4096, bl)
      .through(text.utf8Decode)
      .through(text.lines)
      .takeWhile(s => s != ":q") // <-- ここ
      .map(line => s"$line\n")
      .through(text.utf8Encode)
      .through(io.stdout(bl))

ここまでで下記のように実行できる。

hello
hello
こんにちは
こんにちは
:q

Process finished with exit code 0

変換

上のコードに華氏→摂氏の変換を追加してみる。

  def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0)

  def convertF2C(line: String): String =
      s"${ Try(line.toDouble).fold(_ => "error", fahrenheitToCelsius(_).toString) }\n"

  def convert[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
    io.stdin(4096, bl)
      .through(text.utf8Decode)
      .through(text.lines)
      .takeWhile(s => s != ":q")
      .through(_ map convertF2C) // Pipe[F, String, String]
      .through(text.utf8Encode)
      .through(io.stdout(bl))

convertF2C は、_ map convertF2C のように Pipe にして through に与えて、ストリームに組み込むことができる。Double型に変換できない場合には、"error" を結果とするようにした。下のように実行できる(コメントは後でつけたもの)。

0                  // 入力
-17.77777777777778
foo                // 入力
error
100                // 入力
37.77777777777778
:q                 // 入力

Process finished with exit code 0

ステートフル

FS2: The Official Guide に Statefully transforming stream の節がある。これをヒントにして、状態を持たせるようにしてみたい。具体的には、華氏→摂氏の変換(デフォルト)と、摂氏→華氏の変換を、":f"と":c"で切り替えられるようにし、切り替わった状態を保持するようにする。

まず温度変換の関数と、どちら方向の変換かを表すモードを以下のように定義する。

  def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0)
  def celsiusToFahrenheit(c: Double): Double = c / (5.0/9.0) + 32.0

  sealed trait Mode
  case object F2C extends Mode
  case object C2F extends Mode

次に入力によってモードを切り替えるパイプ changeMode と、モードに応じて関数を選択して計算結果を文字列化するパイプ convertTemperature を定義する。

def changeMode[F[_]]: Pipe[F, String, (Mode, String)] =
  _.scan((F2C: Mode, "")) { case ((mode, _), b) =>
      (b match {
        case ":c" => F2C
        case ":f" => C2F
        case _    => mode
      }, b)
    }
    .filter(p => p._2 != ":c" && p._2 != ":f")
    .drop(1)

def convertTemperature[F[_]]: Pipe[F, (Mode, String), String] =
  _.map { case (mode, line) =>
    val (f, s) = if (mode == F2C) (fahrenheitToCelsius _, "%s°F ⇒ %s°C")
                 else             (celsiusToFahrenheit _, "%s°C ⇒ %s°F")
    s"${ Try(line.trim.toDouble).fold(_ => "error", d => s.format(d, f(d))) }\n"
  }

この二つのパイプを andThen で合成して、以下のようにストリームに組み込む。

  def convert[F[_]: Sync: ContextShift](bl: Blocker): Stream[F, Unit] =
    io.stdin(4096, bl)
      .through(text.utf8Decode)
      .through(text.lines)
      .takeWhile(s => s != ":q")
      .through(changeMode andThen convertTemperature) // <-- ここ
      .through(text.utf8Encode)
      .through(io.stdout(bl))

実行すると下のようになる(コメントは後で付け足した)。

32           // はじめは華氏→摂氏モード
32.0°F ⇒ 0.0°C
:f              // モードを摂氏→華氏に切り替え
0            
0.0°C ⇒ 32.0°F
test
error
:c              // モードを華氏→摂氏に切り替え
0
0.0°F ⇒ -17.77777777777778°C
:q

Process finished with exit code 0

所感

他にも、ストリームを組み立てる各種の部品や、リソース管理、並列処理なども提供されてるので、今後すこしずつ試してみたい。

参照

  1. ver 1.0.x の頃は Sink = Stream[F,I] => Stream[F,Unit] とメソッド to があったが、現在は deprecated とされていて、Pipethrough を使うべしとされている。

  2. ver 1.0 への変更で、それまでは渡す必要がなかった ExecutionContext を渡すようになり、さらに ver 1.1.0 で ExecutionContext が Blocker に置き換えられた。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?