10
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Scala で Akka Actors を触ってみた

Posted at

はじめに

Scala での実装をしはじめてまだ3ヶ月ぐらいしか経っていないので、細かい所はおいといて、Akka Actorsを触ってわかったことをまとめたいと思う。

Akka とは?

Scala や Java で実装できるメッセージ駆動アプリケーションのライブラリ。
詳しくはこちら

今回はその中でも Akka Actors モジュールに絞ってまとめたいと思う。
他のモジュールについては別の機会にでも。

前提条件

基礎的な Scala が理解出来ていればきっと大丈夫。(なはず。

環境

  • scala:2.11.8
  • sbt:0.13.12
  • akka:2.5.3
  • logback:1.2.3

ビルドの設定

build.sbt
name := "akka-sample"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val akkaVersion = "2.5.3"
  val logbackVersion = "1.2.3"

  Seq(
    // akka
    "com.typesafe.akka" %% "akka-slf4j"   % akkaVersion,
    "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,

    // akka-actor
    "com.typesafe.akka" %% "akka-actor"   % akkaVersion,

    // logging
    "com.typesafe.scala-logging" %% "scala-logging"   % "3.1.0",
    "ch.qos.logback"             %  "logback-classic" % logbackVersion
  )
}

とりあえず最低限の実装で Actor を動かしてみた

説明

SimpleActor.scala
import akka.actor.{Actor, ActorLogging}

class SimpleActor extends Actor with ActorLogging {

  override def receive = {
    case "test"  => log.info(s"received test.")
    case unknown => log.info(s"received unknown message. ${unknown}")
  }
}

object SimpleActor {
  val ACTOR_NAME = "simple-actor"
}

最低限 Actor を拡張して receive メソッドさえあれば、メッセージのやり取りができる。
ActorLogging については Actor 内で LoggingAdapter を利用したい場合に利用するトレイト。

BootSimpleActor.scala
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

object BootSimpleActor extends App {

  protected val actorSystem = ActorSystem("actor-system")

  val actor = actorSystem.actorOf(Props[SimpleActor], SimpleActor.ACTOR_NAME)

  val log = Logger(LoggerFactory.getLogger(this.getClass))

  log.info("boot start.")
  actor ! "test"
  actor ! "sample"
  log.info("boot finish.")
  Thread.sleep(1000)

  actorSystem.stop(actor)
  Thread.sleep(1000)

  actorSystem.terminate
  System.exit(0)
}

ActorSystem の生成コストは重たく、アプリケーションで単一となるように実装する。
Props は Actor の設定情報を管理し、それと Actor の名前を Actor のファクトリーメソッドである ActorSystem#actorOf に渡すことによって Actor が生成される。
また、Actor の名前は ActorSystem において一意である必要がある。
ActorSystem を停止させる場合は先に Actor の停止を待って ActorSystem#terminate を呼び出せばよい。
ActorSystem#terminate を呼ばないと、永続的に動き続け、プロセスの停止のタイミングで停止(terminate)処理が自動的に呼ばれる事となる。

実行結果

> runMain com.github.zumappi.akka.sample.actor.simple.BootSimpleActor
[info] Compiling 1 Scala source to /Users/zumappi/Documents/develop/github.com/akka-sample/target/scala-2.11/classes...
[info] Running com.github.zumappi.akka.sample.actor.simple.BootSimpleActor 
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-2] [INFO ] BootSimpleActor$ - boot start.
[run-main-2] [INFO ] BootSimpleActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] SimpleActor - received test.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] SimpleActor - received unknown message. sample

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-2"
[actor-system-akka.actor.default-dispatcher-4] [DEBUG] EventStream - shutting down: StandardOutLogger started

実行ログから Actor に送ったメッセージがちゃんと受け取れていることがわかる。
また、Actorの処理自体は非同期で実施されるので、BootSimpleActor のboot finish.のログより後にメッセージを受け取ったログが出力されている。

Actorに引数を渡してみる

説明

Person.scala
case class Person(val name: String, val age: Int)

Person はただのケースクラスです。

ArgsActor.scala
import akka.actor.{Actor, ActorLogging}

class ArgsActor(val person: Person) extends Actor with ActorLogging {

  override def receive = {
    case "test"    => log.info(s"received test. args : ${person}")
    case p: Person => log.info(s"received test. args : ${person} : ${p}")
    case unknown   => log.info(s"received unknown message. args : ${person} : ${unknown}")
  }
}

