5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ScalaAdvent Calendar 2017

Day 19

Reactive System Toolkit - Akkaのソースコードリーディング

Last updated at Posted at 2017-12-19

この記事はScala Advent Calendar 2017の12月19日の記事です。

今日は、Akkaのソースコード読んで実装について調べたことについて話したいと思います。

Akkaは、プロセッサコアとネットワークにまたがってスケーラブルでレジリエントなシステムを設計するためのオープンソースのライブラリです。JavaとScalaで使うことができます。
https://doc.akka.io/docs/akka/current/guide/introduction.html

https://twitter.com/lightbend/status/919595014392635392 より引用

Akkaは、分散システムの障害に備える道具を提供してくれます。
https://www.slideshare.net/yugolf/preparing-for-distributed-system-failures-using-akka-scala-matsuri2017-72575226

詳しくAkkaに知りたい人は、Akka in Actionの翻訳本のAkka実践バイブル アクターモデルによる並行・分散システムの実現を読むことをオススメします。

環境構築

No 1

Intellij IDEA Community Editionをダウンロードします。

No 2

File -> New -> Projectを選択します。

スクリーンショット 2017-12-03 22.42.42.png

No 3

Scala -> SBT -> Nextを選択

スクリーンショット 2017-12-03 22.44.50.png

No 4

NameをHelloAkkaにして、Finishを選択

スクリーンショット 2017-12-03 22.46.41.png

No 5

すでにIntelliJ IDEAを開いていれば、以下のポップアップで、New Windowを選択

スクリーンショット 2017-12-03 22.49.35.png

No 6

SBTのダウンロードが終わったあとに、
https://akka.io/docs/
を参考にHelloAkka/build.sbtを以下のように編集します。

build.sbt
name := "HelloAkka"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.7",
  "com.typesafe.akka" %% "akka-testkit" % "2.5.7" % Test
)

編集したら、Refresh projectを選択する

スクリーンショット 2017-12-03 22.57.42.png

No 7

HelloAkka/src/main/scalaフォルダを右クリックして、New->Scala Classを選択

スクリーンショット 2017-12-03 22.51.41.png

No 8

Name: HelloAkka
Kind: Object
にして、OKを選択

スクリーンショット 2017-12-03 22.52.33.png

No 9

HelloAkka.scalaを以下のように編集します

HelloAkka.scala
import akka.actor.ActorSystem

object HelloAkka extends App {
  println(ActorSystem())
  println(actorSystem.toString)
}

No 10

Run -> Edit Configurationを選択

スクリーンショット 2017-12-03 23.44.49.png

No 11

+をクリック、SBT Taskを選択

スクリーンショット 2017-12-03 23.45.37.png

No 12

Name: sbt run
Tasks: run
を入力して、OKを選択

スクリーンショット 2017-12-03 23.46.25.png

No 13

再生ボタンを選択

スクリーンショット 2017-12-03 23.47.44.png

No 14

コンソールにakka://defaultが出力されていることを確認します

コンソール
[info] Running HelloAkka 
akka://default
akka://default

No 15

以下のようにActorSystemを選択して、「Command+b」を押して、applyの定義元にジャンプします

スクリーンショット 2017-12-03 23.49.35.png

No 16

ActorSystem.scala
  def apply(): ActorSystem = apply("default")

上のの定義元を見ると

println(ActorSystem().toString)

は、

println(ActorSystem().apply().toString)

と同じで、さらに

println(ActorSystem().apply("default").toString)

と同じことがわかる。

出力結果は、

コンソール
akka://default

だったので、ActorSystemのコンストラクタの引数は、このURIのdefaultに関係していそうです。

No 17

URLのドメインが、defaultだとなんのことだかわからないので、URIを変更してみましょう!

以下のようにHelloAkka.scalaを変更します

HelloAkka.scala
import akka.actor.ActorSystem

object HelloAkka extends App {
  val actorSystem = ActorSystem("actorSystem")
  println(actorSystem)
  println(actorSystem.toString)
}

No 18

再生ボタンを選択

スクリーンショット 2017-12-03 23.47.44.png

No 19

コンソールにakka://actorSystemが出力されてURLのドメインが変更されていることを確認します。

コンソール
[info] Running HelloAkka 
akka://actorSystem
akka://actorSystem

このactorSystem.toStringは、ActorSystem.scalaActorSystemImplクラスのtoString関数を呼んでいます。

ActorSystem.scala
 override def toString: String = lookupRoot.path.root.address.toString

ActorRefProvider.scala
  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))

_systemNameに"actorSystem"が渡されて作られています。

No 20

akka://actorSystemが出力される理由を解説します。(長いので読み飛ばしても構いません)

