LoginSignup
4
2

More than 5 years have passed since last update.

Akka StreamsでのActorの生成され具合をみてみる

Last updated at Posted at 2016-09-07

Akka Streamsの後ろではActorが動いているはずです。それを確認したかったので確認しました。

Akka 2.4.9で確認。

LT駆動29の発表内容を文章に落としただけです。

基本

まずはアクターをつくって、生成されているアクターを確認してみます。

アクターの定義

メッセージを受け取ったら標準出力に出力するActorをつくってみます。

class HelloActor extends akka.actor.Actor {
  override def receive: Receive = { // receiveメソッドを実装する必要がある
    case keyword:String => // 何か文字列でメッセージがきたら…
      println(s"Hello, ${keyword}");
  }
}

akka.actor.Actorトレイトをミックスインしてreceiveメソッドを実装します。戻り値のReceivetype Receive = PartialFunction[Any, Unit]と定義されてます。何かをうけとって、何も返さない部分関数です。

undefinedやerrorや網羅的ではないパターンマッチを使った関数も部分関数になるでしょう。

部分関数をどう扱うか(spoonの紹介)
http://qiita.com/techno-tanoC/items/1b725713cd01bdf83b40

とあるように、ここでは網羅的ではないパターンマッチ返すと考えてよいでしょう。

定義したアクターを作成して使ってみます。

アクターの利用

val actorSystem = akka.actor.ActorSystem() // アクターを動かすにはアクターシステムが必要
val HelloActorProps = akka.actor.Props[HelloActor] // アクターを生成するためには`Props`を用意する

val helloActorRef = actorSystem.actorOf(HelloActorProps) // Propsからアクターを生成して、アクターにアクセスするためのActorRefを受け取る
helloActorRef ! "World"  // "Wordというメッセージをアクターに送る
// Hello, Wordと標準出力に出力される

// HelloActorRefはHelloActorのインスタンスではないのでreceiveメソッドは直接よべない

アクターの基本的な使い方を確認しました。
この時点で生成されているアクターを確認してみます。

アクターの確認

アクターの確認は今回ActorSystemImpl#printTreeを使いました。(もっと良い方法があれば教えて)
これはアクターシステム内のアクターの木構造を出力します。
アクターには親子関係があります。

ActorSystemImplprivate[akka]で宣言されているので、注意です。

// package akka

val actorSystem = akka.actor.ActorSystem()
var as = actorSystem.asInstanceOf[ActorSystemImpl] // printeNodeをつかうために無理矢理ActorSystemImplに
println(as.printTree); 
/* アクターシステムを作った時点
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
   ⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 3 children
   |   ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
   |   ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
   |   ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
   ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 no children
*/

val helloActorRef = actorSystem.actorOf(HelloActorProps)
println(as.printTree); // HelloActorを一つ作成時点を確認
/* HelloActorを一つ作成
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
   ⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 3 children
   |   ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
   |   ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
   |   ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
   ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
       ⌊-> $a RepointableActorRef class akka.HelloActor status=0 0 children
*/

helloActorRef ! "World"

/というルートアクターがいてsystemuserというアクターがいます。systemにはActorSystemが使うアクターがいるっぽいです。(調べてない)
userの下に自分の作ったアクターができていきます。
$aという適当な名前がついていますが、actorOfの第2引数で名前を指定することもできます。

子アクターをつくってみる

printTreeの動作確認をかねて、作ったアクターから子アクターをつくってみます。context.actorOfで作成ができます。

class HelloActor extends akka.actor.Actor {
  var child: akka.actor.ActorRef = _
  override def receive: Receive = {
    case str: String =>
      child = context.actorOf(HelloActor.props) // メッセージを受け取るたびにアクターを生成
      println(s"Hello, $str");
  }
}

object HelloActor {
  val props = akka.actor.Props[HelloActor]
}

object Sample extends App {
  val actorSystem = akka.actor.ActorSystem()
  var as = actorSystem.asInstanceOf[ActorSystemImpl]
  val helloActorRef = actorSystem.actorOf(HelloActor.props)
  helloActorRef ! "World"
  println(as.printTree);
}

$aアクターの中に$aができてます。

-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
   ⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 3 children
   |   ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
   |   ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
   |   ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
   ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
       ⌊-> $a RepointableActorRef class akka.HelloActor status=0 1 children
           ⌊-> $a LocalActorRef class akka.HelloActor status=0 no children

Akka Streamsでも確認してみる

Akka Streamsについては省略。0から数えつづけて何もしないストリームを構築してみます。

val actorSystem = akka.actor.ActorSystem()
val as = actorSystem.asInstanceOf[ActorSystemImpl]
val materializer = akka.stream.ActorMaterializer()(actorSystem)
println(as.printTree) // materializerをつくると、StreamSupervisorアクターが生成される
/*
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 2 children
*/

val graph = Source.fromIterator(() => Iterator.from(0)).to(Sink.ignore())
println(as.printTree)
/* RunnableGraphを作った時点ではアクターはできてない
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=
*/

