24
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KotlinAdvent Calendar 2018

Day 21

Coroutines reactive moduleで今日からKotlin Coroutinesに入門する

Last updated at Posted at 2018-12-20

はじめに

この記事は Kotlin Advent Calendar 2018 21日目の記事です。

これまでExperimental APIであったCoroutinesがKotlin1.3.0よりstableになりました。
「Experimentalだったので見送っていたがこれを機にCoroutinesを導入してみたい」と考えた方も多くいらっしゃるのではないでしょうか?
自分のAndroidチームではCoroutinesの導入について話し合った結果、layerやmoduleの境界となるI/FはRxで統一し、それ以外に関してはRxとCoroutinesとで使いやすい方を使うという方針で進めてみています。
そのため「RxとCoroutinesとの相互変換」が必要になるのですが、kotlinx.coroutinesにあるreacrive module下に既に用意されており、現在進行形でこちらを使って開発を進めています。
本稿では、Coroutines導入時に注意した点や内部処理に触れたりしつつ、このreactive module内の「kotlinx-coroutines-reactive」「kotlinx-coroutines-rx2」についてご紹介していきたいと思います。
※kotlinx-coroutines-reactorについては触れていません。ご注意ください。

環境

  • Android Studio : 3.2.1
  • Kotlin : 1.3.0
  • kotlinx-coroutines-rx2 : 1.0.0
  • kotlinx-coroutines-reactive : 1.0.0

ご紹介の前に(設定)

今回ご紹介するAPIの中にはExperimental APIおよびObsolete APIが含まれます。そのため利用の際には必要に応じて以下の設定が必要になります。

  1. Experimental API/Obsolete API利用メソッド/クラスにannotationを付与する。
  2. compile optionとしてExperimental APIを使用することを宣言する。

Experimental API/Obsolete API利用メソッド/クラスにannotationを付与する

Experimental API/Obsolete APIを利用する際には該当APIを呼び出すクラスもしくはメソッドに対して以下のいずれかのannotationを付与する必要があります。

  1. @ExperimentalCoroutinesApi / @ObsoleteCoroutinesApi
  2. @UseExperimental(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
    (@UseExperimentalの引数はvarargになっています)

compile optionとしてExperimental APIを使用することを宣言する

先述のannotationを付与するとコンパイル時に以下の警告が出てくるようになります。

コンパイル時の警告
This class can only be used with the compiler argument '-Xuse-experimental=kotlin.Experimental'

これを解消するためには先述のAPIを利用しているモジュールのbuild.gradle.kts(もしくはbuild.gradle)のcompile optionへExperimental APIを使うことを追記する必要があります。

module/build.gradle.kts
tasks.withType(KotlinCompile::class).all {
  kotlinOptions {
    freeCompilerArgs = listOf("-Xuse-experimental=kotlin.Experimental")
  }
}

ご紹介の前に(章立て)

各モジュールのREADMEでは以下の構成で説明されています。本稿ではそちらに倣ってご紹介していきます。
1. コルーチンビルダー(Coroutine builders)
 各種Observableを返却する機能を持つコルーチンビルダーについて触れています。
2. 中断関数(Suspending extension functions and suspending iteration)
 各種Observableクラスを拡張した中断関数(suspend関数)について触れています。
3. 変換関数(Conversion functions)
 各種Observableへ変換する関数やSchedulerをCoroutineDispatcherへ変換するための関数について触れています。

kotlinx-coroutines-reactive

コルーチンビルダー(Coroutine builders)

publish

文字通り、Publisherを返却するCoroutine Builderです。
ブロック内はProducerScopeになっており、send()/offer()onNext()close()をThrowableなしで呼び出した場合はonComplete()、Throwableつきで呼び出した場合はonError()が通知されます。
また、publishはExperimentalCoroutinesApiのため、先述の設定を忘れないように注意してください。

Publish.kt/publish
@ExperimentalCoroutinesApi fun <T> CoroutineScope.publish(
    context: CoroutineContext = EmptyCoroutineContext, 
    block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>
publishサンプル
val publisher: Publisher<Int> = publish(coroutineContext) {
  // onNext通知
  send(element)
  offer(element)
  // onComplete通知
  close()
  // onError通知
  close(Throwable)
}

中断関数(Suspending extension functions and suspending iteration)

Publisher await

Publisherの拡張関数として6種類のawaitが用意されています。

Name Description
Publisher.awaitFirst Publisherからの一つ目の値を待ち、その値を返却
Publisher.awaitFirstOrDefault Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければデフォルト値を返却
Publisher.awaitFirstOrElse Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければ引数のfunctionで返却されるデフォルト値を返却
Publisher.awaitFirstOrNull Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければnullを返却
Publisher.awaitLast Publisherからの最後の値を待ち、その値を返却
Publisher.awaitSingle Publisherからの最後の値を待ち、その値を一度だけ返却
awaitSingle()を補足すると、既にいずれかのawaitを用いて値を取得している状態で呼び出した場合はIllegalArgumentExceptionが投げられるので注意してください。

また余談になりますが、これらのawait関数は内部ではModeというenum classとNullableなデフォルト値を引数に取るawaitOne()という関数でまとめられており、この辺りを見ると各awaitについての理解が深めやすいかなと思いました。

Publishser.awaitOne

例えば、awaitFirstOrNull()awaitFirstOrDefault()のデフォルト値がデフォルト引数のnullを指定したものになっていることが分かります。

Await.kt
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)

public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)

