はじめに
Actorにメッセージを送って戻り値を受け取る場合はask(?)を使う。
askの戻り値はFuture型となっており(Future[Any]なので必要に応じてmapToを!)、非同期に実行される。
なお、注意点としてはActor内で例外が発生するとFailureになりそうだがそうではないということは最低限おさえておきたいところ。
じゃ、どういう時にFailureになるの?というのをソースコードなどを見つつ、確認。
サンプルプログラム
ask(?)を使う場合はTimeout時間を設定する必要がある。以下は100msで設定している。
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}
object Main extends App {
val system = ActorSystem()
try {
implicit val timeout = Timeout(100 milliseconds)
val actor = system.actorOf(Props[EchoActor])
val f: Future[String] = (actor ? 1).mapTo[String]
Await.ready(f, Duration.Inf)
f.value.get match {
case Success(num) => println("Success: " + num)
case Failure(t) => println("Fail: " + t.getMessage())
}
} finally {
system.shutdown()
}
}
このサンプルで使うActorはやってきたメッセージをちょっと加工して返すだけ。
class EchoActor extends Actor {
def receive = {
case x => sender ! "return: " + x
}
}
実行すると、 Success: return: 1
と表示される。
次にActorをこのように1s待ってからエコーするよう変えて実行してみると。。。
class EchoActor extends Actor {
def receive = {
case x =>
Thread.sleep(1000)
sender ! "return: " + x
}
}
結果は Fail: Ask timed out on [Actor[akka://default/user/$a#1476087568]] after [100 ms]
となった。
さらに次はActorから常に例外発生するように変えて実行してみると。。。
def receive = {
case _ => sys.error("Exception in EchoActor")
}
結果は Fail: Ask timed out on [Actor[akka://default/user/$a#1476087568]] after [100 ms]
となり、上記同様タイムアウト。
Actor内のExceptionがFailureにくるまれているわけではない。
ソースコードをのぞく(Akka)
/*
* Implementation class of the “ask” pattern enrichment of ActorRef
*/
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated ⇒
actorRef ! message
Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
case ref: InternalActorRef ⇒
if (timeout.duration.length <= 0)
Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
actorRef.tell(message, a)
a.result.future
}
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
}
ここを見ると、3つFailureになるコードが書かれている。
この中で一番手っ取り早く試せそうなのはTimeoutに負の値を設定すること。やってみる。
implicit val timeout = Timeout(-100 milliseconds)
実行結果は Fail: Timeout length must not be negative, question not sent to [Actor[akka://default/user/$a#-414563364]]
となる。よしよし期待通り。
通常は以下の部分が実行される。
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
actorRef.tell(message, a)
PromiseActorRef#applyを見てみる。
https://github.com/akka/akka/blob/release-2.3/akka-actor/src/main/scala/akka/pattern/AskSupport.scala#L328
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): PromiseActorRef = {
val result = Promise[Any]()
val scheduler = provider.guardian.underlying.system.scheduler
val a = new PromiseActorRef(provider, result)
implicit val ec = a.internalCallingThreadExecutionContext
val f = scheduler.scheduleOnce(timeout.duration) {
result tryComplete Failure(new AskTimeoutException(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]"))
}
result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }
a
}
「タイムアウト設定時間を超えたらFailureになる」というのはここでscheduleを使うことで実現していた。
でもって、Success時はちゃんと登録したスケジュール処理をcancelしていることも見て取れる。
次にtellの実行だが、これはEchoActorにメッセージを届ける処理で、最終的にはEchoActor#receiveに到達する。
EchoActor内ではから送り元に!でメッセージを返している。
返ってきたメッセージを受けるのがPromiseActorRef#!で以下。
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
case Stopped | _: StoppedWithPath ⇒ provider.deadLetters ! message
case _ ⇒
if (message == null) throw new InvalidMessageException("Message is null")
if (!(result.tryComplete(
message match {
case Status.Success(r) ⇒ Success(r)
case Status.Failure(f) ⇒ Failure(f)
case other ⇒ Success(other)
}))) provider.deadLetters ! message
}
手元のdebug実行環境では引数のmessageに return: 1
が来ていることが確認できた。
で、 case other => Success(other)
を通り、tryCompleteによってFuture(Promise)の値はSuccessに確定した。
Actor内で例外が発生するとどうなるんだっけ?
については、ポストをわけます。