Edited at

Akka StreamsでステートフルなFlowを作る方法

More than 1 year has passed since last update.

Flow[T].statefulMapConcat を使う。状態を持ったActorをStreamで書き直したいときに良さそう。

import akka.actor.ActorSystem

import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Source }

implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()

val flow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 10)
val flow2: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 3 == 0)
val flow3: Flow[Int, Int, NotUsed] = Flow[Int].statefulMapConcat { () =>
var sequence = 0
i =>
sequence = sequence + i
List(sequence)
}

Source(1 to 10)
.via(flow1)
.via(flow2)
.via(flow3)
.runForeach(println)


結果

30

90
180