Akka Streams が2.4以降からexperimentalを外して、正式版をリリースしました。丁度会社で3日のHackerDaysを機に、Akka Streams を勉強しはじめました。
この記事では、AkkaStreamの公式ドキュメントを抜粋し、翻訳しながら、AkkaStreamの基礎概念を説明します。
Akka Streams ってなに
背景
今のInternet上、我々は膨大なデータを消費している。その大量のデータを人々はビッグデータと呼んでいるw。
もう昔みたいにデータを全部ダウンロードして処理、処理完了してアップロード的な処理は時間掛かりすぎ、そもそも一台のサーバに保存しきれないデータは処理できないので、Streamみたいな流れとしての処理が必要になっている。
Akkaが使うActorモデルもその一例、データを分割し、メッセージとしてActorに送る、Actorは只々流れてきたメッセージを処理し、リアルタイムの処理が出来る。
課題
ただし、Actor間の安定メッセージを実現するのは難しい。
メッセージを送る側をPublisherとして、受け取る側をSubscriberと呼ぶ。
- Publisher側の処理が早い場合Subscriber側のバッファーが溢れてしまう。
- Subscriberに遠慮してPublisher側の処理を抑えた場合は無駄が多くなってしまう。
Akka Streams という解決案
Back Pressure
- それをSubscriberが自分を処理できる量をPublisherにリクエストを送ることで無駄なくSubscriberが処理できる量を処理する仕組み。
Reactive Streams
- 違うStream処理ツール間でもBack Pressureを実現出来るため、Reactive Streams(ノンブロッキングでback pressureな非同期ストリーム処理の標準仕様)を提唱。
その他のメリット
- Akka Streams はいろんなデータをStreamのように処理することが出来る。
- 直感的なユーザAPIを使って、処理を共通モジュール化出来る。
- 処理過程を図を書くようにシンプル。
- 処理のコンテスト、処理過程と処理の実行を分けている。処理過程を必要なところに持ち込んで、好きなタイミングで実行出来る。
- Back Pressureがあるので、性能を最大限まで引き出す能力を持っている。
- Block処理がある場合、実行時間は既存の処理としてあんまり変わらないので。Futureを使って、Non-blockingの実装が望ましい。
Akka Streams API
Akka Streams はバージョン2.4以降、APIを一新させ、experimental(実験的)でなくなった。Akka Streams を使うため、まずAPIの中の基本概念を覚えよう。
Materializer
Stream処理を実行する環境の抽象化、Actorで実行たい場合は、ActorMaterializerを使う。
※ 本文で書かれたコード例は、全部以下のコードを含む。
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
Source
源流、データを作る側、input channelを持たない、一つのoutput channelを持っている。
※Image taken from boldradius.com
Sourceの使い方
- まずは、有限のデータをSourceにする方法。遅延評価のため、run* 実行しないと、評価されない。
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...
scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
- 次は、無限データをSourceにする方法。無限で評価されないため、takeを入れた。
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
- そして、Actorに送るメッセージをSourceにする方法。 結果から、Futureが違うスレッドで並行実行されることが分かる。 ※ ただし、この方法はBack Pressure出来ないので、Bufferを超える際の処理を指定する必要がある。
def run(actor: ActorRef) = {
Future { Thread.sleep(300); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
.actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(run)
scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
Sink
水槽、最終的にデータを受け取る側。Sourceの反対で、output channelを持たない、一つのinput channelを持っている。
※Image taken from boldradius.com
Sinkの使い方
- Sourceと直接繋ぐ
to を使えば、SourceとSinkを繋ぐことが出来る。戻り値は RunnableFlow と呼ぶ。RunnableFlowに対して run() を実行すれば、評価される。
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
※Image taken from boldradius.com
- すべてのデータをActorに渡すことも当然できる。
val actor = system.actorOf(Props(new Actor {
override def receive = {
case msg => println(s"actor received: $msg")
}
}))
scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...
scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Flow
既存systemのデータをAkkaStreamに渡すだけなら、SourceとSinkが十分ですが、出来ないこともある。
Flowがoutput channelとinput channelの両方を行っている、SourceとSinkの間で、データを好きなように変換出来る。
※Image taken from boldradius.com
SourceとFlowを繋げば、新しいSourceになる、FlowとSinkを繋げば、新しいSinkになる、すべて繋げば、 RunnableFlow になる。
※Image taken from boldradius.com
Flowの使い方
- via を使って、SourceとFlowを繋ぐ、inputの型を指定する必要がある。 SourceとFlowとSinkは完全独立なので、再利用出来る。
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
- SourceとSinkとFlowを繋ぐ
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3
scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1
まとめ
以上、Akka Streams における最も基礎の部分を話ししました。
Streamを構成するSource、Flow、Sink、3つの基礎概念がある。これらを使えば、ほとんどの線形処理を簡単に書くことが出来ます。なお、再利用出来る共通モジュールも作ることもてきます。
めでたし、めでたし?
Next
Akka Streams が出来ることはもちろんこれだけではありません。次の記事 では、Akka Streams が提供した他の部品を使って、こんな感じの処理図(Graph)を書く方法について説明します。
※Image taken from doc.akka.io