Everything is a stream
みなさん良いStreamライフを送っていますか?
私は最高のStreamライフを送っています!
特に最近Kotlin Coroutineを使い始め、新たな風が流れてきています。(Streamだけに)
寒いギャグのStreamはonCompleteするとして・・・
昨今のAndroid開発において、RxJava(RxKotlin)とKotlin Coroutineのどっちを使うか問題があると思います。
RxJavaはもちろん素晴らしいフレームワークですが、Kotlin Coroutineは公式が出しているフレームワークなので、こちらを使う方が安心感があるようにも感じます。
Kotlin Coroutineを使ってみて、こちらの方が良いと感じる部分も多数あったので
- RxJava使いがKotlin Coroutineを使い始めるための基本知識
- RxJavaに比べてKotlin Coroutineが良いと私が思った部分
の二つを話していきたいと思います。
対象読者
- Kotlinが読める
- RxJava(RxKotlin)を使ったことがある
- kotlin Coroutineを使いたいと思っている
- (Kotlin/MPPを信じてiOSをKotlinで書こうと思っているRxSwift使い)
まずはKotlin Coroutineについてざっくりと
と言っても、QiitaにKotlin Coroutineについての記事はたくさんあるので、改めて私が書くことはありません。
なので、私が参考にした記事をひたすら貼っていきます!
つまり、巨人の肩に乗るということです。
まずはKotlin Coroutineの概要をざっくりと
実は、CoroutineはRxと違いStreamだけを扱うものではないのです。
async/awaitも一時中断関数(suspend fun
)もKotlin Coroutineの一部です。
以下の記事はこの辺りがわかりやすくまとめられています。
【Kotlin】Coroutineを理解する (AtsushiUemuraさま)
Coroutineで動くアプリを作ってみる
Coroutineについてなんとなく理解したら、簡単なアプリを作ってみたくなりますよね?
以下の記事ではHTTP通信をCoroutineを使って行う例が示されています。
Android + Kotlin 1.3のcoroutines(Async, Await)でHTTP通信を非同期処理(jonghyoさま)
Coroutineを効果的に運用するために
Coroutineは非同期で動くため、下手に使うとメモリリークします。
効率的に運用するためにはCoroutineContext
について知る必要があります。
Coroutineは動かすContextがあり,Coroutineは親子関係を作ります。
そして、親が破棄されたら子は自動的に破棄されます。
なので、AndroidでしたらActivity
でCoroutineContext
を作成し、onDestroy()
で破棄するように設定しておくと、そこから動かした子Coroutineは自動的に破棄されて安全に扱えます!
この辺りの親子関係について以下の記事がとても分かりやすいです!
図で理解する Kotlin Coroutine(kawmraさま)
だいたい分かってきた時に押さえておきたい
チュートリアルや紹介では頻繁に出てくるGlobalScope.launch()
ですが、実はアンチパターンです。
Globalなので、ライフサイクルとして最長なためです。
この辺りのアンチパターンがまとまっているので、ある程度理解してきたら一読しておく価値ありです!
Kotlin Coroutinesパターン&アンチパターン (ikemura23さま)
RxJavaの「あれ」ってKotlin Coroutineではどうするの?[基本編]
ここからが本題です。
Kotlin Coroutineをだいたい理解したところで、Rxとの対応が知りたくなります。
「CoroutineはRxの代替では無い。」と書きましたが、Flow
の登場により、Rxの代替としても使えます。
そして、Flow
型にはmap
やfilter
、scan
やretry
などRxで見慣れたオペレーターが揃っています。
なので、Observable
とFlow
はほとんど互換といって差し支えありません。
しかも、RxのSingle
やCompletable
はCoroutineでは、もっと簡単に書けます。
Rxの「あれ」をKotlin Coroutineでやりたい!という気持ちに答えていきます。
RxとCoroutineの違いを大雑把に考察
CoroutineはRxに比べて型が厳しいです。
例えば、RxのHot Stream
, Cold Stream
を型で分類できません。一つ一つ覚える必要があります。
逆にRxはObservable
のオペレーターが全てなので、Subject
だろうがSingle
だろうが同じように扱えます。
一方で、CoroutineはHot Stream
はChannel
クラス、Cold Stream
はFlow
クラスといったように、型が厳密です。
Channel
とFlow
に生えているオペレーターはまるで別物です。
さらにsuspend fun
まで加わってきて、初見では複雑に感じると思います。
型が多く、厳密ゆえに書くのが難しかったりします。
一方で、型が厳密なので、「Cold Stream
だと思っていたら、Hot Stream
でリークしていた。」なんてことは起こりにくいです。
その辺りも交えながら比較していきます!
Single
Rxで値を1度だけ返すSingle
です。DBへのinsertやHTTPアクセスなど、頻繁に使われている印象があります。
Rx
fun getRequest(): Single<String> {
return Single.create<String> { emitter ->
// 何らかの処理する
emitter.onSuccess(body.message)
}
}
Coroutine
注意: suspend fun
はCold Stream
では無いため、厳密にはSingle
の互換ではありません。
しかし、似た扱いができるため、このように紹介しています。
suspend fun getRequest(): String{
// 何らかの処理する
return body.message
}
Single[callbackを使う場合]
Singleの内部でcallbackを使う場合はCoroutine側は少し変える必要があります。
Rx
fun getRequest(): Single<String> {
return Single.create<String> { emitter ->
val callback = object: Callback {
override fun onNextValue(value: String) {
emitter.onSuccess(value)
}
override fun onApiError(cause: Throwable) {
emitter.onFailure(Exception("API Error", cause))
}
}
}
}
Coroutine
suspendCoroutineは関数を抜けたところで一時停止し、resume
またはresumeWithException
が呼ばれるまで待機します。
(resumeWith
もありますが、説明を省略します)
suspend fun getMessages(): String {
return suspendCoroutine<String> { coroutine ->
val callback = object: Callback {
override fun onNextValue(value: String) {
coroutine.resume(value)
}
override fun onApiError(cause: Throwable) {
coroutine.resumeWithException(Exception("API Error", cause))
}
}
}
}
Completable
Rxで値を返さず完了したことだけ返すCompletable
です。
Rx
fun updateRequest(): Completable {
return Completable.create { emitter ->
// 何らかの処理
emitter.onComplete()
}
}
Coroutine
suspend fun updateRequest() {
// 何らかの処理
return
}
Observable
Observable.just()
一度だけアイテムを流して終えるStreamです。実質Single
です。
Rx
fun getMessage(): Observable<String> {
return Observable.just("サンプル")
}
Coroutine
fun getMessage(): Flow<String> {
return flowOf("サンプル")
}
Observable.create()[固定回数onNextを呼ぶ場合]
任意の回数onNextを呼べる汎用Streamです。
Coroutineの場合は、固定回数onNextを呼ぶ場合と、callbackの場合で異なり、こちらは固定回数呼ぶ場合です。
Rx
fun getMessages(): Observable<String> {
return Observable.create { observer ->
for(/*何回か回す*/) {
observer.onNext("サンプル")
}
observer.onComplete()
}
}
Coroutine
Coroutineの場合はflow
スコープを抜けると自動的にcompleted
したことになります。
fun getMessages(): Flow<String> {
return flow {
repeat(/*繰り返す回数*/) {
emit("サンプル")
}
}
}
Observable.create()[callbackからonNextを呼ぶ場合]
任意の回数onNextを呼べる汎用Streamです。
Coroutineの場合は、固定回数onNextを呼ぶ場合と、callbackの場合で異なり、こちらはcallbackバージョンです。
Rx
fun getMessages(): Observable<String> {
return Observable.create { observer ->
val callback = object: Callback {
override fun onNextValue(value: String) {
observer.onNext(value)
}
override fun onApiError(cause: Throwable) {
observer.onError(Exception("API Error", cause))
}
override fun onComplete() = observer.onComplete()
}
}.doFinally {/*streamが止まった時の後処理*/}
}
Coroutine
fun getMessages(): Flow<String> {
return callbackFlow {
val callback = object: Callback {
override fun onNextValue(value: String) {
offer(value)
}
override fun onApiError(cause: Throwable) {
cancel(Exception("API Error", cause))
}
override fun onComplete() = channel.close()
}
}
// awaitCloseのところで処理が止まるため、awaitClose{}の下にコードを書いてはいけない
awaitClose {/*streamが止まった時の後処理*/}
}
BehaviorSubject()
常に値を1個持っているHost Stream
Rx
class SampleClass {
private val behaviorSubject: BehaviorSubject<Int> = BehaviorSubject.create(0)
}
Coroutine
class SampleClass {
private val channel: Channel<Int> = Channel(Channel.CONFLATED)
}
ReplaySubject()
Rx
class SampleClass {
private val replaySubject: ReplaySubject<Int> = ReplaySubject.create()
}
Coroutine
class SampleClass {
private val channel: Channel<Int> = Channel(Channel.UNLIMTED)
}
Subscribe()
Coroutineの場合は、型によって呼ぶメソッドが変わるのが特徴。
Rx
Single.just(1).subscribeBy(
onNext = { v ->
// vがonNextした値
}
)
Coroutine[suspend funを呼ぶ場合]
suspend fun sample(): Int {
return 1
}
//// ここまで準備 ////
launch {
sample() // 普通に呼ぶ
}
Coroutine[flowを呼ぶ場合]
fun getFlow(): Flow<Int> = flowOf(1)
//// ここまで準備 ////
launch {
getFlow().collect{ v ->
// vがemitされてきた値
}
}
Coroutine[channelを呼ぶ場合]
fun getChannel(): ReceiveChannel<Int> {
// 予めchannelが作ってある前提
return this.channel
}
//// ここまで準備 ////
launch {
getChannel()
.consumeAsFlow() // ここで一度Cold Streamに変換する。その後はFlowと同様
.collect { v ->
// vがsendされてきた値
}
}
RxJavaの「あれ」ってKotlin Coroutineではどうするの?[応用編]
複数のStreamをシーケンシャルに処理する
複数のStreamを順番に処理したい場合
Rx
fun getMessages(): Observable<String> {
return Observable.create<String> { observer ->
val callback = object: Callback {
override fun onNextValue(value: String) {
observer.onNext(value)
}
override fun onApiError(cause: Throwable) {
observer.onError(Exception("API Error", cause))
}
override fun onComplete() = observer.onComplete()
}
}.doFinally {/*streamが止まった時の後処理*/}
}
fun getRequest(url: String): Single<String> {
return Single.create { emitter ->
// http通信をする
emitter.onSuccess(message.body)
}
}
fun insertDB(body: String): Completable {
return Completable.create { emitter ->
// DBに入れる
emitter.onComplete()
}
}
//// ここまで準備 ////
getMessages()
.flatMapSingle { getRequest(it) }
.flatMapCompletable { insertDB(it) }
.subscribeBy(onComplete = {})
Coroutine
onEach
で後ろからStreamが流れてきたら処理をします。
今回は値を返さないCompletable
だったので、onEach
ですが、flatMapMerge
もあります。
fun getMessages(): Flow<String> {
return callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: String) {
offer(value)
}
override fun onApiError(cause: Throwable) {
cancel(Exception("API Error", cause))
}
override fun onComplete() = channel.close()
}
}
// awaitCloseのところで処理が止まるため、awaitClose{}の下にコードを書いてはいけない
awaitClose {/*streamが止まった時の後処理*/}
}
suspend fun getRequest(url: String): String {
// http通信をする
return message.body
}
suspend fun insertDB(body: String) {
// DBに入れる
return
}
//// ここまで準備 ////
launch {
getMessages().onEach { url ->
val body = getRequest(rul)
insertDB(body)
}.collect {}
}
並列に処理して待ち合わせる
2か所にHTTP Requestを送信して、両方が成功したら次に進む処理。
シーケンシャルではなく、同時に送信するのがポイント
Rx
fun getHogeRequest(): Single<String> {
return Single.create { emitter ->
// HTTPリクエストする
emitter.onSuccess(message.body)
}
}
fun getFugaRequest(): Single<String> {
return Single.create { emitter ->
// HTTPリクエストする
emitter.onSuccess(message.body)
}
}
//// ここまで準備 ////
Single.zip(getHogeRequest(), getFugaRequest()).subscribeBy(onNext = { v: Pair<String, String> ->
})
Coroutine
suspend fun
はasync{}
で囲むことで、並列に処理ができるようになります。
全てが終了するまで待つときはList<Deffered<T>>
にawaitAll()
があるので、それを呼びます。
suspend fun getHogeRequest(): String {
// HTTPリクエストする
return message.body
}
suspend fun getFugaRequest(): String {
// HTTPリクエストする
return message.body
}
//// ここまで準備 ////
launch {
val resultList: List<String> = listOf(async{ getHogeRequest() }, async{ getFugaRequest() }).awaitAll()
}
Dispose
Rx
val disposable = hogeObservable().subscribeBy(onNext = {})
disposable.dispose()
Coroutine
val job = launch {
hogeFlow.collect {}
}
job.cancel()
まとめ
Rx使いがCoroutineを使いこなすためのポイントをまとめます。
- CoroutineScopeやCoroutineContextを何となく理解する
-
Single
やCompletable
を使いたくなったらsuspend fun
を使う -
Observable
を使いたくなったらFlow
を使う -
BehaviorSubject
などのSubject系を使いたくなったらChannel
を使う
ここでは細かく書きませんでしたが、Observable
とFlow
のオペレーターは似ていますが、異なる部分も多々あります。
最終的には公式ドキュメントが一番ですので、概要を理解したら公式ドキュメントを読んでみてください。
Rxのリファレンス、Coroutineのリファレンス