83
62

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

NSSOLAdvent Calendar 2019

Day 21

Rx使いに送るKotlin Coroutine入門

Last updated at Posted at 2019-12-20

Everything is a stream

みなさん良いStreamライフを送っていますか?

私は最高のStreamライフを送っています!
特に最近Kotlin Coroutineを使い始め、新たな風が流れてきています。(Streamだけに)
寒いギャグのStreamはonCompleteするとして・・・

昨今のAndroid開発において、RxJava(RxKotlin)とKotlin Coroutineのどっちを使うか問題があると思います。
RxJavaはもちろん素晴らしいフレームワークですが、Kotlin Coroutineは公式が出しているフレームワークなので、こちらを使う方が安心感があるようにも感じます。

Kotlin Coroutineを使ってみて、こちらの方が良いと感じる部分も多数あったので

  • RxJava使いがKotlin Coroutineを使い始めるための基本知識
  • RxJavaに比べてKotlin Coroutineが良いと私が思った部分

の二つを話していきたいと思います。

対象読者

  • Kotlinが読める
  • RxJava(RxKotlin)を使ったことがある
  • kotlin Coroutineを使いたいと思っている
  • (Kotlin/MPPを信じてiOSをKotlinで書こうと思っているRxSwift使い)

まずはKotlin Coroutineについてざっくりと

と言っても、QiitaにKotlin Coroutineについての記事はたくさんあるので、改めて私が書くことはありません。
なので、私が参考にした記事をひたすら貼っていきます!
つまり、巨人の肩に乗るということです。

まずはKotlin Coroutineの概要をざっくりと

実は、CoroutineはRxと違いStreamだけを扱うものではないのです。
async/awaitも一時中断関数(suspend fun)もKotlin Coroutineの一部です。
以下の記事はこの辺りがわかりやすくまとめられています。
【Kotlin】Coroutineを理解する (AtsushiUemuraさま)

Coroutineで動くアプリを作ってみる

Coroutineについてなんとなく理解したら、簡単なアプリを作ってみたくなりますよね?
以下の記事ではHTTP通信をCoroutineを使って行う例が示されています。
Android + Kotlin 1.3のcoroutines(Async, Await)でHTTP通信を非同期処理(jonghyoさま)

Coroutineを効果的に運用するために

Coroutineは非同期で動くため、下手に使うとメモリリークします。
効率的に運用するためにはCoroutineContextについて知る必要があります。

Coroutineは動かすContextがあり,Coroutineは親子関係を作ります。
そして、親が破棄されたら子は自動的に破棄されます。
なので、AndroidでしたらActivityCoroutineContextを作成し、onDestroy()で破棄するように設定しておくと、そこから動かした子Coroutineは自動的に破棄されて安全に扱えます!

この辺りの親子関係について以下の記事がとても分かりやすいです!
図で理解する Kotlin Coroutine(kawmraさま)

だいたい分かってきた時に押さえておきたい

チュートリアルや紹介では頻繁に出てくるGlobalScope.launch()ですが、実はアンチパターンです。
Globalなので、ライフサイクルとして最長なためです。

この辺りのアンチパターンがまとまっているので、ある程度理解してきたら一読しておく価値ありです!
Kotlin Coroutinesパターン&アンチパターン (ikemura23さま)

RxJavaの「あれ」ってKotlin Coroutineではどうするの?[基本編]

ここからが本題です。
Kotlin Coroutineをだいたい理解したところで、Rxとの対応が知りたくなります。
「CoroutineはRxの代替では無い。」と書きましたが、Flowの登場により、Rxの代替としても使えます。
そして、Flow型にはmapfilterscanretryなどRxで見慣れたオペレーターが揃っています。
なので、ObservableFlowはほとんど互換といって差し支えありません。
しかも、RxのSingleCompletableはCoroutineでは、もっと簡単に書けます。

Rxの「あれ」をKotlin Coroutineでやりたい!という気持ちに答えていきます。

RxとCoroutineの違いを大雑把に考察

CoroutineはRxに比べて型が厳しいです。
例えば、RxのHot Stream, Cold Streamを型で分類できません。一つ一つ覚える必要があります。
逆にRxはObservableのオペレーターが全てなので、SubjectだろうがSingleだろうが同じように扱えます。

一方で、CoroutineはHot StreamChannelクラス、Cold StreamFlowクラスといったように、型が厳密です。
ChannelFlowに生えているオペレーターはまるで別物です。
さらにsuspend funまで加わってきて、初見では複雑に感じると思います。
型が多く、厳密ゆえに書くのが難しかったりします。

一方で、型が厳密なので、「Cold Streamだと思っていたら、Hot Streamでリークしていた。」なんてことは起こりにくいです。

その辺りも交えながら比較していきます!

Single