HelloAkka.scala
  val actorSystem = ActorSystem("actorSystem")

HelloAkka.scalaActorSystemをクリックして、「Option+Command+B」で、実装をみます。

ActorSystem.scala
  def apply(name: String): ActorSystem = apply(name, None, None, None)

上のapply(name, None, None, None)applyをクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)

ActorSystem.scala
  def apply(
    name:                    String,
    config:                  Option[Config]           = None,
    classLoader:             Option[ClassLoader]      = None,
    defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem =
    apply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))

上のapply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))applyをクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)

ActorSystem.scala
  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
  }

上記から、ActorSystem("actorSystem").toStringのtoStringメソッドは、ActorSystemImplのインスタンスのtoString関数が呼ばれていることがわかる。

上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()ActorSystemImplをクリックして、Command+Bで、定義元みたあと、toStirng関数を検索します。

ActorSystemImpl.scala
private[akka] class ActorSystemImpl(
  val name:                String,
  applicationConfig:       Config,
  classLoader:             ClassLoader,
  defaultExecutionContext: Option[ExecutionContext],
  val guardianProps:       Option[Props],
  setup:                   ActorSystemSetup) extends ExtendedActorSystem {
  // 略
  override def toString: String = lookupRoot.path.root.address.toString

上記より、ActorSystem("actorSystem").toStringは、lookupRoot.path.root.address.toStringを呼んでいていることがわかります。
lookupRootをクリックして、「Option+Command+B」で、実装をみます。

ActorSystem.scala
  def lookupRoot: InternalActorRef = provider.rootGuardian

上記のlookupRootをクリックして、「Option+Command+B」で、LocalActorRefProvider.scalaの実装をみます。

スクリーンショット 2017-12-09 16.00.11.png

LocalActorRefProvider.scala
  override lazy val rootGuardian: LocalActorRef =
    new LocalActorRef(
      system,
      Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
      defaultDispatcher,
      defaultMailbox,
      theOneWhoWalksTheBubblesOfSpaceTime,
      rootPath) {
      override def getParent: InternalActorRef = this
      override def getSingleChild(name: String): InternalActorRef = name match {
        case "temp"         tempContainer
        case "deadLetters"  deadLetters
        case other          extraNames.get(other).getOrElse(super.getSingleChild(other))
      }
    }

上記のrootPathをクリックして、「Option+Command+B」で、実装をみます。

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))

上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()startをクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)

  def start(): this.type = _start

上の_startをクリックして、「Option+Command+B」で、実装をみます。

ActorSystem.scala
  private lazy val _start: this.type = try {
    registerOnTermination(stopScheduler())
    // the provider is expected to start default loggers, LocalActorRefProvider does this
    provider.init(this)
    if (settings.LogDeadLetters > 0)
      logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
    eventStream.startUnsubscriber()
    loadExtensions()
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) 
      try terminate() catch { case NonFatal(_)  Try(stopScheduler()) }
      throw e
  }

上記の_start関数は、ActorSystemImplのthisを返しています。
上記のprovider.init(this)initをクリックして、「Option+Command+B」で、LocalActorRefProviderの実装をみます。

スクリーンショット 2017-12-09 11.32.29.png

ActorRefProvidoer.scala
  private[akka] def init(_system: ActorSystemImpl) {
    system = _system
    rootGuardian.start()
    // chain death watchers so that killing guardian stops the application
    systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
    rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
    eventStream.startDefaultLoggers(_system)
  }

このinit関数が呼ばれた時、rootGuardian.start()と、systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))が呼ばれます。
rootGuardianと、systemGuardianは、遅延評価された変数(lazy val)なので、呼ばれた時に初期化されます。
rootGuardianと、systemGuardianをそれぞれクリックして、「Option+Command+B」で、実装をみます。

ActorRefProvidoer.scala
  override lazy val rootGuardian: LocalActorRef =
    new LocalActorRef(
      system,
      Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
      defaultDispatcher,
      defaultMailbox,
      theOneWhoWalksTheBubblesOfSpaceTime,
      rootPath) {
      override def getParent: InternalActorRef = this
      override def getSingleChild(name: String): InternalActorRef = name match {
        case "temp"         tempContainer
        case "deadLetters"  deadLetters
        case other          extraNames.get(other).getOrElse(super.getSingleChild(other))
      }
    }

上のLocalActorRefをクリックして、「Command+B」で、定義元をみます。

