1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Scala3: IO と Stream に関する覚書

Last updated at Posted at 2024-10-29

浅い理解で書き散らかした自分用メモ。

基本

cats.effect.IO と fs2.Stream の関係

fs2 の内部で cats-effect を利用しているため、シームレスな連携が実現できている(らしい)。

依存など

    libraryDependencies ++= Seq(
      "org.typelevel" %% "cats-core" % "2.12.0",
      "org.typelevel" %% "cats-effect" % "3.5.4",
      "co.fs2" %% "fs2-core" % "3.11.0"
    ),
    initialCommands := s"""
      import cats.syntax.all.*, cats.effect.IO
      import cats.effect.unsafe.implicits.global
      import fs2.*
      import scala.concurrent.duration.*, java.util.concurrent.*
    """

IO

IO とは

「副作用があり得る計算」を表す「値」である。ただの値なので、定義されただけでは実コードは実行(計算)されない。明示的に実行すると計算され、成功すると結果が返る(失敗すると例えば例外がスローされる)。遅延評価される Future と捉えてもよい。

  • 同期計算あるいは非同期計算を記述できる
  • 内部処理をキャンセル可能にすることもできる

次の2種類の値(状態)を持つ。概念としては Option 型が Some と None から成るのに近い。

Pure: 純粋関数

IO.pure(5 * 2)    // IO(10)

与えたコード(のブロック)は即時実行される。ただし返される値は IO 型であり、まだ「結果」は出ていないことに注意。ここでは、5 * 2 自体は即時計算されたが、結果の IO.pure(10) はただの「副作用があり得る計算」を表す「値」に過ぎない。

Delay: 遅延実行する関数

IO.delay(println("hello"))    // IO(...)
IO.delay(5 / 0)    // IO(...)    まだ実行されていないのでゼロ除算例外は発生しない

与えたコード(のブロック)は、あとで「実行」されるまで評価されない。

IO を定義(生成)する

IO.pure()IO.delay() を直接呼んでもいいが、実運用上は、あえて Pure であることを明示したい場合でなければ、一貫して IO()IO {} で生成してもよいだろう。

import cats.effect.IO

val io: IO[Int] = IO(10 * 2)    // IO(...)
val io: IO[String] = IO { "hello" }    // IO(...)
val io: IO[Unit] = IO { println("hello") }    // IO(...)

ちなみに IO(10 * 2)IO.pure(10 * 2) の結果型は違う(実用上の差違はほぼないだろうが)。

IO を実行する

実行コンテキストが必要なのでインポートしておく。

unsafeRunSync() を呼ぶと、IO が実行され計算された値が返る。

import cats.effect.unsafe.implicits.global

IO(5 * 2).unsafeRunSync()    // Int = 10
IO { Seq(1, 2, 3).map(_ * 2) }.unsafeRunSync()    // Seq[Int] = List(2, 4, 6)

ほかにもいくつか unsafeRunXXX 関数がある

IO 実行時に発生する例外を処理する

しかし unsafeRunSync() を生で実行すると、コードを実行中に発生した例外は普通にリスローされてしまう。

IO(5 / 0).unsafeRunSync()    // java.lang.ArithmeticException: / by zero

そこで attempt() を通し結果を Either に包むと便利。

IO(5 / 1).attempt.unsafeRunSync()    // Right(5)
IO(5 / 0).attempt.unsafeRunSync()    // Left(java.lang.ArithmeticException: / by zero)

エラーハンドリング

handleErrorWith() が使える。副作用にしか興味がない場合に便利かもしれない。

IO(5 / 0)
.handleErrorWith { error =>
  IO.pure(error.getMessage)
}
.unsafeRunSync()    // Int | String = / by zero

複数の IO を合成する

IO 同士を flatMap() 等を使って合成することができる。

val io: IO[Int] = IO(5).flatMap { n => IO(n * 2) }

通常は、複数の IO を for 文で組み上げていくことになるだろう。

val io: IO[Int] =
  for
    n <- IO(5)
    _ <- IO.println("calculating..")
    result <- IO(n * 2)
  yield result

複数の IO がある場合は mapN() が使える。

