並行プログラミング
今回はZIOで用意されている並行プログラミングのためのツールを見ていこうと思います。
ZIOで提供されている並行プログラミングのツールは、並行度を制御するメソッド、実行コンテキストを制御するためのメソッドの2種類に分類できます。ZIOのランタイムはコンビネータによって指定された並行度やコンテキストでタスクを実行していきます。タスクは指定されない限り軽量スレッド(Fiber)で実行されます。
並行制御のメソッド(zip/zipPar&race/raceEither)
並行制御の基本的なメソッドはraceとzipです。
race/raceEither
race
とそのバリエーションのraceEitherは2つのタスクを並行に実行し、先に終了したタスクの結果を返します。以下の例ではrace
はfastApiService
の結果を返します。
"race" should {
"return faster result of two effects" in {
unsafeRun(for {
slow <- slowApiService
fast <- fastApiService
slowOrFast <- slow.request.race(fast.request)
} yield slowOrFast) shouldBe a[Response]
}
}
raceEither
も同様の動作をします。race
との違いは2つのタスクの結果の型が異なっても良いことで、Either
型で結果を返します。
以下のコードでは、結果がQueryResult
のDBアクセスとResponse
のAPIサービスの呼び出しタスクをraceEither
で合成しています。raceEither
の合成結果はEither[QueryResult, Response]
になります。合成したタスクを実行すると早く完了したタスクの結果を返します。
"raceEither" should {
"return the faster result among two effects" in {
unsafeRun(for {
slow <- slowDBAccess
fast <- fastApiService
slowOrFast <- slow.query.raceEither(fast.request)
} yield slowOrFast) shouldBe a[Right[_, _]]
}
}
zip/zipPar
zip
とzipPar
は2つのタスクを実行し両方の結果をジップしてペアを返します。
zip
は2つのタスクを直列で実行します。以下の例ではslowApiService
がレスポンスを返してから、fastApiService
へリクエストを投げます。
"zip" should {
"return both results" in {
unsafeRun(for {
slow <- slowApiService
fast <- fastApiService
both <- slow.request.zip(fast.request)
} yield both) shouldBe a[(_, _)]
}
}
zipPar
はzip
の並行版です。zipPar
は2つのタスクを並行に実行します。
以下のコードでは、zip
とzipPar
の違いを確かめます。100ミリ秒かかるAPIサービスの呼び出しがあるとします。2回このサービスを呼びだすとき、直列で呼びだすと200ミリ秒、並行で呼びだすと100ミリ秒かかる計算になります。120ミリ秒のdedline
とレースさせると、直列の呼び出しではdeadline
が勝ち、並行の呼び出しではdeadline
が負けます。
"zip/zipPar" should {
"execute effects in sequence/parallel" in {
val serviceIO = UIO(MockApiService(Duration(100, TimeUnit.MILLISECONDS)))
val deadline = UIO(()).delay(Duration(120, TimeUnit.MILLISECONDS))
unsafeRun(
for {
service <- serviceIO
serviceCallTwiceInSequence = service.request.zip(service.request)
result <- serviceCallTwiceInSequence.raceEither(deadline)
} yield result
) shouldBe a[Right[_, _]] // service call twice in sequence should be executed in about 200 msec.
unsafeRun(
for {
service <- serviceIO
serviceCallTwiceInParallel = service.request.zipPar(service.request)
result <- serviceCallTwiceInParallel.raceEither(deadline)
} yield result
) shouldBe a[Left[_, _]] // service call twice in parallel should be executed in about 100 msec.
}
}
実行コンテキストの制御
「UIスレッド上でUIを更新する」、「IO用スレッド上でデータベースへのアクセスする」、などタスクを指定したコンテキストで行いたいときに使用するのがlock
、blocking
、interruptible
です。
lock
lock
はコンテキストを制御するメソッドのなかで最も原始的なメソッドです。タスクに対してこのメソッドを呼び出すと、そのタスクは指定されたコンテキストExecutor
で実行されます。Executor
は、Scala
のExecutionContext
から生成することができます。
以下は、自分で定義したMyExecutionContext
上でタスクを実行するサンプルコードです。
"lock" should {
"lock an effect on specified execution context" in {
class MyExecutionContext extends ExecutionContext {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
override def execute(runnable: Runnable): Unit = executor.submit(runnable)
override def reportFailure(cause: Throwable): Unit = cause.printStackTrace()
}
val thisThreadId = Thread.currentThread().getId
unsafeRun(
for {
ec <- UIO.succeed(new MyExecutionContext)
executedIn <- UIO.effectTotal({
Thread.currentThread().getId
})
.lock(Executor.fromExecutionContext(Int.MaxValue) {
ec
})
} yield executedIn
) should not be thisThreadId
}
}
blocking
ZIO
が提供するブロッキング処理用のスレッドプール上でタスクを実行するためのメソッドがblocking
です。
ZIO
のスレッド管理戦略については、ZIO
作者のブログThread Pool Best Practices with ZIOで説明されています。
"blocking" should {
"execute an effect on the blocking thread pool" in {
unsafeRun(for {
_ <- blocking(ZIO.effect(java.lang.Thread.sleep(10000L))) // java.lang.Thread.sleepはブロッキング
} yield ()) shouldBe ()
}
}
interruptible
ブロッキング・タスクを中断可能な状態で実行するのがinterruptible
です。使用方法はblocking
と似ていますが、blocking
はZIO
型でラップされたタスクを引数で受け取るのに対してinterruptible
はタスクそのものを受け取ります。
"interruptible" should {
"make a blocking task interruptible" in {
unsafeRun(for {
_ <- interruptible(java.lang.Thread.sleep(10000L))
} yield ()) shouldBe ()
}
}
以下のコードでblocking
とinterruptible
の中断可能性の違いを確認します。
java.lang.Thread.sleep
はブロッキングの処理です。この処理をblocking
で実行すると完了するまでタスクは中断されません。interruptible
で実行すると完了前に中断が可能です。
withBlocking
は10ミリ秒のデッドラインと10秒のスリープをレースさせています。結果は10ミリ秒のデッドラインがレースに勝ちます。この結果は10秒スリープの処理が完了するまで保留されます。
withInterruptible
は100ミリ秒のデッドラインと15秒のスリープをレースさせています。結果は100ミリ秒のデッドラインがレースに勝ちます。100ミリ秒のデッドラインがレースに勝利した時点で15秒のスリープは中断されます。
withBlocking
とwithInterruptible
をレースさせるとwithInterruptible
が100ミリ秒で完了し勝利します。この結果raceResult
は、withBlocking
が完了する時点(10秒スリープが完了する時点)まで保留されます。
"interruptible" should {
"make a blocking task interruptible while blocking does not" in {
val withBlocking = UIO(()).delay(Duration(10, TimeUnit.MILLISECONDS)).raceEither(blocking(ZIO.effect(java.lang.Thread.sleep(10000L)))).void
val withInterruptible = UIO(()).delay(Duration(100, TimeUnit.MILLISECONDS)).raceEither(interruptible(java.lang.Thread.sleep(15000L))).void
unsafeRun(for {
raceResult <- withBlocking.raceEither(withInterruptible)
} yield raceResult) shouldBe a[Right[_, _]]
}
}
最後に
この記事ではZIO
が提供する並行プログラミングのためのツール(zip
、zipPar
、race
、raceEither
)を紹介しました。ZIO
には、この記事で紹介していないfork
やjoin
などの低レベルなツールも豊富に取り揃えられており、1.0リリースではSoftware Transactional Memoryもサポートするようです。
ATOMICALLY { DELETE YOUR ACTORS } - John A De Goes & Wiem Zine Elabadine | Scalar 2019