こんにちは。
AnyMind Groupという会社でCastingAsiaというプロダクトのテックリードをしています。以前はScala中心でしたが、今はPython,Kotlinを使ってDDDに取り組んでいます。
本記事では、Kotlin Web Applicationでの非同期処理をどう書くかをお話できればと思います。
環境
- SpringBoot WebFlux 2.2
- Kotlin 1.3
なお、本記事では非同期プログラミングライブラリとしてKotlinのCoroutine( https://github.com/Kotlin/kotlinx.coroutines )を使いますが、ReactorやRxJavaなどでも代用出来るかと思います。
なぜ非同期処理を書きたいのか
用途に応じてThread Poolを分け、パフォーマンスを向上させたい
IO(HTTP通信やSQLの発行、ファイル操作)は、近年ではR2DBCやNonBlockingなライブラリも出てきましたが、ThreadをBlockしてしまう処理もまだあるかと思います。
このとき、SpringBoot WebFluxのデフォルトのThread Pool(reactor-http-nio
)はnon-
blockingを前提としてthread数が少なく作られているため、IO系などBlockingする処理を入れると全てのthreadがsleepしてしまいパフォーマンスが急激に落ちます。
(ちなみにreactor-http-nio
のデフォルトのthread数はこちら)
https://github.com/reactor/reactor-netty/blob/0.9.x/src/main/java/reactor/netty/resources/LoopResources.java#L45
そのため、Blockingの有無や優先度に応じて別のthreadPoolを使い分けたくなります。
並行・並列処理を明示的に記すことで、よりドメインを表現したい
例えばListを処理をする際に、各要素の処理を並列で実行出来ると速度の改善が見込めます。
また、RDBへデータを保存すると同時に(並行して)、外部サーバーにHTTPリクエストを送る、というコードを組めると、「この処理はそれぞれ独立しているんだな(前の処理の結果に応じて次の処理内容が変わるわけではない)」と理解することが出来ます。
これらから、ドメイン理解(とパフォーマンス改善)に役立つと期待できます。
DDDに非同期処理を導入する
使いこなせると便利なのがわかって頂けたかと思いますが、非同期処理を書く時に問題になるのが、**『どのExecutor(ThreadPool)上で実行するか』**を明示してあげないといけないことです。
しかし、どのThreadPool上で実行するかという情報はドメイン層には書きたくありません。ドメイン層では、ただ『ここの処理は平行・直列に処理するべき』ということだけを書きたいのです。
一方でApplication層では、どのExecutorを使うかを明示する必要があります。
例えばCoroutineのドキュメントにあるこのようなコード
val a = async {
log("I'm computing a piece of the answer")
6
}
val b = async {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
DDDでのイメージとしては、このコードはDomain層にあり2つの結果を使う業務ロジックが記されていて、asyncの中のコードはInfrastructure層で別のContextで実行されるイメージです。
まずこのコード自体がCoroutineScope
の中にいないと実行できないので、Domain層にも何らかの方法で『何かは明示しないContext』を使いたいです。またasyncの中では『IO用といった明示されたContext』を使いたいですね。
それぞれ以下のように書けます。
Domain層
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
suspend fun calcSomeLogic(): Result {
coroutineScope {
val a = async { someRepository.call() }
val b = async { someExternalApiAccessor.call() }
log("The answer is ${a.await() * b.await()}")
}
}
ここではcoroutineScope
を使うことで、calcSomeLogicの呼び出し元のCoroutineContextをそのまま使えます。これによりコード上では並行処理を明示しつつ、どのContextを使うかは呼び出し側で決められるようになります。
Infrastructure層
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.withContext
import java.util.concurrent.Executor
class SomeRepositoryImpl: SomeRepository {
private val ioExecutor: Executor = ???
suspend fun call():Int {
withContext(ioExecutor.asCoroutineDispatcher()) {
doSomeBlockingTask()
}
}
}
Infrastructure層では、具体的にContextを明示することでBlockingな処理を他の処理に影響与えることなく実行出来るようになります(ここでは別途用意したioExecutor
を作っています)。
まとめ
以上、簡単にですがKotlinを例に上げてDDDで非同期プログラミングをしてみました。非同期処理は便利ですが、JDBCのConnectionなどThreadSafeでないものを扱い場合にはご注意ください。
ちなみにScalaだと、Domain層ではcats.effect.Async
, Application層ではscala.concurrent.Future
を使うと同じような事ができます。(scala.concurrent.Future
は実行にExecutionContextが必要なためDomain層では使いにくいため。AsyncとFutureの相互変換が肝)