import cats.syntax.all.*

val io: IO[Int] = (IO(10), IO(20), IO(30)).mapN { (a, b, c) => a + b + c }
io.unsafeRunSync()    // Int = 60

次の例は、順に実行するが、結果は最後の IO のものだけを返す。

import cats.syntax.all.*

val io: IO[Int] = IO.println("hello") *> IO(42)
io.unsafeRunSync()    // Int = 42

なお、途中の計算結果は(副作用を除いて)捨てられる。

val io: IO[Int] = IO(10) *> IO(20) *> IO(30)
io.unsafeRunSync()    // Int = 30

リストを順次実行し、結果のリストを返す。

val io: IO[List[Int]] = List(IO(10), IO(20), IO(30)).sequence
io.unsafeRunSync()    // List[Int] = List(10, 20, 30)

IO に対しては >> も *> とほぼ同様に動作する。ただし、IO に留まらない広範な型を扱う際には、より(型的に)広い意味を持つ *> が望ましいようだ。とはいえ、IO くらいしか利用しないケースでは可読性を優先して >> で統一してもいいかもしれない。

方針

  • 処理を小さく分割し IO に包む
  • 複数の IO を合成して1つの IO を作る
  • メイン処理(ビジネスロジック)の最後に1回だけ unsafeRunSync() を実行する

入れ子になった IO をまとめる

val doubleIO: IO[IO[String]] = IO { IO("hello") }    // 入れ子
val io: IO[String] = doubleIO.flatten

flatMap() も使える。

val io: IO[String] = doubleIO.flatMap(identity)

Future から変換する

既存の Future を IO に変換し遅延実行できる。

import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global

def sum(list: List[Int])(implicit ec: ExecutionContext): Future[Long] =
  println("calc sum!")
  Thread.sleep(500)
  Future(list.foldLeft(0L)(_ + _))

val io: IO[Long] = IO.fromFuture(IO { sum(List(1, 2, 3, 4, 5)) })

先に完了した IO の結果だけ欲しい (race)

race() が使える。先に終わらなかったタスク(敗者)はキャンセルされる(Stream などで更に計算が続く場合)。

val task1 = IO.sleep(100.millis) *> IO(10)
val task2 = IO.sleep(200.millis) *> IO(20)

val io: IO[Either[Int, Int]] = IO.race(task1, task2)

io.flatMap {
  case Left(value)  => IO.println(s"Left: $value")
  case Right(value) => IO.println(s"Right: $value")
}.unsafeRunSync()    // Left(10)

並列実行させる (parSequence)

parSequence() が使える。それぞれの IO を並列実行し、すべてが終了するまで待機する(結果を返す)。固定長のタプルには parTupled() が使える。

def buildTask(i: Int): IO[Int] =
  IO.println(s"task $i") *> IO.sleep(2.seconds) *> IO { i * 10 }

Seq(1, 2, 3)
  .map(buildTask)
  .parSequence
  .unsafeRunSync()    // List(10, 20, 30)

並列数を指定したい場合は Stream.parEvalMap() 等を使う(後述)。

後処理を確実に実行したい (bracket)

bracket() を使えば try-finally 的な処理を書けて便利。

val io: IO[Int] =
  IO(10).bracket { in =>
    // try ブロックと見なせる
    IO.println("started") *> IO { in + 5 }
  } { _in =>
    // finally ブロックと見なせる
    IO.println("ended") *> IO.sleep(200.millis)
  }

より細かく制御したいなら Rsource 型というのがある。

タイムアウトさせる (timeoutTo)

timeout() もあるが、タイムアウト時の処理を IO で指定できる timeoutTo() が便利そうだ。

val slowIO: IO[Int] = IO.sleep(5000.millis) *> IO { 42 }
val io = slowIO.timeoutTo(2.seconds,
  IO.raiseError(new RuntimeException("Operation timed out!")))

Thread.sleep() のようにスレッドをブロックしてしまう処理がある場合は Fiber を使ったより複雑な手法が必要。

細かく制御したい場合は次の手法もある。

import cats.effect.Temporal

