はじめに
Akka Actorのask(?)まわりをソースコードリーディング でActor内で例外が発生するとFutureのSuccess/Failureには直接影響は及ぼさず、結果的にはtimeoutによってFailureになったことを確認した。
今回はその続きで、Actor内で例外が発生するとどうなるんだっけ?を見ていく。
今回のサンプルプログラム
今回はActorを2種類用意した。
mainプログラムからはSupervisorExceptionActorへ。そこからExceptionActorへ単に横流し。で、例外発生。
単純化のために?(ask)ではなくて!(tell)を使っている。
import akka.actor._
object Main extends App {
val system = ActorSystem()
try {
val actor = system.actorOf(Props[SupervisorExceptionActor])
actor ! 1
actor ! 4
actor ! 2
Thread.sleep(3000)
} finally {
system.shutdown()
}
}
class SupervisorExceptionActor extends Actor {
val actor = context.actorOf(Props[ExceptionActor])
override def preStart() = println("preStart() in SupervisorExceptionActor")
def receive = {
case x => actor ! x
}
}
class ExceptionActor extends Actor {
override def preStart() = println("preStart() in ExceptionActor")
def receive = {
case 4 => sys.error("死")
case x => println(x + " : " + this)
}
}
SupervisorExceptionActor内で context.actorOf
でExceptionActorを生成することでSupervisorExceptionActorはExceptionActorのSupervisor(監督者)となる。要は親子関係が出来る。
なお、 system.actorOf
で生成するとSupervisorにはならないので注意。
Supervisorは監視対象の子Actorで例外が発生した際のエラーハンドリングを行う。
上記プログラムでは特に何も設定していないのでデフォルト設定が使用される。初期設定は以下の通り。
/**
* User overridable definition the strategy to use for supervising
* child actors.
*/
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
SupervisorStragegy.defaultStrategyの宣言部は以下。
https://github.com/akka/akka/blob/release-2.3/akka-actor/src/main/scala/akka/actor/FaultHandling.scala#L166-L168
/**
* When supervisorStrategy is not specified for an actor this
* is used by default. OneForOneStrategy with decider defined in
* [[#defaultDecider]].
*/
final val defaultStrategy: SupervisorStrategy = {
OneForOneStrategy()(defaultDecider)
}
defaultDecider部分で例外毎の振る舞いが定義されている。
https://github.com/akka/akka/blob/release-2.3/akka-actor/src/main/scala/akka/actor/FaultHandling.scala#L154-L159
/**
* When supervisorStrategy is not specified for an actor this
* [[Decider]] is used by default in the supervisor strategy.
* The child will be stopped when [[akka.actor.ActorInitializationException]],
* [[akka.actor.ActorKilledException]], or [[akka.actor.DeathPactException]] is
* thrown. It will be restarted for other `Exception` types.
* The error is escalated if it's a `Throwable`, i.e. `Error`.
*/
final val defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Restart
}
ここでStopやRestartが登場しているがそれらDirectiveの種類とその意味をまとめる。
Directive種別|意味
----+----
Restart|Actorを再作成し、処理を再開(次のメッセージの処理)。Let it crashと言われるのはこの振る舞いを指している(はず)。
Resume|処理を再開(次のメッセージの処理)。Restartとの違いはActorを再作成しないこと。
Stop|Actorを停止する。Mailboxに溜まっていたその他メッセージは処理されないのでdead letter扱いとなる。
Escalate|さらに上のSupervisorに例外を伝搬させる。子についてはそこで定義したSupervisorStrategyの振る舞いに従う。
実際にこの通りになるのか確認してみる。
Restart
まずは冒頭のプログラムから。何も指定していないのでdefaultStrategyが採用される。RuntimeExceptionが発生するのでRestartが実行される。
preStart() in SupervisorExceptionActor
preStart() in ExceptionActor
1 : ExceptionActor@603ed77d
preStart() in ExceptionActor
2 : ExceptionActor@69d00cce
ExceptionActorが再作成され、別インスタンスになっていることが確認できた。
Resume
次にResumeの確認。
Supervisor側で明示的に定義するように以下を追記。
override def supervisorStrategy = OneForOneStrategy() {
case _ => Resume
}
以下、実行結果。
preStart() in SupervisorExceptionActor
preStart() in ExceptionActor
1 : ExceptionActor@2bc2f760
2 : ExceptionActor@2bc2f760
同じインスタンスで例外後の後続メッセージが実行されている。preStart()も実行されていない。
Stop
次にStopの確認。Resumeとしている部分をStopに変更するだけ。
override def supervisorStrategy = OneForOneStrategy() {
case _ => Stop
}
以下、実行結果。
preStart() in SupervisorExceptionActor
preStart() in ExceptionActor
1 : ExceptionActor@6f2612b9
[INFO] [default-akka.actor.default-dispatcher-5] [akka://default/user/$a/$a] Message [java.lang.Integer] from Actor[akka://default/user/$a#-1970577465] to Actor[akka://default/user/$a/$a#1036814967] was not delivered. [1] dead letters encountered.
Stopしたので、「4」の後続の「2」が処理されず、dead-letter行きが発生している事が確認できた。
Escalate
最後にEscalate。Stopの部分をEscalateに変える。
コレの動作確認を行うには更なるSupervisorを用意する必要あり。
class SupervisorSupervisorExceptionActor extends Actor {
val actor = context.actorOf(Props[SupervisorExceptionActor])
override def supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
def receive = {
case x => actor ! x
}
}
SupervisorStrategyActorはEscalateに指定する。
override def supervisorStrategy = OneForOneStrategy() {
case _ => Escalate
}
以下、結果。
preStart() in SupervisorExceptionActor
preStart() in ExceptionActor
1 : ExceptionActor@428ad712
[INFO] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a/$a/$a] Message [java.lang.Integer] from Actor[akka://default/user/$a/$a#2112516208] to Actor[akka://default/user/$a/$a/$a#1057736964] was not delivered. [1] dead letters encountered.
preStart() in SupervisorExceptionActor
preStart() in ExceptionActor
今までと違ってSupervisorのSupervisorでRestartと宣言したことによって、その子であるSupervisorも再作成が実行されているところ。
ただ、「2」のメッセージはdead-letters行きになるのか...
タイミング?この辺は未だちゃんと理解できていない。
なお、Resumeにすると「2」のメッセージも無事処理された。
例外発生を呼び出し元に伝えるには?
もともとこのシリーズは?(ask)で呼び出した時に例外が呼び出し元に伝わらないというものだった。
呼び出し元に返すにはActor内から!を使ってメッセージを送らないと届かない。
よって例外発生を伝えるにはそれを捕獲して!でメッセージ送信をする必要がある。
成功時と失敗時で送る型が異なるのでEitherを使うのが一般的だろうか。
class ExceptionActor extends Actor {
def receive = {
case a =>
Try { ...処理... } match {
case Success(b) =>
sender ! Right(成功時のメッセージ)
case Failure(c) =>
sender ! Left(c)
throw c
}
}
}
呼び出し元はFuture[Either[Throwable, String]]といった感じの型のメッセージを受け取ることとなる。
モナドトランスフォーマー...
なお、そもそもの話でいうと、?ではなくなるべく!で全体を構築するように設計することが良いとは思う。
このへんの話はまた時間があれば。