LoginSignup
6
2

More than 5 years have passed since last update.

Concurrency with ZIO

Posted at

並行プログラミング

今回はZIOで用意されている並行プログラミングのためのツールを見ていこうと思います。

ZIOで提供されている並行プログラミングのツールは、並行度を制御するメソッド、実行コンテキストを制御するためのメソッドの2種類に分類できます。ZIOのランタイムはコンビネータによって指定された並行度やコンテキストでタスクを実行していきます。タスクは指定されない限り軽量スレッド(Fiber)で実行されます。

並行制御のメソッド(zip/zipPar&race/raceEither)

並行制御の基本的なメソッドはraceとzipです。

race/raceEither

raceとそのバリエーションのraceEitherは2つのタスクを並行に実行し、先に終了したタスクの結果を返します。以下の例ではracefastApiServiceの結果を返します。

  "race" should {
    "return faster result of two effects" in {
      unsafeRun(for {
        slow <- slowApiService
        fast <- fastApiService
        slowOrFast <- slow.request.race(fast.request)
      } yield slowOrFast) shouldBe a[Response]
    }
  }

raceEitherも同様の動作をします。raceとの違いは2つのタスクの結果の型が異なっても良いことで、Either型で結果を返します。

以下のコードでは、結果がQueryResultのDBアクセスとResponseのAPIサービスの呼び出しタスクをraceEitherで合成しています。raceEitherの合成結果はEither[QueryResult, Response]になります。合成したタスクを実行すると早く完了したタスクの結果を返します。

  "raceEither" should {
    "return the faster result among two effects" in {
      unsafeRun(for {
        slow <- slowDBAccess
        fast <- fastApiService
        slowOrFast <- slow.query.raceEither(fast.request)
      } yield slowOrFast) shouldBe a[Right[_, _]]
    }
  }

zip/zipPar

zipzipParは2つのタスクを実行し両方の結果をジップしてペアを返します。

zipは2つのタスクを直列で実行します。以下の例ではslowApiServiceがレスポンスを返してから、fastApiServiceへリクエストを投げます。

  "zip" should {
    "return both results" in {
      unsafeRun(for {
        slow <- slowApiService
        fast <- fastApiService
        both <- slow.request.zip(fast.request)
      } yield both) shouldBe a[(_, _)]
    }
  }

zipParzipの並行版です。zipParは2つのタスクを並行に実行します。

以下のコードでは、zipzipParの違いを確かめます。100ミリ秒かかるAPIサービスの呼び出しがあるとします。2回このサービスを呼びだすとき、直列で呼びだすと200ミリ秒、並行で呼びだすと100ミリ秒かかる計算になります。120ミリ秒のdedlineとレースさせると、直列の呼び出しではdeadlineが勝ち、並行の呼び出しではdeadlineが負けます。

  "zip/zipPar" should {
    "execute effects in sequence/parallel" in {
      val serviceIO = UIO(MockApiService(Duration(100, TimeUnit.MILLISECONDS)))
      val deadline = UIO(()).delay(Duration(120, TimeUnit.MILLISECONDS))

      unsafeRun(
        for {
          service <- serviceIO
          serviceCallTwiceInSequence = service.request.zip(service.request)
          result <- serviceCallTwiceInSequence.raceEither(deadline)
        } yield result
      ) shouldBe a[Right[_, _]] // service call twice in sequence should be executed in about 200 msec.

      unsafeRun(
        for {
          service <- serviceIO
          serviceCallTwiceInParallel = service.request.zipPar(service.request)
          result <- serviceCallTwiceInParallel.raceEither(deadline)
        } yield result
      ) shouldBe a[Left[_, _]] // service call twice in parallel should be executed in about 100 msec.
    }
  }

実行コンテキストの制御

「UIスレッド上でUIを更新する」、「IO用スレッド上でデータベースへのアクセスする」、などタスクを指定したコンテキストで行いたいときに使用するのがlockblockinginterruptibleです。

lock