val io = Temporal[IO].timeout(slowTask, 2.seconds)

SyncIO

同期処理に限定された IO 型。

import cats.effect.SyncIO

val io: SyncIO[Int] = SyncIO { 42 }

SyncIOIO と合成できず、しかも unsafeRunSync() でしか実行できない(unsafeRunAsync() 等は使えない)。短い処理で、パフォーマンスを出したい時に使うようだ。

IO と混ぜるには、IO に変換する必要がある。

val io: IO[Int] = SyncIO(42).to[IO]

参照 (Ref)

IO.ref[A] は、型 A を保持する可変な「参照(Ref)」を IO の中で作成する。参照はスレッドセーフで、安全に更新できる。つまり、スレッド間で共有可能な「状態」を純粋に扱える。

import cats.effect.Ref

val ref: IO[Ref[IO, Int]] = IO.ref[Int](10)

ちなみに、Ref は型パラメータを持ち、エフェクト型は IO に限定されない。

import cats.effect.SyncIO

val io: IO[Ref[IO, Int]] = Ref[IO].of(10)
val io: SyncIO[Ref[SyncIO, Int]] = Ref[SyncIO].of(10)

get() で現在の値を取得する。update() で値を更新する。

val io =
  for
    ref <- IO.ref(10)
    _ <- ref.update(_ + 1)
    value <- ref.get
    _ <- IO.println(value)    // 11
  yield ()

modify() は、現在の値に基づいて新しい値を計算し、同時に他の副作用を発生させることができる。

val io =
  for
    ref <- IO.ref(10)
    result <- ref.modify { oldValue =>
      val newValue = oldValue * 2
      
      // 副作用の例
      println(s"Updating counter from $oldValue to $newValue")

      (newValue, oldValue)       // 新しい値と戻り値を返す
    }
    _ <- IO.println(result)      // 元の値 (10)
    newValue <- ref.get
    _ <- IO.println(newValue)    // 更新後の値 (20)
  yield ()

複数のスレッド(ファイバー)で並列更新する例。並列でも正しい結果が得られる。

val io =
  for
    ref <- IO.ref(0)
    _ <- List.fill(1000)(ref.update(_ + 1)).parSequence
    value <- ref.get
    _ <- IO.println(value)    // 1000
  yield ()

安全なカウンターやフラグ(例:Stream を打ち切る)として使われることが多い。

Stream

以下は fs2.Stream について。Scala 組み込みの旧 Stream (現 LazyList) とは別モノなので注意。

一般化した型として Stream[F, O] と書ける。F はエフェクト型、O はアウトプット型である。具体的には、Stream[Pure, Int]Stream[IO, Int] といった形で表現することになる。

基本

作成する

val s: Stream[Pure, String] = Stream.emit("hello")    // 要素は1つのみ
val s: Stream[Pure, String] = Stream.constant("hello")    // 無限に続く。結果は List[String]

val s: Stream[Pure, Int] = Stream(1, 2, 3)    // 要素は3つのみ
val s: Stream[Pure, String] = Stream("apple", "banana", "chocolate")    // 同上
val s: Stream[Pure, Int] = Stream(1, 2, 3).repeat    // 無限に続く。結果は List[Int]
val s: Stream[Pure, Int] = Stream(1, 2, 3).repeatN(5)    // 1, 2, 3 が5回繰り返される

val numbers: Stream[Pure, Int] = Stream(1, 2, 3, 4, 5)
val oddNumbers: Stream[Pure, Int] = numbers.filter(_ % 2 != 0)

もちろんファイルや標準入力などからも作成できるが省略。

評価する

Stream は無限に値を生成し続ける(可能性がある)ので、実際に計算する際には take() 等で有限の値に落とし込んでから処理するケースが多いだろう。

val result: List[Int] = Stream(1, 2, 3).repeat.take(5).toList    // List(1, 2, 3, 1, 2)

合成する

// 結合する
(Stream(1, 2, 3) ++ Stream(4, 5, 6) ++ Stream(7, 8, 9)).take(20).toList
(oddNumbers ++ Stream(0).repeat).take(10).toList

