25
20

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Akka-Stream]Matって何?

Last updated at Posted at 2015-09-10

Akka-Stream学習シリーズ

Source,Flow,Sinkのドキュメントを見てみる

Sourceは型パラメータを2つ取る。
1つはこのSourceから出ていくOutputの型、もう1つはMat。
source.png

Sinkは型パラメータを2つ取る。
1つはこのSinkに入ってくるInputの型。もう1つはMat。
sink.png

Flowは型パラメータを3つ取る。
1つはInputの型、もう1つはOutputの型。もう1つはMat。
flow.png

これらを組み合わせてできるRunnableGraphは型パラメータを1つ取り、Mat。
rg.png

Matについてはここここに説明あり。

Remember those mysterious Mat type parameters on Source[+Out, +Mat], Flow[-In, +Out, +Mat] and Sink[-In, +Mat]? They represent the type of values these processing parts return when materialized. When you chain these together, you can explicitly combine their materialized values

分かったようなわからないような。
具体的な例が欲しい。

SinkのMat例

まずはSinkのMat型を取る例。

val foldSink: Sink[Int, Future[String]] = Sink.fold(""){ _ + _.toString }
val result: Future[String] = Source(1 to 10).runWith(foldSink)

この例の場合はSinkへのInput型はIntだが、Mat型はSinkでの集計処理をした結果のString型となっている。

SourceのMat例

val source: Source[Int, ActorRef] = Source.actorPublisher[Int](Props[SampleActor])
val rg: RunnableGraph[ActorRef] = source.to(Sink.foreach(println))
val actor = rg.run()

Thread.sleep(1000) // 調整のためのsleep

actor ! Message(1)
actor ! Message(2)
actor ! Message(3)
actor ! END

ちなみにActorとメッセージはこんな感じ。

case class Message(n: Int)
case object END

class SampleActor extends ActorPublisher[Int] {
  def receive = {
    case Message(n) => onNext(n)
    case END => onComplete() // ※ちゃんと終了シグナルを送らないとfoldのような処理がDownstreamにいた場合、どこがデータの終了かわからなくなってfoldが終わらなくなる
  }
}

この例では、SourceからのOutputはInt型、MatはActorRef型。
Actorにメッセージを送ると、メッセージに含まれる数字がOutputとして下流に流れていく。

SourceとSinkにMatがある場合は?

上記例のようにSourceにもSinkにもそれぞれMat型をもつ場合、RunnableGraphをrunした際に取れるMatはどちらになるんだろう?

どちらか一方、ではなくて両方使うことが出来る。

val source: Source[Int, ActorRef] = Source.actorPublisher[Int](Props[SampleActor])
val sink: Sink[Int, Future[String]] = Sink.fold[String,Int](""){ _ + _.toString }
  
val rg: RunnableGraph[(ActorRef, Future[String])] = source.toMat(sink)(Keep.both)

val (actor, result) = rg.run()

一方だけにすることも出来る。
以下の例はSourceの方のMatを使う。
※今回の例の場合はこうしてしまうとfoldした結果を取得することが出来ない。

val rg: RunnableGraph[ActorRef] = source.toMat(sink)(Keep.left)

以下の例はSinkの方のMatを使う。
※今回の例の場合はこうしてしまうとactorが取得できないので何もデータが流れない。

val rg: RunnableGraph[Future[String]] = source.toMat(sink)(Keep.right)

実装を覗く

ちなみに source.to(sink) としているtoの実装を見てみるとこうなっている。

def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)

おお、toMatがleftで呼ばれている。つまりこの場合はsinkのMatが無視されてsourceのMatが採用される。

これ source.runWith(sink) はどうだろう?

def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run()

おお、こっちはtoMatがrightで呼ばれている。

ではでは source.runForeach(println) とか source.runFold(""){ _ + _.toString } はというと、、、

def runFold[U](zero: U)(f: (U, Out)  U)(implicit materializer: Materializer): Future[U] =
    runWith(Sink.fold(zero)(f))

def runForeach(f: Out  Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f))

それぞれrunWithに委譲している。ということはrightだ。

ここに色んなサンプルあり

25
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
20

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?