浅い理解で書き散らかした自分用メモ。
基本
cats.effect.IO と fs2.Stream の関係
fs2 の内部で cats-effect を利用しているため、シームレスな連携が実現できている(らしい)。
依存など
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-core" % "2.12.0",
"org.typelevel" %% "cats-effect" % "3.5.4",
"co.fs2" %% "fs2-core" % "3.11.0"
),
initialCommands := s"""
import cats.syntax.all.*, cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.*
import scala.concurrent.duration.*, java.util.concurrent.*
"""
IO
IO とは
「副作用があり得る計算」を表す「値」である。ただの値なので、定義されただけでは実コードは実行(計算)されない。明示的に実行すると計算され、成功すると結果が返る(失敗すると例えば例外がスローされる)。遅延評価される Future と捉えてもよい。
- 同期計算あるいは非同期計算を記述できる
- 内部処理をキャンセル可能にすることもできる
次の2種類の値(状態)を持つ。概念としては Option 型が Some と None から成るのに近い。
Pure: 純粋関数
IO.pure(5 * 2) // IO(10)
与えたコード(のブロック)は即時実行される。ただし返される値は IO 型であり、まだ「結果」は出ていないことに注意。ここでは、5 * 2
自体は即時計算されたが、結果の IO.pure(10)
はただの「副作用があり得る計算」を表す「値」に過ぎない。
Delay: 遅延実行する関数
IO.delay(println("hello")) // IO(...)
IO.delay(5 / 0) // IO(...) まだ実行されていないのでゼロ除算例外は発生しない
与えたコード(のブロック)は、あとで「実行」されるまで評価されない。
IO を定義(生成)する
IO.pure()
や IO.delay()
を直接呼んでもいいが、実運用上は、あえて Pure であることを明示したい場合でなければ、一貫して IO()
や IO {}
で生成してもよいだろう。
import cats.effect.IO
val io: IO[Int] = IO(10 * 2) // IO(...)
val io: IO[String] = IO { "hello" } // IO(...)
val io: IO[Unit] = IO { println("hello") } // IO(...)
ちなみに IO(10 * 2)
と IO.pure(10 * 2)
の結果型は違う(実用上の差違はほぼないだろうが)。
IO を実行する
実行コンテキストが必要なのでインポートしておく。
unsafeRunSync()
を呼ぶと、IO が実行され計算された値が返る。
import cats.effect.unsafe.implicits.global
IO(5 * 2).unsafeRunSync() // Int = 10
IO { Seq(1, 2, 3).map(_ * 2) }.unsafeRunSync() // Seq[Int] = List(2, 4, 6)
ほかにもいくつか unsafeRunXXX
関数がある。
IO 実行時に発生する例外を処理する
しかし unsafeRunSync()
を生で実行すると、コードを実行中に発生した例外は普通にリスローされてしまう。
IO(5 / 0).unsafeRunSync() // java.lang.ArithmeticException: / by zero
そこで attempt()
を通し結果を Either に包むと便利。
IO(5 / 1).attempt.unsafeRunSync() // Right(5)
IO(5 / 0).attempt.unsafeRunSync() // Left(java.lang.ArithmeticException: / by zero)
エラーハンドリング
handleErrorWith()
が使える。副作用にしか興味がない場合に便利かもしれない。
IO(5 / 0)
.handleErrorWith { error =>
IO.pure(error.getMessage)
}
.unsafeRunSync() // Int | String = / by zero
複数の IO を合成する
IO 同士を flatMap()
等を使って合成することができる。
val io: IO[Int] = IO(5).flatMap { n => IO(n * 2) }
通常は、複数の IO を for 文で組み上げていくことになるだろう。
val io: IO[Int] =
for
n <- IO(5)
_ <- IO.println("calculating..")
result <- IO(n * 2)
yield result
複数の IO がある場合は mapN()
が使える。
import cats.syntax.all.*
val io: IO[Int] = (IO(10), IO(20), IO(30)).mapN { (a, b, c) => a + b + c }
io.unsafeRunSync() // Int = 60
次の例は、順に実行するが、結果は最後の IO のものだけを返す。
import cats.syntax.all.*
val io: IO[Int] = IO.println("hello") *> IO(42)
io.unsafeRunSync() // Int = 42
なお、途中の計算結果は(副作用を除いて)捨てられる。
val io: IO[Int] = IO(10) *> IO(20) *> IO(30)
io.unsafeRunSync() // Int = 30
リストを順次実行し、結果のリストを返す。
val io: IO[List[Int]] = List(IO(10), IO(20), IO(30)).sequence
io.unsafeRunSync() // List[Int] = List(10, 20, 30)
IO に対しては >> も *> とほぼ同様に動作する。ただし、IO に留まらない広範な型を扱う際には、より(型的に)広い意味を持つ *> が望ましいようだ。とはいえ、IO くらいしか利用しないケースでは可読性を優先して >> で統一してもいいかもしれない。
方針
- 処理を小さく分割し IO に包む
- 複数の IO を合成して1つの IO を作る
- メイン処理(ビジネスロジック)の最後に1回だけ
unsafeRunSync()
を実行する
入れ子になった IO をまとめる
val doubleIO: IO[IO[String]] = IO { IO("hello") } // 入れ子
val io: IO[String] = doubleIO.flatten
flatMap()
も使える。
val io: IO[String] = doubleIO.flatMap(identity)
Future から変換する
既存の Future を IO に変換し遅延実行できる。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
def sum(list: List[Int])(implicit ec: ExecutionContext): Future[Long] =
println("calc sum!")
Thread.sleep(500)
Future(list.foldLeft(0L)(_ + _))
val io: IO[Long] = IO.fromFuture(IO { sum(List(1, 2, 3, 4, 5)) })
先に完了した IO の結果だけ欲しい (race)
race()
が使える。先に終わらなかったタスク(敗者)はキャンセルされる(Stream などで更に計算が続く場合)。
val task1 = IO.sleep(100.millis) *> IO(10)
val task2 = IO.sleep(200.millis) *> IO(20)
val io: IO[Either[Int, Int]] = IO.race(task1, task2)
io.flatMap {
case Left(value) => IO.println(s"Left: $value")
case Right(value) => IO.println(s"Right: $value")
}.unsafeRunSync() // Left(10)
並列実行させる (parSequence)
parSequence()
が使える。それぞれの IO を並列実行し、すべてが終了するまで待機する(結果を返す)。固定長のタプルには parTupled()
が使える。
def buildTask(i: Int): IO[Int] =
IO.println(s"task $i") *> IO.sleep(2.seconds) *> IO { i * 10 }
Seq(1, 2, 3)
.map(buildTask)
.parSequence
.unsafeRunSync() // List(10, 20, 30)
並列数を指定したい場合は Stream.parEvalMap()
等を使う(後述)。
後処理を確実に実行したい (bracket)
bracket()
を使えば try-finally 的な処理を書けて便利。
val io: IO[Int] =
IO(10).bracket { in =>
// try ブロックと見なせる
IO.println("started") *> IO { in + 5 }
} { _in =>
// finally ブロックと見なせる
IO.println("ended") *> IO.sleep(200.millis)
}
より細かく制御したいなら Rsource 型というのがある。
タイムアウトさせる (timeoutTo)
timeout()
もあるが、タイムアウト時の処理を IO で指定できる timeoutTo()
が便利そうだ。
val slowIO: IO[Int] = IO.sleep(5000.millis) *> IO { 42 }
val io = slowIO.timeoutTo(2.seconds,
IO.raiseError(new RuntimeException("Operation timed out!")))
Thread.sleep()
のようにスレッドをブロックしてしまう処理がある場合は Fiber を使ったより複雑な手法が必要。
細かく制御したい場合は次の手法もある。
import cats.effect.Temporal
val io = Temporal[IO].timeout(slowTask, 2.seconds)
SyncIO
同期処理に限定された IO 型。
import cats.effect.SyncIO
val io: SyncIO[Int] = SyncIO { 42 }
SyncIO
は IO
と合成できず、しかも unsafeRunSync()
でしか実行できない(unsafeRunAsync()
等は使えない)。短い処理で、パフォーマンスを出したい時に使うようだ。
IO と混ぜるには、IO に変換する必要がある。
val io: IO[Int] = SyncIO(42).to[IO]
参照 (Ref)
IO.ref[A]
は、型 A
を保持する可変な「参照(Ref)」を IO
の中で作成する。参照はスレッドセーフで、安全に更新できる。つまり、スレッド間で共有可能な「状態」を純粋に扱える。
import cats.effect.Ref
val ref: IO[Ref[IO, Int]] = IO.ref[Int](10)
ちなみに、Ref
は型パラメータを持ち、エフェクト型は IO
に限定されない。
import cats.effect.SyncIO
val io: IO[Ref[IO, Int]] = Ref[IO].of(10)
val io: SyncIO[Ref[SyncIO, Int]] = Ref[SyncIO].of(10)
get()
で現在の値を取得する。update()
で値を更新する。
val io =
for
ref <- IO.ref(10)
_ <- ref.update(_ + 1)
value <- ref.get
_ <- IO.println(value) // 11
yield ()
modify()
は、現在の値に基づいて新しい値を計算し、同時に他の副作用を発生させることができる。
val io =
for
ref <- IO.ref(10)
result <- ref.modify { oldValue =>
val newValue = oldValue * 2
// 副作用の例
println(s"Updating counter from $oldValue to $newValue")
(newValue, oldValue) // 新しい値と戻り値を返す
}
_ <- IO.println(result) // 元の値 (10)
newValue <- ref.get
_ <- IO.println(newValue) // 更新後の値 (20)
yield ()
複数のスレッド(ファイバー)で並列更新する例。並列でも正しい結果が得られる。
val io =
for
ref <- IO.ref(0)
_ <- List.fill(1000)(ref.update(_ + 1)).parSequence
value <- ref.get
_ <- IO.println(value) // 1000
yield ()
安全なカウンターやフラグ(例:Stream を打ち切る)として使われることが多い。
Stream
以下は fs2.Stream について。Scala 組み込みの旧 Stream (現 LazyList) とは別モノなので注意。
一般化した型として Stream[F, O]
と書ける。F
はエフェクト型、O
はアウトプット型である。具体的には、Stream[Pure, Int]
や Stream[IO, Int]
といった形で表現することになる。
基本
作成する
val s: Stream[Pure, String] = Stream.emit("hello") // 要素は1つのみ
val s: Stream[Pure, String] = Stream.constant("hello") // 無限に続く。結果は List[String]
val s: Stream[Pure, Int] = Stream(1, 2, 3) // 要素は3つのみ
val s: Stream[Pure, String] = Stream("apple", "banana", "chocolate") // 同上
val s: Stream[Pure, Int] = Stream(1, 2, 3).repeat // 無限に続く。結果は List[Int]
val s: Stream[Pure, Int] = Stream(1, 2, 3).repeatN(5) // 1, 2, 3 が5回繰り返される
val numbers: Stream[Pure, Int] = Stream(1, 2, 3, 4, 5)
val oddNumbers: Stream[Pure, Int] = numbers.filter(_ % 2 != 0)
もちろんファイルや標準入力などからも作成できるが省略。
評価する
Stream は無限に値を生成し続ける(可能性がある)ので、実際に計算する際には take()
等で有限の値に落とし込んでから処理するケースが多いだろう。
val result: List[Int] = Stream(1, 2, 3).repeat.take(5).toList // List(1, 2, 3, 1, 2)
合成する
// 結合する
(Stream(1, 2, 3) ++ Stream(4, 5, 6) ++ Stream(7, 8, 9)).take(20).toList
(oddNumbers ++ Stream(0).repeat).take(10).toList
Stream(1, 2, 3).map(_ * 2).toList // List[Int] = List(2, 4, 6)
IO[A] のストリームを作成して実行する
Stream を使い、連続する IO の処理を、単一の IO に「まとめる」ことができる。
1. IO に包まれた値を Stream に変換する
// すこし時間のかかる API があるとする
def apiCall: IO[Int] = IO.println("api call") *> IO.sleep(500.millis) *> IO { 10 }
val stream: Stream[IO, Int] = Stream.eval(apiCall).repeat
つまり、eval()
は F[A]
を Stream[F, A]
へ変換する。
2. さらに Stream を単一の IO へ変換する
val io: IO[List[Int]] = stream.take(5).compile.toList
compile()
は Stream[F, A]
を Stream.CompileOps[F, F, A]
へ変換する。
toList()
はこれを F[List[A]]
へ変換する。
この例では、結果的に IO[Int]
を IO[List[Int]]
に変換し、さらに内部で Stream
が使われていることを(型の上で)隠蔽できている。
実行する
約 2.5 秒かかる。
scala> io.unsafeRunSync()
api call
api call
api call
api call
api call
val res5: List[Int] = List(10, 10, 10, 10, 10)
最終的に実行するのは単一の IO なので、要素中のどこかの API 呼び出しが失敗すればその時点で例外が発生する。
// 一定の割合で失敗する API
def unstableApiCall: IO[Int] =
if scala.util.Random.nextInt(10) == 0 then
IO.raiseError(new Exception("something's wrong"))
else
apiCall
Stream.eval(IO.defer(unstableApiCall)).repeat.take(5).compile.toList.attempt.unsafeRunSync()
なお、ここでは unstableApiCall()
が即時実行(され結果をキャッシュ)されると困るので IO.defer()
で包み遅延実行させる必要がある。
リトライさせる (retry)
.repeat.take(n)
するよりスマート。
val stream: Stream[IO, Int] = Stream.retry(
IO.defer { unstableApiCall },
delay = 1.second, // リトライ間隔
nextDelay = _ * 2, // リトライごとに遅延を倍増
maxAttempts = 5, // 最大リトライ回数
retriable = _ => true // 常にリトライする
)
副作用しか必要ない場合の実行 (drain)
drain()
を呼べば、返値の型は IO[Unit]
となる。
def task: IO[Unit] = IO.println("hello")
Stream.eval(task).repeat.take(3).compile.drain.unsafeRunSync()
Stream の各要素を「値を IO へ変換する関数」でラップする (evalMap)
val wordLen: String => IO[Int] = (s: String) => IO { s.length }
val stream: Stream[IO, Int] = Stream("hello", "world").evalMap(wordLen)
stream.compile.toList.unsafeRunSync() // List[Int] = List(5, 5)
Either から作成する (fromEither)
val rightStream: Stream[IO, String] = Stream.fromEither[IO]("hello".asRight)
val leftStream: Stream[IO, String] = Stream.fromEither[IO](Exception("woa!").asLeft)
Pipe で Stream を変換する (through)
Pipe を定義する。ここでは IO だけではなく他の型も渡せるよう F[*]
で定義している。
def doublePipe[F[*]]: Pipe[F, Int, Int] =
(inStream: Stream[F, Int]) => inStream.map(_ * 2)
Stream の例として Stream[IO, Int]
を用意する。.covary[IO]
は Stream[Pure, A]
を Stream[IO, A]
へ変換してくれる。
val input: Stream[IO, Int] = Stream.emits(List(1, 2, 3)).covary[IO]
パイプを適用するには through()
を使う。
val result: Stream[IO, Int] = input.through(doublePipe)
Option から成る Stream から None を取り除く (unNone)
Stream(Some(10), None, Some(20)).unNone.toList // List[Int] = List(10, 20)
// IO に変換する例
Stream(Some(10), None, Some(20)).covary[IO].unNone.compile.toList.unsafeRunSync()
None
が現れたら Stream を打ち切る unNoneTerminate()
もある。
途中のエラーをリカバリする (handleErrorWith)
handleErrorWith()
を挟むことができる。
Stream(1, 2, 3, 4)
.append(Stream.emit(5 / 0))
.handleErrorWith { _e =>
Stream.eval(IO(999))
}
.append(Stream(6, 7, 8))
.compile
.toList
.unsafeRunSync() // List(1, 2, 3, 4, 999, 6, 7, 8)
終了時に処理を実行する (onFinalize)
Stream(1, 2, 3)
.covary[IO]
.onFinalize(IO.println("stream finalized!"))
処理中にキャンセルさせる (interruptWhen)
Stream.interruptWhen()
に条件を設定すれば処理中にストリーム全体をキャンセルできる。
import cats.effect.Temporal
import fs2.concurrent.SignallingRef
val task = Stream.eval(IO.println("run") *> IO.sleep(1.second)).repeat
val io =
for
cancel <- SignallingRef[IO, Boolean](false)
s <- task.interruptWhen(cancel).compile.drain.start
_ <- Temporal[IO].sleep(5.seconds) *> cancel.set(true)
yield s
io.unsafeRunSync()
定期実行させる (awakeEvery)
awakeEvery()
は指定した間隔ごとに正確にイベントが発生する。また、経過時間を出力してくれる。
def showTime(elapsed: FiniteDuration) = IO.println(elapsed)
Stream.awakeEvery[IO](1.second)
.evalMap(showTime)
.take(5).compile.drain.unsafeRunSync()
遅延させる (fixedDelay)
awakeEvery()
と似ているが、fixedDelay()
は各要素が発行された後に遅延が始まるのでズレが発生(累積)する可能性がある。
val greet = IO.println("hello")
Stream.fixedDelay[IO](1.second)
.evalMap(_ => greet)
.take(5).compile.drain.unsafeRunSync()
スライドしながら出力する (sliding)
指定したウィンドウサイズで「スライド」しながら出力する。
val stream: Stream[Pure, Int] = Stream.iterate(1)(_ + 1) // 1, 2, 3, 4, 5, ... の無限ストリーム
stream.sliding(3).map(chunk => chunk.toList.sum).take(5).toList
// List(6, 9, 12, 15, 18)
// つまり: List((1 + 2 + 3), (2 + 3 + 4), (3 + 4 + 5), (4 + 5 + 6), (5 + 6 + 7))
この例では3つずつの Chunk
にまとめられる。Chunk
は Stream 内部で効率的に値を扱うための型で、ここでは toList()
でリストに変換してから処理している。
移動平均やパターン検出、サンプルの分割などに便利。
Stream の相互作用
交互に配置する (interleave)
interleave()
は2つのストリームの要素を交互に取り出して1つのストリームに結合する。同期的。
val s1: Stream[IO, Int] = Stream(1, 2, 3, 4, 5).covary[IO]
val s2: Stream[IO, Int] = Stream(100, 200, 300).covary[IO]
(s1 interleave s2).compile.toList.unsafeRunSync() // List(1, 100, 2, 200, 3, 300)
短い方のストリームに合わせて終了する(余ったほうの値は捨てられる)。基本的に、2つのストリームが同じ長さであることが前提。余ったほうのストリームも出力したければ interleaveAll()
を使う。
(s1 interleaveAll s2).compile.toList.unsafeRunSync() // List(1, 100, 2, 200, 3, 300, 4, 5)
到着順(=非同期)に出力したい場合は merge()
を使うこと。
マージする (merge)
2つのストリームを並行して処理し、出力可能な要素があればどちらからでもすぐに取り出す。
val s1: Stream[IO, String] = Stream.awakeEvery[IO](1.second).map(_ => "s1")
val s2: Stream[IO, String] = Stream.awakeEvery[IO](500.millis).map(_ => "s2")
(s1 merge s2).take(5).compile.toList.unsafeRunSync() // List(s2, s1, s2, s2, s1)
片方のストリームの速度が遅くても、もう片方のストリームが要素を出力できるので非同期処理に向いている。終了条件をコントロールできる mergeHaltBoth()
や mergeHaltL()
、mergeHaltR()
などもある。
かならず交互(=同期的)に出力したい場合は interleave()
を使うこと。
結合する (zip)
2つのストリームを並行して進め、それぞれのストリームの要素をタプルにして結合する。
val s1: Stream[IO, Int] = Stream(1, 2, 3, 4, 5).covary[IO]
val s2: Stream[IO, Int] = Stream(100, 200, 300).covary[IO]
(s1 zip s2).compile.toList.unsafeRunSync() // List((1,100), (2,200), (3,300))
実際には、さらに値を加工するケースが多いだろう。
val stream: Stream[IO, Int] = (s1 zip s2).evalMap { case (a, b) => IO(a + b) }
stream.compile.toList.unsafeRunSync() // List(101, 202, 303)
短い方のストリームに合わせて終了する。基本的に、2つのストリームが同じ長さであることが前提。余ったほうのストリームも出力したければ zipAll()
を使う。デフォルト値を指定する必要がある。
s1.zipAll(s2)(-1, -1).compile.toList.unsafeRunSync() // List((1,100), (2,200), (3,300), (4,-1), (5,-1))
左結合する (zipLeft)
zip()
した結果から、左側のストリームの要素だけを取り出す。
val s1: Stream[IO, Int] = Stream(1, 2, 3, 4, 5).covary[IO]
val s2: Stream[IO, Unit] = Stream.eval(IO.println("waiting") *> IO.sleep(500.millis))
.repeatN(10)
(s1 zipLeft s2).compile.toList.unsafeRunSync() // List(1, 2, 3, 4, 5)
右側のストリームは消費されるが結果には反映されない。つまり、副作用を実行するが結果は必要ない場合に使える。この例のようにストリームの速度を調整するのに便利。ウェイトを入れるなら次のようなコードでもいいかもしれない。
val s2: Stream[IO, Unit] = Stream.fixedDelay[IO](500.millis)
.evalMap(_ => IO.println("waiting")).take(10)
右側のストリームの要素だけを取り出す zipRight()
もある。
並列実行する (parJoin)
複数の非同期タスクの出力を1つのストリームにまとめる。効率よく並列処理ができる。
val s1: Stream[IO, Int] = Stream.eval(IO.sleep(100.millis) *> IO { 10 }).repeatN(5)
val s2: Stream[IO, Int] = Stream.eval(IO.sleep(200.millis) *> IO { 20 }).repeatN(5)
val s3: Stream[IO, Int] = Stream.eval(IO.sleep(300.millis) *> IO { 30 }).repeatN(5)
Stream(s1, s2, s3).parJoin(2).compile.toList.unsafeRunSync()
// List(10, 20, 10, 10, 20, 10, 10, 20, 20, 30, 20, 30, 30, 30, 30)
ストリームのストリーム Stream[F, Stream[F, O]]
に対して適用される。内部のストリームを最大 n 個まで並列実行し、結果を1つのストリームとして出力する。
完了順は、各ストリームが要素を生成・完了するタイミングに依存し、一般に不規則な順序となる。
並列実行する (parEvalMap)
parEvalMap()
を使って、各 IO の完了を待たずに、並列数を指定して逐次処理する。
def buildTask(i: Int): IO[Int] =
IO.println(s"task $i") *> IO.sleep(1.second) *> IO { i * 10 }
Stream
.emits(Seq(1, 2, 3, 4, 5))
.parEvalMap(maxConcurrent = 2)(buildTask)
.compile
.toList
.unsafeRunSync() // List(10, 20, 30, 40, 50)
parEvalMapUnordered()
もある。なお、並列数を指定する必要がないなら IO.parSequence()
でよい。
Scala Native で使う
2024 年現在、Cats Effect は Scala Native 0.4 系で動く。ただし、シングルスレッド上で動作する関係上、 unsafeRunSync()
は存在しない。unsafeRunAsync()
あたりを代わりに使うことになるだろう。Either を受け取るコールバック関数を渡す。
IO(42).unsafeRunAsync((result) => println(result)) // Right(42)
マルチスレッド化した Scala Native 0.5 系に対応してくれれば、おそらく unsafeRunSync()
も提供されると期待している。が、だいぶ先になる予定だ。
TODO
- キャンセル可能な IO (Cancelable)