Stream(1, 2, 3).map(_ * 2).toList    // List[Int] = List(2, 4, 6)

IO[A] のストリームを作成して実行する

Stream を使い、連続する IO の処理を、単一の IO に「まとめる」ことができる。

1. IO に包まれた値を Stream に変換する

// すこし時間のかかる API があるとする
def apiCall: IO[Int] = IO.println("api call") *> IO.sleep(500.millis) *> IO { 10 }

val stream: Stream[IO, Int] = Stream.eval(apiCall).repeat

つまり、eval()F[A]Stream[F, A] へ変換する。

2. さらに Stream を単一の IO へ変換する

val io: IO[List[Int]] = stream.take(5).compile.toList

compile()Stream[F, A]Stream.CompileOps[F, F, A]へ変換する。
toList() はこれを F[List[A]] へ変換する。

この例では、結果的に IO[Int]IO[List[Int]] に変換し、さらに内部で Stream が使われていることを(型の上で)隠蔽できている。

実行する

約 2.5 秒かかる。

scala> io.unsafeRunSync()
api call
api call
api call
api call
api call
val res5: List[Int] = List(10, 10, 10, 10, 10)

最終的に実行するのは単一の IO なので、要素中のどこかの API 呼び出しが失敗すればその時点で例外が発生する。

// 一定の割合で失敗する API
def unstableApiCall: IO[Int] =
  if scala.util.Random.nextInt(10) == 0 then
    IO.raiseError(new Exception("something's wrong"))
  else
    apiCall

Stream.eval(IO.defer(unstableApiCall)).repeat.take(5).compile.toList.attempt.unsafeRunSync()

なお、ここでは unstableApiCall() が即時実行(され結果をキャッシュ)されると困るので IO.defer() で包み遅延実行させる必要がある。

リトライさせる (retry)

.repeat.take(n) するよりスマート。

val stream: Stream[IO, Int] = Stream.retry(
  IO.defer { unstableApiCall },
  delay = 1.second,        // リトライ間隔
  nextDelay = _ * 2,       // リトライごとに遅延を倍増
  maxAttempts = 5,         // 最大リトライ回数
  retriable = _ => true    // 常にリトライする
)

副作用しか必要ない場合の実行 (drain)

drain() を呼べば、返値の型は IO[Unit] となる。

def task: IO[Unit] = IO.println("hello") 

Stream.eval(task).repeat.take(3).compile.drain.unsafeRunSync()

Stream の各要素を「値を IO へ変換する関数」でラップする (evalMap)

val wordLen: String => IO[Int] = (s: String) => IO { s.length }

val stream: Stream[IO, Int] = Stream("hello", "world").evalMap(wordLen)
stream.compile.toList.unsafeRunSync()    // List[Int] = List(5, 5)

Either から作成する (fromEither)

val rightStream: Stream[IO, String] = Stream.fromEither[IO]("hello".asRight)
val leftStream: Stream[IO, String] = Stream.fromEither[IO](Exception("woa!").asLeft)

Pipe で Stream を変換する (through)

Pipe を定義する。ここでは IO だけではなく他の型も渡せるよう F[*] で定義している。

def doublePipe[F[*]]: Pipe[F, Int, Int] =
  (inStream: Stream[F, Int]) => inStream.map(_ * 2)

Stream の例として Stream[IO, Int] を用意する。.covary[IO]Stream[Pure, A]Stream[IO, A] へ変換してくれる。

val input: Stream[IO, Int] = Stream.emits(List(1, 2, 3)).covary[IO]

パイプを適用するには through() を使う。

val result: Stream[IO, Int] = input.through(doublePipe)

Option から成る Stream から None を取り除く (unNone)

Stream(Some(10), None, Some(20)).unNone.toList    // List[Int] = List(10, 20)

// IO に変換する例
Stream(Some(10), None, Some(20)).covary[IO].unNone.compile.toList.unsafeRunSync()

None が現れたら Stream を打ち切る unNoneTerminate() もある。

途中のエラーをリカバリする (handleErrorWith)

handleErrorWith() を挟むことができる。

