Flow[T].statefulMapConcat
を使う。状態を持ったActorをStreamで書き直したいときに良さそう。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Source }
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val flow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 10)
val flow2: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 3 == 0)
val flow3: Flow[Int, Int, NotUsed] = Flow[Int].statefulMapConcat { () =>
var sequence = 0
i =>
sequence = sequence + i
List(sequence)
}
Source(1 to 10)
.via(flow1)
.via(flow2)
.via(flow3)
.runForeach(println)
結果
30
90
180