public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) // awaitFirstOrDefaultとModeは同じだが,nullを指定している.

public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()

public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)

public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)

private suspend fun <T> Publisher<T>.awaitOne(mode: Mode, default: T? = null): T

Publisher openSubscription

await以外にもPublisherから発行されたイベントをChannelで扱うための拡張関数、openSubscription()が提供されています。
このopenSubscription()は呼び出すとsubscribeを開始し、イベントを受信するためのReceiveChannelを返却します。
こうして受け取ったReceiveChannelのAPI(例えばconsumeEach()など)を用いることでPublisherで発行したイベントを受信することが出来ます。

Channel.kt/openSubscription
@ObsoleteCoroutinesApi
@Suppress("CONFLICTING_OVERLOADS")
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
    val channel = SubscriptionChannel<T>(request)
    subscribe(channel)
    return channel
}
openSubscriptionサンプル
val publisher: Publisher<Int> = publish(coroutineContext) {
    :
}
publisher.openSubscription()
  .consumeEach {
    println(it)
  }

ただし、このopenSubscription()ObsoleteCoroutinesApiとなっていますので、こちらも先述の設定が必要になります。
※Channelの詳細については公式ページを参照ください。

参考: Kotlin Reference : Channels

変換関数(Conversion functions)

ReceiveChannel.asPublisher

先ほどご紹介したPublisher.openSubscription()の逆パターンです。ReceiveChannelをPublisherへと変換することが出来ます。
こちらもObsoleteCoroutinesApiになっています。

Convert.kt/asPublisher
@ObsoleteCoroutinesApi
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = GlobalScope.publish(context) {
    for (t in this@asPublisher)
        send(t)
}
asPublisherサンプル
val publisher = receiveChannel.asPublisher()
publisher.subscribe(object: Subscriber<Int?> {
    override fun onComplete() {}

    override fun onSubscribe(s: Subscription?) {}

    override fun onNext(t: Int?) {}

    override fun onError(t: Throwable?) {}
  })

kotlinx-coroutines-rx2

コルーチンビルダー(Coroutine builders)

kotlinx-coroutines-rx2ではCompletableMaybeSingleObservableFlowableを返却するCoroutine Builderが用意されています。

Name Result Scope
rxCompletable Completable CoroutineScope
rxMaybe Maybe CoroutineScope
rxSingle Single CoroutineScope
rxObservable Observable ProducerScope
rxFlowable Flowable ProducerScope

