Edited at
ScalaDay 19

Reactive System Toolkit - Akkaのソースコードリーディング

More than 1 year has passed since last update.

この記事はScala Advent Calendar 2017の12月19日の記事です。

今日は、Akkaのソースコード読んで実装について調べたことについて話したいと思います。

Akkaは、プロセッサコアとネットワークにまたがってスケーラブルでレジリエントなシステムを設計するためのオープンソースのライブラリです。JavaとScalaで使うことができます。

https://doc.akka.io/docs/akka/current/guide/introduction.html

https://twitter.com/lightbend/status/919595014392635392 より引用

Akkaは、分散システムの障害に備える道具を提供してくれます。

https://www.slideshare.net/yugolf/preparing-for-distributed-system-failures-using-akka-scala-matsuri2017-72575226

詳しくAkkaに知りたい人は、Akka in Actionの翻訳本のAkka実践バイブル アクターモデルによる並行・分散システムの実現を読むことをオススメします。


環境構築


No 1

Intellij IDEA Community Editionをダウンロードします。


No 2

File -> New -> Projectを選択します。

スクリーンショット 2017-12-03 22.42.42.png


No 3

Scala -> SBT -> Nextを選択

スクリーンショット 2017-12-03 22.44.50.png


No 4

NameをHelloAkkaにして、Finishを選択

スクリーンショット 2017-12-03 22.46.41.png


No 5

すでにIntelliJ IDEAを開いていれば、以下のポップアップで、New Windowを選択

スクリーンショット 2017-12-03 22.49.35.png


No 6

SBTのダウンロードが終わったあとに、

https://akka.io/docs/

を参考にHelloAkka/build.sbtを以下のように編集します。


build.sbt

name := "HelloAkka"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.7",
"com.typesafe.akka" %% "akka-testkit" % "2.5.7" % Test
)


編集したら、Refresh projectを選択する

スクリーンショット 2017-12-03 22.57.42.png


No 7

HelloAkka/src/main/scalaフォルダを右クリックして、New->Scala Classを選択

スクリーンショット 2017-12-03 22.51.41.png


No 8

Name: HelloAkka

Kind: Object

にして、OKを選択

スクリーンショット 2017-12-03 22.52.33.png


No 9

HelloAkka.scalaを以下のように編集します


HelloAkka.scala

import akka.actor.ActorSystem

object HelloAkka extends App {
println(ActorSystem())
println(actorSystem.toString)
}



No 10

Run -> Edit Configurationを選択

スクリーンショット 2017-12-03 23.44.49.png


No 11

+をクリック、SBT Taskを選択

スクリーンショット 2017-12-03 23.45.37.png


No 12

Name: sbt run

Tasks: run

を入力して、OKを選択

スクリーンショット 2017-12-03 23.46.25.png


No 13

再生ボタンを選択

スクリーンショット 2017-12-03 23.47.44.png


No 14

コンソールにakka://defaultが出力されていることを確認します


コンソール

[info] Running HelloAkka 

akka://default
akka://default


No 15

以下のようにActorSystemを選択して、「Command+b」を押して、applyの定義元にジャンプします

スクリーンショット 2017-12-03 23.49.35.png


No 16


ActorSystem.scala

  def apply(): ActorSystem = apply("default")


上のの定義元を見ると

println(ActorSystem().toString)

は、

println(ActorSystem().apply().toString)

と同じで、さらに

println(ActorSystem().apply("default").toString)

と同じことがわかる。

出力結果は、


コンソール

akka://default


だったので、ActorSystemのコンストラクタの引数は、このURIのdefaultに関係していそうです。


No 17

URLのドメインが、defaultだとなんのことだかわからないので、URIを変更してみましょう!

以下のようにHelloAkka.scalaを変更します


HelloAkka.scala

import akka.actor.ActorSystem

object HelloAkka extends App {
val actorSystem = ActorSystem("actorSystem")
println(actorSystem)
println(actorSystem.toString)
}



No 18

再生ボタンを選択

スクリーンショット 2017-12-03 23.47.44.png


No 19

コンソールにakka://actorSystemが出力されてURLのドメインが変更されていることを確認します。


コンソール

[info] Running HelloAkka 

akka://actorSystem
akka://actorSystem

このactorSystem.toStringは、ActorSystem.scalaActorSystemImplクラスのtoString関数を呼んでいます。

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L933


ActorSystem.scala

 override def toString: String = lookupRoot.path.root.address.toString


https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala#L485


ActorRefProvider.scala

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))