graph.run()(materializer)
println(as.printTree)
/* flow-0-0-unknown-operationって言うアクターができてる
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 1 children
           ⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=2 no children
*/

ActorMaterializerを作成した時点で、StreamSupervisorが作成されました。
RunnableGraphを作った時点では、まだアクターは生成されず、runをするとActorGraphInterpreterが生成されました。ストリームを動かすアクターができたみたいです。

ストリームのデータを流れる数値を2倍になるようにmapを加えてみます。

val graph = Source.fromIterator(() => Iterator.from(0))
  .map(_ * 2)
  .to(Sink.ignore())
  .run()(materializer)
 ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 1 children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 1 children
           ⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=2 no children

特に変わりませんでした。

次はSink.actorRefをつかってHelloActorにメッセージが流れるようにしてみます。

val helloActor = actorSystem.actorOf(HelloActor.props)
val graph = Source.fromIterator(() => Iterator.from(0))
  .map(_.toString)
  .to(Sink.actorRef[String](helloActor, ()))
   ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
       ⌊-> $a RepointableActorRef class akka.HelloActor status=0 no children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 2 children
           ⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=0 no children
           ⌊-> flow-0-1-actorRefSink RepointableActorRef class akka.stream.impl.ActorRefSinkActor status=2 no children

ActorRefSinkActorが生成されていました。HelloActorとつなぎをするアクターができているようです。

Source.actorRefも試してみます。
Source.actorRefにはActorRefが渡すところがありません。materializeした際にActorRefが返せるので、このアクターにメッセージを投げるとストリーム上にデータが流れるようです。(ちゃんと調べてない)

val helloActor = actorSystem.actorOf(HelloActor.props)
val graph = Source.actorRef(1, OverflowStrategy.dropTail)
  .to(Sink.actorRef[String](helloActor, ()))
   ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
       ⌊-> $a RepointableActorRef class akka.HelloActor status=0 no children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 2 children
           ⌊-> flow-0-0-actorRefSource RepointableActorRef class akka.stream.impl.ActorRefSourceActor status=0 no children
           ⌊-> flow-0-1-actorRefSink RepointableActorRef class akka.stream.impl.ActorRefSinkActor status=0 no children

ActorRefSourceActorが生成されています。
データを処理するところがないためか、ActorGraphInterpreterは作成されませんでした。

ここに更にmapをくわえてみます。

val graph = Source.actorRef[Int](1, OverflowStrategy.dropTail)
  .map(_*2)
  .map(_.toString)
  .to(Sink.actorRef[String](helloActor, ()))
   ⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
       ⌊-> $a RepointableActorRef class akka.HelloActor status=0 no children
       ⌊-> StreamSupervisor-0 RepointableActorRef class akka.stream.impl.StreamSupervisor status=0 3 children
           ⌊-> flow-0-0-unknown-operation RepointableActorRef class akka.stream.impl.fusing.ActorGraphInterpreter status=0 no children
           ⌊-> flow-0-1-actorRefSource RepointableActorRef class akka.stream.impl.ActorRefSourceActor status=0 no children
           ⌊-> flow-0-2-actorRefSink RepointableActorRef class akka.stream.impl.ActorRefSinkActor status=0 no children

ActorGraphInterpreterがかえってきました。

まとめ

Akka Streamsでアクターが生成され具合をじっくりと確認しました。
mapひとつ書けばアクターが一つ増えるかというとそうでもなく処理は合成されるようでした。
また、back pressureするためと思われるStreamを繋ぐためのアクターができるようです。
Sink.actorRefを利用しましたが、Sink.acorSubscriberをつかえばActorSubscriberを自分で実装して、渡せばActorRefSinkActorは作成されずに、自分で実装したアクターがその仕事をします。

サンプルコード

build.sbt
scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.4.9",
)
AkkaStreamActor.scala
package akka

import akka.actor.ActorSystemImpl
import akka.stream.OverflowStrategy
import akka.stream.javadsl.Sink
import akka.stream.scaladsl.Source

class HelloActor extends akka.actor.Actor {
  var child: akka.actor.ActorRef = _
  override def receive: Receive = {
    case str: String =>
      println(s"Hello, ${str}")
  }
}

object HelloActor {
  val props = akka.actor.Props[HelloActor]
}

object AkkaStreamActor extends App {
  val actorSystem = akka.actor.ActorSystem()
  val as = actorSystem.asInstanceOf[ActorSystemImpl]
  val materializer = akka.stream.ActorMaterializer()(actorSystem)


  val hello = actorSystem.actorOf(HelloActor.props)
  val graph = Source.actorRef[Int](1, OverflowStrategy.dropTail)
    .map(_*2)
    .map(_.toString)
    .to(Sink.actorRef(hello, ()))
  graph.run()(materializer)
  println(as.printTree)
  actorSystem.terminate()
}

関連

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2