rxCompletable

Completableを返却するCoroutine Builderです。Coroutine Scope内の処理が正常に完了した場合にonComplete()、例外やキャンセルがされた場合にはonError()へ通知されます。

RxCompletable.kt/rxCompletable
public fun CoroutineScope.rxCompletable(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> Unit
): Completable = Completable.create { subscriber ->
    val newContext = newCoroutineContext(context)
    val coroutine = RxCompletableCoroutine(newContext, subscriber)
    subscriber.setCancellable(RxCancellable(coroutine))
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
rxCoompletableサンプル
val completable = rxCompletable {
  // 処理が正常に成功すればonComplete
  // Exception or unsubscribe, cancelされた場合はonError
}

この関数の処理中で生成しているprivate class RxCompletableCoroutineのsuper class AbstractCoroutineの関数onCompletionInternal()を覗いてみると上記の処理を確認することが出来ます。

AbstractCoroutine.kt
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
    if (state is CompletedExceptionally) // CompletedExceptionally : Jobがキャンセルされたときの状態を表す内部クラス
        onCompletedExceptionally(state.cause)
    else
        onCompleted(state as T)
}

rxMaybe

Maybeを返却するCoroutine Builderです。Coroutine ScopeブロックでNonNullな値を返すとonSuccess()、nullを返却するとonComplete()、rxCompleteと同様に例外発生もしくはキャンセルされるとonError()へ通知されます。