_systemNameに"actorSystem"が渡されて作られています。


No 20

akka://actorSystemが出力される理由を解説します。(長いので読み飛ばしても構いません)


HelloAkka.scala

  val actorSystem = ActorSystem("actorSystem")


HelloAkka.scalaActorSystemをクリックして、「Option+Command+B」で、実装をみます。


ActorSystem.scala

  def apply(name: String): ActorSystem = apply(name, None, None, None)


上のapply(name, None, None, None)applyをクリックして、「Option+Command+B」で、実装をみます。

(nameには、"actorSystem"が渡されている)


ActorSystem.scala

  def apply(

name: String,
config: Option[Config] = None,
classLoader: Option[ClassLoader] = None,
defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem =
apply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))

上のapply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))applyをクリックして、「Option+Command+B」で、実装をみます。

(nameには、"actorSystem"が渡されている)


ActorSystem.scala

  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {

val bootstrapSettings = setup.get[BootstrapSetup]
val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
}


上記から、ActorSystem("actorSystem").toStringのtoStringメソッドは、ActorSystemImplのインスタンスのtoString関数が呼ばれていることがわかる。

上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()ActorSystemImplをクリックして、Command+Bで、定義元みたあと、toStirng関数を検索します。


ActorSystemImpl.scala

private[akka] class ActorSystemImpl(

val name: String,
applicationConfig: Config,
classLoader: ClassLoader,
defaultExecutionContext: Option[ExecutionContext],
val guardianProps: Option[Props],
setup: ActorSystemSetup) extends ExtendedActorSystem {
// 略
override def toString: String = lookupRoot.path.root.address.toString

上記より、ActorSystem("actorSystem").toStringは、lookupRoot.path.root.address.toStringを呼んでいていることがわかります。

lookupRootをクリックして、「Option+Command+B」で、実装をみます。


ActorSystem.scala

  def lookupRoot: InternalActorRef = provider.rootGuardian


上記のlookupRootをクリックして、「Option+Command+B」で、LocalActorRefProvider.scalaの実装をみます。

スクリーンショット 2017-12-09 16.00.11.png


LocalActorRefProvider.scala

  override lazy val rootGuardian: LocalActorRef =

new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher,
defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" tempContainer
case "deadLetters" deadLetters
case other extraNames.get(other).getOrElse(super.getSingleChild(other))
}
}

上記のrootPathをクリックして、「Option+Command+B」で、実装をみます。

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))

上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()startをクリックして、「Option+Command+B」で、実装をみます。

(nameには、"actorSystem"が渡されている)

  def start(): this.type = _start

上の_startをクリックして、「Option+Command+B」で、実装をみます。


ActorSystem.scala

  private lazy val _start: this.type = try {

registerOnTermination(stopScheduler())
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
if (settings.LogDeadLetters > 0)
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
eventStream.startUnsubscriber()
loadExtensions()
if (LogConfigOnStart) logConfiguration()
this
} catch {
case NonFatal(e)
try terminate() catch { case NonFatal(_) Try(stopScheduler()) }
throw e
}

上記の_start関数は、ActorSystemImplのthisを返しています。

上記のprovider.init(this)initをクリックして、「Option+Command+B」で、LocalActorRefProviderの実装をみます。

スクリーンショット 2017-12-09 11.32.29.png


ActorRefProvidoer.scala

  private[akka] def init(_system: ActorSystemImpl) {

system = _system
rootGuardian.start()
// chain death watchers so that killing guardian stops the application
systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
eventStream.startDefaultLoggers(_system)
}

このinit関数が呼ばれた時、rootGuardian.start()と、systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))が呼ばれます。

rootGuardianと、systemGuardianは、遅延評価された変数(lazy val)なので、呼ばれた時に初期化されます。

rootGuardianと、systemGuardianをそれぞれクリックして、「Option+Command+B」で、実装をみます。


ActorRefProvidoer.scala

  override lazy val rootGuardian: LocalActorRef =

new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher,
defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" tempContainer
case "deadLetters" deadLetters
case other extraNames.get(other).getOrElse(super.getSingleChild(other))
}
}

上のLocalActorRefをクリックして、「Command+B」で、定義元をみます。


ActorRefProvider.scala

