Future scalaで非同期演算を扱いたい時に使う
未完了、失敗、成功の状態がある
#基本
Future(Thread.sleep(1000 * 10)) // Future(<not completed>) 未完了
Future(1) // Future(Success(1)) 成功
Future(1/0) // Future(Failure(java.lang.ArithmeticException: / by zero)) 失敗、例外を保持
Future(throw new OutOfMemoryError)
//java.lang.OutOfMemoryError 致命的な例外 ExecutionContextのreportFailureが呼ばれその後
//Thread.uncaughtexceptionhandlerが呼ばれる. Futureは完了しない
※ Fatal(致命的)な例外とは以下の例外のもの
- VirtualMachineError,
- OutOfMemoryとか
- ThreadDeath,
- スレッドが死んだらどうしようもない
- InterruptedException,
- 割り込み要求されたら中断して投げるやつ
- LinkageError,
- AbstractMethodErrorなど
- ControlThrowable,
-
Breakable.break
とか
-
- NotImplementedError
trait Future[+T] extends Awaitable[T]
Future traitに定義されているメソッドはAwaitableにあるreadyとresultメソッド以外は全部ノンブロッキング
Futureのメソッドたち
成功, 失敗したら副作用を実行
成功時
Future(1).foreach(println) // 1
失敗時
Future(1/0).failed.foreach(println) // java.lang.ArithmeticException: / by zero
* onSuccess, onFailureはScala 2.12から非奨励
成功も失敗も両方扱いたい(戻り値はUnit)
Future(1 / 0).andThen {
case Success(x) => println(x)
case Failure(_: ArithmeticException) => println("/ by zero")
}
計算結果を使って処理したい
Future(1).map(_ + 1) // Future(Success(2))
途中で失敗したら後続の処理はskipしてそのまま結果が帰る
Future(1).map(_ / 0).map(_ + 1) // Future(Failure(java.lang.ArithmeticException: / by zero))
計算結果をFutureを返す関数に渡して合成したい
def futureFunc(i: Int): Future[Int] = ???
Future(1).flatMap(futureFunc)
Futureを返す関数が複数合成したい時はfor式を使える
for {
x1 <- futureFunc1()
x2 <- futureFunc2(x1)
x3 <- futureFunc3(x2)
} yield x3
- ただしfutureFunc2はfutureFunc1が完了したあとに実行
futureFunc3はfutureFunc2が完了した後に実行される事に注意(並列に実行される訳ではない)
Futureを並列に実行して合成したい
合成したいものが二つの場合
(Future(1) zip Future(2)).map { case (x, y) => x + y } // Future(Success(3))
scala 2.12では
(Future(1) zipWith Future(2))((x, y) => x + y) // Future(Success(3))
と書けるようになった
二つより多い時
scalazの|@|を使うと便利
import scalaz._
(Future(1) |@| Future(2) |@| Future(3))(_ + _ + _) // Future(Success(6))
catsではこう書く
import cats._
import cats.implicits._
(Future(1), Future(2), Future(3)).mapN(_ + _ + _) // Future(Success(6))
個数が決まってないものはFuture.sequenceを使う
//Seq[Future[A]]を Future[Seq[A]]に変換
Future.sequence(Seq(Future(1), Future(2), Future(3)))// Future(Success(List(1, 2, 3))
特定の例外を回復したい時
recover
Future(1 / 0).recover {
case _: ArithmeticException => 0
}.map(_ + 1) //Future(Success(1))
成功も失敗も両方扱いたい(戻り値あり)
Future(1).transform(_ + 1, ex => ex)
このtransform 成功は成功 失敗は失敗にしか変換できず使いづらかったが
2.12以降は
transform[T](f: Try[U] => Try[T])
とTryを受け取りTryを返せるようになったので
Future(1).transform {
case Success(i) => Success(i + 1)
case Failure(e) => Success(0)
}
失敗 成功両方を扱えみたいに書けるようになり実用的になった
Futureのコンパニオンオブジェクトに生えている便利メソッド達
Future.successful
//Futureを実行したくないが型は合わせたい時
futureFunc1().flatMap { i =>
if (i > 0)
futureFunc2(i)
else
Future.successful(0)
}
Future.sequence
//Seq[Future[A]]を Future[Seq[A]]に変換
Future.sequence(Seq(Future(1), Future(2), Future(3)))// Future(Success(List(1, 2, 3))
//失敗したものあった時には失敗が帰る
Future.sequence(Seq(Future(1), Future(throw new Exception), Future(3)))// Future(Failure(java.lang.Exception))
Future.firstCompletedOf
//最初に完了したものを返す
Future.firstCompletedOf(Seq(Future { Thread.sleep(1000); 1 }, Future(2))) //Future(Success(2))
//最初に完了したものが失敗だったら失敗を返す
Future.firstCompletedOf(Seq(Future { Thread.sleep(1000); 1 }, Future(throw new Exception()))) //Future(Failure(java.lang.Exception))
Future.find
//条件に合うものを返す
Future.find(List(Future { Thread.sleep(1000); 2 }, Future(4)))(_ % 2 == 0) //Future(Success(Some(2)))
//失敗したものは無視する
Future.find(List(Future { Thread.sleep(1000); 2 }, Future(throw new Exception)))(_ % 2 == 0) //Future(Success(Some(2)))
scala 2.12で これまでのfind(以前は失敗を無視しないで報告していていた)は非奨励になった
Future.traverse
//Seq(1, 2).map(Future(_)).sequnceと同じ
Future.traverse(Seq(1, 2))(Future(_)) // Future(Success(List(1, 2, 3))
scala 2.12から
fold, reduceが非奨励になり代わりにfoldLeft, reduceLeftが増えた。
Future.unit
Future.successfull(())
を
Future.unit
と書けるようになった
また永遠に計算を完了しない
Future.never
もできた
ExecutionContext
- Futureの各メソッドでimplicit parameterとして要求されるもの
- Futureは何を計算するかのWhatを表すがExecutionContextはどのように並列実行するかのHowを表す
- 基本スレッドプールのようなものでデフォルトはjavaのFork/Joinフレームワーク
import scala.concurrent.ExecutionContext.Implicits.global
- akka actorのデフォルトもForkJoinPool
ExecutionContextの定義はシンプル
executeに渡されるRunnableがFutureに渡されていた計算
def execute(runnable: Runnable) = runnable.runとかすると同期Futureとか作れる
*prepareは無視していいです
trait ExecutionContext {
def execute(runnable: Runnable): Unit
def reportFailure(@deprecatedName('t) cause: Throwable): Unit
def prepare(): ExecutionContext = this
}
#代表的なスレッドプール
固定スレッドプールの概念図
キューに溜まったtaskを待機スレッドに渡し実行する わかり易いモデル
しかし問題点も。。
- 並列数(=スレッド)を増やすとtaskキューへのロックが多発して性能が劣化する
- スレッドを増やしても並列数はcpuのcore数分
- スレッドを増やしすぎるとコンテキストスイッチが多発して性能劣化(=C10K問題)
つまり 従来の1リクエスト=1スレッドでは同時接続数に上限がある
そこでfork-join-pool
- スレッド数がコア数×N個ほどなのでコンテキストスイッチが少ない
- 各スレッドがキューを持っているのでロック獲得待ちが少ない
- 暇なスレッドが出来たら他のスレッドからjobを奪う(work-stealing)のするのでバランスよくjobが振り分けられる
- 1リクエスト=1スレッドではないのでスレッド数の上限が同時接続数の上限ではなくなる!
無敵に見えるが向き不向きがある。。
問題
import scala.concurrent.ExecutionContext.Implicits.global
(1 to 8).map(_ => Future {Thread.sleep(1000 * 30)})
Future(1)
論理コアが8のcpuで最後のFuture(1)
の計算は最低で何秒かかるでしょうか
答え 最低でも30秒
- ForkJoinPoolのデフォルトのスレッド数が論理コア数なのでスレッドを論理コア数分ブロックしてしまうと
全体が止まる - ForkJoinが得意なものはスレッドをブロックしない処理、 重い計算やブロッキングIOは基本的にやるべきではない
(例えばakka-actorだと全actorが止まってメッセージを処理出来なくってしまう) - ここでいうブロッキング処理とはブロッキングIO、高価な計算、 Thread.sleep, synchronizedなどによるスレッドの資源獲得待ちのこと
といっても全てのIOがノンブロッキングではないし全ての計算をノンブロッキングな粒度まで分解できる訳ではない
ではForkJoinPoolでブロッキングする処理を扱いたい時にはどうするか
方法1
ForkJoinPool限定だがblockingメソッドでスレッドをブロックする処理を囲めばForkJoinPoolが必要と判断した場合に
Threadを新規に作りブロックしないようにしてくれる
import scala.concurrent._
(1 to 8).map(_ => Future(blocking {Thread.sleep(1000 * 30)}))
が新規にスレッドを作るのはコストが高いし必ずしもスレッドを作ってくれる訳ではない
方法2
王道
固定スレッドプールのExecutionContextを作ってそこでブロックする処理を書く
val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(num))
(1 to 8).map(_ => Future {Thread.sleep(1000 * 30)}(ec))
これでForkJoinを使用しているFutureはブロックされないようになる
しかし結局 固定スレッドプールをコンテキストスイッチが多発して性能は劣化する
(そもそも論としてマシンあたりの同時接続数をそこまで要求しない普通のwebアプリの場合固定スレッドプールの方が扱いやすいのかもしれない)
まとめ
- FutureはmapやflatMapを使って合成できる
- Future objectには便利なメソッドが生えている
- デフォルトのfork-joinスレッドプールをブロックしてはいけない
- ブロッキングをしたい時は使用するスレッドプールを吟味する
- Futureは用法を守って正しく服用する