LoginSignup
0

More than 3 years have passed since last update.

fs2 の並行処理 〜 Topic+Signal編

Last updated at Posted at 2019-01-24

前回は、Scala の Streaming I/O ライブラリ fs2 のキューを紹介した。今回はトピックとシグナルについて。

はじめに

fs2 の並行処理関連パッケージ fs2.concurrency には、QueueTopicBalanceBroadcastSignal といった concurrency primitives がある。前回の記事で紹介した Queue に続いて、今回は TopicSignal について書く。

トピックとは

トピック とは、Publish/Subscribe モデル1の非同期メッセージングの一つで、トピックと呼ばれる論理チャンネルに対してある発行者がデータを発行すると、そのトピックを購読している全ての購読者がそのデータを受信するというもの。fs2 では、ストリームベースの実装が Topic[F[_], A] クラスで提供されている。

シグナルとは

ここでのシグナル はストリーム間でデータの更新を通知する仕組みで、ストリームの中止を指示するために使う(注:必ずしもトピックと併用されているわけではない)。

fs2 では RefSignal を併せ持つ SignallingRef が提供されている。Ref2 は、cats.effect で提供されているトレイトで、いわばアトミックに読み書きできる「非同期並行ミュータブル参照」といったもの。Signal は fs2の concurrent primitive の一つで、F の文脈(effect) で読み出される値(この場合 Boolean 値)の容れもの。

お題

  • 一つのトピックを、3つの購読者 A、B、C が購読している
  • コンソールから文字列を入力すると発行者がトピックに発行する。
  • 3つの購読者はそれぞれトピックから読み出して、指定秒数だけ待ってから標準出力に書き出す。
  • コンソールにコマンド文字列 ":q" を入力すると、プログラムが終了する。

以下のような構成になる。

                                   / [購読者A] >>> [標準出力]
[標準入力] >>> [発行者] >>> [トピック] ー [購読者B] >>> [標準出力]
                                   \ [購読者C] >>> [標準出力]

実装

公式ガイドの "Single Publisher / Multiple Subscriber" サンプルを参考にしながら実装する。

fs2 のバージョンは ver 2.0.0。動くソースはここに置いた。

標準入力からのストリーム

前回のキューのサンプルコードとほぼ同じ。ただし終了コマンド ":q" を読むと、それ以上読み込むのをやめる。

def stdinStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
  io.stdin[F](4096, bl)
    .through(text.utf8Decode)
    .through(text.lines)
    .map(_.trim)
    .filter(_.nonEmpty)
    .takeWhile(_ != ":q", takeFailure = true) // ":q" まで読み込む

標準出力へのストリーム

1行の文字列を標準出力に書き出すストリーム。

def printlnStream[F[_]: Sync : ContextShift](line: String)
  (implicit bl: Blocker): Stream[F, Unit] =
    Stream(s"$line\n")
      .through(text.utf8Encode)
      .through(io.stdout[F](bl))

イベント/トピック/シグナル

トピックに発行するデータ型として、ここでは以下のような3種類のイベントを定義する。

sealed trait Event
case object Start               extends Event // 開始イベント
case class  Text(value: String) extends Event // 文字列イベント
case object Quit                extends Event // 終了イベント

公式サンプルに倣って、下のような EventService クラスにトピックとシグナルを置いた。関連するストリーム publishersubscriberstart(後述)もこのクラスに置いた。

class EventService[F[_] : ContextShift : Timer : Concurrent]
  (topic: Topic[F, Event], haltWhenTrue: SignallingRef[F, Boolean])
  (implicit ec: Blocker) {
  ...
  def publisher: Stream[F, Unit] = ???
  def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] = ???
  def start: Stream[F, INothing] = ???
  ...

発行者

発行側のストリーム publisher は以下のようになる。

def publisher: Stream[F, Unit] = stdinStream flatMap {
  case ":q" => topic.publish1(Quit).eval
  case s    => topic.publish1(Text(s"$s")).eval
} interruptWhen haltWhenTrue

まず、上で定義した標準入力からの文字列行ストリーム stdinStream を読み込んで、Text イベントとしてトピックに発行。ただし、終了を指示するコマンド ":q" が入力されたときは、Quit イベントを発行する。また Stream#interruptWhen メソッドを使って、シグナル haltWhenTrue の値が true になったときに、ストリーム読み込みを終了するようにしている。

購読者

購読側のストリームは、購読者の識別 id とともに「時間のかかる処理」をエミュレートする秒数を受け取って、購読者を生成する。

def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] =
  topic.subscribe(maxQueued = 1) flatMap {
    case Start   => printlnStream(s"#$id started")
    case Text(s) => Stream.sleep_[F](d) ++ printlnStream(s"#$id processing text event: $s")
    case Quit    => haltWhenTrue.set(true).eval
  }

Text イベントでは、指定秒数だけ sleep してから、識別子とともに文字列を出力している。

開始ストリーム

このストリームでは、発行者と購読者(×3)を並行実行させている。

def start: Stream[F, INothing] = {
  val subscribers: Stream[F, Unit] =
    Stream(("A", 1.second), ("B", 2.second), ("C", 3.second))
      .map { case (id, d) => subscriber(id, d) }
      .parJoin(3)
  Stream(publisher concurrently subscribers) parJoin 3 drain
}

3つ購読者 A、B、C の所用秒数は、それぞれ 1秒、2秒、3秒と指定した。

全体をまとめるストリーム

前回定義した ConcurrencyTestApp のメソッド stream を、以下のように実装した。このメソッドで、トピック、シグナル、サービスが組み合わされた一つのストリームになる (eval は、前回も使っていた Stream.eval を適用する拡張メソッド)。

override def stream[F[_] : Concurrent : Timer : ContextShift]
  (implicit bl: Blocker): Stream[F, Unit] = {
    for {
      topic   <- Topic[F, Event](Start).eval
      signal  <- SignallingRef(false).eval
      _       <- new EventService[F](topic, signal).start
    } yield ()
  }

一応補足すると、Topic[F, Event](Start) の型は F[Topic[F, Event]] で、これを eval3 すると Stream[F, Topic[F, Event]] 型となり、さらに <- の左辺で求める型 Topic[F, Event] が得られる。SignallinRef を生成している行も同様。

実行結果

以下のような挙動になる(コメントはあとで書き足したもの)。

#B started                       // プログラム開始直後
#C started                       // A、B、C がどの順で実行されるかはその時々で異なりうる
#A started
test                             // "test" と入力した
#A processing text event: test   // 1 秒後に 購読者A が出力
#B processing text event: test   // 2 秒後に 購読者B が出力
#C processing text event: test   // 3 秒後に 購読者C が出力
hello                            // 同様に "hello"
#A processing text event: hello
#B processing text event: hello
#C processing text event: hello
:q                               // 終了コマンド ":q" を入力した

Process finished with exit code 0 // 終わった

おわりに

次は v1.0.0 で追加された Balance と Broadcast について。


  1. Wikipedia: 出版-購読型モデル 

  2. cats.effect の公式ガイドに詳しい。もとは fs2 にあったものをマイグレートしたらしい。 

  3. F[T] => Stream[F, T] 

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0