COROUTINEの並列実行の速度測定する為、
本来FLUX(reactor-core)の部分をCOROUTINEに変える時の発見なんです。
タスクはあるウェブサイトから画像264枚をダウンロードし、JPGファイルとして保存する。
私のノートパソコンはWINDOWS 10、AMD Ryzen 5 3500Uだから
以下の処理は8 Threadの並列実行になります。
###FLUX(reactor-core)
Flux.fromIterable(taskStream.map {
Mono.fromCallable { it.invoke() }.onErrorResume { Mono.empty() }
})
.parallel()
.runOn(Schedulers.parallel())
.flatMap { x: Mono<Boolean> -> x }
.subscribe(onProcess, whenError, whenFinish)
####結果
...
/galleries/1809504/262.jpg
/galleries/1809504/263.jpg
/galleries/1809504/264.jpg
Calculation of Flux time :53075 milliseconds
###Coroutine Async
Kotlin Coroutine 入門2: 並列実行と Structured Concurrency と例外 を参考して以下を作成。
taskStream.map { activeScope.async { it.invoke() } }.awaitAll()
####結果
...
/galleries/1809504/262.jpg
/galleries/1809504/263.jpg
/galleries/1809504/264.jpg
Calculation of Coroutine time :340325 milliseconds
###原因
え。。Fluxより6.41倍の時間も掛かった。これは間違いなく1 Threadの実行結果。
でも上の記事ではasyncを使って2秒のタスク二つを2秒で完成したはず。これは何のことでしょう!?
答えはここに見つけた:
CONCURRENT COROUTINES – CONCURRENCY IS NOT PARALLELISM
簡単に言うとKotlin Coroutine 入門2の例は実際はdelay(2000)を使って何にもしてないからだ。
Coroutineでは1ThreadでもdelayとかawaitとかSuspend関係のコードがあったら、
その暇を利用して他のタスクを実行する!だからdelay(2000)の二つは2.165秒で完成出来た。
しかし、実際CPUの計算を2秒掛かったタスクの場合は4秒以上になってしまう。
入門2の例は並行処理だ、並列処理ではない。
###並列処理
CONCURRENCY IS NOT PARALLELISMからは実際の並列実行(Parallel Coroutines)を必要の条件も書いてる
####1 Run in GlobalScope (GlobalScopeを使う)
val one = GlobalScope.async { doSomethingUsefulOne() }
val two = GlobalScope.async { doSomethingUsefulTwo() }
####2 Specify a coroutine dispatcher (dispatcherを指定する)
val one = async(Dispatchers.Default) { doSomethingUsefulOne() }
val two = async(Dispatchers.Default) { doSomethingUsefulTwo() }
###改正
早速改正して
taskStream.map { activeScope.async(Dispatchers.Default) { it.invoke() } }.awaitAll()
####結果
...
/galleries/1809504/262.jpg
/galleries/1809504/263.jpg
/galleries/1809504/264.jpg
Calculation of Coroutine time :56176 milliseconds
これはFluxと完全一致の並列実行速度。そしてこの簡潔な表現、Kotlinやっぱり素晴らしい!