Akka実践バイブルをゆっくり読み解く企画の第5章です。
第5章 Future
目次を眺めてた時点で何か違和感は感じてたんです。Akka関係ないやん・・・
とはいえ、Future
もアクターも非同期処理という観点では類似の道具。ただし、それぞれ適したユースケースが異なるらしい。
Akkaとは直接関係ないこともあり、少し端折り目でいきます。
Futureのユースケース
- シンプルな非同期処理 → Future
- 複雑な非同期処理 → Akka
一言で言うとこういうことだと思う。
アクターの良いところは、非同期処理を実装するにあたって以下のような恩恵を受けられる点にある。
- 状態を保持することができる
- 処理パターンが多岐に渡っても可読性を維持できる
- きめ細かい監視と障害回復ができる
逆に言うと、これらの要素が必要でない場合はFuture
でも事足りる。
Futureの良いところ
Future
は「アクターを利用するべきか否か」の判断による消極的な選択肢として選択されるツールではない。Future
自体優れたツールであり、特にパイプライン処理においては便利なツールとなる。1つの処理から複数の関数を呼び出し、その結果群を合成して最終的に1つの結果を導き出す際に、複数の関数処理を容易に並列で実行することができる。
加えて、同一目的のサービス(気象情報の取得など)を複数呼び出して、先に返ってきた結果を採用して遅い方のサービスの無視するようなこともできる。
アクターでもこれらを実現することは可能だけど、実装量が多くなる。シンプルな要件に対して非同期処理を行いたい場合はFuture
を選択した方が楽である。
Futureの中では何もブロッキングしない
Future
のapply
は以下のように定義されている。
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] =
unit.map(_ => body)
body
がFuture
に名前渡しで渡されるため、body
は本当に必要なタイミングまで評価されない。そのため、メインスレッドで評価の完了を待つ必要がなくなる。Future
の中は別スレッドで動作するようになっているため、body
は別スレッドの方で評価が開始される。これにより、メインスレッドでの処理をブロッキングすることなく処理を並列で実施する仕組みが実現できる。
Futureの結果の取り扱い
非同期で処理されたFuture
の結果はforeach
やmap
で扱うことができる。
val futureEvent: Future[Event] = ...
futureEvent.foreach {event =>
// Futureの結果が使用可能になった時に呼び出される
// foreachはUnit型なので、値を返さない
}
val futureEvent: Future[Event] = ...
val futureResult = futureEvent.map {event =>
// Futureの結果が使用可能になった時に呼び出される
// ブロックの最後の評価内容がmapの結果として返される
...
}
また、Future[A]
を返すメソッド(getMethodA)とFuture[B]
を返すメソッド(getMethodB)が存在する場合に、それをチェーンする場合はflatMap
を使うこととなる。
val futureResult: Future[C] = getMethodA().flatMap{a =>
// mapにしてしまうと、futureResultの型がFuture[Future[C]]となってしまう
getMethodB(a)
}
並列実行の実現
scala.concurrent.Implicits.global
によってFuture
の並列処理を実現されている。このglobal
はデフォルトのExecutionContext
であり、自分で用意することもできる。
むしろ、他のプロセスとglobal
を共有してしまうとglobal
の設定(スレッドプール上限など)の兼ね合いでこちらで待ちが発生する可能性もあるため、global
よりもディスパッチャーを使う方が良い選択となる。
FutureとPromise
Future
とPromise
は対の関係にある。
-
Future
:読み込み用 -
Promise
:書き込み用
シンプルにFuture
を返すAPIを使うことができれば、Promise
を使う機会は頻繁には発生しない、とのこと。Future.apply(body)
の中でExecutionContext
が別スレッドを起こし、別スレッド側の方でPromise
による並列処理が実行される。並列処理の結果は元スレッドのFuture
の中にセットされることとなるため、Promise
を意識しなくても良いようになっている。
Futureとエラー
エラーが起きた際はエラーがFuture
にラップされる。Future
にラップされたエラー内容を確認する方法の1つとして、onComplete
が存在する。
val result = ...
result.onComplete {
case Success(value) => // 成功時の処理
case Failure(e) => // 失敗時の処理
}
ただし、Future
では致命的なエラーまで処理することはできない。OutOfMemeory
などの致命的なエラーについてはFuture
の中にラップすることができず、そのまま例外としてスローされる。
Futureの合成
複数のFuture
は合成することができる。合成の方法はいくつかあるが、とりあえずzip
とmap
による合成の例だけ挙げる。
val futureA: [Future[Option[A]] = ...
val futureB: [Future[Option[B]] = ...
futureA.zip(futureB).map {
case(resultA, resultB) => // resultA: Option[A] resultB: Option[B]
...
}
Futureとアクターの組み合わせ
第2章の時に見たコードの中に以下のようなコードがあった。
def getEvents = context.children.map { child =>
self.ask(GetEvent(child.path.name)).mapTo[Option[Event]]
}
このgetEvents
の型は以下のとおりIterable[Future[Option[Event]]]
となっている。
これはask
がFuture[Any]
を返すようになっているためである。ask
は、メッセージを送信した際に戻り値を返してもらうために使用するメソッドであった。ask
の型からは以下のことがわかる。
-
ask
は非同期的に実行され、その結果をFuture
にラップして返す -
ask
の結果は、合成することができる
まとめ
非同期処理を実現する方法として、アクターとFuture
の2つの手段があると冒頭に記述したが、結局のところアクター内で非同期処理の結果を受ける場合はFuture
を受けることもあり、アクターを使う上でFuture
は常に意識しないといけなさそう。ややこしそうと思う反面、非同期処理をScalaの言語仕様としてしっかりサポートしてくれるからアクターの仕組みにもそれを取り込んでわかりやすくなっているのかも。
Future
に限らず、そもそもScalaの言語仕様も真面目に勉強しないと・・・
参考資料
tototoshiの日記:アプリケーションに合ったExecutionContextを使う