Cats Effect: IOというCats Effectが提供する公式ドキュメントを参考にCats EffectのIOモナドを触ってみたので、その内容をベースにIOモナドについて書いてみました。
Cats Effectとは
Scalaのcatsが提供するIOモナドとIOをサポートするいくつかのタイプクラスを提供するライブラリです。
Cats EffectはMonixやFinchなどの最近Scala界隈で話題の(?)ライブラリでもサポートされています。
IOモナドとは
IOモナドはhaskellにあるIOモナドのCats Effect版です。ドキュメントには以下のように書いてあります。詳しくは知らないのですが、scalaz.effect.IOも同じようなものだと思われます。
IO is more similar to common Task implementations than it is to the classic scalaz.effect.IO or even Haskell's IO
今回はIOモナドによる
- タスクの遅延評価
- エラーハンドリング
- 外部リソースの扱い
- 非同期実行
について紹介していきます。
タスクの遅延評価
IOのapplyメソッドのbodyに定義したタスクは遅延評価されます。つまり、IO自体はタスクを定義するだけになっています。
IOを実行するunsafeRunSync()メソッドで実行すると、中身が評価されていることがわかります。
def apply[A](body: => A): IO[A] =
delay(body)
// この時点ではIOのbodyは評価されていない
val ioa = IO { println("hey!") }
ioa.unsafeRunAsync()
// hey!
IOはモナドなので、for式で書くことができます。
// この段階ではbodyは評価されない
val program: IO[Unit] =
for {
_ <- ioa
_ <- ioa
} yield ()
program.unsafeRunSync()
// hey!
// hey!
エラーハンドリング
上で述べたunsafeRunSync()メソッドはただ中身を評価するだけなので、名前の通り安全ではありません。
試しにIOの中で例外を投げて、unsafeRunSync()メソッドで評価するとそのまま例外が投げられプログラムが終了してしまうのがわかります。
IO内で、ファイル参照やデータベース接続、ネットワーク接続などの外部との例外が起きる可能性がある処理をする場合には有効ではありません。
val error = IO { throw new RuntimeException("error") }
println(error.unsafeRunSync())
// Exception in thread "main" java.lang.RuntimeException: error
そこで、IOには例外をハンドリングするメソッドが用意されています。ここでは以下の2つのメソッドを紹介します。
- attempt
タスクの実行結果がEitherに包まれる - handleErrorWith
Throwableをキャッチして例外処理をかける(FutureのrecoverWithみたいなやつ)
val hello = IO { "Hello" }
val error = IO { throw new RuntimeException("error") }
// -------------attempt-------------
println(hello.attempt.unsafeRunSync())
// Right(Hello)
println(error.attempt.unsafeRunSync())
// Left(java.lang.RuntimeException: error)
// -------------handleErrorWith-------------
println(hello.handleErrorWith { err =>
IO.pure(err.getMessage)
}.unsafeRunSync())
// Hello
println(error.handleErrorWith { err =>
IO.pure(err.getMessage)
}.unsafeRunSync())
// error
外部リソースの扱い
ファイル参照やデータベースセッション、httpコネクションなどを行った際には、処理完了後にファイルやコネクションをクローズする必要があります。
迂闊にクローズする作業を忘れてしまうと無駄にメモリを消費してしまう可能性があります。
IOのbracketメソッドは、リソースを扱うタスクを実行する処理とリソースを解放する処理の両方を扱って、リソースの解放を確実にすることを可能にします。
bracketの第一引数にはリソースを受け取って実行するタスク実行する関数を渡し、
第二引数にはリソースを受け取ってリソースの解放を行う関数を渡します。
第二引数は第一引数に渡された処理がどのように終了するか(正常終了、エラー、キャンセル)に関わらず処理が終了したら呼ばれます。
このように実行する処理と解放する処理を一緒に渡すことによって、無駄なメモリの消費などを防ぐことができます。
下の例では、bracketの第一引数でファイルを受け取ってファイルの中身を読み込処理を行い、第二引数でファイルをクローズする処理を行っています。
def bracket[B](use: A => IO[B])(release: A => IO[Unit]): IO[B]
def readFirstLine(file: File): IO[String] =
IO(new BufferedReader(new FileReader((file)))).bracket { in =>
IO(in.readLine())
} { in =>
IO(in.close())
}
非同期処理
IOはタスクを同期にも非同期にも実行することができます。ここまで紹介した処理は全て同期実行でした。
ここからは非同期にタスクを実行する方法を紹介します。
タスクを非同期に実行するためには、Cats Effectに定義されているContextShiftを用いて実行します。このContextShiftはFutureなどで使われているExecutionContextのようなものです。
IOのshiftメソッドは、タスクを実行するスレッドプールを切り替えるもので、切り替えるContextShiftをshiftメソッドに渡すことによって別スレッドで非同期にタスクを実行することができます。以下のコードは、タスクの同期実行と非同期実行を行っています。shiftによってタスクを実行するスレッドが切り替えられていることがわかりますね。
def shift(implicit cs: ContextShift[IO]): IO[Unit]
val task = IO {
println(s"Thread name: ${Thread.currentThread().getName}")
println("task")
}
// -------------同期実行-------------
task.unsafeRunSync()
// Thread name: main
// task
// -------------非同期実行-------------
val contextShift = IO.contextShift(ExecutionContext.global)
IO.shift(contextShift).flatMap(_ => task).unsafeRunSync()
// Thread name: scala-execution-context-global-11
// task
shiftメソッドの使い方としては、スレッドプールを指定することによって、タスク別で実行するスレッドを分割するなどの用途があります。
以下のコードは処理ごとにスレッドプールを使い分けています。
val cachedThreadPool = Executors.newCachedThreadPool()
val executionContextForBlockingFileIO = ExecutionContext.fromExecutor(cachedThreadPool)
val mainExecutionContext = ExecutionContext.global
val ioa: IO[Unit] =
for {
_ <- IO(println("Enter your name: "))
_ <- IO.shift(executionContextForBlockingFileIO)
name <- IO(scala.io.StdIn.readLine()) // Thread name: pool-1-thread-2で実行
_ <- IO.shift(mainExecutionContext)
_ <- IO(println(s"Welcome $name!")) // Thread name: scala-execution-context-global-11で実行
_ <- IO(cachedThreadPool.shutdown())
} yield ()
非同期処理はstartメソッドを用いて実行することも可能です。startをメソッドを実行すると、非同期にタスクが実行されブロッキングが起きません。その代わりにすぐFiberというオブジェクトが返されます。
このFiberオブジェクトは以下の定義のように、joinとcancelの二つのメソッドを持ちます。
JavaのForkJoinTaskの
fork <=> start
join <=> join
cancel <=> cancel
のように解釈することができます。
つまり、startはタスクを指定したスレッドプール内で実行するように調整します。joinは非同期で実行したタスクが完了したあと、結果を取得します。
cancelはタスクのキャンセルを試みます。
Fiberの実装を詳しくみてはいないので、間違えていたらご指摘をお願いします
final def start(implicit cs: ContextShift[IO]): IO[Fiber[IO, A @uncheckedVariance]]
trait Fiber[F[_], A] {
def cancel: CancelToken[F]
def join: F[A]
}
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val fiber: IO[Fiber[IO, Unit]] = ioa.start
for {
f1 <- ioa.start // 処理がブロックされない
f2 <- ioa.start
r1 <- f1.join // 処理結果を取得する
r2 <- f2.join
} yield ()
cancelを使うコード例を以下に示しました。
validationとheavyTaskを非同期実行しています。さきにvalidationが終わり失敗した時に、heavyTaskをキャンセルし、例外を投げています。
例外が投げられるとheavyTaskの結果をjoinで待つことなく、その時点でfor式内のモナドチェーンの処理が終了し、不要になったタスクの完了を待つ必要がなくなります。
ただ、このケースではheavyTaskの実行は始まっているので、heavyTaskの処理はcancelされることなく動き続けています。仮にheavyTaskの実行が始まる前に、validationの処理が完了していれば、heavyTaskは実行されることなくcancelされます。
import cats.syntax.apply._
def validation(data: Data): IO[Boolean]
def heavyTask(data: Data): IO[Data] = IO {
println("start task")
// 重い処理
Thread.sleep(10000)
println("finish task")
...
}
val result = for {
f1 <- validation(data).start
f2 <- heavyTask(data).start
valid <- f1.join
_ <- if(!valid) f2.cancel *> IO.raiseError(new RuntimeException("validation error"))
else IO.unit
res <- f2.join
} yield res
// validationが失敗した際のサンプル
// heavyTaskがfinishする前にresultがLeftで返ってきている
println(result.attempt.unsafeRunSync())
// start task
// Left(java.lang.RuntimeException: validation error)
まとめ
Cats EffectのIOについて簡単に紹介しました。
またSync、Async、EffectなどのタイプクラスがまだCats Effectにはありますが、それはまた別の機会にでも書けたら書こうと思います。
今回紹介した非同期処理やscalaz.effect.io、monixのTaskとの違いなども深掘っていきたいです。