Akka-Stream学習シリーズ
Source,Flow,Sinkのドキュメントを見てみる
Sourceは型パラメータを2つ取る。
1つはこのSourceから出ていくOutputの型、もう1つはMat。
Sinkは型パラメータを2つ取る。
1つはこのSinkに入ってくるInputの型。もう1つはMat。
Flowは型パラメータを3つ取る。
1つはInputの型、もう1つはOutputの型。もう1つはMat。
これらを組み合わせてできるRunnableGraphは型パラメータを1つ取り、Mat。
Remember those mysterious Mat type parameters on Source[+Out, +Mat], Flow[-In, +Out, +Mat] and Sink[-In, +Mat]? They represent the type of values these processing parts return when materialized. When you chain these together, you can explicitly combine their materialized values
分かったようなわからないような。
具体的な例が欲しい。
例
SinkのMat例
まずはSinkのMat型を取る例。
val foldSink: Sink[Int, Future[String]] = Sink.fold(""){ _ + _.toString }
val result: Future[String] = Source(1 to 10).runWith(foldSink)
この例の場合はSinkへのInput型はIntだが、Mat型はSinkでの集計処理をした結果のString型となっている。
SourceのMat例
val source: Source[Int, ActorRef] = Source.actorPublisher[Int](Props[SampleActor])
val rg: RunnableGraph[ActorRef] = source.to(Sink.foreach(println))
val actor = rg.run()
Thread.sleep(1000) // 調整のためのsleep
actor ! Message(1)
actor ! Message(2)
actor ! Message(3)
actor ! END
ちなみにActorとメッセージはこんな感じ。
case class Message(n: Int)
case object END
class SampleActor extends ActorPublisher[Int] {
def receive = {
case Message(n) => onNext(n)
case END => onComplete() // ※ちゃんと終了シグナルを送らないとfoldのような処理がDownstreamにいた場合、どこがデータの終了かわからなくなってfoldが終わらなくなる
}
}
この例では、SourceからのOutputはInt型、MatはActorRef型。
Actorにメッセージを送ると、メッセージに含まれる数字がOutputとして下流に流れていく。
SourceとSinkにMatがある場合は?
上記例のようにSourceにもSinkにもそれぞれMat型をもつ場合、RunnableGraphをrunした際に取れるMatはどちらになるんだろう?
どちらか一方、ではなくて両方使うことが出来る。
val source: Source[Int, ActorRef] = Source.actorPublisher[Int](Props[SampleActor])
val sink: Sink[Int, Future[String]] = Sink.fold[String,Int](""){ _ + _.toString }
val rg: RunnableGraph[(ActorRef, Future[String])] = source.toMat(sink)(Keep.both)
val (actor, result) = rg.run()
一方だけにすることも出来る。
以下の例はSourceの方のMatを使う。
※今回の例の場合はこうしてしまうとfoldした結果を取得することが出来ない。
val rg: RunnableGraph[ActorRef] = source.toMat(sink)(Keep.left)
以下の例はSinkの方のMatを使う。
※今回の例の場合はこうしてしまうとactorが取得できないので何もデータが流れない。
val rg: RunnableGraph[Future[String]] = source.toMat(sink)(Keep.right)
実装を覗く
ちなみに source.to(sink)
としているtoの実装を見てみるとこうなっている。
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)
おお、toMatがleftで呼ばれている。つまりこの場合はsinkのMatが無視されてsourceのMatが採用される。
これ source.runWith(sink)
はどうだろう?
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run()
おお、こっちはtoMatがrightで呼ばれている。
ではでは source.runForeach(println)
とか source.runFold(""){ _ + _.toString }
はというと、、、
def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] =
runWith(Sink.fold(zero)(f))
def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f))
それぞれrunWithに委譲している。ということはrightだ。