Rxで値を1度だけ返すSingleです。DBへのinsertやHTTPアクセスなど、頻繁に使われている印象があります。

Rx

fun getRequest(): Single<String> {
    return Single.create<String> { emitter ->
        // 何らかの処理する
        emitter.onSuccess(body.message)
    }
}

Coroutine

注意: suspend funCold Streamでは無いため、厳密にはSingleの互換ではありません。
しかし、似た扱いができるため、このように紹介しています。

suspend fun getRequest(): String{
    // 何らかの処理する
    return body.message
}

Single[callbackを使う場合]

Singleの内部でcallbackを使う場合はCoroutine側は少し変える必要があります。

Rx

fun getRequest(): Single<String> {
    return Single.create<String> { emitter ->
        val callback = object: Callback {
            override fun onNextValue(value: String) {
                emitter.onSuccess(value)
            }
            override fun onApiError(cause: Throwable) {
                emitter.onFailure(Exception("API Error", cause))
            }
        }
    }
}

Coroutine

suspendCoroutineは関数を抜けたところで一時停止し、resumeまたはresumeWithExceptionが呼ばれるまで待機します。
(resumeWithもありますが、説明を省略します)

suspend fun getMessages(): String {
    return suspendCoroutine<String> { coroutine ->
        val callback = object: Callback {
            override fun onNextValue(value: String) {
                coroutine.resume(value)
            }
            override fun onApiError(cause: Throwable) {
                coroutine.resumeWithException(Exception("API Error", cause))
            }
        }
    }
}

Completable

Rxで値を返さず完了したことだけ返すCompletableです。

Rx

fun updateRequest(): Completable {
    return Completable.create { emitter ->
        // 何らかの処理
        emitter.onComplete()
    }
}

Coroutine

suspend fun updateRequest() {
    // 何らかの処理
    return
}

Observable

Observable.just()

一度だけアイテムを流して終えるStreamです。実質Singleです。

Rx

fun getMessage(): Observable<String> {
    return Observable.just("サンプル")
}

Coroutine

fun getMessage(): Flow<String> {
    return flowOf("サンプル")
}

Observable.create()[固定回数onNextを呼ぶ場合]

任意の回数onNextを呼べる汎用Streamです。
Coroutineの場合は、固定回数onNextを呼ぶ場合と、callbackの場合で異なり、こちらは固定回数呼ぶ場合です。

Rx

fun getMessages(): Observable<String> {
    return Observable.create { observer ->
        for(/*何回か回す*/) {
            observer.onNext("サンプル")
        }
        observer.onComplete()
    }
}

Coroutine

Coroutineの場合はflowスコープを抜けると自動的にcompletedしたことになります。

fun getMessages(): Flow<String> {
    return flow {
        repeat(/*繰り返す回数*/) {
            emit("サンプル")
        }
    }
}

Observable.create()[callbackからonNextを呼ぶ場合]

任意の回数onNextを呼べる汎用Streamです。
Coroutineの場合は、固定回数onNextを呼ぶ場合と、callbackの場合で異なり、こちらはcallbackバージョンです。

Rx

fun getMessages(): Observable<String> {
    return Observable.create { observer ->
        val callback = object: Callback {
            override fun onNextValue(value: String) {
                observer.onNext(value)
            }
            override fun onApiError(cause: Throwable) {
                observer.onError(Exception("API Error", cause))
            }
            override fun onComplete() = observer.onComplete()
        }
    }.doFinally {/*streamが止まった時の後処理*/}
}

Coroutine

fun getMessages(): Flow<String> {
    return callbackFlow {
        val callback = object: Callback {
            override fun onNextValue(value: String) {
                offer(value)
            }
            override fun onApiError(cause: Throwable) {
                cancel(Exception("API Error", cause))
            }
            override fun onComplete() = channel.close()
        }
    }
    // awaitCloseのところで処理が止まるため、awaitClose{}の下にコードを書いてはいけない
    awaitClose {/*streamが止まった時の後処理*/}
}

BehaviorSubject()

常に値を1個持っているHost Stream

Rx

class SampleClass {
    private val behaviorSubject: BehaviorSubject<Int> = BehaviorSubject.create(0)
}

Coroutine

class SampleClass {
    private val channel: Channel<Int> = Channel(Channel.CONFLATED)
}

ReplaySubject()

Rx

class SampleClass {
    private val replaySubject: ReplaySubject<Int> = ReplaySubject.create()
}

Coroutine

class SampleClass {
    private val channel: Channel<Int> = Channel(Channel.UNLIMTED)
}

Subscribe()

Coroutineの場合は、型によって呼ぶメソッドが変わるのが特徴。

Rx

Single.just(1).subscribeBy(
    onNext = { v ->
        // vがonNextした値
    }
)

Coroutine[suspend funを呼ぶ場合]

suspend fun sample(): Int {
    return 1
}

