5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Scala の Future を 停止させたい

Last updated at Posted at 2020-11-02

概要

並列に実行される複数の演算を取り扱うのに便利な scala の標準の Future ですが、「Future 内で実行した処理が遅すぎるので timeout させて kill / stop したい」 みたいなことはないでしょうか。

結論からいうと、 Future には停止させる機能はありません。

なぜ無いのか、そしてよく使われる timeout のコードではダメなのか、 みたいなのを調べていきます。

よく使われる Future の timeout

akka.pattern.afterFuture.firstCompletedOf で Future のタイムアウトを設定するのがよく使われるアレだと思います。

// なんか重い処理
val nannkaOmoiSyori = Future { Thread.sleep(1000); println("重い処理 Done") }

val timeout = akka.pattern.after(500.milliseconds, using = system.scheduler) {
  Future.failed(new Exception("timeout されたよ!"))
}

val f = Future.firstCompletedOf(Seq(nannkaOmoiSyori, timeout))

Await.result(f, 2.seconds)

上記実行せずとも Future.firstCompletedOf と書いてあること変わる通り、 Seq の中の Future の中の最も早く終わった値を f の中に入れているだけなので、 f に値が格納された後もnannkaOmoiSyoriは完了するまで実行され続けます。

実行結果です。

Exception in thread "main" java.lang.Exception: timeout されたよ!
	at Example$.$anonfun$timeout$1(Example.scala:15)
	at akka.pattern.FutureTimeoutSupport.liftedTree1$1(FutureTimeoutSupport.scala:50)
	at akka.pattern.FutureTimeoutSupport.$anonfun$after$1(FutureTimeoutSupport.scala:50)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:475)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
重い処理 Done

nannkaOmoiSyoriprintln("重い処理 Done")Exception の後に実行されているのが分かりますね。

下記のようにしてもダメです。

// なんか重い処理
val nannkaOmoiSyori = Future { Thread.sleep(1000); println("重い処理 Done") }

// 重い処理より短い timeout
Await.result(f, 500.milliseconds)
Exception in thread "main" java.util.concurrent.TimeoutException: Future timed out after [500 milliseconds]
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:212)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
	...
重い処理 Done

Future を止めることはできないのか..?

Future に キル・停止 の機能がなぜないのか

調べた感じ https://stackoverflow.com/questions/16615750/future-with-timeout-in-scalahttps://viktorklang.com/blog/Futures-in-Scala-protips-6.html が参考になったのでまとめると

  • cancelを呼び出してから、実際に対象のFutureの処理を停止するまでの間、何かが起こるかもしれない(未来のボディは実行を開始するかもしれない)
  • cancelableによって返された未来はキャンセルされたと言っていますが、対象のFutureの処理は実際に実行されました。(これは、Futureの処理が何らかの副作用を実行すると問題になる)
  • Futureは共有の読み取り専用ハンドルであり、他のリーダと干渉するメソッドを提供すべきではない
  • Futureは共有された読み取り専用ハンドルであるため、他のリーダに干渉するメソッドを提供してはいけない

とのこと。

上記を踏まえてそれでも Future を止めたい場合、上記サイト のコードを参考にすると多分止めることができました。(重い処理 Done が表示されない、すなわち Future は停止された..?)

val (resultF, cancel) = Future.interruptibly { Thread.sleep(1000); println("重い処理 Done") }

val timeoutF = akka.pattern.after(500.milliseconds, system.scheduler)({
  cancel()
  Future.failed(new Exception("timeout してキャンセルされたよ"))
})

val f = Future.firstCompletedOf(Seq(resultF, timeoutF))

Await.result(f, 2.seconds)
Exception in thread "main" java.lang.Exception: timeout してキャンセルされたよ
	at Example$.$anonfun$timeoutF$1(Example.scala:23)
	at akka.pattern.FutureTimeoutSupport.liftedTree1$1(FutureTimeoutSupport.scala:50)
	at akka.pattern.FutureTimeoutSupport.$anonfun$after$1(FutureTimeoutSupport.scala:50)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:475)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

まとめ

scala の標準の Future は cancelable ではないので、基本的には止めることはできないし、拡張すればできるけどしない方が良さそう。

もし特定の時間に負荷が高くなり、 Future をたくさん作られて、その中の特定の何かに時間のかかる処理があって timeout したものは kill しないとスレッドが枯渇してしまう..! などがある場合、Future を cancelable に拡張することを検討するよりも、該当部分の処理の Thread Pool を分けたり、あるいは 処理の内容を再検討するべきです。

また、 Scala には Monix という 非同期プログラミングをするためのライブラリがあり、そちらでは cancelable な Future を提供しているので、そちらの導入を検討してみるなどした方が良さそうですね。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?