object ArgsActor {
  val ACTOR_NAME = "args-actor"
}

Actorのメンバーを自由に実装できる
今回の場合person: Personを保持している。

BootArgsActor.scala
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

object BootArgsActor extends App {

  protected val actorSystem = ActorSystem("actor-system")

  val person = Person("same", 20)
  val actor = actorSystem.actorOf(Props(classOf[ArgsActor], person), ArgsActor.ACTOR_NAME)

  val log = Logger(LoggerFactory.getLogger(this.getClass))

  log.info("boot start.")
  actor ! "test"
  actor ! Person("tom", 25)
  actor ! "sample"
  log.info("boot finish.")
  Thread.sleep(1000)

  actorSystem.stop(actor)
  Thread.sleep(1000)

  actorSystem.terminate
  System.exit(0)
}

Actor のメンバーに値を渡したい場合は、Props の生成時に値も一緒に渡すことで、Actor に値を渡すことができる。
Props の作成の仕方が違うのに注意、第一引数が Class になっている。

実行結果

> runMain com.github.zumappi.akka.sample.actor.args.BootArgsActor
[info] Running com.github.zumappi.akka.sample.actor.args.BootArgsActor 
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-0] [INFO ] BootArgsActor$ - boot start.
[run-main-0] [INFO ] BootArgsActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] ArgsActor - received test. args : Person(same,20)
[actor-system-akka.actor.default-dispatcher-4] [INFO ] ArgsActor - received test. args : Person(same,20) : Person(tom,25)
[actor-system-akka.actor.default-dispatcher-4] [INFO ] ArgsActor - received unknown message. args : Person(same,20) : sample

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"
[actor-system-akka.actor.default-dispatcher-4] [DEBUG] EventStream - shutting down: StandardOutLogger started

引数がちゃんと渡されていていることがわかる。

Actorから戻り値を取得してみる

説明

FutureActor.scala
import akka.actor.{Actor, ActorLogging}

class FutureActor extends Actor with ActorLogging {

  override def receive = {
    case "test"    => {
      log.info(s"received test.")
      sender ! Person("test", 1)
    }
    case p: Person => {
      log.info(s"received test. ${p}")
      sender ! Person("same", 20)
    }
    case unknown   => {
      log.info(s"received unknown message. ${unknown}")
      Thread.sleep(2000)
    }
  }
}

object FutureActor {
  val ACTOR_NAME = "future-actor"
}

Actor で sender に対してメッセージを渡すことによって、呼び出し元に値を返すことができる
試しに予期していないデータが来た場合は、スレッドを3秒間休止させてタイムアウトを発生させている。

BootFutureActor.scala
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

import scala.util.{Failure, Success}

object BootFutureActor extends App {

  protected val actorSystem = ActorSystem("actor-system")

  val actor = actorSystem.actorOf(Props[FutureActor], FutureActor.ACTOR_NAME)

  val log = Logger(LoggerFactory.getLogger(this.getClass))

  implicit val timeout = Timeout(1 seconds)

  log.info("boot start.")
  val f1 = (actor ? "test").mapTo[Person]
  f1 onComplete {
    case Success(p) => log.info(s"Success!. ${p}")
    case Failure(t) => log.error(s"Failure!. ${t.getMessage}")
  }

  val f2 = (actor ? Person("tom", 25)).mapTo[Person]
  f2 onComplete {
    case Success(p) => log.info(s"Success!. ${p}")
    case Failure(t) => log.error(s"Failure!. ${t.getMessage}")
  }

  val f3 = (actor ? "sample").mapTo[Person]
  f3 onComplete {
    case Success(p) => log.info(s"Success!. ${p}")
    case Failure(t) => log.error(s"Failure!. ${t.getMessage}")
  }

  log.info("boot finish.")
  Thread.sleep(3000)

  actorSystem.stop(actor)
  Thread.sleep(1000)

  actorSystem.terminate
  System.exit(0)
}

Actor から戻り値を取得する場合は ask もしくは?を利用してメッセージを渡す必要がある。
また戻り値の型は Future[Any] なので、戻り値の型がわかっている場合は Future#mapTo を利用して、型変換を行うとよい。
今回の場合 Future[Any] を Future[Person] に変換している。

実行結果