RxMaybe.kt/rxMaybe
public fun <T> CoroutineScope.rxMaybe(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
    val newContext = newCoroutineContext(context)
    val coroutine = RxMaybeCoroutine(newContext, subscriber)
    subscriber.setCancellable(RxCancellable(coroutine))
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
rxMaybeサンプル
val maybe = rxMaybe<Int> {
  // NonNullな値を返却するとonSuccess
  return@rxMaybe 100
  // nullを返却するとonComplete
  return@rxMaybe null
  // Exception or unsubscribe, cancelされた場合はonError
}

また、rxMaybeの内部で生成されるRxMaybeCoroutineでは定義の通りvalueの値によって呼び出す関数を切り替えている処理になっているのが分かります。

RxMaybeCoroutine
private class RxMaybeCoroutine<T>(
    parentContext: CoroutineContext,
    private val subscriber: MaybeEmitter<T>
) : AbstractCoroutine<T>(parentContext, true) {
    override val cancelsParent: Boolean get() = true
    override fun onCompleted(value: T) {
        if (!subscriber.isDisposed) {
            if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
        }
    }

    override fun onCompletedExceptionally(exception: Throwable) {
        if (!subscriber.isDisposed) subscriber.onError(exception)
    }
}

rxSingle

Singleを返却するCoroutine Builderです。Coroutine Scopeブロック内で値を返すとonSuccess()、先述の2つのCoroutine Builderと同様に例外発生もしくはキャンセルされた際にonError()へと通知されます。

RxSingle.kt/rxSingle
public fun <T : Any> CoroutineScope.rxSingle(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
    val newContext = newCoroutineContext(context)
    val coroutine = RxSingleCoroutine(newContext, subscriber)
    subscriber.setCancellable(RxCancellable(coroutine))
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
rxSingleサンプル
val single = rxSingle<Int> {
  // 値を返却するとonSuccess
  return@rxSingle 100
  // Exception or unsubscribe, cancelされた場合はonError
}

また、<T : Any>とジェネネリクスが宣言されているため、rxSingleのCoroutine Scopeでは返り値はNonNullな値でなければいけないことが分かります。

rxObservable

Observableを返却するCoroutine Builderです。これまではCoroutineScopeがスコープとなっていましたが、rxObservableはProducerScopeになっています。send()/offer()を呼び出すとonNext()、引数なしのclose()の様な正常終了の場合にはonComplete()、例外発生もしくは引数ありでclose()を呼び出した際にはonError()が呼ばれます。
また、ExperimentalCoroutinesApiとなっている点にも注意してください。

RxObservabe.kt/rxObservable
@ExperimentalCoroutinesApi
public fun <T : Any> CoroutineScope.rxObservable(
    context: CoroutineContext = EmptyCoroutineContext,
    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
    val newContext = newCoroutineContext(context)
    val coroutine = RxObservableCoroutine(newContext, subscriber)
    subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
rxObservableサンプル
val observable = rxObservable<Int> {
  // 値の発行でonNext
  offer(100)
  send(200)

  // 通常終了でonComplete
  return@rxObservable
  close()

  // Exception or unsubscribe, cancelされた場合はonError
  close(Throwable)
}

Publisher同様KDocにはsend()の記載しかありませんが、rxObservable内で生成されるRxObservableCoroutineの中を見ると、send()/offer()両方でdoLockedNext()を呼び出し、その中でonNext()が呼び出されているのが分かります。

RxObservable.kt/RxObservableCoroutine
override fun offer(element: T): Boolean {
    if (!mutex.tryLock()) return false
    doLockedNext(element)
    return true
}

public override suspend fun send(element: T) {
    // fast-path -- try send without suspension
    if (offer(element)) return
    // slow-path does suspend
    return sendSuspend(element)
}

private suspend fun sendSuspend(element: T) {
    mutex.lock()
    doLockedNext(element)
}

private fun doLockedNext(elem: T) {
    // check if already closed for send
    if (!isActive) {
        doLockedSignalCompleted()
        throw getCancellationException()
    }
    // notify subscriber
    try {
        subscriber.onNext(elem)
    } catch (e: Throwable) {
        try {
            if (!cancel(e))
                handleCoroutineException(context, e, this)
        } finally {
            doLockedSignalCompleted()
        }
        throw getCancellationException()
    }
    /*
        There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
        happen after this check and before `unlock` (see `onCancellation` that does not do anything
        if it fails to acquire the lock that we are still holding).
        We have to recheck `isCompleted` after `unlock` anyway.
        */
    mutex.unlock()
    // recheck isActive
    if (!isActive && mutex.tryLock())
        doLockedSignalCompleted()
}

今のチームではObservableかSingleで上位layerへ公開している機能がほとんどなこともあり、rxObservableと先述のrxSingleは使う機会が多くなっています。

rxFlowable

Flowableを返却するCoroutine Builderです。rxObservableと同様にScopeはProducerScope、各イベント関数の呼び出し条件は、send()/offer()を呼び出しでonNext()、引数なしのclose()の様な正常終了でonComplete()、例外発生もしくは引数ありでのclose()呼び出しでonError()となっています。

rxFlowableサンプル
val flowable = rxFlowable<Int> {
  // send/offerでonNext
  offer(100)
  send(200)

  // 通常終了でonComplete
  return@rxFlowable
  close()

  // Exception or unsubscribe, cancelされた場合はonError
  close(Throwable)
}

中断関数(Suspending extension functions and suspending iteration)

CompletableSource await

Completableの処理が完了するまで待ちます。

RxAwait.kt/CompletableSource.await
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
    subscribe(object : CompletableObserver {
        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
        override fun onComplete() { cont.resume(Unit) }
        override fun onError(e: Throwable) { cont.resumeWithException(e) }
    })
}

suspendCancellableCoroutineではCancellableContinuationが渡され、CompletableObserverの各イベントでinternal関数disposeOnCancellation()などが呼ばれています。

MaybeSource await / openSubscription

MaybeSourceの拡張関数として2種類のawaitとopenSubscriptionが用意されています。

Name Description
MaybeSource.await Maybeの値もしくはnullの返却を待つ
MaybeSource.awaitOrDefault Maybeの値もしくはデフォルト値の返却を待つ
MaybeSource.openSubscription Maybeのsubscribeを開始し、Maybeからのイベントを受け取るためのReceiveChannelを受け取る

SingleSource await

SingleSourceでは拡張関数でawaitが用意されています。Singleの処理が完了するまで待ち、値を受け取ります。

RxAwait.kt/SingleSource.await
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
    subscribe(object : SingleObserver<T> {
        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
        override fun onSuccess(t: T) { cont.resume(t) }
        override fun onError(error: Throwable) { cont.resumeWithException(error) }
    })
}

CompletableSourceのawaitとほぼ同じですね。

ObservableSource await / openSubscription

ObservableSourceにはPublisherで用意されているのと同じだけの種類のawait関数とopenSubscription関数が用意されています。

Name Description
ObservableSource.awaitFirst Observableからの一つ目の値を待ち、その値を返却
ObservableSource.awaitFirstOrDefault Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければデフォルト値を返却
ObservableSource.awaitFirstOrElse Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければ引数のfunctionで返却されるデフォルト値を返却
ObservableSource.awaitFirstOrNull Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければnullを返却
ObservableSource.awaitLast Observableからの最後の値を待ち、その値を返却
ObservableSource.awaitSingle Observableからの最後の値を待ち、その値を一度だけ返却

awaitに関しては内部の処理についてもPublisherの処理と類似した構成になっており、ObservableSource向けのawaitOne()Modeによって使い分けされています。

RxAwait.kt
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)

