LoginSignup
12
9

More than 5 years have passed since last update.

Akka Actorのask(?)まわりをソースコードリーディング

Last updated at Posted at 2015-07-13

はじめに

Actorにメッセージを送って戻り値を受け取る場合はask(?)を使う。
askの戻り値はFuture型となっており(Future[Any]なので必要に応じてmapToを!)、非同期に実行される。
なお、注意点としてはActor内で例外が発生するとFailureになりそうだがそうではないということは最低限おさえておきたいところ。
じゃ、どういう時にFailureになるの?というのをソースコードなどを見つつ、確認。

サンプルプログラム

ask(?)を使う場合はTimeout時間を設定する必要がある。以下は100msで設定している。

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

import scala.concurrent.{Await, Future}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

object Main extends App {
  val system = ActorSystem()
  try {
    implicit val timeout = Timeout(100 milliseconds)
    val actor = system.actorOf(Props[EchoActor])

    val f: Future[String] = (actor ? 1).mapTo[String]

    Await.ready(f, Duration.Inf)

    f.value.get match {
      case Success(num) => println("Success: " + num)
      case Failure(t) => println("Fail: " + t.getMessage())
    }
  } finally {
    system.shutdown()
  }
}

このサンプルで使うActorはやってきたメッセージをちょっと加工して返すだけ。

class EchoActor extends Actor {
  def receive = {
    case x => sender ! "return: " + x
  }
}

実行すると、 Success: return: 1 と表示される。

次にActorをこのように1s待ってからエコーするよう変えて実行してみると。。。

class EchoActor extends Actor {
  def receive = {
    case x =>
      Thread.sleep(1000)
      sender ! "return: " + x
  }
}

結果は Fail: Ask timed out on [Actor[akka://default/user/$a#1476087568]] after [100 ms] となった。

さらに次はActorから常に例外発生するように変えて実行してみると。。。

def receive = {
  case _ => sys.error("Exception in EchoActor")
}

結果は Fail: Ask timed out on [Actor[akka://default/user/$a#1476087568]] after [100 ms] となり、上記同様タイムアウト。
Actor内のExceptionがFailureにくるまれているわけではない。

ソースコードをのぞく(Akka)

akka.pattern.AskSupport.scala
/*
 * Implementation class of the “ask” pattern enrichment of ActorRef
 */
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
  def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
    case ref: InternalActorRef if ref.isTerminated 
      actorRef ! message
      Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
    case ref: InternalActorRef 
      if (timeout.duration.length <= 0)
        Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
      else {
        val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)
        a.result.future
      }
    case _  Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
  }

ここを見ると、3つFailureになるコードが書かれている。
この中で一番手っ取り早く試せそうなのはTimeoutに負の値を設定すること。やってみる。

    implicit val timeout = Timeout(-100 milliseconds)

実行結果は Fail: Timeout length must not be negative, question not sent to [Actor[akka://default/user/$a#-414563364]] となる。よしよし期待通り。

通常は以下の部分が実行される。

        val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)

PromiseActorRef#applyを見てみる。
https://github.com/akka/akka/blob/release-2.3/akka-actor/src/main/scala/akka/pattern/AskSupport.scala#L328

akka.pattern.AskSupport.scala
  def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): PromiseActorRef = {
    val result = Promise[Any]()
    val scheduler = provider.guardian.underlying.system.scheduler
    val a = new PromiseActorRef(provider, result)
    implicit val ec = a.internalCallingThreadExecutionContext
    val f = scheduler.scheduleOnce(timeout.duration) {
      result tryComplete Failure(new AskTimeoutException(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]"))
    }
    result.future onComplete { _  try a.stop() finally f.cancel() }
    a
  }

「タイムアウト設定時間を超えたらFailureになる」というのはここでscheduleを使うことで実現していた。
でもって、Success時はちゃんと登録したスケジュール処理をcancelしていることも見て取れる。

次にtellの実行だが、これはEchoActorにメッセージを届ける処理で、最終的にはEchoActor#receiveに到達する。
EchoActor内ではから送り元に!でメッセージを返している。
返ってきたメッセージを受けるのがPromiseActorRef#!で以下。

akka.pattern.AskSupport.scala
  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
    case Stopped | _: StoppedWithPath  provider.deadLetters ! message
    case _ 
      if (message == null) throw new InvalidMessageException("Message is null")
      if (!(result.tryComplete(
        message match {
          case Status.Success(r)  Success(r)
          case Status.Failure(f)  Failure(f)
          case other              Success(other)
        }))) provider.deadLetters ! message
  }

手元のdebug実行環境では引数のmessageに return: 1 が来ていることが確認できた。
で、 case other => Success(other) を通り、tryCompleteによってFuture(Promise)の値はSuccessに確定した。

Actor内で例外が発生するとどうなるんだっけ?

については、ポストをわけます。

Actor内での例外発生とSupervisorStrategyの動作確認 書きました。

12
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
9