Stream(1, 2, 3, 4)
  .append(Stream.emit(5 / 0))
  .handleErrorWith { _e =>
    Stream.eval(IO(999))
  }
  .append(Stream(6, 7, 8))
  .compile
  .toList
  .unsafeRunSync()    // List(1, 2, 3, 4, 999, 6, 7, 8)

終了時に処理を実行する (onFinalize)

Stream(1, 2, 3)
  .covary[IO]
  .onFinalize(IO.println("stream finalized!"))

処理中にキャンセルさせる (interruptWhen)

Stream.interruptWhen() に条件を設定すれば処理中にストリーム全体をキャンセルできる。

import cats.effect.Temporal
import fs2.concurrent.SignallingRef

val task = Stream.eval(IO.println("run") *> IO.sleep(1.second)).repeat

val io =
  for
    cancel <- SignallingRef[IO, Boolean](false)
    s <- task.interruptWhen(cancel).compile.drain.start
    _ <- Temporal[IO].sleep(5.seconds) *> cancel.set(true)
  yield s

io.unsafeRunSync()

定期実行させる (awakeEvery)

awakeEvery() は指定した間隔ごとに正確にイベントが発生する。また、経過時間を出力してくれる。

def showTime(elapsed: FiniteDuration) = IO.println(elapsed)

Stream.awakeEvery[IO](1.second)
  .evalMap(showTime)
  .take(5).compile.drain.unsafeRunSync()

遅延させる (fixedDelay)

awakeEvery() と似ているが、fixedDelay() は各要素が発行された後に遅延が始まるのでズレが発生(累積)する可能性がある。

val greet = IO.println("hello")

Stream.fixedDelay[IO](1.second)
  .evalMap(_ => greet)
  .take(5).compile.drain.unsafeRunSync()

スライドしながら出力する (sliding)

指定したウィンドウサイズで「スライド」しながら出力する。

val stream: Stream[Pure, Int] = Stream.iterate(1)(_ + 1)    // 1, 2, 3, 4, 5, ... の無限ストリーム
stream.sliding(3).map(chunk => chunk.toList.sum).take(5).toList
// List(6, 9, 12, 15, 18)
// つまり: List((1 + 2 + 3), (2 + 3 + 4), (3 + 4 + 5), (4 + 5 + 6), (5 + 6 + 7))

この例では3つずつの Chunk にまとめられる。Chunk は Stream 内部で効率的に値を扱うための型で、ここでは toList() でリストに変換してから処理している。

移動平均やパターン検出、サンプルの分割などに便利。

Stream の相互作用

交互に配置する (interleave)

interleave() は2つのストリームの要素を交互に取り出して1つのストリームに結合する。同期的。

val s1: Stream[IO, Int] = Stream(1, 2, 3, 4, 5).covary[IO]
val s2: Stream[IO, Int] = Stream(100, 200, 300).covary[IO]

(s1 interleave s2).compile.toList.unsafeRunSync()    // List(1, 100, 2, 200, 3, 300)

短い方のストリームに合わせて終了する(余ったほうの値は捨てられる)。基本的に、2つのストリームが同じ長さであることが前提。余ったほうのストリームも出力したければ interleaveAll() を使う。

(s1 interleaveAll s2).compile.toList.unsafeRunSync()    // List(1, 100, 2, 200, 3, 300, 4, 5)

到着順(=非同期)に出力したい場合は merge() を使うこと。

マージする (merge)

2つのストリームを並行して処理し、出力可能な要素があればどちらからでもすぐに取り出す。

val s1: Stream[IO, String] = Stream.awakeEvery[IO](1.second).map(_ => "s1")
val s2: Stream[IO, String] = Stream.awakeEvery[IO](500.millis).map(_ => "s2")

(s1 merge s2).take(5).compile.toList.unsafeRunSync()    // List(s2, s1, s2, s2, s1)

片方のストリームの速度が遅くても、もう片方のストリームが要素を出力できるので非同期処理に向いている。終了条件をコントロールできる mergeHaltBoth()mergeHaltL()mergeHaltR() などもある。

かならず交互(=同期的)に出力したい場合は interleave() を使うこと。

結合する (zip)

