LoginSignup
12
0

More than 5 years have passed since last update.

Akka StreamsでGraphDSLを使ってみよう

Posted at

はじめに

アドベントカレンダー1日目です.

Akka Streamsで複雑なストリーム処理を組むためには必須のGraphDSLの使い方を紹介したいと思います.

準備

sbtプロジェクトに下記の依存を追加するとAkka Streamsが使えるようになります.

build.sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.18"

もしくはサンプルコードをGithubにあげているのでcloneしてお使いください.尚,本記事のコードはimport部分も含めて実行可能なコードを記載していますので,気軽にコピペして実行してみてください.
https://github.com/mattsu6/akka-custom-graphstage

GraphDSLとは

Akka StreamsではSource,Flow,Sinkといった処理ステージ(processing stage or processing steps)を組み合わせてGraphという処理フローを構築していきます.Graph(図)という名の通り,Akka Streamsでは処理フローを紙に書いた時の図をそのままコードに落とし込めることを目指しているようです.
つまりGraphDSLを使うと複雑に分岐するようなストリーム処理を図のように表現できます.

GraphDSLを使わずともストリーム処理は書けるのですが,その場合は一直線の単純なフローしか書けません.一直線なフローであればGraphDSLを使わない方がシンプルなのですが,ここではまずは一直線なフローをGraphDSLで記述してみて両者を比較してみようと思います.

GraphDSLを使わないケース
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }

object Sample1 extends App {

  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  Source(1 to 5).map(_ * 10).runWith(Sink.seq).foreach(println)
}

// 実行結果
> Vector(10, 20, 30, 40, 50)

1~5の数字を入力に,各要素を10倍した値を表示するというコード例です.線形な処理の場合はコレクションAPI感覚でストリーム処理が組めるのですごく便利ですね.
ではGraphDSLを使った場合です.

GraphDSLを使うケース
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Flow, GraphDSL, RunnableGraph, Sink, Source }
import akka.stream.{ ActorMaterializer, ClosedShape }

object Sample2 extends App {

  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val out = Sink.seq[Int]

  val g = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit builder => o =>
    import GraphDSL.Implicits._
    val in = Source(1 to 5)
    val flow = Flow[Int].map(_ * 10)

    in ~> flow ~> o

    ClosedShape
  })

  g.run().foreach(println)
}

// 実行結果
> Vector(10, 20, 30, 40, 50)

コード例は先程と一緒ですが少し複雑な感じになりましたね.
ただ,ここで注目してほしいポイントは

in ~> f1 ~> o

という部分です.
inf1oへの処理の流れを ~>でつないで表現しています.これがいわゆるホワイトボード等に書かれる図と同じ感覚で書かれる処理フローを指しています.

処理を分岐してみる

単純な例で比較するとGraphDSLは少し複雑な感じがします.さて,先程の例では入力値を10倍していましたが,奇数なら10倍,偶数なら100倍という処理に分岐してみましょう.(Flowの中にif書いて分岐すれば良いと思うかもしれませんが,将来的に並列処理をしたい場合や偶数の場合だけ特別な処理をしたい場合などを考慮するとFlowを分けるメリットはあります)

奇数なら10倍,偶数なら100倍
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source }
import akka.stream.{ ActorMaterializer, ClosedShape }

object Sample3 extends App {

  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val out = Sink.seq[Int]

  val g = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit builder => o =>
    import GraphDSL.Implicits._
    val in = Source(1 to 5)

    // 奇数のみ通過
    val oddFilter = Flow[Int].filter(_ % 2 != 0)
    // 偶数のみ通過
    val evenFilter = Flow[Int].filter(_ % 2 == 0)

    val to100x = Flow[Int].map(_ * 100)
    val to10x = Flow[Int].map(_ * 10)

    val bcast = builder.add(Broadcast[Int](2))
    val merge = builder.add(Merge[Int](2))

    in ~> bcast ~> oddFilter  ~> to10x  ~> merge ~> o
          bcast ~> evenFilter ~> to100x ~> merge
    ClosedShape
  })

  g.run().foreach(println)
}

// 実行結果
> Vector(10, 200, 30, 400, 50)

下記が処理の流れを示したGraphです.

in ~> bcast ~> oddFilter  ~> to10x  ~> merge ~> o
      bcast ~> evenFilter ~> to100x ~> merge

ここで注目したいポイントはbcastmergeです.bcastinによる入力値をoddFilterevenFilterにブロードキャストしています.これは1つの入力と複数の出力をもつBroadcastと呼ばれるGraphStageです.公式の用語ではこのような分岐する処理ステージはjunctionsと呼ばれています.
一方mergeは複数の入力と1つの出力を持つGraphStageでbcastと対になる動作をします.

このように記述することで,奇数と偶数は並列に操作しても問題ないことが明らかになります.なので,必要であれば両者を並列に実行することも簡単にできます.

コードを詳しく見る

ではコードを具体的に見ていきましょう.

val g = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit builder => o =>
  import GraphDSL.Implicits._
  ...
  ClosedShape
}

GraphDSL.createがGraphを生成するためのファクトリになっています.ファクトリの最後にClosedShapeを記述するとGraphのShapeを定義できます(Shapeには入力と出力の数が定義されています).今回の場合はRunnableGraph(SourceとSinkが繋がっている実行可能なGraph)を作りたいので,ClosedShapeという入力と出力を持たないShapeを定義しています.
そして,GraphDSLによって生成したGraphをRunnableGraph.fromGraph(...)に渡すことで晴れてRunnableGraphが生成できます.

RunnableGraph以外もGraphDSLで作れます

今までRunnableGraphを生成していましたが,部分的にGraphDSLを導入したい場合はSourceだけ,SinkだけなどのGraphもGraphDSLで作ることができます.
例えばSourceだけをGraphDSLで作りたい場合は以下のように生成できます

SourceをGraphDSLで作る
import akka.actor.ActorSystem
import akka.stream.scaladsl.{GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, SourceShape}

object Sample4 extends App {

  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val out = Sink.seq[Int]

  val source = Source.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val s1 = Source(1 to 5)
    val s2 = Source(6 to 10)

    val merge = builder.add(Merge[Int](2))

    s1 ~> merge
    s2 ~> merge

    SourceShape(merge.out)
  }

  source.map(_ * 10).runWith(Sink.seq).foreach(println)
}

// 実行時間
> Vector(10, 60, 20, 70, 30, 80, 40, 90, 50, 100)

Sourceは出力だけを持つShapeなのでSourceShapeを使います.SouceShapeの引数には2つのSourceをマージさせたmerge.outを指定しています.

おわりに

Akka Streamsでは独自の用語や概念が多く理解が難しいと思いますが,GraphDSLを使うことで複雑なストリーム処理をわかりやすく記述できるので慣れれば非常に使いやすいです.
次回はGraphDSLを扱う上ではかかせないGraphStageについて説明できればと思っています.今回はBroadcastとMergeを紹介しましたが,それ以外に独自で定義することも可能ですので,カスタムなGraphStageを作れるようになると更にAkka Streamsが楽しくなってきます.

参考

12
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
0