Akka Streamsの後ろではActorが動いているはずです。それを確認したかったので確認しました。
Akka 2.4.9で確認。
LT駆動29の発表内容を文章に落としただけです。
基本
まずはアクターをつくって、生成されているアクターを確認してみます。
アクターの定義
メッセージを受け取ったら標準出力に出力するActorをつくってみます。
class HelloActor extends akka.actor.Actor {
override def receive: Receive = { // receiveメソッドを実装する必要がある
case keyword:String => // 何か文字列でメッセージがきたら…
println(s"Hello, ${keyword}");
}
}
akka.actor.Actor
トレイトをミックスインしてreceive
メソッドを実装します。戻り値のReceive
はtype Receive = PartialFunction[Any, Unit]
と定義されてます。何かをうけとって、何も返さない部分関数です。
undefinedやerrorや網羅的ではないパターンマッチを使った関数も部分関数になるでしょう。
部分関数をどう扱うか(spoonの紹介)
http://qiita.com/techno-tanoC/items/1b725713cd01bdf83b40
とあるように、ここでは網羅的ではないパターンマッチ返すと考えてよいでしょう。
定義したアクターを作成して使ってみます。
アクターの利用
val actorSystem = akka.actor.ActorSystem() // アクターを動かすにはアクターシステムが必要
val HelloActorProps = akka.actor.Props[HelloActor] // アクターを生成するためには`Props`を用意する
val helloActorRef = actorSystem.actorOf(HelloActorProps) // Propsからアクターを生成して、アクターにアクセスするためのActorRefを受け取る
helloActorRef ! "World" // "Wordというメッセージをアクターに送る
// Hello, Wordと標準出力に出力される
// HelloActorRefはHelloActorのインスタンスではないのでreceiveメソッドは直接よべない
アクターの基本的な使い方を確認しました。
この時点で生成されているアクターを確認してみます。
アクターの確認
アクターの確認は今回ActorSystemImpl#printTree
を使いました。(もっと良い方法があれば教えて)
これはアクターシステム内のアクターの木構造を出力します。
アクターには親子関係があります。
ActorSystemImpl
はprivate[akka]
で宣言されているので、注意です。
// package akka
val actorSystem = akka.actor.ActorSystem()
var as = actorSystem.asInstanceOf[ActorSystemImpl] // printeNodeをつかうために無理矢理ActorSystemImplに
println(as.printTree);
/* アクターシステムを作った時点
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 3 children
| ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
| ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
| ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 no children
*/
val helloActorRef = actorSystem.actorOf(HelloActorProps)
println(as.printTree); // HelloActorを一つ作成時点を確認
/* HelloActorを一つ作成
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 3 children
| ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
| ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
| ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
⌊-> $a RepointableActorRef class akka.HelloActor status=0 0 children
*/
helloActorRef ! "World"
/
というルートアクターがいてsystem
とuser
というアクターがいます。system
にはActorSystem
が使うアクターがいるっぽいです。(調べてない)
user
の下に自分の作ったアクターができていきます。
$a
という適当な名前がついていますが、actorOf
の第2引数で名前を指定することもできます。
子アクターをつくってみる
printTree
の動作確認をかねて、作ったアクターから子アクターをつくってみます。context.actorOf
で作成ができます。
class HelloActor extends akka.actor.Actor {
var child: akka.actor.ActorRef = _
override def receive: Receive = {
case str: String =>
child = context.actorOf(HelloActor.props) // メッセージを受け取るたびにアクターを生成
println(s"Hello, $str");
}
}
object HelloActor {
val props = akka.actor.Props[HelloActor]
}
object Sample extends App {
val actorSystem = akka.actor.ActorSystem()
var as = actorSystem.asInstanceOf[ActorSystemImpl]
val helloActorRef = actorSystem.actorOf(HelloActor.props)
helloActorRef ! "World"
println(as.printTree);
}
$aアクターの中に$aができてます。
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 3 children
| ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
| ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
| ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
⌊-> $a RepointableActorRef class akka.HelloActor status=0 1 children
⌊-> $a LocalActorRef class akka.HelloActor status=0 no children
Akka Streamsでも確認してみる
Akka Streamsについては省略。0から数えつづけて何もしないストリームを構築してみます。
val actorSystem = akka.actor.ActorSystem()
val as = actorSystem.asInstanceOf[ActorSystemImpl]
val materializer = akka.stream.ActorMaterializer()(actorSystem)
println(as.printTree) // materializerをつくると、StreamSupervisorアクターが生成される
/*
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 2 children
*/
val graph = Source.fromIterator(() => Iterator.from(0)).to(Sink.ignore())
println(as.printTree)
/* RunnableGraphを作った時点ではアクターはできてない
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=
*/
graph.run()(materializer)
println(as.printTree)
/* flow-0-0-unknown-operationって言うアクターができてる
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 1 children
⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=2 no children
*/
ActorMaterializer
を作成した時点で、StreamSupervisor
が作成されました。
RunnableGraph
を作った時点では、まだアクターは生成されず、runをするとActorGraphInterpreter
が生成されました。ストリームを動かすアクターができたみたいです。
ストリームのデータを流れる数値を2倍になるようにmapを加えてみます。
val graph = Source.fromIterator(() => Iterator.from(0))
.map(_ * 2)
.to(Sink.ignore())
.run()(materializer)
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 1 children
⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=2 no children
特に変わりませんでした。
次はSink.actorRef
をつかってHelloActorにメッセージが流れるようにしてみます。
val helloActor = actorSystem.actorOf(HelloActor.props)
val graph = Source.fromIterator(() => Iterator.from(0))
.map(_.toString)
.to(Sink.actorRef[String](helloActor, ()))
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> $a RepointableActorRef class akka.HelloActor status=0 no children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 2 children
⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=0 no children
⌊-> flow-0-1-actorRefSink RepointableActorRef class akka.stream.impl.ActorRefSinkActor status=2 no children
ActorRefSinkActor
が生成されていました。HelloActorとつなぎをするアクターができているようです。
Source.actorRef
も試してみます。
Source.actorRef
にはActorRefが渡すところがありません。materializeした際にActorRef
が返せるので、このアクターにメッセージを投げるとストリーム上にデータが流れるようです。(ちゃんと調べてない)
val helloActor = actorSystem.actorOf(HelloActor.props)
val graph = Source.actorRef(1, OverflowStrategy.dropTail)
.to(Sink.actorRef[String](helloActor, ()))
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> $a RepointableActorRef class akka.HelloActor status=0 no children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 2 children
⌊-> flow-0-0-actorRefSource RepointableActorRef class akka.stream.impl.ActorRefSourceActor status=0 no children
⌊-> flow-0-1-actorRefSink RepointableActorRef class akka.stream.impl.ActorRefSinkActor status=0 no children
ActorRefSourceActor
が生成されています。
データを処理するところがないためか、ActorGraphInterpreter
は作成されませんでした。
ここに更にmapをくわえてみます。
val graph = Source.actorRef[Int](1, OverflowStrategy.dropTail)
.map(_*2)
.map(_.toString)
.to(Sink.actorRef[String](helloActor, ()))
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> $a RepointableActorRef class akka.HelloActor status=0 no children
⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 3 children
⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=0 no children
⌊-> flow-0-1-actorRefSource RepointableActorRef class akka.stream.impl.ActorRefSourceActor status=0 no children
⌊-> flow-0-2-actorRefSink RepointableActorRef class akka.stream.impl.ActorRefSinkActor status=0 no children
ActorGraphInterpreter
がかえってきました。
まとめ
Akka Streamsでアクターが生成され具合をじっくりと確認しました。
mapひとつ書けばアクターが一つ増えるかというとそうでもなく処理は合成されるようでした。
また、back pressureするためと思われるStreamを繋ぐためのアクターができるようです。
Sink.actorRef
を利用しましたが、Sink.acorSubscriber
をつかえばActorSubscriber
を自分で実装して、渡せばActorRefSinkActor
は作成されずに、自分で実装したアクターがその仕事をします。
サンプルコード
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.9",
)
package akka
import akka.actor.ActorSystemImpl
import akka.stream.OverflowStrategy
import akka.stream.javadsl.Sink
import akka.stream.scaladsl.Source
class HelloActor extends akka.actor.Actor {
var child: akka.actor.ActorRef = _
override def receive: Receive = {
case str: String =>
println(s"Hello, ${str}")
}
}
object HelloActor {
val props = akka.actor.Props[HelloActor]
}
object AkkaStreamActor extends App {
val actorSystem = akka.actor.ActorSystem()
val as = actorSystem.asInstanceOf[ActorSystemImpl]
val materializer = akka.stream.ActorMaterializer()(actorSystem)
val hello = actorSystem.actorOf(HelloActor.props)
val graph = Source.actorRef[Int](1, OverflowStrategy.dropTail)
.map(_*2)
.map(_.toString)
.to(Sink.actorRef(hello, ()))
graph.run()(materializer)
println(as.printTree)
actorSystem.terminate()
}