LoginSignup
27
16

More than 5 years have passed since last update.

Akka Streams を使って処理を図(Graph)にする

Last updated at Posted at 2016-08-22

A little Advanced Akka Streams API

前の記事で、Akka Streams の基礎概念のSource、Flow、Sinkについて説明しました。それらを繋げば、線形の処理をシンプルに実装出来る。
でも非線形の処理はどうでしょうか?

Graph

Akka Streams ではストリームの実行トポロジーを表す概念をGraph(図)と呼ぶ。
線形、非線形、分岐のあるデータ処理は全部Graphである。

Junctions

まず、非線型処理をするため、Akka Streams が提供した分岐API(Junction)を見てみよう。

Fan-out 複数出力
  • Broadcast[T] – (1 input, N outputs) inputをすべてのoutputに出す。
  • Balance[T] – (1 input, N outputs) inputを任意一つのoutputに出す。
  • UnZip[A,B] – (1 input, 2 outputs) Tuple[A, B]のinputをAとBに分割して、2つのoutputに別々で送る。
  • UnzipWith[In,A,B,...] – (1 input, N outputs) inputを受け取って、N個の結果お返す関数を渡すことで、N個の結果を別々のoutputへ別々で送る (N <= 20)。
Fan-in 複数入力
  • Merge[In] – (N inputs , 1 output) すべてのinputを一つのoutputに出す。
  • MergePreferred[In] – 指定ポートを優先でmergeする。
  • ZipWith[A,B,...,Out] – (N inputs, 1 output) 複数の入力を受け取って、一つの結果お返す関数を渡すことで、N個のinputを処理する。
  • Zip[A,B] – (2 inputs, 1 output) inputのAとBをTuple[A, B]に合成する。
  • Concat[A] – (2 inputs, 1 output) 2つのstreamを繋がる

必要な部件を揃うところで、Graphは以下のように作成出来る。

simple-graph

※Image taken from doc.akka.io

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
              bcast ~> f4 ~> merge

  ClosedShape
})

上記のコードでtweetsのSourceを2つのFlowにブロードキャストしている。

  • GraphDSL.create() { implicit b => でGraphを作成

  • GraphDSL.Implicits._ をimportすることで、~> (connect, via, toなどへ自動変換)みたいなGraph DSL記号を使うことが出来る。反対の <~ もある。

  • ClosedShape (閉じた図形)はこのGraphは完全に繋がっている(SourceからSinkまで)の意味

  • ClosedShapeにすると、このGraphはRunnableGraphになる、 run() で実行出来る。

Shape

図の形状(Shape)は、ClosedShape以外にも色いろある。
ClosedShape閉じ違って、完全に繋がっていないShapeを持つGraphは、 Partial graph と呼ぶ。
以下のように、Shapeを定義出来る。

// Shapeは任意のinputとoutputポートを持つ再利用できる処理モジュールのこと
case class PriorityWorkerPoolShape[In, Out](
  jobsIn:         Inlet[In],
  priorityJobsIn: Inlet[In],
  resultsOut:     Outlet[Out]) extends Shape {

  // 固定の順番でポートのリストを定義することが重要、重複はできない。
  override val inlets: immutable.Seq[Inlet[_]] =
    jobsIn :: priorityJobsIn :: Nil
  override val outlets: immutable.Seq[Outlet[_]] =
    resultsOut :: Nil

  // Shapeは自分のコピーを作れる
  override def deepCopy() = PriorityWorkerPoolShape(
    jobsIn.carbonCopy(),
    priorityJobsIn.carbonCopy(),
    resultsOut.carbonCopy())

  // 既存のポートから作ることも必要
  override def copyFromPorts(
    inlets:  immutable.Seq[Inlet[_]],
    outlets: immutable.Seq[Outlet[_]]) = {
    assert(inlets.size == this.inlets.size)
    assert(outlets.size == this.outlets.size)
    // ここでなぜ順番が重要なのかがわかる
    PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out])
  }
}
Akka Streams では、以下のShapeを用意している。
  • SourceShape, SinkShape, FlowShape 普通のShapeを代表する,
  • UniformFanInShape, UniformFanOutShape 複数かつ同じ型のinput、もしくはoutputを持つShape,
  • FanInShape1, FanInShape2, ..., FanOutShape1, FanOutShape2, ... 複数かつ違うじ型のinput、もしくはoutputを持つShape。
