Scala
scalaz

[Scala]ScalazのTaskを使う上で知っておきたいこと

この記事は何?

scalazのTaskを使う上で知っておきたいtipsとか注意点とか。
以前、こういう記事を書いた。

[Scala]scalazのTaskは何が嬉しいのか - Qiita

ScalaのFutureと同じようなこと出来る + 便利なAPIが生えているというイメージ。

と書いたが、実際に使っていく上ではもう少し知っておくべきことがあるのでまとめておく。

Taskは遅延評価される

改めて、Taskは遅延評価されるもの。

Task.nowを使うと名前の通り即時評価されるが、apply, delay, pointなどの別のファクトリを使用すると遅延評価される。
作成したTaskをmapflatMapを使って合成していくことになると思うが、それらも遅延評価されて逐次実行される。

scala標準のFutureと比較してみる。

import scala.concurrent.Future

{
  def future(i: Int) = Future {
    println(s"future sleeping: $i")
    Thread.sleep(i * 10)
    println(s"future wakeup: $i")
    i
  }

  val i = future(10)
  val j = future(20)
  val k = future(30)

  (for {
    _i <- i
    _j <- j
    _k <- k
  } yield { _i * _j * _k }).onComplete(println)
}

これを実行すると以下のような出力が得られる。

future sleeping: 30
future sleeping: 10
future sleeping: 20
future wakeup: 10
future wakeup: 20
future wakeup: 30
Success(6000)

Thread.sleepする前のコードが先に評価されているのがわかる。

↑のFutureをTaskに置き換えてみると以下のようなコードになる。ほぼ同じ。

import scalaz.concurrent.Task
{
  def task(i: Int) = Task {
    println(s"task sleeping: $i")
    Thread.sleep(i * 10)
    println(s"task wakeup: $i")
    i
  }

  val i = task(10)
  val j = task(20)
  val k = task(30)
  (for {
    _i <- i
    _j <- j
    _k <- k
  } yield {_i * _j * _k }).unsafePerformAsync(println)
}

これを実行すると以下のような出力が得られる。

task sleeping: 10
task wakeup: 10
task sleeping: 20
task wakeup: 20
task sleeping: 30
task wakeup: 30
\/-(6000)

先ほどと違ってThread.sleep前後のコードが順番に評価されている。

これが遅延評価されていて逐次実行されるということ。
Futureならfor式の中でFuture.applyを呼ばないようにしておけ並行実行されるが、Taskの場合はfor式で書くと逐次実行されてしまう。

Taskを並行実行したい

遅延評価されて逐次実行されると困る、という場合にどうするか。
選択肢は以下の2つ。

  • Task.gatherUnorderedTask.reduceUnorderedを使う
  • Task.unsafeStartを使う
  • Nondeterminismを使うものについては@xuwei_kさんのコメントを参照して下さい

Nondeterminismを使うのもありだが、ここでは触れない。

Task.gatherUnorderedかTask.reduceUnorderedを使う

複数のTaskをまとめて並行実行するためのAPIとなっている。

val i = task(10)
val j = task(20)
val k = task(30)
val reducer = {
  val adder = (i1: Int) => (i2: Int) => i1 * i2
  Reducer.apply[Int, Int](identity, adder, adder)(Scalaz.intInstance)
}
// 順番入れ替え
Task.reduceUnordered(List(k, i, j))(reducer).unsafePerformAsync(println)

これを実行すると以下のようになる。

task sleeping: 30
task sleeping: 10
task sleeping: 20
task wakeup: 10
task wakeup: 20
task wakeup: 30
\/-(6000)

Thread.sleepする前のコードが先に評価されているのがわかる。

Reducerを用意するのが面倒なので、gatherUnorderdを使った方がコード的には楽。

// 同じ結果になる
Task.gatherUnordered(Seq(i, j, k)).map { _.foldLeft(1) { _ * _ } }.unsafePerformAsync(println)

ここで問題点があって、シグネチャを見ると型パラメータが1つしかない。

def gatherUnordered[A](tasks: Seq[Task[A]], exceptionCancels: Boolean = false): Task[List[A]] = ...
def reduceUnordered[A, M](tasks: Seq[Task[A]], exceptionCancels: Boolean = false)(implicit R: Reducer[A, M]): Task[M] = ...

たとえば複数のテーブルから別々にデータを取得したい場合に共通の型があればいいが、無ければAnyとかになってしまう。

shapelessのHListを使うとか...。

あるいは諦めてTask.unsafeStartを使ったほうが良い。

Task.unsafeStartを使う

これはScala標準のFutureっぽい使い方が出来るTaskを生成するためのAPIになっている。

def task(i: Int) = Task.unsafeStart {
  println(s"task sleeping: $i")
  Thread.sleep(i * 10)
  println(s"task wakeup: $i")
  i
}
val k = task(3)
val i = task(1)
val j = task(2)
(for {
  _i <- i
  _k <- k
  _j <- j
} yield {_i + _j + _k }).unsafePerformAsync(println)

これを実行すると以下のようになる。

task sleeping: 3
task sleeping: 2
task sleeping: 1
task wakeup: 1
task wakeup: 2
task wakeup: 3
\/-(6)

Futureを使った時とほぼ同じような結果が得られている。

並行実行まとめ

やり方は以下の2通り。

  • Task.gatherUnorderedTask.reduceUnorderedを使う
  • Task.unsafeStartを使う

MapReduceのようなイメージで使う場合は前者、Scala標準のFutureのように使う場合は後者、という使い分けだろうか。
通常のアプリケーション的なコードだとunsafeStartを使うことも多くなりそうだが、それならScala標準のFutureを使えば十分だと思う。

トランザクションを抜けてしまう問題

Taskは遅延評価されるもの。

その弊害(?)として、例えばRDBのトランザクション内での処理をしたい場合にTaskを使うとトランザクションが閉じてしまっていたりする問題が発生する。

擬似コードとしては以下のような感じ。

class Transaction(var status: String = "OPEN") {
  def close() = status = "CLOSED"
}
def withTransaction[A](f: Transaction => Task[A]): Task[A] = {
  val ctx = new Transaction("OPEN")
  try {
    f(ctx)
  } finally { ctx.close() } // 本当はcommit/rollback的な処理
}

val t = withTransaction { ctx =>
  Task(s"transaction status: ${ctx.status}")
}
t.unsafePerformASync(println)

実行すると

\/-(transaction status: CLOSED)

と出力される。

この例だとTask#unsafePerformAsyncが実行されてTaskの中身が評価されるタイミングではwithTransactionのブロックを抜けてしまっていることになる。
なので、withTransactionの中でTaskを実行してしまわないといけないことになる。

val t = withTransaction { ctx =>
  Task.fromDisjunction {
    \/.fromTryCatchNonFatal {
      Task(s"transaction status: ${ctx.status }").unsafePerformSync
    }
  }
}
t.unsafePerformAsync(println)

unsafePerformSyncを使って同期的に実行してトランザクションを確定してからブロックを抜けるようなコードになってしまった。

トランザクションのようなローンパターンとTaskは気をつけて組み合わせないといけない。