Help us understand the problem. What is going on with this article?

Akka Streams を使って簡単な集計を行う Stream を構築してみる

More than 3 years have passed since last update.

Akka Streams って?

Akka Streams は Reactive Streams(ノンブロッキングでback pressureな非同期ストリーム処理の標準仕様)のAkka実装
つまるところ、Back-pressure によってSubscriber側は受けきれない仕事を受けず、Subscriber側でどのくらい仕事を受けることができるかPublisher側に通知する方式をとることで、効率よく処理を行うことができるようにするものです。
また、Akka Streams を実装する時には構成の項で示すようなフローチャートを記載することが多く、コードの読み手にとっても、わかりやすく処理内容を伝えることができます。

Akka Streamsの基本に関しては下記を参考にさせていただきました。
Akka Stream についての基礎概念
Akka Document

今回やってみること

今回はわかりやすいように、簡単な例で集計をやってみたいと思います。

[ケース]
野菜ECにおける発注ログからどの商品がいくつ発注されたか集計する。
ログの例(発注日時など、余計なものは便宜上、全て省きます)

商品名   発注数量
apple 10
orange  8
apple 2
banana  20
banana 100
.
.
.

構成

Akka Streams は Source -> Flow -> Sink という 構成を必ずとります。簡単にいうと、Source は Input、Flowは処理、Sinkはデータの受け手です。

     SourceShape                     FlowShape                      FlowShape
      [String]                   [String, Converted]         [Converted, (String, Int)]
+-------------------+          +-------------------+          +-------------------+
|                   |          |                   |          |                   |
|                   +-+      +-+                   +-+      +-+                   +-+
| Source            | |      | |                   | |      | |                   | |
|  .fromIterator()  | +----> | |   toConverted()   | +----> | |    aggregate()    | +-----+
|                   | |      | |                   | |      | |                   | |     |
|                   +-+      +-+                   +-+      +-+                   +-+     |
|                   |          |                   |          |                   |       |
+-------------------+          +-------------------+          +-------------------+       |
                                                                                          |
                                                                                          |
                                                                                          |
                                     SinkShape                      FlowShape             |
                             [ByteString, Future[Done]]      [(String, Int), ByteString]  |
                               +-------------------+          +-------------------+       |
                               |                   |          |                   |       |
                               |                   +-+      +-+                   +-+     |
                               | FileIOFormatTo    | |      | |                   | |     |
                               |         .toPath() | | <----+ |   toByteString()  | | <---+
                               |                   | |      | |                   | |
                               |                   +-+      +-+                   +-+
                               |                   |          |                   |
                               +-------------------+          +-------------------+

Input

今回のInputは TSVファイル ということにします。

処理の流れ

  1. ファイルから1行ずつ取得
  2. 1行を集計しやすい形にコンバート
  3. 集計
  4. 集計結果(商品ごとの発注数量)をファイルに書き込むためにByteStringという型に変換
  5. 所定のファイルに書き込み

実装

Source[SampleSource.scala]

class SampleSource {

  def source(): Source[String, NotUsed] = Source.fromIterator(() => IOSource.fromFile("product-log.txt").getLines())

}

Source.fromIterator()は Akka Streamsに標準で提供されているもので、引数にとるIterator型の関数が、emptyやエラーになるまで、Iterator から要素を取り続けます。
取り出すタイミングはdownstreamからの要求(Back-pressure)があったときです。今回で言えば、ファイルの最終行に到達するまでですね。

Flow[SampleFlow.scala]

class SampleFlow {
  def toConverted(): Flow[String, Converted, NotUsed] = Flow[String].map { line =>
    val columns = line.split("\t")
    Converted(columns.head, columns(1).toInt)
  }

  def aggregate(): Flow[Converted, (String, Int), NotUsed] = {
    val maxSubStreamSize = 4
    val zero = ("", 0)

    Flow[Converted]
      .groupBy(maxSubStreamSize, { case Converted(product, quantity) => product } )
      .fold(zero) {
        (acc: (String, Int), converted: Converted) =>
          val quantity = acc._2 + converted.quantity
          (converted.product, quantity)
      }.mergeSubstreams
  }

  def toByteString() = Flow[(String, Int)].map { case (product, quantity) => ByteString(s"$product\t$quantity\n")}

}

aggregate()