lockはコンテキストを制御するメソッドのなかで最も原始的なメソッドです。タスクに対してこのメソッドを呼び出すと、そのタスクは指定されたコンテキストExecutorで実行されます。Executorは、ScalaExecutionContextから生成することができます。

以下は、自分で定義したMyExecutionContext上でタスクを実行するサンプルコードです。

  "lock" should {
    "lock an effect on specified execution context" in {
      class MyExecutionContext extends ExecutionContext {
        val executor: ExecutorService = Executors.newSingleThreadExecutor()

        override def execute(runnable: Runnable): Unit = executor.submit(runnable)

        override def reportFailure(cause: Throwable): Unit = cause.printStackTrace()
      }

      val thisThreadId = Thread.currentThread().getId

      unsafeRun(
        for {
          ec <- UIO.succeed(new MyExecutionContext)
          executedIn <- UIO.effectTotal({
            Thread.currentThread().getId
          })
            .lock(Executor.fromExecutionContext(Int.MaxValue) {
              ec
            })
        } yield executedIn
      ) should not be thisThreadId
    }
  }

blocking

ZIOが提供するブロッキング処理用のスレッドプール上でタスクを実行するためのメソッドがblockingです。

ZIOのスレッド管理戦略については、ZIO作者のブログThread Pool Best Practices with ZIOで説明されています。

  "blocking" should {
    "execute an effect on the blocking thread pool" in {
      unsafeRun(for {
        _ <- blocking(ZIO.effect(java.lang.Thread.sleep(10000L))) // java.lang.Thread.sleepはブロッキング
      } yield ()) shouldBe ()
    }
  }

interruptible

ブロッキング・タスクを中断可能な状態で実行するのがinterruptibleです。使用方法はblockingと似ていますが、blockingZIO型でラップされたタスクを引数で受け取るのに対してinterruptibleはタスクそのものを受け取ります。

  "interruptible" should {
    "make a blocking task interruptible" in {
      unsafeRun(for {
          _ <- interruptible(java.lang.Thread.sleep(10000L))
      } yield ()) shouldBe ()
    }
  }

以下のコードでblockinginterruptibleの中断可能性の違いを確認します。

java.lang.Thread.sleepはブロッキングの処理です。この処理をblockingで実行すると完了するまでタスクは中断されません。interruptibleで実行すると完了前に中断が可能です。

withBlockingは10ミリ秒のデッドラインと10秒のスリープをレースさせています。結果は10ミリ秒のデッドラインがレースに勝ちます。この結果は10秒スリープの処理が完了するまで保留されます。

withInterruptibleは100ミリ秒のデッドラインと15秒のスリープをレースさせています。結果は100ミリ秒のデッドラインがレースに勝ちます。100ミリ秒のデッドラインがレースに勝利した時点で15秒のスリープは中断されます。

withBlockingwithInterruptibleをレースさせるとwithInterruptibleが100ミリ秒で完了し勝利します。この結果raceResultは、withBlockingが完了する時点(10秒スリープが完了する時点)まで保留されます。

  "interruptible" should {
    "make a blocking task interruptible while blocking does not" in {
      val withBlocking = UIO(()).delay(Duration(10, TimeUnit.MILLISECONDS)).raceEither(blocking(ZIO.effect(java.lang.Thread.sleep(10000L)))).void
      val withInterruptible = UIO(()).delay(Duration(100, TimeUnit.MILLISECONDS)).raceEither(interruptible(java.lang.Thread.sleep(15000L))).void

      unsafeRun(for {
        raceResult <- withBlocking.raceEither(withInterruptible)
      } yield raceResult) shouldBe a[Right[_, _]]
    }
  }

最後に

この記事ではZIOが提供する並行プログラミングのためのツール(zipzipParraceraceEither)を紹介しました。ZIOには、この記事で紹介していないforkjoinなどの低レベルなツールも豊富に取り揃えられており、1.0リリースではSoftware Transactional Memoryもサポートするようです。

ATOMICALLY { DELETE YOUR ACTORS } - John A De Goes & Wiem Zine Elabadine | Scalar 2019

6
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2