//// ここまで準備 ////

launch {
    sample() // 普通に呼ぶ
}

Coroutine[flowを呼ぶ場合]

fun getFlow(): Flow<Int> = flowOf(1)

//// ここまで準備 ////

launch {
    getFlow().collect{ v ->
        // vがemitされてきた値
    }
}

Coroutine[channelを呼ぶ場合]

fun getChannel(): ReceiveChannel<Int> {
    // 予めchannelが作ってある前提
    return this.channel
} 

//// ここまで準備 ////

launch {
    getChannel()
        .consumeAsFlow() // ここで一度Cold Streamに変換する。その後はFlowと同様
        .collect { v ->
            // vがsendされてきた値
        }
}

RxJavaの「あれ」ってKotlin Coroutineではどうするの?[応用編]

複数のStreamをシーケンシャルに処理する

複数のStreamを順番に処理したい場合

Rx

fun getMessages(): Observable<String> {
    return Observable.create<String> { observer ->
        val callback = object: Callback {
            override fun onNextValue(value: String) {
                observer.onNext(value)
            }
            override fun onApiError(cause: Throwable) {
                observer.onError(Exception("API Error", cause))
            }
            override fun onComplete() = observer.onComplete()
        }
    }.doFinally {/*streamが止まった時の後処理*/}
}

fun getRequest(url: String): Single<String> {
    return Single.create { emitter ->
        // http通信をする
        emitter.onSuccess(message.body)
    }
}

fun insertDB(body: String): Completable {
    return Completable.create { emitter ->
        // DBに入れる
        emitter.onComplete()
    }
}

//// ここまで準備 ////

getMessages()
    .flatMapSingle { getRequest(it) }
    .flatMapCompletable { insertDB(it) }
    .subscribeBy(onComplete = {})

Coroutine

onEachで後ろからStreamが流れてきたら処理をします。
今回は値を返さないCompletableだったので、onEachですが、flatMapMergeもあります。

fun getMessages(): Flow<String> {
    return callbackFlow {
        val callback = object : Callback {
            override fun onNextValue(value: String) {
                offer(value)
            }
            override fun onApiError(cause: Throwable) {
                cancel(Exception("API Error", cause))
            }
            override fun onComplete() = channel.close()
        }
    }
    // awaitCloseのところで処理が止まるため、awaitClose{}の下にコードを書いてはいけない
    awaitClose {/*streamが止まった時の後処理*/}
}

suspend fun getRequest(url: String): String {
    // http通信をする
    return message.body
}

suspend fun insertDB(body: String) {
    // DBに入れる
    return
}

//// ここまで準備 ////

launch {
    getMessages().onEach { url ->
        val body = getRequest(rul)
        insertDB(body)
    }.collect {}
}

並列に処理して待ち合わせる

2か所にHTTP Requestを送信して、両方が成功したら次に進む処理。
シーケンシャルではなく、同時に送信するのがポイント

Rx


fun getHogeRequest(): Single<String> {
    return Single.create { emitter ->
        // HTTPリクエストする
        emitter.onSuccess(message.body)
    }
}

fun getFugaRequest(): Single<String> {
    return Single.create { emitter ->
        // HTTPリクエストする
        emitter.onSuccess(message.body)
    }
}

//// ここまで準備 ////
Single.zip(getHogeRequest(), getFugaRequest()).subscribeBy(onNext = { v: Pair<String, String> ->

})

Coroutine

suspend funasync{}で囲むことで、並列に処理ができるようになります。
全てが終了するまで待つときはList<Deffered<T>>awaitAll()があるので、それを呼びます。

suspend fun getHogeRequest(): String {
    // HTTPリクエストする
    return message.body
}

suspend fun getFugaRequest(): String {
    // HTTPリクエストする
    return message.body
}

//// ここまで準備 ////

launch {
    val resultList: List<String> = listOf(async{ getHogeRequest() }, async{ getFugaRequest() }).awaitAll()
}

Dispose

Rx

val disposable = hogeObservable().subscribeBy(onNext = {})

disposable.dispose()

Coroutine


val job = launch {
    hogeFlow.collect {}
}

job.cancel()

まとめ

Rx使いがCoroutineを使いこなすためのポイントをまとめます。

  • CoroutineScopeやCoroutineContextを何となく理解する
  • SingleCompletableを使いたくなったらsuspend funを使う
  • Observableを使いたくなったらFlowを使う
  • BehaviorSubjectなどのSubject系を使いたくなったらChannelを使う

ここでは細かく書きませんでしたが、ObservableFlowのオペレーターは似ていますが、異なる部分も多々あります。
最終的には公式ドキュメントが一番ですので、概要を理解したら公式ドキュメントを読んでみてください。
RxのリファレンスCoroutineのリファレンス

83
62
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
83
62

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?