Coroutines
とReactor
(Reactive Streams
)の相互運用について、Java
ベースのノンブロッキングなフレームワークを使ってサーバーサイドKotlin
をやる上でまず最初に必要かなと思っている部分をまとめます。
誤っている部分が有りましたらご指摘下さい。
頻出の技術
Spring WebFlux
を前提に、以下3つの技術をざっくり解説します。
Reactive Streams
Project Reactor
Coroutines
Reactive Streams
Reactive Stream
は、非同期ストリーム処理を実現するためのオープンな仕様です。
Java 9
からはjava.util.concurrent.Flow
としてJDK
に取り込まれました(これは1対1対応するAPI
としてであって、クラス名等は後述する内容と異なっています)。
頻出のクラス
サーバーサイドKotlin
をやっていてよく登場するクラスはPublisher
です。
ただし、サーバーサイドKotlin
をやる場合はReactor
の実装やcoroutine
に変換することが多く、経験上これらを直接触ることは少なかったです。
また、Project Reactor
が導入されている場合、後述するMono
やFlux
の方がより具体的な表現になるため、アプリケーションのコードではなるべくPublisher
を書かない方が良いと思っています。
Project Reactor
Project Reactor
は、Reactive Streams
の実装の1つです。
有名どころとして、Spring WebFlux
はProject Reactor
をベースに開発されています。
頻出のクラス
Project Reactor
を使っていてよく登場するクラスはMono
とFlux
です。
両者ともに前述したPublisher
の実装で、値を1つ返す場合はMono
、複数返す場合はFlux
を利用します。
誤解を招きそうな表現ですが、Mono
は単なる値、Flux
はList
やStream
のようなものです(どちらかと言えばStream
の方が近いかも)。
頻出の記法
Kotlin
を使う場合coroutine
に変換してしまうのが簡単ですが、基盤部分の実装ではMono
・Flux
に触れなければならない場面も有ります。
詳しい部分はクラスのJavadoc
から探すべきですが、以下の操作を覚えておけば大体の部分は読み書きできると思います。
-
Mono#map
:Mono
内の値を別の値に変換する -
Mono#flatMap
: 単にMono#map
を使ってしまうとMono<Mono<T>>
になってしまうような場合に使う -
Mono#flatMapMany
: 値が複数になる(Flux
に変換したい)ような場合に使う -
Mono.just
: 値からMono
を作る
ここではMono
を例に紹介しましたが、Flux
に関しても大体同様の操作が有ります。
Reactor Context
サーバーサイド開発をする際には、層を跨いでリクエスト単位の値を参照したい場合が有ります。
従来のフレームワークでは、このような用途にThreadLocal
が使われていました。
例えば、Spring Web MVC
におけるTransactional
アノテーションの制御なども、内部的にはThreadLocal
が利用されています。
一方、ノンブロッキングフレームワークでは複数リクエスト間で限られたスレッドを使い回すため、ThreadLocal
を利用することができません。
そこで、Project Reactor
を利用したフレームワークでは、ThreadLocal
の代わりにContext
という機能が利用されます。
実際、Spring WebFlux
におけるTransactional
アノテーションの制御は、Context
を利用して実装されています。
なお、後述するCoroutines
にも同じような機能かつ同名のContext
が登場するため、この記事ではReactor
のContext
をReactor Context
と呼んで区別します。
また、Reactor Context
とCoroutine Context
には完全な互換性が有り、後述するような相互変換を行った場合も値が引き継がれます。
注意点
Reactive Streams
に対応しているライブラリであっても、Project Reactor
の機能は使えない(例えばそのライブラリを介するとReactor Context
が消えてしまう)場合が有ります。
自分の知っている例だと、jOOQ
にはこの問題が有ります。
Coroutines
Coroutines
は非同期・ノンブロッキングなプログラムを簡単に書けるKotlin
の言語機能です。
前述した2つの技術とは別物ですが、Kotlin
と周辺ライブラリによって完全な相互変換が可能です。
頻出のクラス
Coroutines
を利用する上で固有の頻出クラスはFlow
です1。
これは前述したFlux
と同様、複数の値を返す時に用いるList
やSequence
のようなものです。
Mono
に対応するクラスは無く、後述するsuspend
関数がその役割を担います。
頻出の記法
基本的に以下3点を覚えるだけで、アプリケーション上の大半の処理は通常と同様に書けます。
- 単一の値を返す場合、関数に
suspend
と付ける - 複数の値を返す場合、戻り値を
Flow
にし、suspend
は付けない-
Flow
を返す非suspend
関数からsuspend
関数を呼びたい場合、flow
関数を使う
-
相互運用について
次に、Coroutines
とReactor
(Reactive Streams
)の相互運用についてざっくりまとめます。
前提として、org.jetbrains.kotlinx:kotlinx-coroutines-reactor
が導入されているものとします。
suspend
関数の呼び出しをMono
にする
suspend
関数の呼び出し結果をMono
として返す場合、mono
関数が利用できます。
これを利用した場合、suspend
関数の呼び出しは以下のように書けます。
import kotlinx.coroutines.reactor.mono
import reactor.core.publisher.Mono
suspend fun foo(): Foo = TODO()
fun bar(): Mono<Foo> = mono { foo() } // ブロック内ではsuspend関数が呼び出せる
Flux
に関しても同様にflux
関数が用意されていますが、後述するFlow
からの変換を利用する機会の方が多いため、こちらを利用する機会は少なかったです。
Flux
・Flow
の相互変換
Flux
とFlow
に関しては、相互に変換することが出来ます。
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.asFlux
import reactor.core.publisher.Flux
val flux: Flux<Int> = flowOf(1, 2, 3).asFlux()
val flow: Flow<Int> = Flux.just(1, 2, 3).asFlow()
Flow
に変換した後であれば、Kotlin
のCollection
同様に表現力豊かな拡張関数を利用することが出来ます。
suspend
関数でPublisher
から値を取り出す
suspend
関数では、ブロック無しにPublisher
から値を取り出すことが出来ます。
Mono
の場合
Mono
から値を取り出す場合、awaitSingle
またはawaitSingleOrNull
関数を使います。
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import reactor.core.publisher.Mono
suspend fun foo(mono: Mono<Any>) {
val v1 = mono.awaitSingle()
val v2 = mono.awaitSingleOrNull()
}
Flux
(Publisher
)の場合
Flux
の場合、asFlow().toList()
といった操作で通常のCollection
が得られます。
Publisher
もFlux
と同様の操作で値を取り出すことが出来ます。
他にも、await
系の関数を使って1つずつ値を取り出すこともできます。
その他
その他Java
で実装された非同期・ノンブロッキング実装に関しても、これまで説明したようなCoroutine
との相互運用関数が提供されています。
suspend
関数呼び出しは対象名に由来するトップレベル関数、値の取り出しはawait~
という名前の拡張関数になっている場合が多いです。
自分の経験では、例えばAWS SDK
のS3AsyncClient
はJava
のCompletableFuture
を返すため、これをawait
するためにkotlinx-coroutines-jdk8
を導入したことがありました2。
利用可能なモジュールに関しては以下をご覧下さい。
また、reactor-kotlin-extensions
のように、kotlinx.coroutines
以外からもサポートライブラリが提供されている場合が有ります。
-
これは前述の
java.util.concurrent.Flow
とは異なるクラスです ↩ -
今新規で開発する場合aws-sdk-kotlinを使うべきかもしれません(自分はまだ手を出せておらず必要性を理解し切れていませんが……) ↩