Akka-Stream学習シリーズ
はじめに
概要的に部品群を色々とさわろうと思ったけれども、いかんせん数が多いのでとりあえず重要っぽいものの使い方をさらっと確認。
Akka-Streamでの処理実行の確認
通常のmap関数の振る舞い
以下のプログラムを実行すると、
(1 to 3)
.map{ i => println(s"A: $i"); i }
.map{ i => println(s"B: $i"); i }
.map{ i => println(s"C: $i"); i }
こうなる。
A: 1
A: 2
A: 3
B: 1
B: 2
B: 3
C: 1
C: 2
C: 3
Akka-Streamでのmap関数の振る舞い
Source(1 to 3)
.map{ i => println(s"A: $i"); i }
.map{ i => println(s"B: $i"); i }
.map{ i => println(s"C: $i"); i }
.runWith(Sink.ignore)
こうなる。
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
The next element is processed by a stage as soon as it is emitted the previous one.
通常のCollection系のmapの振る舞いとは異なり終わったものからどんどん次のステージに処理が送られるという振る舞いを確認することができた(filter等も同様の振る舞いをする)。
実行基盤としてActorが使われていることを考えると、この動きはまさに1つ1つの要素がメッセージとしてActor間を駆け巡っている感じですね。
登場人物の整理
Source,Flow,Sink,RunnableGraph
代表的なものはSource、Flow、Sink。それらを結合して出来るRunnableGraph。
なおオフィシャルドキュメントでの用語の解説はこちらを参照
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Future
object Main extends App {
implicit val system = ActorSystem("toyo-sample")
implicit val materializer = ActorMaterializer()
val source: Source[Int, Unit] = Source(1 to 10)
val flow1: Flow[Int, Int, Unit] = Flow[Int].filter(_ % 3 == 0)
val flow2: Flow[Int, Int, Unit] = Flow[Int].map(_ * 2)
val sink: Sink[Int, Future[Unit]] = Sink.foreach(println)
val g: RunnableGraph[Unit] = source.via(flow1).via(flow2).to(sink)
g.run()
Thread.sleep(1000)
system.shutdown()
system.awaitTermination()
}
パーツを分けて宣言しなくてもチェーンさせてこう書くことも出来る。
Source(1 to 10)
.filter(_ % 3 == 0)
.map(_ * 2)
.runForeach(println)
なおこの場合 runForeach
からは戻り値 Future[Unit]
が返ってくるので、Futureを使ってActorSystemの終了処理もまとめちゃう。
Source(1 to 10)
.filter(_ % 3 == 0)
.map(_ * 2)
.runForeach(println)
.andThen { case _ =>
system.shutdown()
system.awaitTermination()
}
FlowGraph
FlowGraphを使うと上記と同様のコードを以下の様にも書ける。
val g: RunnableGraph[Unit] = FlowGraph.closed() { implicit builder =>
import FlowGraph.Implicits._
source ~> flow1 ~> flow2 ~> sink
}
g.run()
正直、これだけだとちょっと微妙なのだがオフィシャルドキュメントでのbroadcastやmergeを使った例はグラフを直感的に表現できておりDSLがイケていると感じることが出来る。
Stage
Stageと呼ばれるものは色んな種類存在するが、ここでは一番基本になりそうなPushPullStageでその振る舞いを確認する。
Stageは、SourceやFlowで用意されているmapやfilterやfoldといった関数では表現できないような処理を書くときに重宝する。
例として1から9の要素を持つSourceから下流に対してそのまま流すのではなく2要素の和を求めた上で次へ流すという事をやってみようと思う。
期待する結果はこういう出力。
3
7
11
15
9 // 要素数が奇数でペアができなかったものについてはそのまま出力
コード例。
Source(1 to 9)
.transform(() => new PushPullStage[Int, Int] {
var cache: Option[Int] = None
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = cache match {
case Some(n) =>
cache = None
ctx.push(n + elem) // 次(Downstream)に送出!
case None =>
cache = Some(elem)
ctx.pull() // 次の値をくれと(Upstreamに)要求!
}
override def onPull(ctx: Context[Int]): SyncDirective = {
if (ctx.isFinishing && cache.isDefined) ctx.pushAndFinish(cache.get) // 最後の要素を送出
else ctx.pull()
}
override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = {
// If the stream is finished, we need to emit the last element in the onPull block.
// It is not allowed to directly emit elements from a termination block
// (onUpstreamFinish or onUpstreamFailure)
ctx.absorbTermination()
}
})
.runForeach(println)
なお、作った後に気づいたが今回のサンプルは既存の関数の組み合わせで実現可能だった。。
Source(1 to 9)
.grouped(2)
.map(_.sum)
.runForeach(println)
ちなみに map
や filter
に渡した関数は akka.stream.impl.fusing.Ops.scala で定義されたMapやFilterといったcase classで保持され、実行される。
で、これらのcase classはPushStageを継承している。
private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem))
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def decide(t: Throwable): Supervision.Directive = decider(t)
}