Scala
Akka
ScalaDay 19

Reactive System Toolkit - Akkaのソースコードリーディング

この記事はScala Advent Calendar 2017の12月19日の記事です。

今日は、Akkaのソースコード読んで実装について調べたことについて話したいと思います。

Akkaは、プロセッサコアとネットワークにまたがってスケーラブルでレジリエントなシステムを設計するためのオープンソースのライブラリです。JavaとScalaで使うことができます。
https://doc.akka.io/docs/akka/current/guide/introduction.html

https://twitter.com/lightbend/status/919595014392635392 より引用

Akkaは、分散システムの障害に備える道具を提供してくれます。
https://www.slideshare.net/yugolf/preparing-for-distributed-system-failures-using-akka-scala-matsuri2017-72575226

詳しくAkkaに知りたい人は、Akka in Actionの翻訳本のAkka実践バイブル アクターモデルによる並行・分散システムの実現を読むことをオススメします。

環境構築

No 1

Intellij IDEA Community Editionをダウンロードします。

No 2

File -> New -> Projectを選択します。

スクリーンショット 2017-12-03 22.42.42.png

No 3

Scala -> SBT -> Nextを選択

スクリーンショット 2017-12-03 22.44.50.png

No 4

NameをHelloAkkaにして、Finishを選択

スクリーンショット 2017-12-03 22.46.41.png

No 5

すでにIntelliJ IDEAを開いていれば、以下のポップアップで、New Windowを選択

スクリーンショット 2017-12-03 22.49.35.png

No 6

SBTのダウンロードが終わったあとに、
https://akka.io/docs/
を参考にHelloAkka/build.sbtを以下のように編集します。

build.sbt
name := "HelloAkka"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.7",
  "com.typesafe.akka" %% "akka-testkit" % "2.5.7" % Test
)

編集したら、Refresh projectを選択する

スクリーンショット 2017-12-03 22.57.42.png

No 7

HelloAkka/src/main/scalaフォルダを右クリックして、New->Scala Classを選択

スクリーンショット 2017-12-03 22.51.41.png

No 8

Name: HelloAkka
Kind: Object
にして、OKを選択

スクリーンショット 2017-12-03 22.52.33.png

No 9

HelloAkka.scalaを以下のように編集します

HelloAkka.scala
import akka.actor.ActorSystem

object HelloAkka extends App {
  println(ActorSystem())
  println(actorSystem.toString)
}

No 10

Run -> Edit Configurationを選択

スクリーンショット 2017-12-03 23.44.49.png

No 11

+をクリック、SBT Taskを選択

スクリーンショット 2017-12-03 23.45.37.png

No 12

Name: sbt run
Tasks: run
を入力して、OKを選択

スクリーンショット 2017-12-03 23.46.25.png

No 13

再生ボタンを選択

スクリーンショット 2017-12-03 23.47.44.png

No 14

コンソールにakka://defaultが出力されていることを確認します

コンソール
[info] Running HelloAkka 
akka://default
akka://default

No 15

以下のようにActorSystemを選択して、「Command+b」を押して、applyの定義元にジャンプします

スクリーンショット 2017-12-03 23.49.35.png

No 16

ActorSystem.scala
  def apply(): ActorSystem = apply("default")

上のの定義元を見ると

println(ActorSystem().toString)

は、

println(ActorSystem().apply().toString)

と同じで、さらに

println(ActorSystem().apply("default").toString)

と同じことがわかる。

出力結果は、

コンソール
akka://default

だったので、ActorSystemのコンストラクタの引数は、このURIのdefaultに関係していそうです。

No 17

URLのドメインが、defaultだとなんのことだかわからないので、URIを変更してみましょう!

以下のようにHelloAkka.scalaを変更します

HelloAkka.scala
import akka.actor.ActorSystem

object HelloAkka extends App {
  val actorSystem = ActorSystem("actorSystem")
  println(actorSystem)
  println(actorSystem.toString)
}

No 18

再生ボタンを選択

スクリーンショット 2017-12-03 23.47.44.png

No 19

コンソールにakka://actorSystemが出力されてURLのドメインが変更されていることを確認します。

コンソール
[info] Running HelloAkka 
akka://actorSystem
akka://actorSystem

このactorSystem.toStringは、ActorSystem.scalaActorSystemImplクラスのtoString関数を呼んでいます。

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L933

ActorSystem.scala
 override def toString: String = lookupRoot.path.root.address.toString

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala#L485

ActorRefProvider.scala
  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))

_systemNameに"actorSystem"が渡されて作られています。

No 20

akka://actorSystemが出力される理由を解説します。(長いので読み飛ばしても構いません)

