Help us understand the problem. What is going on with this article?

Akka Streams についての基礎概念

More than 1 year has passed since last update.

Akka Streams が2.4以降からexperimentalを外して、正式版をリリースしました。丁度会社で3日のHackerDaysを機に、Akka Streams を勉強しはじめました。
この記事では、AkkaStreamの公式ドキュメントを抜粋し、翻訳しながら、AkkaStreamの基礎概念を説明します。

Akka Streams ってなに

背景

今のInternet上、我々は膨大なデータを消費している。その大量のデータを人々はビッグデータと呼んでいるw。
もう昔みたいにデータを全部ダウンロードして処理、処理完了してアップロード的な処理は時間掛かりすぎ、そもそも一台のサーバに保存しきれないデータは処理できないので、Streamみたいな流れとしての処理が必要になっている。
Akkaが使うActorモデルもその一例、データを分割し、メッセージとしてActorに送る、Actorは只々流れてきたメッセージを処理し、リアルタイムの処理が出来る。

課題

ただし、Actor間の安定メッセージを実現するのは難しい。

メッセージを送る側をPublisherとして、受け取る側をSubscriberと呼ぶ。

  • Publisher側の処理が早い場合Subscriber側のバッファーが溢れてしまう。
  • Subscriberに遠慮してPublisher側の処理を抑えた場合は無駄が多くなってしまう。

Akka Streams という解決案

Back Pressure
  • それをSubscriberが自分を処理できる量をPublisherにリクエストを送ることで無駄なくSubscriberが処理できる量を処理する仕組み。
Reactive Streams
  • 違うStream処理ツール間でもBack Pressureを実現出来るため、Reactive Streams(ノンブロッキングでback pressureな非同期ストリーム処理の標準仕様)を提唱。
その他のメリット
  • Akka Streams はいろんなデータをStreamのように処理することが出来る。
  • 直感的なユーザAPIを使って、処理を共通モジュール化出来る。
  • 処理過程を図を書くようにシンプル。
  • 処理のコンテスト、処理過程と処理の実行を分けている。処理過程を必要なところに持ち込んで、好きなタイミングで実行出来る。
  • Back Pressureがあるので、性能を最大限まで引き出す能力を持っている。
  • Block処理がある場合、実行時間は既存の処理としてあんまり変わらないので。Futureを使って、Non-blockingの実装が望ましい。

Akka Streams API

Akka Streams はバージョン2.4以降、APIを一新させ、experimental(実験的)でなくなった。Akka Streams を使うため、まずAPIの中の基本概念を覚えよう。

Materializer

Stream処理を実行する環境の抽象化、Actorで実行たい場合は、ActorMaterializerを使う。
※ 本文で書かれたコード例は、全部以下のコードを含む。

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

Source

源流、データを作る側、input channelを持たない、一つのoutput channelを持っている。

Source

※Image taken from boldradius.com

Sourceの使い方

  1. まずは、有限のデータをSourceにする方法。遅延評価のため、run* 実行しないと、評価されない。
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
  1. 次は、無限データをSourceにする方法。無限で評価されないため、takeを入れた。
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
  1. そして、Actorに送るメッセージをSourceにする方法。 結果から、Futureが違うスレッドで並行実行されることが分かる。 ※ ただし、この方法はBack Pressure出来ないので、Bufferを超える際の処理を指定する必要がある。
def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Sink

水槽、最終的にデータを受け取る側。Sourceの反対で、output channelを持たない、一つのinput channelを持っている。

Sink

※Image taken from boldradius.com

Sinkの使い方

  1. Sourceと直接繋ぐ

to を使えば、SourceとSinkを繋ぐことが出来る。戻り値は RunnableFlow と呼ぶ。RunnableFlowに対して run() を実行すれば、評価される。

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

runaable-flow

※Image taken from boldradius.com

  1. すべてのデータをActorに渡すことも当然できる。
val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Flow

既存systemのデータをAkkaStreamに渡すだけなら、SourceとSinkが十分ですが、出来ないこともある。
Flowがoutput channelとinput channelの両方を行っている、SourceとSinkの間で、データを好きなように変換出来る。

Flow

※Image taken from boldradius.com

SourceとFlowを繋げば、新しいSourceになる、FlowとSinkを繋げば、新しいSinkになる、すべて繋げば、 RunnableFlow になる。

source-flow-sink

※Image taken from boldradius.com

Flowの使い方

  1. via を使って、SourceとFlowを繋ぐ、inputの型を指定する必要がある。 SourceとFlowとSinkは完全独立なので、再利用出来る。
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
  1. SourceとSinkとFlowを繋ぐ
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

まとめ

以上、Akka Streams における最も基礎の部分を話ししました。
Streamを構成するSource、Flow、Sink、3つの基礎概念がある。これらを使えば、ほとんどの線形処理を簡単に書くことが出来ます。なお、再利用出来る共通モジュールも作ることもてきます。
めでたし、めでたし?

Next

Akka Streams が出来ることはもちろんこれだけではありません。次の記事 では、Akka Streams が提供した他の部品を使って、こんな感じの処理図(Graph)を書く方法について説明します。

compose_graph
※Image taken from doc.akka.io

参考資料

xoyo24
格ゲーマーエンジニアです
https://xoyo24.me/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away