2つのストリームを並行して進め、それぞれのストリームの要素をタプルにして結合する。

val s1: Stream[IO, Int] = Stream(1, 2, 3, 4, 5).covary[IO]
val s2: Stream[IO, Int] = Stream(100, 200, 300).covary[IO]

(s1 zip s2).compile.toList.unsafeRunSync()    // List((1,100), (2,200), (3,300))

実際には、さらに値を加工するケースが多いだろう。

val stream: Stream[IO, Int] = (s1 zip s2).evalMap { case (a, b) => IO(a + b) }
stream.compile.toList.unsafeRunSync()    // List(101, 202, 303)

短い方のストリームに合わせて終了する。基本的に、2つのストリームが同じ長さであることが前提。余ったほうのストリームも出力したければ zipAll() を使う。デフォルト値を指定する必要がある。

s1.zipAll(s2)(-1, -1).compile.toList.unsafeRunSync()    // List((1,100), (2,200), (3,300), (4,-1), (5,-1))

左結合する (zipLeft)

zip() した結果から、左側のストリームの要素だけを取り出す。

val s1: Stream[IO, Int] = Stream(1, 2, 3, 4, 5).covary[IO]
val s2: Stream[IO, Unit] = Stream.eval(IO.println("waiting") *> IO.sleep(500.millis))
  .repeatN(10)

(s1 zipLeft s2).compile.toList.unsafeRunSync()    // List(1, 2, 3, 4, 5)

右側のストリームは消費されるが結果には反映されない。つまり、副作用を実行するが結果は必要ない場合に使える。この例のようにストリームの速度を調整するのに便利。ウェイトを入れるなら次のようなコードでもいいかもしれない。

val s2: Stream[IO, Unit] = Stream.fixedDelay[IO](500.millis)
  .evalMap(_ => IO.println("waiting")).take(10)

右側のストリームの要素だけを取り出す zipRight() もある。

並列実行する (parJoin)

複数の非同期タスクの出力を1つのストリームにまとめる。効率よく並列処理ができる。

val s1: Stream[IO, Int] = Stream.eval(IO.sleep(100.millis) *> IO { 10 }).repeatN(5)
val s2: Stream[IO, Int] = Stream.eval(IO.sleep(200.millis) *> IO { 20 }).repeatN(5)
val s3: Stream[IO, Int] = Stream.eval(IO.sleep(300.millis) *> IO { 30 }).repeatN(5)

Stream(s1, s2, s3).parJoin(2).compile.toList.unsafeRunSync()
// List(10, 20, 10, 10, 20, 10, 10, 20, 20, 30, 20, 30, 30, 30, 30)

ストリームのストリーム Stream[F, Stream[F, O]] に対して適用される。内部のストリームを最大 n 個まで並列実行し、結果を1つのストリームとして出力する。

完了順は、各ストリームが要素を生成・完了するタイミングに依存し、一般に不規則な順序となる。

並列実行する (parEvalMap)

parEvalMap() を使って、各 IO の完了を待たずに、並列数を指定して逐次処理する。

def buildTask(i: Int): IO[Int] =
  IO.println(s"task $i") *> IO.sleep(1.second) *> IO { i * 10 }

Stream
  .emits(Seq(1, 2, 3, 4, 5))
  .parEvalMap(maxConcurrent = 2)(buildTask)
  .compile
  .toList
  .unsafeRunSync()    // List(10, 20, 30, 40, 50)

parEvalMapUnordered() もある。なお、並列数を指定する必要がないなら IO.parSequence() でよい。

Scala Native で使う

2024 年現在、Cats Effect は Scala Native 0.4 系で動く。ただし、シングルスレッド上で動作する関係上、 unsafeRunSync() は存在しない。unsafeRunAsync() あたりを代わりに使うことになるだろう。Either を受け取るコールバック関数を渡す。

IO(42).unsafeRunAsync((result) => println(result))    // Right(42)

マルチスレッド化した Scala Native 0.5 系に対応してくれれば、おそらく unsafeRunSync() も提供されると期待している。が、だいぶ先になる予定だ。

TODO

  • キャンセル可能な IO (Cancelable)
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?