Shapeの使い方

FanInShapeを使えば、上と同じPriorityWorkerPoolShapeを定義出来る。

import FanInShape.{ Init, Name }

class PriorityWorkerPoolShape2[In, Out](_init: Init[Out] = Name("PriorityWorkerPool"))
  extends FanInShape[Out](_init) {
  protected override def construct(i: Init[Out]) = new PriorityWorkerPoolShape2(i)

  val jobsIn = newInlet[In]("jobsIn")
  val priorityJobsIn = newInlet[In]("priorityJobsIn")
  //  "out"の名前のOutlet[Out]ポートは自動で作られる
}

PriorityWorkerPoolShapeを使って、Graphを作成する。

object PriorityWorkerPool {
  def apply[In, Out](
    worker:      Flow[In, Out, Any],
    workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], NotUsed] = {

    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val priorityMerge = b.add(MergePreferred[In](1))
      val balance = b.add(Balance[In](workerCount))
      val resultsMerge = b.add(Merge[Out](workerCount))

      priorityMerge ~> balance

      for (i <- 0 until workerCount)
        balance.out(i) ~> worker ~> resultsMerge.in(i)

      PriorityWorkerPoolShape(
        jobsIn = priorityMerge.in(0),
        priorityJobsIn = priorityMerge.preferred,
        resultsOut = resultsMerge.out)
    }

  }

}

ちょっと複雑な例

以上の概念の組み合わせて行けば、下記のような複雑な処理をそのまま書ける。

ClosedShape

compose_graph

※Image taken from doc.akka.io

import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  val A: Outlet[Int]                  = builder.add(Source.single(0)).out
  val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
  val C: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
  val D: FlowShape[Int, Int]          = builder.add(Flow[Int].map(_ + 1))
  val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
  val F: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
  val G: Inlet[Any]                   = builder.add(Sink.foreach(println)).in

                C     <~      F
  A  ~>  B  ~>  C     ~>      F
         B  ~>  D  ~>  E  ~>  F
                       E  ~>  G

  ClosedShape
})

Source、Flow、Sinkはaddする必要ないので、下記のようにも書ける。

import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  val B = builder.add(Broadcast[Int](2))
  val C = builder.add(Merge[Int](2))
  val E = builder.add(Balance[Int](2))
  val F = builder.add(Merge[Int](2))

  Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
  C.in(0) <~ F.out

  B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
  E.out(1) ~> Sink.foreach(println)
  ClosedShape
})
Partial graph

compose_graph_partial

※Image taken from doc.akka.io

import GraphDSL.Implicits._
val partial = GraphDSL.create() { implicit builder =>
  val B = builder.add(Broadcast[Int](2))
  val C = builder.add(Merge[Int](2))
  val E = builder.add(Balance[Int](2))
  val F = builder.add(Merge[Int](2))

                                   C  <~  F
  B  ~>                            C  ~>  F
  B  ~>  Flow[Int].map(_ + 1)  ~>  E  ~>  F
  FlowShape(B.in, E.out(1))
}.named("partial")

※ named()はモジュールに名付けことが出来る、デバッグ時が有用。

FlowShapeになるので、Flowのように使える。

Source.single(0).via(partial).to(Sink.ignore)

まとめ

今回はここまで、もうちょっと複雑なAPIの紹介しました。
Junctionsを使って、複雑なGraphを作成することが出来た。
これから、紙上で書いたフローチャートはそのまま書くことが出来るでしょう。

参考資料

27
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
27
16