この記事は何?
scalazのTaskを使う上で知っておきたいtipsとか注意点とか。
以前、こういう記事を書いた。
[Scala]scalazのTaskは何が嬉しいのか - Qiita
ScalaのFutureと同じようなこと出来る + 便利なAPIが生えているというイメージ。
と書いたが、実際に使っていく上ではもう少し知っておくべきことがあるのでまとめておく。
Taskは遅延評価される
改めて、Taskは遅延評価されるもの。
Task.now
を使うと名前の通り即時評価されるが、apply
, delay
, point
などの別のファクトリを使用すると遅延評価される。
作成したTaskをmap
やflatMap
を使って合成していくことになると思うが、それらも遅延評価されて逐次実行される。
scala標準のFutureと比較してみる。
import scala.concurrent.Future
{
def future(i: Int) = Future {
println(s"future sleeping: $i")
Thread.sleep(i * 10)
println(s"future wakeup: $i")
i
}
val i = future(10)
val j = future(20)
val k = future(30)
(for {
_i <- i
_j <- j
_k <- k
} yield { _i * _j * _k }).onComplete(println)
}
これを実行すると以下のような出力が得られる。
future sleeping: 30
future sleeping: 10
future sleeping: 20
future wakeup: 10
future wakeup: 20
future wakeup: 30
Success(6000)
Thread.sleep
する前のコードが先に評価されているのがわかる。
↑のFutureをTaskに置き換えてみると以下のようなコードになる。ほぼ同じ。
import scalaz.concurrent.Task
{
def task(i: Int) = Task {
println(s"task sleeping: $i")
Thread.sleep(i * 10)
println(s"task wakeup: $i")
i
}
val i = task(10)
val j = task(20)
val k = task(30)
(for {
_i <- i
_j <- j
_k <- k
} yield {_i * _j * _k }).unsafePerformAsync(println)
}
これを実行すると以下のような出力が得られる。
task sleeping: 10
task wakeup: 10
task sleeping: 20
task wakeup: 20
task sleeping: 30
task wakeup: 30
/-(6000)
先ほどと違ってThread.sleep
前後のコードが順番に評価されている。
これが遅延評価されていて逐次実行されるということ。
Futureならfor式の中でFuture.apply
を呼ばないようにしておけ並行実行されるが、Taskの場合はfor式で書くと逐次実行されてしまう。
Taskを並行実行したい
遅延評価されて逐次実行されると困る、という場合にどうするか。
選択肢は以下の2つ。
-
Task.gatherUnordered
かTask.reduceUnordered
を使う -
Task.unsafeStart
を使う -
Nondeterminism
を使うものについては@xuwei_kさんのコメントを参照して下さい
Nondeterminism
を使うのもありだが、ここでは触れない。
Task.gatherUnorderedかTask.reduceUnorderedを使う
複数のTaskをまとめて並行実行するためのAPIとなっている。
val i = task(10)
val j = task(20)
val k = task(30)
val reducer = {
val adder = (i1: Int) => (i2: Int) => i1 * i2
Reducer.apply[Int, Int](identity, adder, adder)(Scalaz.intInstance)
}
// 順番入れ替え
Task.reduceUnordered(List(k, i, j))(reducer).unsafePerformAsync(println)
これを実行すると以下のようになる。
task sleeping: 30
task sleeping: 10
task sleeping: 20
task wakeup: 10
task wakeup: 20
task wakeup: 30
/-(6000)
Thread.sleep
する前のコードが先に評価されているのがわかる。
Reducer
を用意するのが面倒なので、gatherUnorderd
を使った方がコード的には楽。
// 同じ結果になる
Task.gatherUnordered(Seq(i, j, k)).map { _.foldLeft(1) { _ * _ } }.unsafePerformAsync(println)
ここで問題点があって、シグネチャを見ると型パラメータが1つしかない。
def gatherUnordered[A](tasks: Seq[Task[A]], exceptionCancels: Boolean = false): Task[List[A]] = ...
def reduceUnordered[A, M](tasks: Seq[Task[A]], exceptionCancels: Boolean = false)(implicit R: Reducer[A, M]): Task[M] = ...
たとえば複数のテーブルから別々にデータを取得したい場合に共通の型があればいいが、無ければAny
とかになってしまう。
shapelessのHListを使うとか...。
あるいは諦めてTask.unsafeStart
を使ったほうが良い。
Task.unsafeStartを使う
これはScala標準のFutureっぽい使い方が出来るTaskを生成するためのAPIになっている。
def task(i: Int) = Task.unsafeStart {
println(s"task sleeping: $i")
Thread.sleep(i * 10)
println(s"task wakeup: $i")
i
}
val k = task(3)
val i = task(1)
val j = task(2)
(for {
_i <- i
_k <- k
_j <- j
} yield {_i + _j + _k }).unsafePerformAsync(println)
これを実行すると以下のようになる。
task sleeping: 3
task sleeping: 2
task sleeping: 1
task wakeup: 1
task wakeup: 2
task wakeup: 3
/-(6)
Futureを使った時とほぼ同じような結果が得られている。
並行実行まとめ
やり方は以下の2通り。
-
Task.gatherUnordered
かTask.reduceUnordered
を使う -
Task.unsafeStart
を使う
MapReduceのようなイメージで使う場合は前者、Scala標準のFutureのように使う場合は後者、という使い分けだろうか。
通常のアプリケーション的なコードだとunsafeStart
を使うことも多くなりそうだが、それならScala標準のFutureを使えば十分だと思う。
トランザクションを抜けてしまう問題
Taskは遅延評価されるもの。
その弊害(?)として、例えばRDBのトランザクション内での処理をしたい場合にTaskを使うとトランザクションが閉じてしまっていたりする問題が発生する。
擬似コードとしては以下のような感じ。
class Transaction(var status: String = "OPEN") {
def close() = status = "CLOSED"
}
def withTransaction[A](f: Transaction => Task[A]): Task[A] = {
val ctx = new Transaction("OPEN")
try {
f(ctx)
} finally { ctx.close() } // 本当はcommit/rollback的な処理
}
val t = withTransaction { ctx =>
Task(s"transaction status: ${ctx.status}")
}
t.unsafePerformASync(println)
実行すると
/-(transaction status: CLOSED)
と出力される。
この例だとTask#unsafePerformAsync
が実行されてTaskの中身が評価されるタイミングではwithTransaction
のブロックを抜けてしまっていることになる。
なので、withTransaction
の中でTaskを実行してしまわないといけないことになる。
val t = withTransaction { ctx =>
Task.fromDisjunction {
\/.fromTryCatchNonFatal {
Task(s"transaction status: ${ctx.status }").unsafePerformSync
}
}
}
t.unsafePerformAsync(println)
unsafePerformSync
を使って同期的に実行してトランザクションを確定してからブロックを抜けるようなコードになってしまった。
トランザクションのようなローンパターンとTaskは気をつけて組み合わせないといけない。