HelloAkka.scala
  val actorSystem = ActorSystem("actorSystem")

HelloAkka.scalaActorSystemをクリックして、「Option+Command+B」で、実装をみます。

ActorSystem.scala
  def apply(name: String): ActorSystem = apply(name, None, None, None)

上のapply(name, None, None, None)applyをクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)

ActorSystem.scala
  def apply(
    name:                    String,
    config:                  Option[Config]           = None,
    classLoader:             Option[ClassLoader]      = None,
    defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem =
    apply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))

上のapply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))applyをクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)

ActorSystem.scala
  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
  }

上記から、ActorSystem("actorSystem").toStringのtoStringメソッドは、ActorSystemImplのインスタンスのtoString関数が呼ばれていることがわかる。

上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()ActorSystemImplをクリックして、Command+Bで、定義元みたあと、toStirng関数を検索します。

ActorSystemImpl.scala
private[akka] class ActorSystemImpl(
  val name:                String,
  applicationConfig:       Config,
  classLoader:             ClassLoader,
  defaultExecutionContext: Option[ExecutionContext],
  val guardianProps:       Option[Props],
  setup:                   ActorSystemSetup) extends ExtendedActorSystem {
  // 略
  override def toString: String = lookupRoot.path.root.address.toString

上記より、ActorSystem("actorSystem").toStringは、lookupRoot.path.root.address.toStringを呼んでいていることがわかります。
lookupRootをクリックして、「Option+Command+B」で、実装をみます。

ActorSystem.scala
  def lookupRoot: InternalActorRef = provider.rootGuardian

上記のlookupRootをクリックして、「Option+Command+B」で、LocalActorRefProvider.scalaの実装をみます。

スクリーンショット 2017-12-09 16.00.11.png

LocalActorRefProvider.scala
  override lazy val rootGuardian: LocalActorRef =
    new LocalActorRef(
      system,
      Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
      defaultDispatcher,
      defaultMailbox,
      theOneWhoWalksTheBubblesOfSpaceTime,
      rootPath) {
      override def getParent: InternalActorRef = this
      override def getSingleChild(name: String): InternalActorRef = name match {
        case "temp"         tempContainer
        case "deadLetters"  deadLetters
        case other          extraNames.get(other).getOrElse(super.getSingleChild(other))
      }
    }

上記のrootPathをクリックして、「Option+Command+B」で、実装をみます。

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))

上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()startをクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)

  def start(): this.type = _start

上の_startをクリックして、「Option+Command+B」で、実装をみます。

ActorSystem.scala
  private lazy val _start: this.type = try {
    registerOnTermination(stopScheduler())
    // the provider is expected to start default loggers, LocalActorRefProvider does this
    provider.init(this)
    if (settings.LogDeadLetters > 0)
      logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
    eventStream.startUnsubscriber()
    loadExtensions()
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) 
      try terminate() catch { case NonFatal(_)  Try(stopScheduler()) }
      throw e
  }

上記の_start関数は、ActorSystemImplのthisを返しています。
上記のprovider.init(this)initをクリックして、「Option+Command+B」で、LocalActorRefProviderの実装をみます。

スクリーンショット 2017-12-09 11.32.29.png

ActorRefProvidoer.scala
  private[akka] def init(_system: ActorSystemImpl) {
    system = _system
    rootGuardian.start()
    // chain death watchers so that killing guardian stops the application
    systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
    rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
    eventStream.startDefaultLoggers(_system)
  }

このinit関数が呼ばれた時、rootGuardian.start()と、systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))が呼ばれます。
rootGuardianと、systemGuardianは、遅延評価された変数(lazy val)なので、呼ばれた時に初期化されます。
rootGuardianと、systemGuardianをそれぞれクリックして、「Option+Command+B」で、実装をみます。

ActorRefProvidoer.scala
  override lazy val rootGuardian: LocalActorRef =
    new LocalActorRef(
      system,
      Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
      defaultDispatcher,
      defaultMailbox,
      theOneWhoWalksTheBubblesOfSpaceTime,
      rootPath) {
      override def getParent: InternalActorRef = this
      override def getSingleChild(name: String): InternalActorRef = name match {
        case "temp"         tempContainer
        case "deadLetters"  deadLetters
        case other          extraNames.get(other).getOrElse(super.getSingleChild(other))
      }
    }

上のLocalActorRefをクリックして、「Command+B」で、定義元をみます。

