はじめに
Scala での実装をしはじめてまだ3ヶ月ぐらいしか経っていないので、細かい所はおいといて、Akka Actorsを触ってわかったことをまとめたいと思う。
Akka とは?
Scala や Java で実装できるメッセージ駆動アプリケーションのライブラリ。
詳しくはこちら
今回はその中でも Akka Actors モジュールに絞ってまとめたいと思う。
他のモジュールについては別の機会にでも。
前提条件
基礎的な Scala が理解出来ていればきっと大丈夫。(なはず。
環境
- scala:2.11.8
- sbt:0.13.12
- akka:2.5.3
- logback:1.2.3
ビルドの設定
name := "akka-sample"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= {
val akkaVersion = "2.5.3"
val logbackVersion = "1.2.3"
Seq(
// akka
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
// akka-actor
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
// logging
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
"ch.qos.logback" % "logback-classic" % logbackVersion
)
}
とりあえず最低限の実装で Actor を動かしてみた
説明
import akka.actor.{Actor, ActorLogging}
class SimpleActor extends Actor with ActorLogging {
override def receive = {
case "test" => log.info(s"received test.")
case unknown => log.info(s"received unknown message. ${unknown}")
}
}
object SimpleActor {
val ACTOR_NAME = "simple-actor"
}
最低限 Actor を拡張して receive メソッドさえあれば、メッセージのやり取りができる。
ActorLogging については Actor 内で LoggingAdapter を利用したい場合に利用するトレイト。
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
object BootSimpleActor extends App {
protected val actorSystem = ActorSystem("actor-system")
val actor = actorSystem.actorOf(Props[SimpleActor], SimpleActor.ACTOR_NAME)
val log = Logger(LoggerFactory.getLogger(this.getClass))
log.info("boot start.")
actor ! "test"
actor ! "sample"
log.info("boot finish.")
Thread.sleep(1000)
actorSystem.stop(actor)
Thread.sleep(1000)
actorSystem.terminate
System.exit(0)
}
ActorSystem の生成コストは重たく、アプリケーションで単一となるように実装する。
Props は Actor の設定情報を管理し、それと Actor の名前を Actor のファクトリーメソッドである ActorSystem#actorOf に渡すことによって Actor が生成される。
また、Actor の名前は ActorSystem において一意である必要がある。
ActorSystem を停止させる場合は先に Actor の停止を待って ActorSystem#terminate を呼び出せばよい。
ActorSystem#terminate を呼ばないと、永続的に動き続け、プロセスの停止のタイミングで停止(terminate)処理が自動的に呼ばれる事となる。
実行結果
> runMain com.github.zumappi.akka.sample.actor.simple.BootSimpleActor
[info] Compiling 1 Scala source to /Users/zumappi/Documents/develop/github.com/akka-sample/target/scala-2.11/classes...
[info] Running com.github.zumappi.akka.sample.actor.simple.BootSimpleActor
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-2] [INFO ] BootSimpleActor$ - boot start.
[run-main-2] [INFO ] BootSimpleActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] SimpleActor - received test.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] SimpleActor - received unknown message. sample
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-2"
[actor-system-akka.actor.default-dispatcher-4] [DEBUG] EventStream - shutting down: StandardOutLogger started
実行ログから Actor に送ったメッセージがちゃんと受け取れていることがわかる。
また、Actorの処理自体は非同期で実施されるので、BootSimpleActor のboot finish.
のログより後にメッセージを受け取ったログが出力されている。
Actorに引数を渡してみる
説明
case class Person(val name: String, val age: Int)
Person はただのケースクラスです。
import akka.actor.{Actor, ActorLogging}
class ArgsActor(val person: Person) extends Actor with ActorLogging {
override def receive = {
case "test" => log.info(s"received test. args : ${person}")
case p: Person => log.info(s"received test. args : ${person} : ${p}")
case unknown => log.info(s"received unknown message. args : ${person} : ${unknown}")
}
}
object ArgsActor {
val ACTOR_NAME = "args-actor"
}
Actorのメンバーを自由に実装できる。
今回の場合person: Person
を保持している。
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
object BootArgsActor extends App {
protected val actorSystem = ActorSystem("actor-system")
val person = Person("same", 20)
val actor = actorSystem.actorOf(Props(classOf[ArgsActor], person), ArgsActor.ACTOR_NAME)
val log = Logger(LoggerFactory.getLogger(this.getClass))
log.info("boot start.")
actor ! "test"
actor ! Person("tom", 25)
actor ! "sample"
log.info("boot finish.")
Thread.sleep(1000)
actorSystem.stop(actor)
Thread.sleep(1000)
actorSystem.terminate
System.exit(0)
}
Actor のメンバーに値を渡したい場合は、Props の生成時に値も一緒に渡すことで、Actor に値を渡すことができる。
Props の作成の仕方が違うのに注意、第一引数が Class になっている。
実行結果
> runMain com.github.zumappi.akka.sample.actor.args.BootArgsActor
[info] Running com.github.zumappi.akka.sample.actor.args.BootArgsActor
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-0] [INFO ] BootArgsActor$ - boot start.
[run-main-0] [INFO ] BootArgsActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] ArgsActor - received test. args : Person(same,20)
[actor-system-akka.actor.default-dispatcher-4] [INFO ] ArgsActor - received test. args : Person(same,20) : Person(tom,25)
[actor-system-akka.actor.default-dispatcher-4] [INFO ] ArgsActor - received unknown message. args : Person(same,20) : sample
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"
[actor-system-akka.actor.default-dispatcher-4] [DEBUG] EventStream - shutting down: StandardOutLogger started
引数がちゃんと渡されていていることがわかる。
Actorから戻り値を取得してみる
説明
import akka.actor.{Actor, ActorLogging}
class FutureActor extends Actor with ActorLogging {
override def receive = {
case "test" => {
log.info(s"received test.")
sender ! Person("test", 1)
}
case p: Person => {
log.info(s"received test. ${p}")
sender ! Person("same", 20)
}
case unknown => {
log.info(s"received unknown message. ${unknown}")
Thread.sleep(2000)
}
}
}
object FutureActor {
val ACTOR_NAME = "future-actor"
}
Actor で sender に対してメッセージを渡すことによって、呼び出し元に値を返すことができる。
試しに予期していないデータが来た場合は、スレッドを3秒間休止させてタイムアウトを発生させている。
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
import scala.util.{Failure, Success}
object BootFutureActor extends App {
protected val actorSystem = ActorSystem("actor-system")
val actor = actorSystem.actorOf(Props[FutureActor], FutureActor.ACTOR_NAME)
val log = Logger(LoggerFactory.getLogger(this.getClass))
implicit val timeout = Timeout(1 seconds)
log.info("boot start.")
val f1 = (actor ? "test").mapTo[Person]
f1 onComplete {
case Success(p) => log.info(s"Success!. ${p}")
case Failure(t) => log.error(s"Failure!. ${t.getMessage}")
}
val f2 = (actor ? Person("tom", 25)).mapTo[Person]
f2 onComplete {
case Success(p) => log.info(s"Success!. ${p}")
case Failure(t) => log.error(s"Failure!. ${t.getMessage}")
}
val f3 = (actor ? "sample").mapTo[Person]
f3 onComplete {
case Success(p) => log.info(s"Success!. ${p}")
case Failure(t) => log.error(s"Failure!. ${t.getMessage}")
}
log.info("boot finish.")
Thread.sleep(3000)
actorSystem.stop(actor)
Thread.sleep(1000)
actorSystem.terminate
System.exit(0)
}
Actor から戻り値を取得する場合は ask もしくは?
を利用してメッセージを渡す必要がある。
また戻り値の型は Future[Any] なので、戻り値の型がわかっている場合は Future#mapTo を利用して、型変換を行うとよい。
今回の場合 Future[Any] を Future[Person] に変換している。
実行結果
> runMain com.github.zumappi.akka.sample.actor.future.BootFutureActor
[info] Running com.github.zumappi.akka.sample.actor.future.BootFutureActor
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-2] [INFO ] BootFutureActor$ - boot start.
[run-main-2] [INFO ] BootFutureActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] FutureActor - received test.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] FutureActor - received test. Person(tom,25)
[ForkJoinPool-1-worker-5] [INFO ] BootFutureActor$ - Success!. Person(test,1)
[actor-system-akka.actor.default-dispatcher-3] [INFO ] FutureActor - received unknown message. sample
[ForkJoinPool-1-worker-1] [INFO ] BootFutureActor$ - Success!. Person(same,20)
[ForkJoinPool-1-worker-1] [ERROR] BootFutureActor$ - Failure!. Ask timed out on [Actor[akka://actor-system/user/future-actor#-1559953003]] after [1000 ms]. Sender[null] sent message of type "java.lang.String".
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-2"
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - shutting down: StandardOutLogger started
戻り値がちゃんと受け取れていることがわかる。
タイムアウト等で処理が正常に完結できないと、Future は Failure になる。
Actorのライフサイクルを試してみる
説明
import akka.actor.{Actor, ActorLogging}
class LifeCycleActor extends Actor with ActorLogging {
override def preStart() = log.info(s"preStart.")
override def receive = {
case "test" => log.info(s"received test.")
case "error" => {
log.info(s"received error.")
throw new Exception("manual error!")
}
case unknown => log.info(s"received unknown message. ${unknown}")
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
log.info(s"preRestart start. ${reason.getMessage}")
for {
msg <- message
} yield {
log.error(s"message : ${msg}")
}
super.preRestart(reason, message)
log.info(s"preRestart end.")
}
override def postRestart(reason: Throwable) = {
log.info(s"postRestart start. ${reason.getMessage}")
super.postRestart(reason)
log.info(s"postRestart end.")
}
override def postStop() = log.info(s"postStop.")
}
object LifeCycleActor {
val ACTOR_NAME = "life-cycle-actor"
}
メッセージがerror
だった場合は例外を発生させている。
メッセージの受取処理で例外が発生した場合、デフォルトで自分自身の再起動(restar)処理を行う。
Actor には preStart, preRestart, postRestart, postStop が存在する。
-
preStart
- Actor の作成タイミングで呼び出される。
-
preRestart
- 再起動(restart)タイミングの最初に呼び出される。
- 継承元の Actor#preRestart では子Actor をすべて停止させ、自分自身の停止(postStop)も行うので、特に意図しない場合は、継承元の preRestart も呼び出してあげた方が良い。
-
postRestart
- 再起動(restart)タイミングの最後に呼び出される。
- 継承元の Actor#postRestart では自分自身の preStart を呼び出すので、こちらも特に意図しない場合は**、継承元の postRestart も呼び出してあげた方が良い。**
-
postStop
- Actor の停止(stop)タイミングで呼び出される。
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
object BootLifeCycleActor extends App {
protected val actorSystem = ActorSystem("actor-system")
val actor = actorSystem.actorOf(Props[LifeCycleActor], LifeCycleActor.ACTOR_NAME)
val log = Logger(LoggerFactory.getLogger(this.getClass))
log.info("boot start.")
actor ! "test"
actor ! "error"
actor ! "sample"
log.info("boot finish.")
Thread.sleep(1000)
actorSystem.stop(actor)
Thread.sleep(1000)
actorSystem.terminate
System.exit(0)
}
今回は普通にメッセージを送っているだけ。
実行結果
> runMain com.github.zumappi.akka.sample.actor.lifecycle.BootLifeCycleActor
[info] Running com.github.zumappi.akka.sample.actor.lifecycle.BootLifeCycleActor
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-3] [INFO ] BootLifeCycleActor$ - boot start.
[run-main-3] [INFO ] BootLifeCycleActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] LifeCycleActor - preStart.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] LifeCycleActor - received test.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] LifeCycleActor - received error.
[actor-system-akka.actor.default-dispatcher-2] [ERROR] OneForOneStrategy - manual error!
java.lang.Exception: manual error!
at com.github.zumappi.akka.sample.actor.lifecycle.LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:16)
at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
at com.github.zumappi.akka.sample.actor.lifecycle.LifeCycleActor.aroundReceive(LifeCycleActor.scala:8)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - preRestart start. manual error!
[actor-system-akka.actor.default-dispatcher-2] [ERROR] LifeCycleActor - message : error
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postStop.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - preRestart end.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postRestart start. manual error!
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postRestart end.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - received unknown message. sample
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postStop.
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-3"
[actor-system-akka.actor.default-dispatcher-5] [DEBUG] EventStream - shutting down: StandardOutLogger started
最初のメッセージを処理する前に、preStart が呼び出されているのがわかる。
Actor 内で例外が発生すると、再起動(restart)処理され、preRestart, postStop, postRestart, preStart の順番で再起動していることがわかる。
プログラムの停止(terminate)タイミングでは、postStop が呼び出されているのがわかる。
子Actorを試してみる
説明
import akka.actor.{Actor, ActorLogging}
class ChildActor extends Actor with ActorLogging {
override def preStart() = log.info(s"preStart.")
override def receive = {
case "test" => log.info(s"received test.")
case "error" => {
log.info(s"received error.")
throw new Exception("manual child error!")
}
case unknown => log.info(s"received unknown message. ${unknown}")
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
log.info(s"preRestart start. ${reason.getMessage}")
for {
msg <- message
} yield {
log.error(s"message : ${msg}")
}
super.preRestart(reason, message)
log.info(s"preRestart end.")
}
override def postRestart(reason: Throwable) = {
log.info(s"postRestart start. ${reason.getMessage}")
super.postRestart(reason)
log.info(s"postRestart end.")
}
override def postStop() = log.info(s"postStop.")
}
object ChildActor {
val ACTOR_NAME = "child-actor"
}
通常の Actor。
今回はこちらを子Actor とする。
メッセージがerror
だった場合、例外を発生させる。
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorLogging, DeathPactException, OneForOneStrategy, Props}
class ParentActor extends Actor with ActorLogging {
val child = context.actorOf(Props[ChildActor], ChildActor.ACTOR_NAME)
override def preStart() = log.info(s"preStart.")
override val supervisorStrategy = OneForOneStrategy() {
case _: ActorInitializationException => Stop
case _: ActorKilledException => Stop
case _: DeathPactException => Stop
case e: Exception => {
log.error(s"exception! ${e.getMessage} => restart")
Restart
}
}
override def receive = {
case "error" => {
log.info(s"received error.")
throw new Exception("manual parent error!")
}
case "child-error" => {
log.info(s"received child error.")
child ! "error"
}
case s: String => {
log.info(s"received string. ${s}")
child ! s
}
case unknown => {
log.info(s"received unknown message. ${unknown}")
}
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
log.info(s"preRestart start. ${reason.getMessage}")
for {
msg <- message
} yield {
log.error(s"message : ${msg}")
}
super.preRestart(reason, message)
log.info(s"preRestart end.")
}
override def postRestart(reason: Throwable) = {
log.info(s"postRestart start. ${reason.getMessage}")
super.postRestart(reason)
log.info(s"postRestart end.")
}
override def postStop() = log.info(s"postStop.")
}
object ParentActor {
val ACTOR_NAME = "parent-actor"
}
親Actor。
メッセージがerror
だった場合、例外を発生させる。
メッセージがchild-error
だった場合、子Actor に対してメッセージerror
を送信する。
メッセージが「String型」だった場合は、そのまま子Actor にメッセージを送信する。
上記以外だった場合は、子Actor に対してメッセージ送信はしない。
supervisorStrategy は子Actor が発生させた例外のハンドリング処理を記述できる。
今回の場合、「Exception型」だった場合、再起動(restart)処理が実行をさせている。
(デフォルトを同じ挙動)
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
object BootChildActor extends App {
protected val actorSystem = ActorSystem("actor-system")
val actor = actorSystem.actorOf(Props[ParentActor], ParentActor.ACTOR_NAME)
val log = Logger(LoggerFactory.getLogger(this.getClass))
log.info("boot start.")
actor ! "test"
actor ! "child-error"
Thread.sleep(1000)
actor ! "error"
actor ! "sample"
actor ! 1
log.info("boot finish.")
Thread.sleep(1000)
actorSystem.stop(actor)
Thread.sleep(1000)
actorSystem.terminate
System.exit(0)
}
child-error
とerror
の間にスリープ処理があるのは、連続するメッセージを処理してしまうと、親Actor・子Actorで発生させた例外処理より再起動タイミングが被ってしまって、タイミング次第では動作が安定しないので、今回はスリープさせて、挙動を固定させている。
実行結果
> runMain com.github.zumappi.akka.sample.actor.child.BootChildActor
[info] Running com.github.zumappi.akka.sample.actor.child.BootChildActor
[actor-system-akka.actor.default-dispatcher-3] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-3] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-3] [DEBUG] EventStream - Default Loggers started
[run-main-5] [INFO ] BootChildActor$ - boot start.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ParentActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ParentActor - received string. test
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ParentActor - received child error.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - received test.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - received error.
[actor-system-akka.actor.default-dispatcher-3] [ERROR] ParentActor - exception! manual child error! => restart
[actor-system-akka.actor.default-dispatcher-2] [ERROR] OneForOneStrategy - manual child error!
java.lang.Exception: manual child error!
at com.github.zumappi.akka.sample.actor.child.ChildActor$$anonfun$receive$1.applyOrElse(ChildActor.scala:16)
at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
at com.github.zumappi.akka.sample.actor.child.ChildActor.aroundReceive(ChildActor.scala:8)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preRestart start. manual child error!
[actor-system-akka.actor.default-dispatcher-2] [ERROR] ChildActor - message : error
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - postStop.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preRestart end.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - postRestart start. manual child error!
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - postRestart end.
[run-main-5] [INFO ] BootChildActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - received error.
[actor-system-akka.actor.default-dispatcher-3] [ERROR] OneForOneStrategy - manual parent error!
java.lang.Exception: manual parent error!
at com.github.zumappi.akka.sample.actor.child.ParentActor$$anonfun$receive$1.applyOrElse(ParentActor.scala:27)
at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
at com.github.zumappi.akka.sample.actor.child.ParentActor.aroundReceive(ParentActor.scala:9)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - preRestart start. manual parent error!
[actor-system-akka.actor.default-dispatcher-3] [ERROR] ParentActor - message : error
[actor-system-akka.actor.default-dispatcher-5] [INFO ] ParentActor - postStop.
[actor-system-akka.actor.default-dispatcher-5] [INFO ] ParentActor - preRestart end.
[actor-system-akka.actor.default-dispatcher-5] [INFO ] ChildActor - postStop.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - postRestart start. manual parent error!
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ChildActor - preStart.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - preStart.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - postRestart end.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - received string. sample
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - received unknown message. 1
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ChildActor - received unknown message. sample
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ChildActor - postStop.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - postStop.
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-5"
子Actor でメッセージ処理が出来ていることが確認できる。
子Actor で例外が発生すると、親Actor で例外処理され、再起動(restart)処理が実行されていることがわかる。
同じように親Actor で例外が発生すると、親Actor では例外処理はされないが、再起動(restart)処理が実行されていることがわかる。
まとめ
5種類のサンプルを通して、Akka Actors の基本的な動作について理解できたと思う。
またこの記事を書くために、色々と調べていたら Publisher や Subscriber より細かい例外ハンドリング処理とかまだまだ奥が深いので、別の機会にまとめたいと思う。
サンプルコード
今回利用したサンプルコードはこちらに保存しておきます。