概要
並列に実行される複数の演算を取り扱うのに便利な scala の標準の Future ですが、「Future 内で実行した処理が遅すぎるので timeout させて kill / stop したい」 みたいなことはないでしょうか。
結論からいうと、 Future には停止させる機能はありません。
なぜ無いのか、そしてよく使われる timeout のコードではダメなのか、 みたいなのを調べていきます。
よく使われる Future の timeout
akka.pattern.after
と Future.firstCompletedOf
で Future のタイムアウトを設定するのがよく使われるアレだと思います。
// なんか重い処理
val nannkaOmoiSyori = Future { Thread.sleep(1000); println("重い処理 Done") }
val timeout = akka.pattern.after(500.milliseconds, using = system.scheduler) {
Future.failed(new Exception("timeout されたよ!"))
}
val f = Future.firstCompletedOf(Seq(nannkaOmoiSyori, timeout))
Await.result(f, 2.seconds)
上記実行せずとも Future.firstCompletedOf
と書いてあること変わる通り、 Seq
の中の Future
の中の最も早く終わった値を f
の中に入れているだけなので、 f
に値が格納された後もnannkaOmoiSyori
は完了するまで実行され続けます。
実行結果です。
Exception in thread "main" java.lang.Exception: timeout されたよ!
at Example$.$anonfun$timeout$1(Example.scala:15)
at akka.pattern.FutureTimeoutSupport.liftedTree1$1(FutureTimeoutSupport.scala:50)
at akka.pattern.FutureTimeoutSupport.$anonfun$after$1(FutureTimeoutSupport.scala:50)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:475)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
重い処理 Done
nannkaOmoiSyori
の println("重い処理 Done")
が Exception
の後に実行されているのが分かりますね。
下記のようにしてもダメです。
// なんか重い処理
val nannkaOmoiSyori = Future { Thread.sleep(1000); println("重い処理 Done") }
// 重い処理より短い timeout
Await.result(f, 500.milliseconds)
Exception in thread "main" java.util.concurrent.TimeoutException: Future timed out after [500 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
...
重い処理 Done
Future を止めることはできないのか..?
Future に キル・停止 の機能がなぜないのか
調べた感じ https://stackoverflow.com/questions/16615750/future-with-timeout-in-scala や https://viktorklang.com/blog/Futures-in-Scala-protips-6.html が参考になったのでまとめると
- cancelを呼び出してから、実際に対象のFutureの処理を停止するまでの間、何かが起こるかもしれない(未来のボディは実行を開始するかもしれない)
- cancelableによって返された未来はキャンセルされたと言っていますが、対象のFutureの処理は実際に実行されました。(これは、Futureの処理が何らかの副作用を実行すると問題になる)
- Futureは共有の読み取り専用ハンドルであり、他のリーダと干渉するメソッドを提供すべきではない
- Futureは共有された読み取り専用ハンドルであるため、他のリーダに干渉するメソッドを提供してはいけない
とのこと。
上記を踏まえてそれでも Future を止めたい場合、上記サイト のコードを参考にすると多分止めることができました。(重い処理 Done
が表示されない、すなわち Future は停止された..?)
val (resultF, cancel) = Future.interruptibly { Thread.sleep(1000); println("重い処理 Done") }
val timeoutF = akka.pattern.after(500.milliseconds, system.scheduler)({
cancel()
Future.failed(new Exception("timeout してキャンセルされたよ"))
})
val f = Future.firstCompletedOf(Seq(resultF, timeoutF))
Await.result(f, 2.seconds)
Exception in thread "main" java.lang.Exception: timeout してキャンセルされたよ
at Example$.$anonfun$timeoutF$1(Example.scala:23)
at akka.pattern.FutureTimeoutSupport.liftedTree1$1(FutureTimeoutSupport.scala:50)
at akka.pattern.FutureTimeoutSupport.$anonfun$after$1(FutureTimeoutSupport.scala:50)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:475)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
まとめ
scala の標準の Future は cancelable ではないので、基本的には止めることはできないし、拡張すればできるけどしない方が良さそう。
もし特定の時間に負荷が高くなり、 Future をたくさん作られて、その中の特定の何かに時間のかかる処理があって timeout したものは kill しないとスレッドが枯渇してしまう..! などがある場合、Future を cancelable に拡張することを検討するよりも、該当部分の処理の Thread Pool を分けたり、あるいは 処理の内容を再検討するべきです。
また、 Scala には Monix という 非同期プログラミングをするためのライブラリがあり、そちらでは cancelable な Future を提供しているので、そちらの導入を検討してみるなどした方が良さそうですね。