search
LoginSignup
36

More than 5 years have passed since last update.

posted at

updated at

[Akka-Stream]触ってみた

Akka-Stream学習シリーズ

はじめに

概要的に部品群を色々とさわろうと思ったけれども、いかんせん数が多いのでとりあえず重要っぽいものの使い方をさらっと確認。

Akka-Streamでの処理実行の確認

通常のmap関数の振る舞い

以下のプログラムを実行すると、

(1 to 3)
  .map{ i => println(s"A: $i"); i }
  .map{ i => println(s"B: $i"); i }
  .map{ i => println(s"C: $i"); i }

こうなる。

A: 1
A: 2
A: 3
B: 1
B: 2
B: 3
C: 1
C: 2
C: 3

Akka-Streamでのmap関数の振る舞い

Source(1 to 3)
  .map{ i => println(s"A: $i"); i }
  .map{ i => println(s"B: $i"); i }
  .map{ i => println(s"C: $i"); i }
  .runWith(Sink.ignore)

こうなる。

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

プログラム例はこちらから引用

The next element is processed by a stage as soon as it is emitted the previous one.

通常のCollection系のmapの振る舞いとは異なり終わったものからどんどん次のステージに処理が送られるという振る舞いを確認することができた(filter等も同様の振る舞いをする)。
実行基盤としてActorが使われていることを考えると、この動きはまさに1つ1つの要素がメッセージとしてActor間を駆け巡っている感じですね。

登場人物の整理

Source,Flow,Sink,RunnableGraph

代表的なものはSource、Flow、Sink。それらを結合して出来るRunnableGraph。
なおオフィシャルドキュメントでの用語の解説はこちらを参照

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.concurrent.Future

object Main extends App {
  implicit val system = ActorSystem("toyo-sample")
  implicit val materializer = ActorMaterializer()

  val source: Source[Int, Unit] = Source(1 to 10)
  val flow1: Flow[Int, Int, Unit] = Flow[Int].filter(_ % 3 == 0)
  val flow2: Flow[Int, Int, Unit] = Flow[Int].map(_ * 2)
  val sink: Sink[Int, Future[Unit]] = Sink.foreach(println)

  val g: RunnableGraph[Unit] = source.via(flow1).via(flow2).to(sink)

  g.run()

  Thread.sleep(1000)
  system.shutdown()
  system.awaitTermination()
}

パーツを分けて宣言しなくてもチェーンさせてこう書くことも出来る。

Source(1 to 10)
  .filter(_ % 3 == 0)
  .map(_ * 2)
  .runForeach(println)

なおこの場合 runForeach からは戻り値 Future[Unit] が返ってくるので、Futureを使ってActorSystemの終了処理もまとめちゃう。

Source(1 to 10)
  .filter(_ % 3 == 0)
  .map(_ * 2)
  .runForeach(println)
  .andThen { case _ =>
    system.shutdown()
    system.awaitTermination()  
  }     

FlowGraph

FlowGraphを使うと上記と同様のコードを以下の様にも書ける。

val g: RunnableGraph[Unit] = FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._
  source ~> flow1 ~> flow2 ~> sink
}

g.run()

正直、これだけだとちょっと微妙なのだがオフィシャルドキュメントでのbroadcastやmergeを使った例はグラフを直感的に表現できておりDSLがイケていると感じることが出来る。

Stage

Stageと呼ばれるものは色んな種類存在するが、ここでは一番基本になりそうなPushPullStageでその振る舞いを確認する。
Stageは、SourceやFlowで用意されているmapやfilterやfoldといった関数では表現できないような処理を書くときに重宝する。

例として1から9の要素を持つSourceから下流に対してそのまま流すのではなく2要素の和を求めた上で次へ流すという事をやってみようと思う。
期待する結果はこういう出力。

3
7
11
15
9 // 要素数が奇数でペアができなかったものについてはそのまま出力

コード例。

Source(1 to 9)
  .transform(() => new PushPullStage[Int, Int] {
    var cache: Option[Int] = None
    override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = cache match {
      case Some(n) =>
        cache = None
        ctx.push(n + elem) // 次(Downstream)に送出!
      case None =>
        cache = Some(elem)
        ctx.pull() // 次の値をくれと(Upstreamに)要求!
    }

    override def onPull(ctx: Context[Int]): SyncDirective = {
      if (ctx.isFinishing && cache.isDefined) ctx.pushAndFinish(cache.get)  // 最後の要素を送出
      else ctx.pull()
    }

    override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = {
      // If the stream is finished, we need to emit the last element in the onPull block.
      // It is not allowed to directly emit elements from a termination block
      // (onUpstreamFinish or onUpstreamFailure)
      ctx.absorbTermination()
    }
  })
  .runForeach(println)

参考コード

なお、作った後に気づいたが今回のサンプルは既存の関数の組み合わせで実現可能だった。。

Source(1 to 9)
  .grouped(2)
  .map(_.sum)
  .runForeach(println)

ちなみに mapfilter に渡した関数は akka.stream.impl.fusing.Ops.scala で定義されたMapやFilterといったcase classで保持され、実行される。
で、これらのcase classはPushStageを継承している。

akka.stream.impl.fusing.Ops.scala
private[akka] final case class Map[In, Out](f: In  Out, decider: Supervision.Decider) extends PushStage[In, Out] {
  override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem))

  override def decide(t: Throwable): Supervision.Directive = decider(t)
}

private[akka] final case class Filter[T](p: T  Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
  override def onPush(elem: T, ctx: Context[T]): SyncDirective =
    if (p(elem)) ctx.push(elem)
    else ctx.pull()

  override def decide(t: Throwable): Supervision.Directive = decider(t)
}

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
36