groupByメソッドは指定されたキーでInputを振り分けていきます。振り分け先はSubstreamとよばれるもので、キー毎にサブストリームが作られ、流れていくイメージです。従ってこのメソッドの返り値は普通のFlowではなく、SubFlowとなります。
メソッドで1番目にSubstreamSizeを指定していますが、これはキーの数だけストリームを作る必要が有りますので、今回の場合、最低でも商品数の数だけ指定しておく必要があります。
fold はSubFlow クラスが提供しているメソッドになります。挙動は見ておわかりの通り、collectionのfoldとだいたい同じです。
最後のmergeSubstreamsは先程のSubFlowによって分裂したstreamをmergeします。SubFlowをmergeすることで、このFlow全体の返り値を(String, Int)のように、商品とその発注数のような形になっています。 後述しますが、mergeをするということはInputの「終わり」がわからないといけません。

Sink[SampleSink]

class SampleSink {

  def sink(): Sink[ByteString, Future[IOResult]] = FileIO.toPath(new File("result.tsv").toPath)

}

FileIO クラスも Akka Streamsが提供しているクラスです。upstreamからの「終わり」が伝えられるまで指定されたpathに書き込みを続けます。

実行

def main(args: Array[String]) = {

  implicit val system = ActorSystem("sample-system")
  implicit val mat = ActorMaterializer()

  val source         = new SampleSource()
  val flow           = new SampleFlow()
  val sink           = new SampleSink()

  val iteratorSource   = source.source()
  val toConvertedFlow  = flow.toConverted()
  val aggregateFlow    = flow.aggregate()
  val toByteStringFlow = flow.toByteString()
  val writeSink        = sink.sink()

  val runnable: RunnableGraph[NotUsed] = iteratorSource via toConvertedFlow via aggregateFlow via toByteStringFlow to writeSink

  runnable.run()
}

runnable のように先程作成した streamを連結していくことでRunnableGraphを作成し、run()を実行することではじめて、実際にstreamが実行されます。

出力結果例

result.tsv
apple 1422
banana  549
orange  5004
.
. //略
.

上記のように、集計された結果がtsvファイルとして所定の場所に格納されていれば成功です。

集計の際に注意すべきところ

集計するということは何かを境界として集計します。本件で言えば、1TSVファイルがその[境界]に相当するのでしょう。
aggregateFlowはupstreamとなるtoConvertedFlowに対してBack-preasureを行いますが、upstreamが「もう終わり!ファイル読み終わったよ!」と伝えない限り、永遠に次をまち続けることとなり、集計が終わりません。さらにtoConvertedFlowも、upstreamが終わりを伝えてくれない限り、「終わり」とdownstraem(aggregateFlow)に伝えることができません。
このように辿っていくと最終的にsourceの中のSource.fromIterator()にたどり着くのですが、このfromIteratorは前述したとおり、iteratorが空をrunしたら「終わり」を宣言してくれます。

例えば下記のような実装をすると、終わりがわからないために、groupByによって作られたSubStreamをmergeすることができず、永遠に集計が終わらなくなります。

失敗例

例)SQSからキューを取得し、キューのメッセージに入っているpathからファイルをとってきて、上記と同じ処理をする。
SQSからキューを取得してakka streamsに流すには alpakka が提供しているコネクターを使用するのがポピュラーです。

  def source() = SqsSource(queue)(awsSqsClient).flatMapConcat{ message: Message =>
    Source.fromIterator(() => IOSource.fromFile(new File(message.getBody)).getLines())
  }

そもそもこんな面倒なことやらんわっていう感じかもしれないですが、例なのであしからず(実際やって詰まった。。)。
恐らく、このような実装になったりするのですが、元となるSqsSourceがポイントになります。このSqsSourceには終わりがないです。なぜならSQSに入ってきたqueueを取得するものだから、いつ「終わる」なんてものがないのです。だから、うまーく Source を作ったつもりになってこのまま集計まで実装すると青ざめることになります。処理が永遠に終わりません。

まとめ

簡単ではありますが、Akka Streams をつかった集計方法を紹介させていただきました。
あまり理解せずにAkka Stramsに手を出してしまった結果、最初に上記のような失敗をしてしまいました。「いつBack-pressureするのか」、「いつ終わりになるのか」をもっとちゃんと理解していたら、防げたなと感じております。Akka StreamsのドキュメントやコードのDescriptionにも、丁寧に解説があるので、きっちり一読していただくことをおすすめしますm(_ _)m
ここでは紹介できていませんが、Flowのチューニングを行うことで実行速度が大きく変わったりします。

参考資料

Akka Stream についての基礎概念
【Akka】Akka Streamsがめっちゃ便利すぎて脳汁が出た話し
Akka Document
2014 akka-streams-tokyo-japanese
Alpakkaとの連携

showmant
サーバーサイドエンジニアです。Scalaをよく書き、よく愛しています。 アルプ株式会社を創業しました。エンジニア大募集中です。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away