public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)

private suspend fun <T> ObservableSource<T>.awaitOne(mode: Mode, default: T? = null): T

また、openSubscription()はPublisherのものと同様にObsoleteCoroutinesApiとなっています。

FlowableSource.await

章立てをしてはいますが、Flowableは先述のPublisherを継承しているため、await関数についてもPublisherのものを利用できます。そのため、取り立てて追記する事項はありません。

変換関数(Conversion functions)

変換関数として、Observable、Completable、Singleへの変換関数以外にもSchedulerをCoroutineDispatcherに変換する関数も用意されています。

Name Description
Job.asCompletable JobをCompletableへ変換する
Deferred.asSingle DeferredをSingleへ変換する
ReceiveChannel.asObservable ReceiveChannelをObservableへ変換する
Scheduler.asCoroutineDispatcher SchedulerをCoroutineDispatcherへ変換する

Job.asCompletable

JobからCompletableへ変換します。内部ではGlobalScopeが用いられています。
ExperimentalCoroutinesApiになっているので先述の設定を忘れない様にご注意ください。

RxConvert.kt/Job.asCompletable
@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) {
    this@asCompletable.join()
}

Deferred.asSingle

DeferredからSingleへ変換します。こちらも内部ではGlobalScopeが用いられています。
こちらもExperimentalCoroutinesApiなので注意してください。

RxConvert.kt/Deferred.asSingle
@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
    this@asSingle.await()
}

ReceiveChannel.asObservable

ReceiveChannelからObservableへ変換します。先述の2つと同様にGlobalScopeが用いられています。
こちらもObsoleteCoroutinesApiになっています。

RxConvert.kt/ReceiveChannel.asObservable
@ObsoleteCoroutinesApi
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = GlobalScope.rxObservable(context) {
    for (t in this@asObservable)
        send(t)
}

Scheduler.asCoroutineDispatcher

SchedulerをCoroutineDispatcherへ変換します。

RxScheduler.kt/Scheduler.asCoroutineDispatcher
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)

終わりに

kotlinx-coroutines-reactive、kotlinx-coroutines-rx2について一通りご紹介しました。
Rx導入済みの環境に対してCoroutinesを導入していくにあたって十分な機能が提供されていると思っていますが、私もまだまだ理解が及んでいないところが多いです。
それでも便利な機能が多く、Coroutinesへの置き換えをサポートしてくれる印象を受けましたので、まだCoroutinesに触れられていないという方はRxの置き換えからトライしてみるのは如何でしょうか?

参考

24
13
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?