> runMain com.github.zumappi.akka.sample.actor.future.BootFutureActor
[info] Running com.github.zumappi.akka.sample.actor.future.BootFutureActor 
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-2] [INFO ] BootFutureActor$ - boot start.
[run-main-2] [INFO ] BootFutureActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] FutureActor - received test.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] FutureActor - received test. Person(tom,25)
[ForkJoinPool-1-worker-5] [INFO ] BootFutureActor$ - Success!. Person(test,1)
[actor-system-akka.actor.default-dispatcher-3] [INFO ] FutureActor - received unknown message. sample
[ForkJoinPool-1-worker-1] [INFO ] BootFutureActor$ - Success!. Person(same,20)
[ForkJoinPool-1-worker-1] [ERROR] BootFutureActor$ - Failure!. Ask timed out on [Actor[akka://actor-system/user/future-actor#-1559953003]] after [1000 ms]. Sender[null] sent message of type "java.lang.String".

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-2"
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - shutting down: StandardOutLogger started

戻り値がちゃんと受け取れていることがわかる。
タイムアウト等で処理が正常に完結できないと、Future は Failure になる。

Actorのライフサイクルを試してみる

説明

LifeCycleActor.scala
import akka.actor.{Actor, ActorLogging}

class LifeCycleActor extends Actor with ActorLogging {

  override def preStart() = log.info(s"preStart.")

  override def receive = {
    case "test"  => log.info(s"received test.")
    case "error" => {
      log.info(s"received error.")
      throw new Exception("manual error!")
    }
    case unknown => log.info(s"received unknown message. ${unknown}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]) = {
    log.info(s"preRestart start. ${reason.getMessage}")
    for {
      msg <- message
    } yield {
      log.error(s"message : ${msg}")
    }
    super.preRestart(reason, message)
    log.info(s"preRestart end.")
  }

  override def postRestart(reason: Throwable) = {
    log.info(s"postRestart start. ${reason.getMessage}")
    super.postRestart(reason)
    log.info(s"postRestart end.")
  }

  override def postStop() = log.info(s"postStop.")
}

object LifeCycleActor {
  val ACTOR_NAME = "life-cycle-actor"
}

メッセージがerrorだった場合は例外を発生させている。
メッセージの受取処理で例外が発生した場合、デフォルトで自分自身の再起動(restar)処理を行う。
Actor には preStart, preRestart, postRestart, postStop が存在する。

  • preStart
    • Actor の作成タイミングで呼び出される。
  • preRestart
    • 再起動(restart)タイミングの最初に呼び出される。
    • 継承元の Actor#preRestart では子Actor をすべて停止させ、自分自身の停止(postStop)も行うので、特に意図しない場合は、継承元の preRestart も呼び出してあげた方が良い。
  • postRestart
    • 再起動(restart)タイミングの最後に呼び出される。
    • 継承元の Actor#postRestart では自分自身の preStart を呼び出すので、こちらも特に意図しない場合は**、継承元の postRestart も呼び出してあげた方が良い。**
  • postStop
    • Actor の停止(stop)タイミングで呼び出される。
scala:BootLifeCycleActor.scala
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

object BootLifeCycleActor extends App {

  protected val actorSystem = ActorSystem("actor-system")

  val actor = actorSystem.actorOf(Props[LifeCycleActor], LifeCycleActor.ACTOR_NAME)

  val log = Logger(LoggerFactory.getLogger(this.getClass))

  log.info("boot start.")
  actor ! "test"
  actor ! "error"
  actor ! "sample"
  log.info("boot finish.")
  Thread.sleep(1000)

  actorSystem.stop(actor)
  Thread.sleep(1000)

  actorSystem.terminate
  System.exit(0)
}

今回は普通にメッセージを送っているだけ。

実行結果

> runMain com.github.zumappi.akka.sample.actor.lifecycle.BootLifeCycleActor
[info] Running com.github.zumappi.akka.sample.actor.lifecycle.BootLifeCycleActor 
[actor-system-akka.actor.default-dispatcher-2] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-2] [DEBUG] EventStream - Default Loggers started
[run-main-3] [INFO ] BootLifeCycleActor$ - boot start.
[run-main-3] [INFO ] BootLifeCycleActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] LifeCycleActor - preStart.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] LifeCycleActor - received test.
[actor-system-akka.actor.default-dispatcher-4] [INFO ] LifeCycleActor - received error.
[actor-system-akka.actor.default-dispatcher-2] [ERROR] OneForOneStrategy - manual error!
java.lang.Exception: manual error!
        at com.github.zumappi.akka.sample.actor.lifecycle.LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:16)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
        at com.github.zumappi.akka.sample.actor.lifecycle.LifeCycleActor.aroundReceive(LifeCycleActor.scala:8)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
        at akka.actor.ActorCell.invoke(ActorCell.scala:496)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - preRestart start. manual error!