ActorRefProvider.scala
private[akka] class LocalActorRef private[akka] (
  _system:           ActorSystemImpl,
  _props:            Props,
  _dispatcher:       MessageDispatcher,
  _mailboxType:      MailboxType,
  _supervisor:       InternalActorRef,
  override val path: ActorPath)
  extends ActorRefWithCell with LocalRef {

上の、_systemには、systempathには、rootPathが渡されていることがわかります。

Command+[で戻って、rootPathをクリックして、「Option+Command+B」で、実装をみます。

  override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))`

上の`Address'をクリックして、「Option+Command+B」で、実装をみます。

  def apply(protocol: String, system: String) = new Address(protocol, system)

上の`Address'をクリックして、「Command+B」で、実装をみます。

final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
  def this(protocol: String, system: String) = this(protocol, system, None, None)

  @transient
  override lazy val toString: String = {
    val sb = (new java.lang.StringBuilder(protocol)).append("://").append(system)

    if (host.isDefined) sb.append('@').append(host.get)
    if (port.isDefined) sb.append(':').append(port.get)

    sb.toString
  }

上のthis関数のprotocolに"akka"、systemに"actorSystem"が渡されて、akka://actorSystemのURIが作られたと思われます。

No 21

Akkaを使ってみる

以下のような構成のアクターシステムをHelloAkka.scalaで作ります。

SupervisorTree.png

HelloAkka.scalaを以下のように編集して、sbt runで実行して見ましょう!

HelloAkka.scala
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorLogging, ActorSystem, DeathPactException, Inbox, Kill, OneForOneStrategy, Props, SupervisorStrategy}

import scala.concurrent.duration._

// 送るメッセージの型宣言
case object ReplyMessage {
//  println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class WhoToMessage(who: String) {
//  println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class PrintMessage(message: String) {
//  println(s"Thead Name: ${Thread.currentThread().getName}")
}

class MessageActor extends Actor with ActorLogging {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  var message = ""

  // メッセージを受信した時に呼ばれるメソッド
  override def receive: Receive = {
    case WhoToMessage(who) => {
      // メッセージをログ出力
      log.info(s"${WhoToMessage(who)}")
      message = s"hello, $who"
    }
    case ReplyMessage => {
      // ReplyMessageメッセージを受け取ったらログ出力
      log.info(s"${ReplyMessage}")
      // sender(送信元)に、PrintMessage(message)のメッセージを返信します
      sender ! PrintMessage(message)
    }
    case _ => {
      // 期待したメッセージが来なかったことをログに出力
      log.info(s"unexpected message")
    }
  }
}

class PrintMessageActor extends Actor with ActorLogging {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  // メッセージを受信した時に呼ばれるメソッド
  override def receive: Receive = {
    case PrintMessage(message) => {
      // PrintMessage(message)メッセージを受け取ったらログ出力
      log.info(s"${PrintMessage(message)}")
    }
    case _ => {
      // 期待したメッセージが来なかったことをログに出力
      log.info(s"unexpected message")
    }
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
    case _: ActorInitializationException  Stop
    case _: ActorKilledException          Stop
    case _: DeathPactException            Stop
    case _: Exception                     Restart // それ以外の例外は再起動
  }
}

object HelloActor extends App {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  // ActorSystemを作成
  val actorSystem = ActorSystem("actorSystem")
  println(actorSystem)
  println(actorSystem.toString)

  // MessageActor型のmessageActorアクターを作成
  val messageActor = actorSystem.actorOf(Props[MessageActor], "messageActor")
  println(messageActor.path)
  println(messageActor.path.parent)
  println(messageActor.path.parent.parent)

  // ActorSystemに、受信Boxアクターを作成
  val inbox = Inbox.create(actorSystem)
  println(inbox.getRef().path)

  // messageActorアクターに、PrintMessage("akka")というメッセージを送信
  messageActor ! PrintMessage("akka")
  // PrintMessageは、WhoToMessageでも、PrintMessage型でもないので、unexpected messageを出力

  // messageActorアクターに、WhoToMessage("akka")というメッセージを送信
  messageActor ! WhoToMessage("akka")
  // messageActorアクターは、WhoToMessage("akka")というメッセージを受け取ると
  // message = "hello, akka"
  // messageActorアクターのmessageg変数に"hello, akka"が代入される

  // messageActorアクターにReplyMessageメッセージを送信します
  inbox.send(messageActor, ReplyMessage)
  // messageActorアクターは、ReplyMessageメッセージを受信すると
  // sender(送信元)に、PrintMessage("hello, akka")を返信します
  // その返信メッセージは、受信Boxアクターに届く

  // PrintMessage(message1)に、受信Boxアクターに、5秒以内に届いたPrintMessage("hello, akka")メッセージを代入
  val PrintMessage(message1) = inbox.receive(5.seconds)

  println(s"Message: $message1") // Message: hello, akka

  // messageActorアクターに、WhoToMessage("playframework")というメッセージを送信
  messageActor ! WhoToMessage("playframework")
  // messageActorアクターは、WhoToMessage("playframework")メッセージを受信すると、
  // message = "hello, playframework"
  // messageActorアクターのmessage変数に"hello, playframework"が代入される

  // messageActorアクターに、ReplyMessageメッセージを送信します
  inbox.send(messageActor, ReplyMessage)
  // messageActorアクターは、ReplyMessageメッセージを受信すると
  // sender(送信元)に、PrintMessage("hello, playframework")メッセージを返信します
  // その返信メッセージは、受信Boxアクターに届く

  // PrintMessage(message2)に、受信Boxアクターに、5秒以内に届いたPrintMessage"hello, playframework")を代入
  val PrintMessage(message2) = inbox.receive(5.seconds)
  println(s"Message: $message2") // Greeting: hello, playframework

  // PrintMessageActor型のprintMessageActorアクターを作成します
  val printMessageActor = actorSystem.actorOf(Props[PrintMessageActor], "printMessageActor")

  // アクターシステムは、3秒遅れ、1秒間隔で、
  // messageActorアクターが、1秒間隔でReplyMessageメッセージをprintMessageActorアクターに送信します
  actorSystem.scheduler.schedule(
    3.seconds,
    1.seconds,
    messageActor,
    ReplyMessage
  )(actorSystem.dispatcher, printMessageActor)
  // messageActorアクターは、ReplyMessageメッセージを受け取ると
  // sender(送信者)にPrintMessage("hello, playframework")メッセージを返信します
  // printMessageActorアクターは、PrintMessage("hello, playframework")メッセージを受け取ると、
  // "hello, playframework"を出力します

  // ActorのデフォルトのスーパバイザーストラテジーはExceptionの例外のメッセージの場合は、再起動するので、上の出力は止まらない
  printMessageActor ! new Exception("printMessageActor exception") 
  messageActor ! new Exception("messageActor exception")
}

実行結果

akka://actorSystem
akka://actorSystem
akka://actorSystem/user/messageActor
akka://actorSystem/user
akka://actorSystem/
akka://actorSystem/system/dsl/inbox-1
[INFO] [12/19/2017 02:51:26.079] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] WhoToMessage(akka)
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
Message: hello, akka
Message: hello, playframework
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] WhoToMessage(playframework)
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] unexpected message
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:30.122] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:30.123] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:32.123] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
// 略

SupervisorTree.png

actorSystemというrootGuardianアクターを作ると、
userというuserGuardianアクターと、systeGuardianアクターが作られています
actorOfでアクターを作ると、userというuserGuardianアクターの下にアクターが作られる
今回はmessageActorと、printMessageActorを作成しています。
ただし、inboxアクターは特殊で、systeGuardianアクターの下に作られます。
messageActorや、printMessageActorにExceptionのメッセージを送っても、
デフォルトのスーパバイザーストラテジーはExceptionの場合は、
再起動なので、messageActorと、printMessageActorを再起動して出力され続けることが確認できると思います。

Actor.scala
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
FaultHandlling.scala
  final val defaultDecider: Decider = {
    case _: ActorInitializationException  Stop
    case _: ActorKilledException          Stop
    case _: DeathPactException            Stop
    case _: Exception                     Restart
  }

  final val defaultStrategy: SupervisorStrategy = {
    OneForOneStrategy()(defaultDecider)
  }

デフォルトのSupervisorStrategyは以下のようにして、上書きすることもできます。

class PrintMessageActor extends Actor with ActorLogging {
//  println(s"Thead Name: ${Thread.currentThread().getName}")

  // メッセージを受信した時に呼ばれるメソッド
  override def receive: Receive = {
    case PrintMessage(message) => {
      // PrintMessage(message)メッセージを受け取ったらログ出力
      log.info(s"${PrintMessage(message)}")
    }
    case _ => {
      // 期待したメッセージが来なかったことをログに出力
      log.info(s"unexpected message")
    }
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
    case _: ActorInitializationException  Resume
    case _: ActorKilledException          Stop
    case _: DeathPactException            Escalate
    case _: Exception                     Restart
  }
}

Escalateなどを使うと、例外処理の判断を親アクターに任せることができますが、printMessageActorの親アクターは、userというuserGuardianアクターなので、何も設定してなければ、デフォルトのsupervisorStrategyが適用されます。
例外処理の判断を親アクターに任せる時は、親アクターのSupervisorStrategyを確認して、変更したい場合は、SupervisorStrategyをoverrideしましょう!

以上、12月19日のScalaアドベントカレンダーでした!

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?