LoginSignup
2
0

サーバーサイドKotlinにおけるCoroutinesとReactor(Reactive Streams)の相互運用ざっくりまとめ

Last updated at Posted at 2024-03-09

CoroutinesReactor(Reactive Streams)の相互運用について、Javaベースのノンブロッキングなフレームワークを使ってサーバーサイドKotlinをやる上でまず最初に必要かなと思っている部分をまとめます。
誤っている部分が有りましたらご指摘下さい。

頻出の技術

Spring WebFluxを前提に、以下3つの技術をざっくり解説します。

  • Reactive Streams
  • Project Reactor
  • Coroutines

Reactive Streams

Reactive Streamは、非同期ストリーム処理を実現するためのオープンな仕様です。
Java 9からはjava.util.concurrent.FlowとしてJDKに取り込まれました(これは1対1対応するAPIとしてであって、クラス名等は後述する内容と異なっています)。

頻出のクラス

サーバーサイドKotlinをやっていてよく登場するクラスはPublisherです。
ただし、サーバーサイドKotlinをやる場合はReactorの実装やcoroutineに変換することが多く、経験上これらを直接触ることは少なかったです。

また、Project Reactorが導入されている場合、後述するMonoFluxの方がより具体的な表現になるため、アプリケーションのコードではなるべくPublisherを書かない方が良いと思っています。

Project Reactor

Project Reactorは、Reactive Streamsの実装の1つです。
有名どころとして、Spring WebFluxProject Reactorをベースに開発されています。

頻出のクラス

Project Reactorを使っていてよく登場するクラスはMonoFluxです。
両者ともに前述したPublisherの実装で、値を1つ返す場合はMono、複数返す場合はFluxを利用します。

誤解を招きそうな表現ですが、Monoは単なる値、FluxListStreamのようなものです(どちらかと言えばStreamの方が近いかも)。

頻出の記法

Kotlinを使う場合coroutineに変換してしまうのが簡単ですが、基盤部分の実装ではMonoFluxに触れなければならない場面も有ります。
詳しい部分はクラスのJavadocから探すべきですが、以下の操作を覚えておけば大体の部分は読み書きできると思います。

  • Mono#map: Mono内の値を別の値に変換する
  • Mono#flatMap: 単にMono#mapを使ってしまうとMono<Mono<T>>になってしまうような場合に使う
  • Mono#flatMapMany: 値が複数になる(Fluxに変換したい)ような場合に使う
  • Mono.just: 値からMonoを作る

ここではMonoを例に紹介しましたが、Fluxに関しても大体同様の操作が有ります。

Reactor Context

サーバーサイド開発をする際には、層を跨いでリクエスト単位の値を参照したい場合が有ります。

従来のフレームワークでは、このような用途にThreadLocalが使われていました。
例えば、Spring Web MVCにおけるTransactionalアノテーションの制御なども、内部的にはThreadLocalが利用されています。

一方、ノンブロッキングフレームワークでは複数リクエスト間で限られたスレッドを使い回すため、ThreadLocalを利用することができません。
そこで、Project Reactorを利用したフレームワークでは、ThreadLocalの代わりにContextという機能が利用されます。
実際、Spring WebFluxにおけるTransactionalアノテーションの制御は、Contextを利用して実装されています。

なお、後述するCoroutinesにも同じような機能かつ同名のContextが登場するため、この記事ではReactorContextReactor Contextと呼んで区別します。
また、Reactor ContextCoroutine Contextには完全な互換性が有り、後述するような相互変換を行った場合も値が引き継がれます。

注意点

Reactive Streamsに対応しているライブラリであっても、Project Reactorの機能は使えない(例えばそのライブラリを介するとReactor Contextが消えてしまう)場合が有ります。
自分の知っている例だと、jOOQにはこの問題が有ります。

Coroutines

Coroutinesは非同期・ノンブロッキングなプログラムを簡単に書けるKotlinの言語機能です。

前述した2つの技術とは別物ですが、Kotlinと周辺ライブラリによって完全な相互変換が可能です。

頻出のクラス

Coroutinesを利用する上で固有の頻出クラスはFlowです1
これは前述したFluxと同様、複数の値を返す時に用いるListSequenceのようなものです。

Monoに対応するクラスは無く、後述するsuspend関数がその役割を担います。

頻出の記法

基本的に以下3点を覚えるだけで、アプリケーション上の大半の処理は通常と同様に書けます。

  • 単一の値を返す場合、関数にsuspendと付ける
  • 複数の値を返す場合、戻り値をFlowにし、suspendは付けない
    • Flowを返す非suspend関数からsuspend関数を呼びたい場合、flow関数を使う

相互運用について

次に、CoroutinesReactor(Reactive Streams)の相互運用についてざっくりまとめます。
前提として、org.jetbrains.kotlinx:kotlinx-coroutines-reactorが導入されているものとします。

suspend関数の呼び出しをMonoにする

suspend関数の呼び出し結果をMonoとして返す場合、mono関数が利用できます。

これを利用した場合、suspend関数の呼び出しは以下のように書けます。

import kotlinx.coroutines.reactor.mono
import reactor.core.publisher.Mono

suspend fun foo(): Foo = TODO()
fun bar(): Mono<Foo> = mono { foo() } // ブロック内ではsuspend関数が呼び出せる

Fluxに関しても同様にflux関数が用意されていますが、後述するFlowからの変換を利用する機会の方が多いため、こちらを利用する機会は少なかったです。

FluxFlowの相互変換

FluxFlowに関しては、相互に変換することが出来ます。

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.asFlux
import reactor.core.publisher.Flux

val flux: Flux<Int> = flowOf(1, 2, 3).asFlux()
val flow: Flow<Int> = Flux.just(1, 2, 3).asFlow()

Flowに変換した後であれば、KotlinCollection同様に表現力豊かな拡張関数を利用することが出来ます。

suspend関数でPublisherから値を取り出す

suspend関数では、ブロック無しにPublisherから値を取り出すことが出来ます。

Monoの場合

Monoから値を取り出す場合、awaitSingleまたはawaitSingleOrNull関数を使います。

import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import reactor.core.publisher.Mono

suspend fun foo(mono: Mono<Any>) {
    val v1 = mono.awaitSingle()
    val v2 = mono.awaitSingleOrNull()
}

Flux(Publisher)の場合

Fluxの場合、asFlow().toList()といった操作で通常のCollectionが得られます。
PublisherFluxと同様の操作で値を取り出すことが出来ます。

他にも、await系の関数を使って1つずつ値を取り出すこともできます。

その他

その他Javaで実装された非同期・ノンブロッキング実装に関しても、これまで説明したようなCoroutineとの相互運用関数が提供されています。
suspend関数呼び出しは対象名に由来するトップレベル関数、値の取り出しはawait~という名前の拡張関数になっている場合が多いです。

自分の経験では、例えばAWS SDKS3AsyncClientJavaCompletableFutureを返すため、これをawaitするためにkotlinx-coroutines-jdk8を導入したことがありました2

利用可能なモジュールに関しては以下をご覧下さい。

また、reactor-kotlin-extensionsのように、kotlinx.coroutines以外からもサポートライブラリが提供されている場合が有ります。

  1. これは前述のjava.util.concurrent.Flowとは異なるクラスです

  2. 今新規で開発する場合aws-sdk-kotlinを使うべきかもしれません(自分はまだ手を出せておらず必要性を理解し切れていませんが……)

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0