[actor-system-akka.actor.default-dispatcher-2] [ERROR] LifeCycleActor - message : error
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postStop.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - preRestart end.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postRestart start. manual error!
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postRestart end.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - received unknown message. sample
[actor-system-akka.actor.default-dispatcher-2] [INFO ] LifeCycleActor - postStop.

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-3"
[actor-system-akka.actor.default-dispatcher-5] [DEBUG] EventStream - shutting down: StandardOutLogger started

最初のメッセージを処理する前に、preStart が呼び出されているのがわかる。
Actor 内で例外が発生すると、再起動(restart)処理され、preRestart, postStop, postRestart, preStart の順番で再起動していることがわかる。
プログラムの停止(terminate)タイミングでは、postStop が呼び出されているのがわかる。

子Actorを試してみる

説明

ChildActor.scala
import akka.actor.{Actor, ActorLogging}

class ChildActor extends Actor with ActorLogging {

  override def preStart() = log.info(s"preStart.")

  override def receive = {
    case "test"  => log.info(s"received test.")
    case "error" => {
      log.info(s"received error.")
      throw new Exception("manual child error!")
    }
    case unknown => log.info(s"received unknown message. ${unknown}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]) = {
    log.info(s"preRestart start. ${reason.getMessage}")
    for {
      msg <- message
    } yield {
      log.error(s"message : ${msg}")
    }
    super.preRestart(reason, message)
    log.info(s"preRestart end.")
  }

  override def postRestart(reason: Throwable) = {
    log.info(s"postRestart start. ${reason.getMessage}")
    super.postRestart(reason)
    log.info(s"postRestart end.")
  }

  override def postStop() = log.info(s"postStop.")
}

object ChildActor {
  val ACTOR_NAME = "child-actor"
}

通常の Actor。
今回はこちらを子Actor とする。
メッセージがerrorだった場合、例外を発生させる。

ParentActor.scala
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorLogging, DeathPactException, OneForOneStrategy, Props}

class ParentActor extends Actor with ActorLogging {
  val child = context.actorOf(Props[ChildActor], ChildActor.ACTOR_NAME)

  override def preStart() = log.info(s"preStart.")

  override val supervisorStrategy = OneForOneStrategy() {
    case _: ActorInitializationException => Stop
    case _: ActorKilledException         => Stop
    case _: DeathPactException           => Stop
    case e: Exception                    => {
      log.error(s"exception! ${e.getMessage} => restart")
      Restart
    }
  }

  override def receive = {
    case "error" => {
      log.info(s"received error.")
      throw new Exception("manual parent error!")
    }
    case "child-error" => {
      log.info(s"received child error.")
      child ! "error"
    }
    case s: String => {
      log.info(s"received string. ${s}")
      child ! s
    }
    case unknown => {
      log.info(s"received unknown message. ${unknown}")
    }
  }

  override def preRestart(reason: Throwable, message: Option[Any]) = {
    log.info(s"preRestart start. ${reason.getMessage}")
    for {
      msg <- message
    } yield {
      log.error(s"message : ${msg}")
    }
    super.preRestart(reason, message)
    log.info(s"preRestart end.")
  }

  override def postRestart(reason: Throwable) = {
    log.info(s"postRestart start. ${reason.getMessage}")
    super.postRestart(reason)
    log.info(s"postRestart end.")
  }

  override def postStop() = log.info(s"postStop.")
}

object ParentActor {
  val ACTOR_NAME = "parent-actor"
}

親Actor。
メッセージがerrorだった場合、例外を発生させる。
メッセージがchild-errorだった場合、子Actor に対してメッセージerrorを送信する。
メッセージが「String型」だった場合は、そのまま子Actor にメッセージを送信する。
上記以外だった場合は、子Actor に対してメッセージ送信はしない。
supervisorStrategy は子Actor が発生させた例外のハンドリング処理を記述できる。
今回の場合、「Exception型」だった場合、再起動(restart)処理が実行をさせている。
(デフォルトを同じ挙動)

BootChildActor.scala
import akka.actor.{ActorSystem, Props}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

object BootChildActor extends App {

