前回は、Scala の Streaming I/O ライブラリ fs2 のキューを紹介した。今回はトピックとシグナルについて。
はじめに
fs2 の並行処理関連パッケージ fs2.concurrency
には、Queue
、Topic
、Balance
、Broadcast
、Signal
といった concurrency primitives がある。前回の記事で紹介した Queue
に続いて、今回は Topic
と Signal
について書く。
トピックとは
トピック とは、Publish/Subscribe モデル1の非同期メッセージングの一つで、トピックと呼ばれる論理チャンネルに対してある発行者がデータを発行すると、そのトピックを購読している全ての購読者がそのデータを受信するというもの。fs2 では、ストリームベースの実装が Topic[F[_], A]
クラスで提供されている。
シグナルとは
ここでのシグナル はストリーム間でデータの更新を通知する仕組みで、ストリームの中止を指示するために使う(注:必ずしもトピックと併用されているわけではない)。
fs2 では Ref
と Signal
を併せ持つ SignallingRef
が提供されている。Ref
2 は、cats.effect で提供されているトレイトで、いわばアトミックに読み書きできる「非同期並行ミュータブル参照」といったもの。Signal
は fs2の concurrent primitive の一つで、F の文脈(effect) で読み出される値(この場合 Boolean 値)の容れもの。
お題
- 一つのトピックを、3つの購読者 A、B、C が購読している
- コンソールから文字列を入力すると発行者がトピックに発行する。
- 3つの購読者はそれぞれトピックから読み出して、指定秒数だけ待ってから標準出力に書き出す。
- コンソールにコマンド文字列 ":q" を入力すると、プログラムが終了する。
以下のような構成になる。
/ [購読者A] >>> [標準出力]
[標準入力] >>> [発行者] >>> [トピック] ー [購読者B] >>> [標準出力]
\ [購読者C] >>> [標準出力]
実装
公式ガイドの "Single Publisher / Multiple Subscriber" サンプルを参考にしながら実装する。
fs2 のバージョンは ver 2.0.0。動くソースはここに置いた。
標準入力からのストリーム
前回のキューのサンプルコードとほぼ同じ。ただし終了コマンド ":q" を読むと、それ以上読み込むのをやめる。
def stdinStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.map(_.trim)
.filter(_.nonEmpty)
.takeWhile(_ != ":q", takeFailure = true) // ":q" まで読み込む
標準出力へのストリーム
1行の文字列を標準出力に書き出すストリーム。
def printlnStream[F[_]: Sync : ContextShift](line: String)
(implicit bl: Blocker): Stream[F, Unit] =
Stream(s"$line\n")
.through(text.utf8Encode)
.through(io.stdout[F](bl))
イベント/トピック/シグナル
トピックに発行するデータ型として、ここでは以下のような3種類のイベントを定義する。
sealed trait Event
case object Start extends Event // 開始イベント
case class Text(value: String) extends Event // 文字列イベント
case object Quit extends Event // 終了イベント
公式サンプルに倣って、下のような EventService
クラスにトピックとシグナルを置いた。関連するストリーム publisher
、subscriber
、start
(後述)もこのクラスに置いた。
class EventService[F[_] : ContextShift : Timer : Concurrent]
(topic: Topic[F, Event], haltWhenTrue: SignallingRef[F, Boolean])
(implicit ec: Blocker) {
...
def publisher: Stream[F, Unit] = ???
def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] = ???
def start: Stream[F, INothing] = ???
...
発行者
発行側のストリーム publisher
は以下のようになる。
def publisher: Stream[F, Unit] = stdinStream flatMap {
case ":q" => topic.publish1(Quit).eval
case s => topic.publish1(Text(s"$s")).eval
} interruptWhen haltWhenTrue
まず、上で定義した標準入力からの文字列行ストリーム stdinStream
を読み込んで、Text
イベントとしてトピックに発行。ただし、終了を指示するコマンド ":q" が入力されたときは、Quit
イベントを発行する。また Stream#interruptWhen
メソッドを使って、シグナル haltWhenTrue
の値が true
になったときに、ストリーム読み込みを終了するようにしている。
購読者
購読側のストリームは、購読者の識別 id
とともに「時間のかかる処理」をエミュレートする秒数を受け取って、購読者を生成する。
def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] =
topic.subscribe(maxQueued = 1) flatMap {
case Start => printlnStream(s"#$id started")
case Text(s) => Stream.sleep_[F](d) ++ printlnStream(s"#$id processing text event: $s")
case Quit => haltWhenTrue.set(true).eval
}
Text
イベントでは、指定秒数だけ sleep してから、識別子とともに文字列を出力している。
開始ストリーム
このストリームでは、発行者と購読者(×3)を並行実行させている。
def start: Stream[F, INothing] = {
val subscribers: Stream[F, Unit] =
Stream(("A", 1.second), ("B", 2.second), ("C", 3.second))
.map { case (id, d) => subscriber(id, d) }
.parJoin(3)
Stream(publisher concurrently subscribers) parJoin 3 drain
}
3つ購読者 A、B、C の所用秒数は、それぞれ 1秒、2秒、3秒と指定した。
全体をまとめるストリーム
前回定義した ConcurrencyTestApp
のメソッド stream
を、以下のように実装した。このメソッドで、トピック、シグナル、サービスが組み合わされた一つのストリームになる (eval
は、前回も使っていた Stream.eval
を適用する拡張メソッド)。
override def stream[F[_] : Concurrent : Timer : ContextShift]
(implicit bl: Blocker): Stream[F, Unit] = {
for {
topic <- Topic[F, Event](Start).eval
signal <- SignallingRef(false).eval
_ <- new EventService[F](topic, signal).start
} yield ()
}
一応補足すると、Topic[F, Event](Start)
の型は F[Topic[F, Event]]
で、これを eval
3 すると Stream[F, Topic[F, Event]]
型となり、さらに <-
の左辺で求める型 Topic[F, Event]
が得られる。SignallinRef
を生成している行も同様。
実行結果
以下のような挙動になる(コメントはあとで書き足したもの)。
#B started // プログラム開始直後
#C started // A、B、C がどの順で実行されるかはその時々で異なりうる
#A started
test // "test" と入力した
#A processing text event: test // 1 秒後に 購読者A が出力
#B processing text event: test // 2 秒後に 購読者B が出力
#C processing text event: test // 3 秒後に 購読者C が出力
hello // 同様に "hello"
#A processing text event: hello
#B processing text event: hello
#C processing text event: hello
:q // 終了コマンド ":q" を入力した
Process finished with exit code 0 // 終わった
おわりに
次は v1.0.0 で追加された Balance と Broadcast について。
-
cats.effect の公式ガイドに詳しい。もとは fs2 にあったものをマイグレートしたらしい。 ↩
-
F[T] => Stream[F, T]
↩