private[akka] class LocalActorRef private[akka] (

_system: ActorSystemImpl,
_props: Props,
_dispatcher: MessageDispatcher,
_mailboxType: MailboxType,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends ActorRefWithCell with LocalRef {

上の、_systemには、systempathには、rootPathが渡されていることがわかります。

Command+[で戻って、rootPathをクリックして、「Option+Command+B」で、実装をみます。

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))`

上の`Address'をクリックして、「Option+Command+B」で、実装をみます。

  def apply(protocol: String, system: String) = new Address(protocol, system)

上の`Address'をクリックして、「Command+B」で、実装をみます。

final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {

def this(protocol: String, system: String) = this(protocol, system, None, None)

@transient
override lazy val toString: String = {
val sb = (new java.lang.StringBuilder(protocol)).append("://").append(system)

if (host.isDefined) sb.append('@').append(host.get)
if (port.isDefined) sb.append(':').append(port.get)

sb.toString
}

上のthis関数のprotocolに"akka"、systemに"actorSystem"が渡されて、akka://actorSystemのURIが作られたと思われます。


No 21

Akkaを使ってみる

以下のような構成のアクターシステムをHelloAkka.scalaで作ります。

SupervisorTree.png

HelloAkka.scalaを以下のように編集して、sbt runで実行して見ましょう!


HelloAkka.scala

import akka.actor.SupervisorStrategy.{Restart, Stop}

import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorLogging, ActorSystem, DeathPactException, Inbox, Kill, OneForOneStrategy, Props, SupervisorStrategy}

import scala.concurrent.duration._

// 送るメッセージの型宣言
case object ReplyMessage {
// println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class WhoToMessage(who: String) {
// println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class PrintMessage(message: String) {
// println(s"Thead Name: ${Thread.currentThread().getName}")
}

class MessageActor extends Actor with ActorLogging {
// println(s"Thead Name: ${Thread.currentThread().getName}")

var message = ""

// メッセージを受信した時に呼ばれるメソッド
override def receive: Receive = {
case WhoToMessage(who) => {
// メッセージをログ出力
log.info(s"${WhoToMessage(who)}")
message = s"hello, $who"
}
case ReplyMessage => {
// ReplyMessageメッセージを受け取ったらログ出力
log.info(s"${ReplyMessage}")
// sender(送信元)に、PrintMessage(message)のメッセージを返信します
sender ! PrintMessage(message)
}
case _ => {
// 期待したメッセージが来なかったことをログに出力
log.info(s"unexpected message")
}
}
}

class PrintMessageActor extends Actor with ActorLogging {
// println(s"Thead Name: ${Thread.currentThread().getName}")

// メッセージを受信した時に呼ばれるメソッド
override def receive: Receive = {
case PrintMessage(message) => {
// PrintMessage(message)メッセージを受け取ったらログ出力
log.info(s"${PrintMessage(message)}")
}
case _ => {
// 期待したメッセージが来なかったことをログに出力
log.info(s"unexpected message")
}
}

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
case _: ActorInitializationException Stop
case _: ActorKilledException Stop
case _: DeathPactException Stop
case _: Exception Restart // それ以外の例外は再起動
}
}

object HelloActor extends App {
// println(s"Thead Name: ${Thread.currentThread().getName}")

// ActorSystemを作成
val actorSystem = ActorSystem("actorSystem")
println(actorSystem)
println(actorSystem.toString)

// MessageActor型のmessageActorアクターを作成
val messageActor = actorSystem.actorOf(Props[MessageActor], "messageActor")
println(messageActor.path)
println(messageActor.path.parent)
println(messageActor.path.parent.parent)

// ActorSystemに、受信Boxアクターを作成
val inbox = Inbox.create(actorSystem)
println(inbox.getRef().path)

// messageActorアクターに、PrintMessage("akka")というメッセージを送信
messageActor ! PrintMessage("akka")
// PrintMessageは、WhoToMessageでも、PrintMessage型でもないので、unexpected messageを出力

// messageActorアクターに、WhoToMessage("akka")というメッセージを送信
messageActor ! WhoToMessage("akka")
// messageActorアクターは、WhoToMessage("akka")というメッセージを受け取ると
// message = "hello, akka"
// messageActorアクターのmessageg変数に"hello, akka"が代入される

// messageActorアクターにReplyMessageメッセージを送信します
inbox.send(messageActor, ReplyMessage)
// messageActorアクターは、ReplyMessageメッセージを受信すると
// sender(送信元)に、PrintMessage("hello, akka")を返信します
// その返信メッセージは、受信Boxアクターに届く

// PrintMessage(message1)に、受信Boxアクターに、5秒以内に届いたPrintMessage("hello, akka")メッセージを代入
val PrintMessage(message1) = inbox.receive(5.seconds)

println(s"Message: $message1") // Message: hello, akka

// messageActorアクターに、WhoToMessage("playframework")というメッセージを送信
messageActor ! WhoToMessage("playframework")
// messageActorアクターは、WhoToMessage("playframework")メッセージを受信すると、
// message = "hello, playframework"
// messageActorアクターのmessage変数に"hello, playframework"が代入される

// messageActorアクターに、ReplyMessageメッセージを送信します
inbox.send(messageActor, ReplyMessage)
// messageActorアクターは、ReplyMessageメッセージを受信すると
// sender(送信元)に、PrintMessage("hello, playframework")メッセージを返信します
// その返信メッセージは、受信Boxアクターに届く

// PrintMessage(message2)に、受信Boxアクターに、5秒以内に届いたPrintMessage"hello, playframework")を代入
val PrintMessage(message2) = inbox.receive(5.seconds)
println(s"Message: $message2") // Greeting: hello, playframework

// PrintMessageActor型のprintMessageActorアクターを作成します
val printMessageActor = actorSystem.actorOf(Props[PrintMessageActor], "printMessageActor")

// アクターシステムは、3秒遅れ、1秒間隔で、
// messageActorアクターが、1秒間隔でReplyMessageメッセージをprintMessageActorアクターに送信します
actorSystem.scheduler.schedule(
3.seconds,
1.seconds,
messageActor,
ReplyMessage
)(actorSystem.dispatcher, printMessageActor)
// messageActorアクターは、ReplyMessageメッセージを受け取ると
// sender(送信者)にPrintMessage("hello, playframework")メッセージを返信します
// printMessageActorアクターは、PrintMessage("hello, playframework")メッセージを受け取ると、
// "hello, playframework"を出力します

// ActorのデフォルトのスーパバイザーストラテジーはExceptionの例外のメッセージの場合は、再起動するので、上の出力は止まらない
printMessageActor ! new Exception("printMessageActor exception")
messageActor ! new Exception("messageActor exception")
}



実行結果

akka://actorSystem

akka://actorSystem
akka://actorSystem/user/messageActor
akka://actorSystem/user
akka://actorSystem/
akka://actorSystem/system/dsl/inbox-1
[INFO] [12/19/2017 02:51:26.079] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] WhoToMessage(akka)
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
Message: hello, akka
Message: hello, playframework
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] WhoToMessage(playframework)
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] unexpected message
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:30.122] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:30.123] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:32.123] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
// 略

SupervisorTree.png

actorSystemというrootGuardianアクターを作ると、

userというuserGuardianアクターと、systeGuardianアクターが作られています

actorOfでアクターを作ると、userというuserGuardianアクターの下にアクターが作られる

今回はmessageActorと、printMessageActorを作成しています。

ただし、inboxアクターは特殊で、systeGuardianアクターの下に作られます。

messageActorや、printMessageActorにExceptionのメッセージを送っても、

デフォルトのスーパバイザーストラテジーはExceptionの場合は、

再起動なので、messageActorと、printMessageActorを再起動して出力され続けることが確認できると思います。


Actor.scala

  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy



FaultHandlling.scala

  final val defaultDecider: Decider = {

case _: ActorInitializationException Stop
case _: ActorKilledException Stop
case _: DeathPactException Stop
case _: Exception Restart
}

final val defaultStrategy: SupervisorStrategy = {
OneForOneStrategy()(defaultDecider)
}


デフォルトのSupervisorStrategyは以下のようにして、上書きすることもできます。

class PrintMessageActor extends Actor with ActorLogging {

// println(s"Thead Name: ${Thread.currentThread().getName}")

// メッセージを受信した時に呼ばれるメソッド
override def receive: Receive = {
case PrintMessage(message) => {
// PrintMessage(message)メッセージを受け取ったらログ出力
log.info(s"${PrintMessage(message)}")
}
case _ => {
// 期待したメッセージが来なかったことをログに出力
log.info(s"unexpected message")
}
}

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
case _: ActorInitializationException Resume
case _: ActorKilledException Stop
case _: DeathPactException Escalate
case _: Exception Restart
}
}

Escalateなどを使うと、例外処理の判断を親アクターに任せることができますが、printMessageActorの親アクターは、userというuserGuardianアクターなので、何も設定してなければ、デフォルトのsupervisorStrategyが適用されます。

例外処理の判断を親アクターに任せる時は、親アクターのSupervisorStrategyを確認して、変更したい場合は、SupervisorStrategyをoverrideしましょう!

以上、12月19日のScalaアドベントカレンダーでした!