  protected val actorSystem = ActorSystem("actor-system")

  val actor = actorSystem.actorOf(Props[ParentActor], ParentActor.ACTOR_NAME)

  val log = Logger(LoggerFactory.getLogger(this.getClass))

  log.info("boot start.")
  actor ! "test"
  actor ! "child-error"
  Thread.sleep(1000)
  actor ! "error"
  actor ! "sample"
  actor ! 1
  log.info("boot finish.")
  Thread.sleep(1000)

  actorSystem.stop(actor)
  Thread.sleep(1000)

  actorSystem.terminate
  System.exit(0)
}

child-errorerrorの間にスリープ処理があるのは、連続するメッセージを処理してしまうと、親Actor・子Actorで発生させた例外処理より再起動タイミングが被ってしまって、タイミング次第では動作が安定しないので、今回はスリープさせて、挙動を固定させている。

実行結果

> runMain com.github.zumappi.akka.sample.actor.child.BootChildActor
[info] Running com.github.zumappi.akka.sample.actor.child.BootChildActor 
[actor-system-akka.actor.default-dispatcher-3] [INFO ] Slf4jLogger - Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-3] [DEBUG] EventStream - logger log1-Slf4jLogger started
[actor-system-akka.actor.default-dispatcher-3] [DEBUG] EventStream - Default Loggers started
[run-main-5] [INFO ] BootChildActor$ - boot start.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ParentActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ParentActor - received string. test
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ParentActor - received child error.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - received test.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - received error.
[actor-system-akka.actor.default-dispatcher-3] [ERROR] ParentActor - exception! manual child error! => restart
[actor-system-akka.actor.default-dispatcher-2] [ERROR] OneForOneStrategy - manual child error!
java.lang.Exception: manual child error!
        at com.github.zumappi.akka.sample.actor.child.ChildActor$$anonfun$receive$1.applyOrElse(ChildActor.scala:16)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
        at com.github.zumappi.akka.sample.actor.child.ChildActor.aroundReceive(ChildActor.scala:8)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
        at akka.actor.ActorCell.invoke(ActorCell.scala:496)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preRestart start. manual child error!
[actor-system-akka.actor.default-dispatcher-2] [ERROR] ChildActor - message : error
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - postStop.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preRestart end.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - postRestart start. manual child error!
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - preStart.
[actor-system-akka.actor.default-dispatcher-2] [INFO ] ChildActor - postRestart end.
[run-main-5] [INFO ] BootChildActor$ - boot finish.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - received error.
[actor-system-akka.actor.default-dispatcher-3] [ERROR] OneForOneStrategy - manual parent error!
java.lang.Exception: manual parent error!
        at com.github.zumappi.akka.sample.actor.child.ParentActor$$anonfun$receive$1.applyOrElse(ParentActor.scala:27)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
        at com.github.zumappi.akka.sample.actor.child.ParentActor.aroundReceive(ParentActor.scala:9)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
        at akka.actor.ActorCell.invoke(ActorCell.scala:496)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - preRestart start. manual parent error!
[actor-system-akka.actor.default-dispatcher-3] [ERROR] ParentActor - message : error
[actor-system-akka.actor.default-dispatcher-5] [INFO ] ParentActor - postStop.
[actor-system-akka.actor.default-dispatcher-5] [INFO ] ParentActor - preRestart end.
[actor-system-akka.actor.default-dispatcher-5] [INFO ] ChildActor - postStop.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - postRestart start. manual parent error!
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ChildActor - preStart.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - preStart.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - postRestart end.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - received string. sample
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - received unknown message. 1
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ChildActor - received unknown message. sample
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ChildActor - postStop.
[actor-system-akka.actor.default-dispatcher-3] [INFO ] ParentActor - postStop.

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-5"

子Actor でメッセージ処理が出来ていることが確認できる。
子Actor で例外が発生すると、親Actor で例外処理され、再起動(restart)処理が実行されていることがわかる。
同じように親Actor で例外が発生すると、親Actor では例外処理はされないが、再起動(restart)処理が実行されていることがわかる。

まとめ

5種類のサンプルを通して、Akka Actors の基本的な動作について理解できたと思う。
またこの記事を書くために、色々と調べていたら Publisher や Subscriber より細かい例外ハンドリング処理とかまだまだ奥が深いので、別の機会にまとめたいと思う。

サンプルコード

今回利用したサンプルコードはこちらに保存しておきます。

10
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?