この記事はScala Advent Calendar 2017の12月19日の記事です。
今日は、Akkaのソースコード読んで実装について調べたことについて話したいと思います。
Akkaは、プロセッサコアとネットワークにまたがってスケーラブルでレジリエントなシステムを設計するためのオープンソースのライブラリです。JavaとScalaで使うことができます。
https://doc.akka.io/docs/akka/current/guide/introduction.html
https://twitter.com/lightbend/status/919595014392635392 より引用
Akkaは、分散システムの障害に備える道具を提供してくれます。
https://www.slideshare.net/yugolf/preparing-for-distributed-system-failures-using-akka-scala-matsuri2017-72575226
詳しくAkkaに知りたい人は、Akka in Actionの翻訳本のAkka実践バイブル アクターモデルによる並行・分散システムの実現を読むことをオススメします。
環境構築
No 1
Intellij IDEA Community Editionをダウンロードします。
No 2
File -> New -> Projectを選択します。
No 3
Scala -> SBT -> Nextを選択
No 4
NameをHelloAkkaにして、Finishを選択
No 5
すでにIntelliJ IDEAを開いていれば、以下のポップアップで、New Windowを選択
No 6
SBTのダウンロードが終わったあとに、
https://akka.io/docs/
を参考にHelloAkka/build.sbtを以下のように編集します。
name := "HelloAkka"
version := "0.1"
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.7",
"com.typesafe.akka" %% "akka-testkit" % "2.5.7" % Test
)
編集したら、Refresh projectを選択する
No 7
HelloAkka/src/main/scalaフォルダを右クリックして、New->Scala Classを選択
No 8
Name: HelloAkka
Kind: Object
にして、OKを選択
No 9
HelloAkka.scalaを以下のように編集します
import akka.actor.ActorSystem
object HelloAkka extends App {
println(ActorSystem())
println(actorSystem.toString)
}
No 10
Run -> Edit Configurationを選択
No 11
+をクリック、SBT Taskを選択
No 12
Name: sbt run
Tasks: run
を入力して、OKを選択
No 13
再生ボタンを選択
No 14
コンソールにakka://defaultが出力されていることを確認します
[info] Running HelloAkka
akka://default
akka://default
No 15
以下のようにActorSystemを選択して、「Command+b」を押して、applyの定義元にジャンプします
No 16
def apply(): ActorSystem = apply("default")
上のの定義元を見ると
println(ActorSystem().toString)
は、
println(ActorSystem().apply().toString)
と同じで、さらに
println(ActorSystem().apply("default").toString)
と同じことがわかる。
出力結果は、
akka://default
だったので、ActorSystemのコンストラクタの引数は、このURIのdefault
に関係していそうです。
No 17
URLのドメインが、default
だとなんのことだかわからないので、URIを変更してみましょう!
以下のようにHelloAkka.scalaを変更します
import akka.actor.ActorSystem
object HelloAkka extends App {
val actorSystem = ActorSystem("actorSystem")
println(actorSystem)
println(actorSystem.toString)
}
No 18
再生ボタンを選択
No 19
コンソールにakka://actorSystem
が出力されてURLのドメインが変更されていることを確認します。
[info] Running HelloAkka
akka://actorSystem
akka://actorSystem
このactorSystem.toString
は、ActorSystem.scala
のActorSystemImpl
クラスのtoString
関数を呼んでいます。
override def toString: String = lookupRoot.path.root.address.toString
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
の_systemName
に"actorSystem"が渡されて作られています。
No 20
akka://actorSystem
が出力される理由を解説します。(長いので読み飛ばしても構いません)
val actorSystem = ActorSystem("actorSystem")
HelloAkka.scala
のActorSystem
をクリックして、「Option+Command+B」で、実装をみます。
def apply(name: String): ActorSystem = apply(name, None, None, None)
上のapply(name, None, None, None)
のapply
をクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)
def apply(
name: String,
config: Option[Config] = None,
classLoader: Option[ClassLoader] = None,
defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem =
apply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))
上のapply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))
のapply
をクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)
def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
val bootstrapSettings = setup.get[BootstrapSetup]
val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)
new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
}
上記から、ActorSystem("actorSystem").toString
のtoStringメソッドは、ActorSystemImpl
のインスタンスのtoString関数が呼ばれていることがわかる。
上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
のActorSystemImpl
をクリックして、Command+B
で、定義元みたあと、toStirng関数を検索します。
private[akka] class ActorSystemImpl(
val name: String,
applicationConfig: Config,
classLoader: ClassLoader,
defaultExecutionContext: Option[ExecutionContext],
val guardianProps: Option[Props],
setup: ActorSystemSetup) extends ExtendedActorSystem {
// 略
override def toString: String = lookupRoot.path.root.address.toString
上記より、ActorSystem("actorSystem").toString
は、lookupRoot.path.root.address.toString
を呼んでいていることがわかります。
lookupRoot
をクリックして、「Option+Command+B」で、実装をみます。
def lookupRoot: InternalActorRef = provider.rootGuardian
上記のlookupRoot
をクリックして、「Option+Command+B」で、LocalActorRefProvider.scala
の実装をみます。
override lazy val rootGuardian: LocalActorRef =
new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher,
defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" ⇒ tempContainer
case "deadLetters" ⇒ deadLetters
case other ⇒ extraNames.get(other).getOrElse(super.getSingleChild(other))
}
}
上記のrootPath
をクリックして、「Option+Command+B」で、実装をみます。
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
上のnew ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
のstart
をクリックして、「Option+Command+B」で、実装をみます。
(nameには、"actorSystem"が渡されている)
def start(): this.type = _start
上の_start
をクリックして、「Option+Command+B」で、実装をみます。
private lazy val _start: this.type = try {
registerOnTermination(stopScheduler())
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
if (settings.LogDeadLetters > 0)
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
eventStream.startUnsubscriber()
loadExtensions()
if (LogConfigOnStart) logConfiguration()
this
} catch {
case NonFatal(e) ⇒
try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) }
throw e
}
上記の_start
関数は、ActorSystemImpl
のthisを返しています。
上記のprovider.init(this)
のinit
をクリックして、「Option+Command+B」で、LocalActorRefProvider
の実装をみます。
private[akka] def init(_system: ActorSystemImpl) {
system = _system
rootGuardian.start()
// chain death watchers so that killing guardian stops the application
systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
eventStream.startDefaultLoggers(_system)
}
このinit
関数が呼ばれた時、rootGuardian.start()
と、systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
が呼ばれます。
rootGuardian
と、systemGuardian
は、遅延評価された変数(lazy val)なので、呼ばれた時に初期化されます。
rootGuardian
と、systemGuardian
をそれぞれクリックして、「Option+Command+B」で、実装をみます。
override lazy val rootGuardian: LocalActorRef =
new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher,
defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" ⇒ tempContainer
case "deadLetters" ⇒ deadLetters
case other ⇒ extraNames.get(other).getOrElse(super.getSingleChild(other))
}
}
上のLocalActorRef
をクリックして、「Command+B」で、定義元をみます。
private[akka] class LocalActorRef private[akka] (
_system: ActorSystemImpl,
_props: Props,
_dispatcher: MessageDispatcher,
_mailboxType: MailboxType,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends ActorRefWithCell with LocalRef {
上の、_system
には、system
、path
には、rootPath
が渡されていることがわかります。
Command+[
で戻って、rootPath
をクリックして、「Option+Command+B」で、実装をみます。
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))`
上の`Address'をクリックして、「Option+Command+B」で、実装をみます。
def apply(protocol: String, system: String) = new Address(protocol, system)
上の`Address'をクリックして、「Command+B」で、実装をみます。
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
def this(protocol: String, system: String) = this(protocol, system, None, None)
@transient
override lazy val toString: String = {
val sb = (new java.lang.StringBuilder(protocol)).append("://").append(system)
if (host.isDefined) sb.append('@').append(host.get)
if (port.isDefined) sb.append(':').append(port.get)
sb.toString
}
上のthis
関数のprotocol
に"akka"、system
に"actorSystem"が渡されて、akka://actorSystem
のURIが作られたと思われます。
No 21
Akkaを使ってみる
以下のような構成のアクターシステムをHelloAkka.scala
で作ります。
HelloAkka.scala
を以下のように編集して、sbt run
で実行して見ましょう!
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorLogging, ActorSystem, DeathPactException, Inbox, Kill, OneForOneStrategy, Props, SupervisorStrategy}
import scala.concurrent.duration._
// 送るメッセージの型宣言
case object ReplyMessage {
// println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class WhoToMessage(who: String) {
// println(s"Thead Name: ${Thread.currentThread().getName}")
}
case class PrintMessage(message: String) {
// println(s"Thead Name: ${Thread.currentThread().getName}")
}
class MessageActor extends Actor with ActorLogging {
// println(s"Thead Name: ${Thread.currentThread().getName}")
var message = ""
// メッセージを受信した時に呼ばれるメソッド
override def receive: Receive = {
case WhoToMessage(who) => {
// メッセージをログ出力
log.info(s"${WhoToMessage(who)}")
message = s"hello, $who"
}
case ReplyMessage => {
// ReplyMessageメッセージを受け取ったらログ出力
log.info(s"${ReplyMessage}")
// sender(送信元)に、PrintMessage(message)のメッセージを返信します
sender ! PrintMessage(message)
}
case _ => {
// 期待したメッセージが来なかったことをログに出力
log.info(s"unexpected message")
}
}
}
class PrintMessageActor extends Actor with ActorLogging {
// println(s"Thead Name: ${Thread.currentThread().getName}")
// メッセージを受信した時に呼ばれるメソッド
override def receive: Receive = {
case PrintMessage(message) => {
// PrintMessage(message)メッセージを受け取ったらログ出力
log.info(s"${PrintMessage(message)}")
}
case _ => {
// 期待したメッセージが来なかったことをログに出力
log.info(s"unexpected message")
}
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Restart // それ以外の例外は再起動
}
}
object HelloActor extends App {
// println(s"Thead Name: ${Thread.currentThread().getName}")
// ActorSystemを作成
val actorSystem = ActorSystem("actorSystem")
println(actorSystem)
println(actorSystem.toString)
// MessageActor型のmessageActorアクターを作成
val messageActor = actorSystem.actorOf(Props[MessageActor], "messageActor")
println(messageActor.path)
println(messageActor.path.parent)
println(messageActor.path.parent.parent)
// ActorSystemに、受信Boxアクターを作成
val inbox = Inbox.create(actorSystem)
println(inbox.getRef().path)
// messageActorアクターに、PrintMessage("akka")というメッセージを送信
messageActor ! PrintMessage("akka")
// PrintMessageは、WhoToMessageでも、PrintMessage型でもないので、unexpected messageを出力
// messageActorアクターに、WhoToMessage("akka")というメッセージを送信
messageActor ! WhoToMessage("akka")
// messageActorアクターは、WhoToMessage("akka")というメッセージを受け取ると
// message = "hello, akka"
// messageActorアクターのmessageg変数に"hello, akka"が代入される
// messageActorアクターにReplyMessageメッセージを送信します
inbox.send(messageActor, ReplyMessage)
// messageActorアクターは、ReplyMessageメッセージを受信すると
// sender(送信元)に、PrintMessage("hello, akka")を返信します
// その返信メッセージは、受信Boxアクターに届く
// PrintMessage(message1)に、受信Boxアクターに、5秒以内に届いたPrintMessage("hello, akka")メッセージを代入
val PrintMessage(message1) = inbox.receive(5.seconds)
println(s"Message: $message1") // Message: hello, akka
// messageActorアクターに、WhoToMessage("playframework")というメッセージを送信
messageActor ! WhoToMessage("playframework")
// messageActorアクターは、WhoToMessage("playframework")メッセージを受信すると、
// message = "hello, playframework"
// messageActorアクターのmessage変数に"hello, playframework"が代入される
// messageActorアクターに、ReplyMessageメッセージを送信します
inbox.send(messageActor, ReplyMessage)
// messageActorアクターは、ReplyMessageメッセージを受信すると
// sender(送信元)に、PrintMessage("hello, playframework")メッセージを返信します
// その返信メッセージは、受信Boxアクターに届く
// PrintMessage(message2)に、受信Boxアクターに、5秒以内に届いたPrintMessage"hello, playframework")を代入
val PrintMessage(message2) = inbox.receive(5.seconds)
println(s"Message: $message2") // Greeting: hello, playframework
// PrintMessageActor型のprintMessageActorアクターを作成します
val printMessageActor = actorSystem.actorOf(Props[PrintMessageActor], "printMessageActor")
// アクターシステムは、3秒遅れ、1秒間隔で、
// messageActorアクターが、1秒間隔でReplyMessageメッセージをprintMessageActorアクターに送信します
actorSystem.scheduler.schedule(
3.seconds,
1.seconds,
messageActor,
ReplyMessage
)(actorSystem.dispatcher, printMessageActor)
// messageActorアクターは、ReplyMessageメッセージを受け取ると
// sender(送信者)にPrintMessage("hello, playframework")メッセージを返信します
// printMessageActorアクターは、PrintMessage("hello, playframework")メッセージを受け取ると、
// "hello, playframework"を出力します
// ActorのデフォルトのスーパバイザーストラテジーはExceptionの例外のメッセージの場合は、再起動するので、上の出力は止まらない
printMessageActor ! new Exception("printMessageActor exception")
messageActor ! new Exception("messageActor exception")
}
実行結果
akka://actorSystem
akka://actorSystem
akka://actorSystem/user/messageActor
akka://actorSystem/user
akka://actorSystem/
akka://actorSystem/system/dsl/inbox-1
[INFO] [12/19/2017 02:51:26.079] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] WhoToMessage(akka)
[INFO] [12/19/2017 02:51:26.080] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
Message: hello, akka
Message: hello, playframework
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] WhoToMessage(playframework)
[INFO] [12/19/2017 02:51:26.105] [actorSystem-akka.actor.default-dispatcher-2] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/messageActor] unexpected message
[INFO] [12/19/2017 02:51:26.107] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] unexpected message
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:29.124] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:30.122] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:30.123] [actorSystem-akka.actor.default-dispatcher-5] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
[INFO] [12/19/2017 02:51:31.122] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/printMessageActor] PrintMessage(hello, playframework)
[INFO] [12/19/2017 02:51:32.123] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/messageActor] ReplyMessage
// 略
actorSystemというrootGuardianアクターを作ると、
userというuserGuardianアクターと、systeGuardianアクターが作られています
actorOfでアクターを作ると、userというuserGuardianアクターの下にアクターが作られる
今回はmessageActorと、printMessageActorを作成しています。
ただし、inboxアクターは特殊で、systeGuardianアクターの下に作られます。
messageActorや、printMessageActorにExceptionのメッセージを送っても、
デフォルトのスーパバイザーストラテジーはExceptionの場合は、
再起動なので、messageActorと、printMessageActorを再起動して出力され続けることが確認できると思います。
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
final val defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Restart
}
final val defaultStrategy: SupervisorStrategy = {
OneForOneStrategy()(defaultDecider)
}
デフォルトのSupervisorStrategyは以下のようにして、上書きすることもできます。
class PrintMessageActor extends Actor with ActorLogging {
// println(s"Thead Name: ${Thread.currentThread().getName}")
// メッセージを受信した時に呼ばれるメソッド
override def receive: Receive = {
case PrintMessage(message) => {
// PrintMessage(message)メッセージを受け取ったらログ出力
log.info(s"${PrintMessage(message)}")
}
case _ => {
// 期待したメッセージが来なかったことをログに出力
log.info(s"unexpected message")
}
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minutes) {
case _: ActorInitializationException ⇒ Resume
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Escalate
case _: Exception ⇒ Restart
}
}
Escalate
などを使うと、例外処理の判断を親アクターに任せることができますが、printMessageActorの親アクターは、user
というuserGuardianアクターなので、何も設定してなければ、デフォルトのsupervisorStrategy
が適用されます。
例外処理の判断を親アクターに任せる時は、親アクターのSupervisorStrategyを確認して、変更したい場合は、SupervisorStrategyをoverrideしましょう!
以上、12月19日のScalaアドベントカレンダーでした!