ActorRefProvider.scala
private[akka] class LocalActorRef private[akka] (
  _system:           ActorSystemImpl,
  _props:            Props,
  _dispatcher:       MessageDispatcher,
  _mailboxType:      MailboxType,
  _supervisor:       InternalActorRef,
  override val path: ActorPath)
  extends ActorRefWithCell with LocalRef {

上の、_systemには、systempathには、rootPathが渡されていることがわかります。

Command+[で戻って、rootPathをクリックして、「Option+Command+B」で、実装をみます。

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))`

上の`Address'をクリックして、「Option+Command+B」で、実装をみます。

  def apply(protocol: String, system: String) = new Address(protocol, system)

上の`Address'をクリックして、「Command+B」で、実装をみます。

final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
  def this(protocol: String, system: String) = this(protocol, system, None, None)

  @transient
  override lazy val toString: String = {
    val sb = (new java.lang.StringBuilder(protocol)).append("://").append(system)

    if (host.isDefined) sb.append('@').append(host.get)
    if (port.isDefined) sb.append(':').append(port.get)

    sb.toString
  }

上のthis関数のprotocolに"akka"、systemに"actorSystem"が渡されて、akka://actorSystemのURIが作られたと思われます。

No 21

Akkaを使ってみる

以下のような構成のアクターシステムをHelloAkka.scalaで作ります。

SupervisorTree.png

HelloAkka.scalaを以下のように編集して、sbt runで実行して見ましょう!

HelloAkka.scala
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorLogging, ActorSystem, DeathPactException, Inbox, Kill, OneForOneStrategy, Props, SupervisorStrategy}

import scala.concurrent.duration._

// 送るメッセージの型宣言
case object ReplyMessage {
//  println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class WhoToMessage(who: String) {
//  println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class PrintMessage(message: String) {
//  println(s"Thead Name: ${Thread.currentThread().getName}")
}

class MessageActor extends Actor with ActorLogging {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  var message = ""

  // メッセージを受信した時に呼ばれるメソッド
  override def receive: Receive = {
    case WhoToMessage(who) => {
      // メッセージをログ出力
      log.info(s"${WhoToMessage(who)}")
      message = s"hello, $who"
    }
    case ReplyMessage => {
      // ReplyMessageメッセージを受け取ったらログ出力
      log.info(s"${ReplyMessage}")
      // sender(送信元)に、PrintMessage(message)のメッセージを返信します
      sender ! PrintMessage(message)
    }
    case _ => {
      // 期待したメッセージが来なかったことをログに出力
      log.info(s"unexpected message")
    }
  }
}

class PrintMessageActor extends Actor with ActorLogging {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  // メッセージを受信した時に呼ばれるメソッド
  override def receive: Receive = {
    case PrintMessage(message) => {
      // PrintMessage(message)メッセージを受け取ったらログ出力
      log.info(s"${PrintMessage(message)}")
    }
    case _ => {
      // 期待したメッセージが来なかったことをログに出力
      log.info(s"unexpected message")
    }
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
    case _: ActorInitializationException  Stop
    case _: ActorKilledException          Stop
    case _: DeathPactException            Stop
    case _: Exception                     Restart // それ以外の例外は再起動
  }
}

object HelloActor extends App {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  // ActorSystemを作成
  val actorSystem = ActorSystem("actorSystem")
  println(actorSystem)
  println(actorSystem.toString)

  // MessageActor型のmessageActorアクターを作成
  val messageActor = actorSystem.actorOf(Props[MessageActor], "messageActor")
  println(messageActor.path)
  println(messageActor.path.parent)
  println(messageActor.path.parent.parent)

  // ActorSystemに、受信Boxアクターを作成
  val inbox = Inbox.create(actorSystem)
  println(inbox.getRef().path)

  // messageActorアクターに、PrintMessage("akka")というメッセージを送信
  messageActor ! PrintMessage("akka")
  // PrintMessageは、WhoToMessageでも、PrintMessage型でもないので、unexpected messageを出力

  // messageActorアクターに、WhoToMessage("akka")というメッセージを送信
  messageActor ! WhoToMessage("akka")
  // messageActorアクターは、WhoToMessage("akka")というメッセージを受け取ると
  // message = "hello, akka"
  // messageActorアクターのmessageg変数に"hello, akka"が代入される

  // messageActorアクターにReplyMessageメッセージを送信します
  inbox.send(messageActor, ReplyMessage)
  // messageActorアクターは、ReplyMessageメッセージを受信すると
  // sender(送信元)に、PrintMessage("hello, akka")を返信します
  // その返信メッセージは、受信Boxアクターに届く

  // PrintMessage(message1)に、受信Boxアクターに、5秒以内に届いたPrintMessage("hello, akka")メッセージを代入
  val PrintMessage(message1) = inbox.receive(5.seconds)

  println(s"Message: $message1") // Message: hello, akka

  // messageActorアクターに、WhoToMessage("playframework")というメッセージを送信
  messageActor ! WhoToMessage("playframework")
  // messageActorアクターは、WhoToMessage("playframework")メッセージを受信すると、
  // message = "hello, playframework"
  // messageActorアクターのmessage変数に"hello, playframework"が代入される

  // messageActorアクターに、ReplyMessageメッセージを送信します
  inbox.send(messageActor, ReplyMessage)
  // messageActorアクターは、ReplyMessageメッセージを受信すると
  // sender(送信元)に、PrintMessage("hello, playframework")メッセージを返信します
  // その返信メッセージは、受信Boxアクターに届く

  // PrintMessage(message2)に、受信Boxアクターに、5秒以内に届いたPrintMessage"hello, playframework")を代入
  val PrintMessage(message2) = inbox.receive(5.seconds)
  println(s"Message: $message2") // Greeting: hello, playframework

  // PrintMessageActor型のprintMessageActorアクターを作成します
  val printMessageActor = actorSystem.actorOf(Props[PrintMessageActor], "printMessageActor")

  // アクターシステムは、3秒遅れ、1秒間隔で、
  // messageActorアクターが、1秒間隔でReplyMessageメッセージをprintMessageActorアクターに送信します
  actorSystem.scheduler.schedule(
    3.seconds,
    1.seconds,
    messageActor,
    ReplyMessage
  )(actorSystem.dispatcher, printMessageActor)
  // messageActorアクターは、ReplyMessageメッセージを受け取ると
  // sender(送信者)にPrintMessage("hello, playframework")メッセージを返信します
  // printMessageActorアクターは、PrintMessage("hello, playframework")メッセージを受け取ると、
  // "hello, playframework"を出力します

  // ActorのデフォルトのスーパバイザーストラテジーはExceptionの例外のメッセージの場合は、再起動するので、上の出力は止まらない
  printMessageActor ! new Exception("printMessageActor exception") 
  messageActor ! new Exception("messageActor exception")
}

実行結果

akka://actorSystem
akka://actorSystem
akka://actorSystem/user/messageActor
akka://actorSystem/user
akka://actorSystem/
akka://actorSystem/system/dsl/inbox-1
[INFO] [12/19/2017 02:51:26.079] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] WhoToMessage(akka)
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
Message: hello, akka
Message: hello, playframework
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] WhoToMessage(playframework)
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] unexpected message
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:30.122] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:30.123] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:32.123] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
// 略

SupervisorTree.png

actorSystemというrootGuardianアクターを作ると、
userというuserGuardianアクターと、systeGuardianアクターが作られています
actorOfでアクターを作ると、userというuserGuardianアクターの下にアクターが作られる
今回はmessageActorと、printMessageActorを作成しています。
ただし、inboxアクターは特殊で、systeGuardianアクターの下に作られます。
messageActorや、printMessageActorにExceptionのメッセージを送っても、
デフォルトのスーパバイザーストラテジーはExceptionの場合は、
再起動なので、messageActorと、printMessageActorを再起動して出力され続けることが確認できると思います。

Actor.scala
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
FaultHandlling.scala
  final val defaultDecider: Decider = {
    case _: ActorInitializationException  Stop
    case _: ActorKilledException          Stop
    case _: DeathPactException            Stop
    case _: Exception                     Restart
  }

  final val defaultStrategy: SupervisorStrategy = {
    OneForOneStrategy()(defaultDecider)
  }

デフォルトのSupervisorStrategyは以下のようにして、上書きすることもできます。

class PrintMessageActor extends Actor with ActorLogging {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  // メッセージを受信した時に呼ばれるメソッド
  override def receive: Receive = {
    case PrintMessage(message) => {
      // PrintMessage(message)メッセージを受け取ったらログ出力
      log.info(s"${PrintMessage(message)}")
    }
    case _ => {
      // 期待したメッセージが来なかったことをログに出力
      log.info(s"unexpected message")
    }
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
    case _: ActorInitializationException  Resume
    case _: ActorKilledException          Stop
    case _: DeathPactException            Escalate
    case _: Exception                     Restart
  }
}

Escalateなどを使うと、例外処理の判断を親アクターに任せることができますが、printMessageActorの親アクターは、userというuserGuardianアクターなので、何も設定してなければ、デフォルトのsupervisorStrategyが適用されます。
例外処理の判断を親アクターに任せる時は、親アクターのSupervisorStrategyを確認して、変更したい場合は、SupervisorStrategyをoverrideしましょう!

以上、12月19日のScalaアドベントカレンダーでした!