背景
子Actorが投げた例外を親ActorのSupervisorStrategyでキャッチし、ActorRef.askで生成したFutureにその例外を返したい。
また、askを使ったときのActor間の情報の流れを解説している記事があまり見当たらなかったのでまとめてみた。
サンプルプログラム
SupervisingActorとThrowExceptionActorという二つのActorを用いる。
MainプログラムではSupervisingActorのaskからFutureを生成し、Await.result
で実行する。
各Actorの挙動は以下の通り。
-
SupervisingActorが子ActorとしてThrowExceptionActorを生成する。
- SupervisingActorは受け取ったメッセージをそのまま子Actor (ThrowExceptionActor) に伝える。
- SupervisingActorは、子アクターからメッセージを受け取ったら"finished"というメッセージをFutureの値として返す。
- もし子アクターが例外を発生させた場合、その例外をFutureの値 (Failure) として返す。
-
ThrowExceptionActorはメッセージとして4を受け取ったら例外(RuntimeException)を発生させ、それ以外の整数ならそのままメッセージとして送り返す。
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props, Status, Terminated}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps;
class SupervisingActor extends Actor {
implicit val timeout = Timeout(5 seconds)
private val childActor = context.actorOf(Props[ThrowExceptionActor], "throwException-actor")
// context.watch(childActor)
private var lastSender = sender()
override def preStart() = {
println("preStart() in SupervisorExceptionActor")
println("SupervisingActor is " + self) // akka://system/user/supervising-actor#{address}
}
// handle case of childActor throwing Exception
override def supervisorStrategy = OneForOneStrategy() {
case e =>
println("sender on supervisorStrategy is " + sender()) // akka://system/user/supervising-actor/throwException-actor#{address}
println("sending Failure message to " + lastSender) // akka://system/temp/supervising-actor$a
lastSender ! Status.Failure(e) // send failure status
Stop // stop childActor
}
def receive = {
case x: Int => {
lastSender = sender() // keep internal (temp) actor as variable
println("sender of signal to SupervisingActor is " + lastSender) // akka://system/temp/supervising-actor$a
(childActor ? x).onComplete(
(_) => lastSender ! "finished") // send result back to akka://system/temp/supervising-actor$a
}
case _: Terminated =>
println(s"$childActor is terminated.")
}
}
class ThrowExceptionActor extends Actor {
override def preStart() = {
println("preStart() in ThrowExceptionActor")
println("ThrowExceptionActor is " + self) // akka://system/user/supervising-actor/throwException-actor#{address}
}
def receive = {
case 4 => throw new RuntimeException("Unexpected number")
case x: Int => {
println(s"Received int is $x")
sender() ! x
}
}
override def postStop(): Unit = {
println(s"ThrowExceptionActor: $self is stopped")
}
}
object Main extends App {
implicit val timeout = Timeout(5 minutes)
val system = ActorSystem("system")
try {
val actor = system.actorOf(Props[SupervisingActor], "supervising-actor") // akka://system/user/supervising-actor
// val future = actor ? 4 // throw exception
val future = actor ? 3 // success
try {
val res = Await.result(future, 5 minutes)
println(s"Result is $res")
}
catch {
case e => throw e
}
finally {
system.terminate()
}
}
}
解説
まずは例外が発生しない場合のシーケンス図は以下のようになる (間違ってたらすみません) 。
ここで重要なのは、Mainスレッドで明示的に作成したSupervisingActorのインスタンス (user/supervising-actor
)とは別に、
askを実行する際にはtemp/supervising-actor
という別のインスタンスが内部的に生成されることだ。 1
公式サイト には以下のように書いてある。
"/temp" は短命のシステムが作成したすべてのアクターに対するガーディアンです。例えば、 ActorRef.ask の実装などに使われます。
ActorRef.askでFutureを作成した場合、この内部Actor (temp/supervising-actor
)に対して返したメッセージが、Futureの値としてMainスレッドに返されることになる。
上のシーケンス図からわかるように、user/supervising-actor
にaskからのメッセージを送るのはtemp/supervising-actor
であるから、このときのsender()
を変数lastSender
として保持しておき、子Actorからのメッセージを受け取ったらlastSender
に対して終了メッセージを返せば良い。
// in SupervisingActor
def receive = {
case x: Int => {
lastSender = sender() // sender is temp/supervising-actor
(childActor ? x).onComplete(
(_) => lastSender ! "finished") // send finish msg to temp/supervising-actor, which is returned as Future value
}
...
}
次に、子Actorが例外を発生させる場合のシーケンス図は以下のようになる。
user/supervising-actor
から子アクター(user/supervising-actor/throwException-actor
)にmsgを送るところまでは同じだが、ここでは子アクターがメッセージの処理中にRuntimeExceptionを発生させる。
このとき親ActorのsupervisorStrategyが呼び出されるが、Futureの結果として例外を返すためには、temp/supervising-actor
にメッセージとしてakka.actor.Status.Failure
を送る必要がある。
これについては公式サイトに次のように書かれている。
To complete the Future with an exception you need to send an akka.actor.Status.Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.
Please note that Scala’s Try sub types scala.util.Failure and scala.util.Success are not treated specially, and would complete the ask Future with the given value - only the akka.actor.Status messages are treated specially by the ask pattern.
従って親アクターのsupervisorStrategy内では、子Actorが発生させた例外をakka.actor.Status.Failure
で包んでlastSender
に送れば良い。
// in SupervisingActor
override def supervisorStrategy = OneForOneStrategy() {
case e =>
lastSender ! Status.Failure(e) // send failure status to temp/supervising-actor
Stop // stop childActor
}
補足: context.watch について
親アクター内でcontext.watch(子アクター)
とすると、その子アクターが停止したときにTerminated
という信号を受け取れる。ただし親アクターがこのTerminated
を処理しない場合、親アクター自身もDeathPactException
を発生させてしまうことに注意。公式サイトにもあるように、子アクターの例外を伝播させたいときなどに役立つ。
まとめ
-
askでFutureを生成した場合には、内部的に生成されたアクター (
temp/hogehoge
) に対して戻り値を送り返す必要がある。またFutureから例外を返したい場合は、temp/hogehoge
にakka.actor.Status.Failure
を返す必要がある。 -
メッセージの最初の送り手 (
actor ? msg
のmsgを送るアクター) が内部アクター (temp/hogehoge
) なので、実装上は最初にメッセージを受け取ったときのsender()
を変数として保持しておけば良い。
-
インスタンス生成がFutureを生成したタイミングで行われるのか、Futureを実行したときに行われるのかは定かでないです (Timeoutのことを考えると実行時に生成されると思うのが自然?)。詳しい人教えてください。 ↩