Edited at

[Scala]scalazのTaskは何が嬉しいのか

More than 1 year has passed since last update.


Task?

scalaz/scalazにあるクラス。


scalaz/Task.scala

これはscalaz/Future.scalaをwrapしたクラスとなっている。


Future?

scala.concurrent.Futureと同じく非同期処理を表現するクラスになっているが、


違いとしてはmapflatMapExecutionContextを必要とせず新しいタスクを生成しないようになっている点。


これによってスレッドの再利用性が向上しているとのこと。

こんな感じ。

(for {

i <- Future.apply(100)
j <- Future.now(200)
k <- Future.delay(300)
} yield i + j + k).unsafePerformAsync(println)
// 600

monadicなAPIを備えているのでfor式が書ける。


Future自体は非同期タスクを定義・記述だけとなっていて、実行するにはunsafePerformAsyncunsafePerformSyncを呼び出す必要がある。

Futureのインスタンスを作成するのにapply, now, delayを使っているが、applyだと別スレッドでの評価、nowは正格評価でdelayは遅延評価、みたいなイメージで使い分ける。

Scala標準のFutureapplyは遅延評価となっているが、Scalazのものは異なる。


さらに大きな違いとして例外処理機構は持っていない。


そのため、nowでもdelayでも評価して例外が発生するとそれは例外になる。

Future.now { sys.error("error"); 1 }.unsafePerformAsync(println)

// Exception in thread "main" java.lang.RuntimeException: error
Future.delay { sys.error("error"); 1 }.unsafePerformAsync(println)
// Exception in thread "main" java.lang.RuntimeException: error

ちなみにapplyだと闇に消える...。

Future.apply { sys.error("error"); 300 }.unsafePerformAsync(println)

// 何も表示されない

Futureの実装クラスはこの辺にあって、例外っぽいものを表現する型はない。

じゃあ例外処理はどうするの...?となった時にTaskが登場する。


Task?

改めて。


scalaz.concurrent.Futureに例外処理を組み込んだもの。


といいつつScala標準Futureのようなものではなく、Future[Throwable \/ A]として例外を保持するようになっている。

class Task[+A](val get: Future[Throwable \/ A]) {

...

実装はこのあたり

さっきと同じようなプログラムをTaskで書いてみると以下のようになる

(for {

i <- Task.apply(100)
j <- Task.now(200)
k <- Task.delay(300)
} yield i + j + k).unsafePerformAsync(println)
// \/-(600)

Futureと同じように書けるが、結果は\/-になっているのが分かる。


Futureと違う点として、apply, delayにおいて例外が起きるようにしてみると、-\/として結果を受け取る事もできるが、nowだと引数を正格評価するのでその時点で例外が発生する。

Task.apply { sys.error("error"); 100 }.unsafePerformAsync(println)

// -\/(java.lang.RuntimeException: error)

Task.delay { sys.error("error"); 200 }.unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: error)

Task.now { sys.error("error"); 300 }.unsafePerformAsync(println)
// Exception in thread "main" java.lang.RuntimeException: error


Taskの嬉しいところ


Future的操作

ScalaのFutureのようにコールバックや例外処理の設定を付与できる。

Task.delay { sys.error("error") }.onFinish { errOpt: Option[Throwable] =>

Task.now {
errOpt match {
case Some(err) => println(s"fail! ${err.getMessage}")
case None => println("success!")
}
}
}.unsafePerformAsync(println)
// fail! error
// -\/(java.lang.RuntimeException: error)

Task { sys.error("fail") }.handle { case t: Throwable => "success" }.unsafePerformAsync(println)
// \/-(success)

FutureonCompleteと違ってコールバックが設定されたTaskが返ってくるのが地味に嬉しい。


実行方法の種類

すでに上でunsafePerformAsyncというメソッドを叩いているが、名前の通りasyncかつ危険な実行という感じ。


\/で返ってくるのだからunsafeじゃないような気もするが...。

Taskの実行方法はざっくり以下。



  • unsafePerformAsync


    • 非同期実行で結果は\/に包まれる




  • unsafePerformSync


    • 同期実行




  • unsafePerformSyncAttempt


    • 同期実行で結果は\/に包まれる




  • unsafePerformSyncFor


    • 同期実行でタイムアウト設定可能




  • unsafePerformSyncAttemptFor


    • 同期実行でタイムアウト設定可能で結果は\/に包まれる



Task.apply { 100 }.unsafePerformAsync(println)

// \/-(100)
Task.apply { sys.error("fail"); 100 }.unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: fail)

println(Task.apply { 100 }.unsafePerformSync)
// 100
println(Task.apply { sys.error("fail"); 100 }.unsafePerformSync)
// Exception in thread "main" java.lang.RuntimeException: fail

println(Task.apply { 100 }.unsafePerformSyncAttempt)
// \/-(100)
println(Task.apply { sys.error("fail"); 100 }.unsafePerformSyncAttempt)
// -\/(java.lang.RuntimeException: fail)

import scala.concurrent.duration._
val d = 10.millis
println(Task { 100 }.unsafePerformSyncFor(d))
// 100
println(Task { Thread.sleep(1000); 100 }.unsafePerformSyncFor(d))
// Exception in thread "main" java.util.concurrent.TimeoutException: Timed out after 10 milliseconds

println(Task { 100 }.unsafePerformSyncAttemptFor(d))
// \/-(100)
println(Task { Thread.sleep(1000); 100 }.unsafePerformSyncAttemptFor(d))
// -\/(java.util.concurrent.TimeoutException: Timed out after 10 milliseconds)

安全に使うなら、unsafePerformAsyncunsafePerformSyncAttemptForのどちらかを使うことになりそう。


ちなみにタイムアウトはtimedメソッドでも設定できる。

Task { Thread.sleep(100); 100 }.timed(10.millis).unsafePerformAsync(println)

// -\/(java.util.concurrent.TimeoutException: Timed out after 10 milliseconds)


スケジューリングやリトライ出来る

retryTask.scheduleで設定可能。

Task { 100 }.retry(List.fill(2)(10.millis)).unsafePerformAsync(println)

// \/-(100)
Task { sys.error("fail"); 100 }.retry(List.fill(2)(10.millis)).unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: fail)

Task.schedule(100, 10.millis).unsafePerformAsync(println)
// \/-(100)

豊富なAPIが便利。


ScalaのFutureに変換する

実際問題、開発の基盤としてScalazいれてTaskを全面で使っていくにしても他のライブラリとの兼ね合いでscala.concurrent.Futureを使いたいシーンも多いはず。


そういう時にはVerizon/deloreanみたいな変換ライブラリもあるので、合わせて入れておくと便利かもしれない。delorean/package.scala at master · Verizon/delorean


まとめ

ScalazのTask便利。

ScalaのFutureと同じようなこと出来る + 便利なAPIが生えているというイメージ。

ただ、関数型っぽく明示的な実行をキックしない限り走り出さないのでそこは少し慣れが必要かもしれない。

あわせて読みたい

ScalazのTaskを使う上で知っておきたいこと - Qiita