LoginSignup
5
3

More than 5 years have passed since last update.

Akka StreamsでステートフルなFlowを作る方法

Last updated at Posted at 2016-09-13

Flow[T].statefulMapConcat を使う。状態を持ったActorをStreamで書き直したいときに良さそう。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Source }

implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()

val flow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 10)
val flow2: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 3 == 0)
val flow3: Flow[Int, Int, NotUsed] = Flow[Int].statefulMapConcat { () =>
  var sequence = 0
  i =>
    sequence = sequence + i
    List(sequence)
}

Source(1 to 10)
  .via(flow1)
  .via(flow2)
  .via(flow3)
  .runForeach(println)
結果
30
90
180
5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3