Scala の streaming I/O ライブラリ fs2 の並行処理。今回は Queue について。
はじめに
fs2 の concurrency
パッケージには、Queue
、Topic
、Balance
、Broadcast
、Signal
といった、concurrency primitives がある。今回は、その中の Queue
を紹介する。
※ Topic と Signal についてはこの記事、Balance と Broadcast についてはこの記事に書いた。
お題
以前にも fs2の記事 を書いたが、そこでも入力側のストリームとしては標準入力を使った。理由は、既存ファイルのデータやStream(1, 2, 3)
1 だと普通のコレクション操作との違いが体感しにくいが、標準入力だとリアクティブっぽさが醸せて面白いという理由だった。今回もそれを踏襲する。また、それに加えてイベント発生源としてタイマーも使ってみる。
以下のような仕様になる。
- イベントソースのストリーム
- コンソールからの標準入力:入力テキストを行に区切って、トリムした上で空行を取り除いたストリーム
- タイマー起動の現在秒:2秒毎にシステム現在時刻の秒部分を整数として取り出したストリーム
- キュー操作
- 書き込み側: 上記イベントストリームからデータがエミットされるたびに、適当に修飾してキューに入れる
- 読み出し側: FIFO でキューから値を取り出して標準出力に書き出す
- プログラム開始後15秒で、キューからの新たな読み出しをやめてストリームを閉じる。
<標準入力> → [文字列の加工] → [キュー入れ]─┐
├─→ [キュー出し] → [標準出力に表示]─┐
<タイマー> → [現在秒取得] → [キュー入れ] ──┘ ├→ 終
[15秒スリープ]──┘
実装
fs2 は ver 2.0.0 を使った。動くソースはここに置いた。
標準入力からのストリーム
コンソールからの入力行ストリームは以下のように定義する(説明のために若干コメントを過剰に書いた)。
def stdinStream[F[_] : Sync : ContextShift](bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl) // Stream[F, Byte] ... 標準入力からの読み出し
.through(text.utf8Decode) // Stream[F, String]... UTF8バイト列から文字列(複数行もあり)に
.through(text.lines) // Stream[F, String]... 文字列を一行ずつに分離
.map(_.trim) // Stream[F, String]... 両端のスペースを除去
.filter(_.nonEmpty) // Stream[F, String]... 非空行のみエミット
これで「入力文字列を改行で区切って、トリムして空行を取り除いた文字列ストリーム」が得られる。
タイマーで発生するデータのストリーム
タイマーで値を発生させるには、下のコードのように zipRight
や zipLeft
を使うのが、fs2 のイディオムらしい2。ここではシステムミリ秒から秒を取り出して、2秒ごとにエミットするストリームを作っている。
def timerStream[F[_] : Timer](implicit F: Concurrent[F]): Stream[F, Long] = {
val timer: Stream[F, Unit] = Stream.fixedDelay[F](2.second)
val seconds: Stream[F, Long] = F.delay(System.currentTimeMillis).eval.repeat
(timer zipRight seconds) map (_ / 1000 % 60)
}
ちなみに F.delay
に続く eval
は、value class を使って Stream.eval()
を 拡張メソッドにしたもの。3
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal {
def eval: Stream[F, O] = Stream.eval(fo)
}
キュー を介してストリームを組み立てるコード
下のメソッドは下記ようなことをしている。
- 入力行ストリームと、キューに入れるシンク4をつなげる
- 現在秒ストリームと、キューに入れるシンクをつなげる
- キューから取り出すストリームと、標準出力に書き出すシンクをつなげる
- シンクにつながった各ストリームを合流して、一つの並列ストリームにする
こう書くと SRP(単一責任原則)5 に反しているように見えるが、要するに既存のストリームをキューを介して束ねているだけ(このコードも説明のためあえて少しくどい書き方をしている)。
def join[F[_]: ContextShift : Timer](q: Queue[F, String])
(implicit F: Concurrent[F], bl: Blocker): Stream[F, Unit] = {
val timerSink: Pipe[F, Long, Unit] = _.evalMap(n => q.enqueue1(s"$n from timer\n"))
val stdinSink: Pipe[F, String, Unit] = _.evalMap(s => q.enqueue1(s"$s from stdin\n"))
val stdoutSink: Pipe[F, String, Unit] = s => text.utf8Encode(s) through io.stdout[F](bl)
val enq1: Stream[F, Unit] = timerStream[F] through timerSink
val enq2: Stream[F, Unit] = stdinStream[F](bl) through stdinSink
val deq: Stream[F, Unit] = q.dequeue through stdoutSink
Stream(enq1, enq2, deq) parJoin 3
}
parJoin
に渡している 3 は並列実行するストリームの上限だが、このあたりの仕様などは APIドキュメントに詳細に書かれている。
キューを生成して全体をまとめるストリーム
キューを生成して上で組み立てたストリームに与えて、15秒スリープするストリームと組み合わせるのが下記コード。
object QueueTestApp extends ConcurrencyDemoApp {
...
def stream[F[_] : Concurrent : Timer : ContextShift]
(implicit bl: Blocker): Stream[F, Unit] = for {
q <- Queue.bounded[F, String](100).eval
_ <- Stream.sleep_[F](15.seconds) concurrently join(q).drain
} yield ()
...
concurrently
の左辺のストリームが終了すると、右辺に指定したバックグラウンドストリームも含む「結果ストリーム」全体が終了する。この場合、ストリーム開始時から15秒過ぎると新たな読み出しをやめる。ConcurrencyTestApp
は次に記述する。
ストリーム全体を実行するコード
ここまでに記述したストリームを実行するコードを、下記のように書いてみた。他の concurrency primitive のサンプルでも使いまわしできるように、IOApp
を拡張した共通コードを別途定義した ConcurrencyTestApp
に置いている。
trait ConcurrencyDemoApp extends IOApp {
def run(args: List[String]): IO[ExitCode] =
Stream.resource(Blocker[IO])
.flatMap(implicit bl => stream[IO])
.compile.drain
.as(ExitCode.Success)
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit]
}
IOApp
は ContextShift
や Timer
を使うために、また ExecutionContext
は fs2.io
の各種APIを使うために必要になる6 7。
結果例
下記のような実行になる(コメントはあとで追加したもの)
hello // 手入力
hello from stdin // 標準入力からのデータとして表示
49 from timer // タイマかからのデータとして出力
51 from timer
line1 // line1 と line2 の2行をまとめてコピペ入力
line2
line1 from stdin // 行ごとに異なるイベントとして表示
line2 from stdin
53 from timer
55 from timer
57 from timer
59 from timer
1 from timer
bye // 15秒すぎているので新たな読み出しをやめて終了
Process finished with exit code 0
おわりに
他の concurrency primitive についても続けて書く。次は Topic と Signal。
-
fs2 の文脈での Stream は、Scala 標準の Stream とは別物なので注意 ↩
-
Useful with timed streams, the example below will emit a number every 100 milliseconds. ↩
-
Stream.eval(F.delay(System.currentTimeMillis)).repeat
やStream.eval(Queue.bounded[F, String](100))
みたいなコードは読みづらくて仕方がないので、拡張メソッドを利用して、視線が左右に行ったり来たりするのをなるべく減らすようにしている。 ↩ -
Single Responsibility Principle ↩
-
migration guide 1.0 の Concurrency https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/docs/migration-guide-1.0.md#concurrent ↩