0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

SupervisorStrategyから例外をAskに伝える方法と、Askの挙動解説

Last updated at Posted at 2023-12-23

背景

子Actorが投げた例外を親ActorのSupervisorStrategyでキャッチし、ActorRef.askで生成したFutureにその例外を返したい。
また、askを使ったときのActor間の情報の流れを解説している記事があまり見当たらなかったのでまとめてみた。

サンプルプログラム

SupervisingActorとThrowExceptionActorという二つのActorを用いる。
MainプログラムではSupervisingActorのaskからFutureを生成し、Await.resultで実行する。
各Actorの挙動は以下の通り。

  • SupervisingActorが子ActorとしてThrowExceptionActorを生成する。

    • SupervisingActorは受け取ったメッセージをそのまま子Actor (ThrowExceptionActor) に伝える。
    • SupervisingActorは、子アクターからメッセージを受け取ったら"finished"というメッセージをFutureの値として返す。
    • もし子アクターが例外を発生させた場合、その例外をFutureの値 (Failure) として返す。
  • ThrowExceptionActorはメッセージとして4を受け取ったら例外(RuntimeException)を発生させ、それ以外の整数ならそのままメッセージとして送り返す。

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props, Status, Terminated}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps;


class SupervisingActor extends Actor {
  implicit val timeout = Timeout(5 seconds)
  private val childActor = context.actorOf(Props[ThrowExceptionActor], "throwException-actor")
  // context.watch(childActor)
  private var lastSender = sender()
  override def preStart() = {
    println("preStart() in SupervisorExceptionActor")
    println("SupervisingActor is " + self) // akka://system/user/supervising-actor#{address}
  }
  
  // handle case of childActor throwing Exception
  override def supervisorStrategy = OneForOneStrategy() {
    case e =>
      println("sender on supervisorStrategy is " + sender()) // akka://system/user/supervising-actor/throwException-actor#{address}
      println("sending Failure message to " + lastSender) // akka://system/temp/supervising-actor$a
      lastSender ! Status.Failure(e) // send failure status
      Stop // stop childActor
  }

  def receive = {
    case x: Int => {
      lastSender = sender() // keep internal (temp) actor as variable
      println("sender of signal to SupervisingActor is " + lastSender) // akka://system/temp/supervising-actor$a

      (childActor ? x).onComplete(
        (_) => lastSender ! "finished") // send result back to akka://system/temp/supervising-actor$a
    }
    case _: Terminated =>
      println(s"$childActor is terminated.")
  }
}

class ThrowExceptionActor extends Actor {
  override def preStart() = {
    println("preStart() in ThrowExceptionActor")
    println("ThrowExceptionActor is " + self) // akka://system/user/supervising-actor/throwException-actor#{address}
  }

  def receive = {
    case 4 => throw new RuntimeException("Unexpected number")
    case x: Int => {
      println(s"Received int is $x")
      sender() ! x
    }
  }

  override def postStop(): Unit = {
    println(s"ThrowExceptionActor: $self is stopped")
  }
}

object Main extends App {
  implicit val timeout = Timeout(5 minutes)
  val system = ActorSystem("system")
  try {
    val actor = system.actorOf(Props[SupervisingActor], "supervising-actor") // akka://system/user/supervising-actor
//    val future = actor ? 4 // throw exception
    val future = actor ? 3 // success

    try {
      val res = Await.result(future, 5 minutes)
      println(s"Result is $res")
    }
    catch {
      case e => throw e
    }
    finally {
      system.terminate()
    }
  }
}

解説

まずは例外が発生しない場合のシーケンス図は以下のようになる (間違ってたらすみません) 。
AkkaActor-Page-1.jpg

ここで重要なのは、Mainスレッドで明示的に作成したSupervisingActorのインスタンス (user/supervising-actor)とは別に、
askを実行する際にはtemp/supervising-actorという別のインスタンスが内部的に生成されることだ。 1

公式サイト には以下のように書いてある。

"/temp" は短命のシステムが作成したすべてのアクターに対するガーディアンです。例えば、 ActorRef.ask の実装などに使われます。

ActorRef.askでFutureを作成した場合、この内部Actor (temp/supervising-actor)に対して返したメッセージが、Futureの値としてMainスレッドに返されることになる。
上のシーケンス図からわかるように、user/supervising-actorにaskからのメッセージを送るのはtemp/supervising-actorであるから、このときのsender()を変数lastSenderとして保持しておき、子Actorからのメッセージを受け取ったらlastSenderに対して終了メッセージを返せば良い。

// in SupervisingActor
def receive = {
    case x: Int => {
      lastSender = sender() // sender is temp/supervising-actor
      (childActor ? x).onComplete(
        (_) => lastSender ! "finished") // send finish msg to temp/supervising-actor, which is returned as Future value
    }
    ...
}
    

次に、子Actorが例外を発生させる場合のシーケンス図は以下のようになる。
AkkaActor-Page-2.jpg
user/supervising-actorから子アクター(user/supervising-actor/throwException-actor)にmsgを送るところまでは同じだが、ここでは子アクターがメッセージの処理中にRuntimeExceptionを発生させる。
このとき親ActorのsupervisorStrategyが呼び出されるが、Futureの結果として例外を返すためには、temp/supervising-actorにメッセージとしてakka.actor.Status.Failureを送る必要がある。
これについては公式サイトに次のように書かれている。

To complete the Future with an exception you need to send an akka.actor.Status.Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.
Please note that Scala’s Try sub types scala.util.Failure and scala.util.Success are not treated specially, and would complete the ask Future with the given value - only the akka.actor.Status messages are treated specially by the ask pattern.

従って親アクターのsupervisorStrategy内では、子Actorが発生させた例外をakka.actor.Status.Failureで包んでlastSenderに送れば良い。

// in SupervisingActor
  override def supervisorStrategy = OneForOneStrategy() {
    case e =>
      lastSender ! Status.Failure(e) // send failure status to temp/supervising-actor
      Stop // stop childActor
  }

補足: context.watch について

親アクター内でcontext.watch(子アクター)とすると、その子アクターが停止したときにTerminatedという信号を受け取れる。ただし親アクターがこのTerminatedを処理しない場合、親アクター自身もDeathPactExceptionを発生させてしまうことに注意。公式サイトにもあるように、子アクターの例外を伝播させたいときなどに役立つ。

まとめ

  • askでFutureを生成した場合には、内部的に生成されたアクター (temp/hogehoge) に対して戻り値を送り返す必要がある。またFutureから例外を返したい場合は、temp/hogehogeakka.actor.Status.Failureを返す必要がある。

  • メッセージの最初の送り手 (actor ? msg のmsgを送るアクター) が内部アクター (temp/hogehoge) なので、実装上は最初にメッセージを受け取ったときのsender()を変数として保持しておけば良い。

  1. インスタンス生成がFutureを生成したタイミングで行われるのか、Futureを実行したときに行われるのかは定かでないです (Timeoutのことを考えると実行時に生成されると思うのが自然?)。詳しい人教えてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?