Android
Kotlin
RxJava
coroutine
KotlinDay 21

Coroutines reactive moduleで今日からKotlin Coroutinesに入門する


はじめに

この記事は Kotlin Advent Calendar 2018 21日目の記事です。

これまでExperimental APIであったCoroutinesがKotlin1.3.0よりstableになりました。

「Experimentalだったので見送っていたがこれを機にCoroutinesを導入してみたい」と考えた方も多くいらっしゃるのではないでしょうか?

自分のAndroidチームではCoroutinesの導入について話し合った結果、layerやmoduleの境界となるI/FはRxで統一し、それ以外に関してはRxとCoroutinesとで使いやすい方を使うという方針で進めてみています。

そのため「RxとCoroutinesとの相互変換」が必要になるのですが、kotlinx.coroutinesにあるreacrive module下に既に用意されており、現在進行形でこちらを使って開発を進めています。

本稿では、Coroutines導入時に注意した点や内部処理に触れたりしつつ、このreactive module内の「kotlinx-coroutines-reactive」「kotlinx-coroutines-rx2」についてご紹介していきたいと思います。

※kotlinx-coroutines-reactorについては触れていません。ご注意ください。


環境


  • Android Studio : 3.2.1

  • Kotlin : 1.3.0

  • kotlinx-coroutines-rx2 : 1.0.0

  • kotlinx-coroutines-reactive : 1.0.0


ご紹介の前に(設定)

今回ご紹介するAPIの中にはExperimental APIおよびObsolete APIが含まれます。そのため利用の際には必要に応じて以下の設定が必要になります。


  1. Experimental API/Obsolete API利用メソッド/クラスにannotationを付与する。

  2. compile optionとしてExperimental APIを使用することを宣言する。


Experimental API/Obsolete API利用メソッド/クラスにannotationを付与する

Experimental API/Obsolete APIを利用する際には該当APIを呼び出すクラスもしくはメソッドに対して以下のいずれかのannotationを付与する必要があります。

1. @ExperimentalCoroutinesApi / @ObsoleteCoroutinesApi

2. @UseExperimental(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)

(@UseExperimentalの引数はvarargになっています)


compile optionとしてExperimental APIを使用することを宣言する

先述のannotationを付与するとコンパイル時に以下の警告が出てくるようになります。


コンパイル時の警告

This class can only be used with the compiler argument '-Xuse-experimental=kotlin.Experimental'


これを解消するためには先述のAPIを利用しているモジュールのbuild.gradle.kts(もしくはbuild.gradle)のcompile optionへExperimental APIを使うことを追記する必要があります。


module/build.gradle.kts

tasks.withType(KotlinCompile::class).all {

kotlinOptions {
freeCompilerArgs = listOf("-Xuse-experimental=kotlin.Experimental")
}
}


ご紹介の前に(章立て)

各モジュールのREADMEでは以下の構成で説明されています。本稿ではそちらに倣ってご紹介していきます。

1. コルーチンビルダー(Coroutine builders)

 各種Observableを返却する機能を持つコルーチンビルダーについて触れています。

2. 中断関数(Suspending extension functions and suspending iteration)

 各種Observableクラスを拡張した中断関数(suspend関数)について触れています。

3. 変換関数(Conversion functions)

 各種Observableへ変換する関数やSchedulerをCoroutineDispatcherへ変換するための関数について触れています。


kotlinx-coroutines-reactive


コルーチンビルダー(Coroutine builders)


publish

文字通り、Publisherを返却するCoroutine Builderです。

ブロック内はProducerScopeになっており、send()/offer()onNext()close()をThrowableなしで呼び出した場合はonComplete()、Throwableつきで呼び出した場合はonError()が通知されます。

また、publishはExperimentalCoroutinesApiのため、先述の設定を忘れないように注意してください。


Publish.kt/publish

@ExperimentalCoroutinesApi fun <T> CoroutineScope.publish(

context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>


publishサンプル

val publisher: Publisher<Int> = publish(coroutineContext) {

// onNext通知
send(element)
offer(element)
// onComplete通知
close()
// onError通知
close(Throwable)
}


中断関数(Suspending extension functions and suspending iteration)


Publisher await

Publisherの拡張関数として6種類のawaitが用意されています。

Name
Description

Publisher.awaitFirst
Publisherからの一つ目の値を待ち、その値を返却

Publisher.awaitFirstOrDefault
Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければデフォルト値を返却

Publisher.awaitFirstOrElse
Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければ引数のfunctionで返却されるデフォルト値を返却

Publisher.awaitFirstOrNull
Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければnullを返却

Publisher.awaitLast
Publisherからの最後の値を待ち、その値を返却

Publisher.awaitSingle
Publisherからの最後の値を待ち、その値を一度だけ返却

awaitSingle()を補足すると、既にいずれかのawaitを用いて値を取得している状態で呼び出した場合はIllegalArgumentExceptionが投げられるので注意してください。

また余談になりますが、これらのawait関数は内部ではModeというenum classとNullableなデフォルト値を引数に取るawaitOne()という関数でまとめられており、この辺りを見ると各awaitについての理解が深めやすいかなと思いました。

Publishser.awaitOne

例えば、awaitFirstOrNull()awaitFirstOrDefault()のデフォルト値がデフォルト引数のnullを指定したものになっていることが分かります。


Await.kt

public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)

public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)

public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) // awaitFirstOrDefaultとModeは同じだが,nullを指定している.

public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()

public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)

public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)

private suspend fun <T> Publisher<T>.awaitOne(mode: Mode, default: T? = null): T



Publisher openSubscription

await以外にもPublisherから発行されたイベントをChannelで扱うための拡張関数、openSubscription()が提供されています。

このopenSubscription()は呼び出すとsubscribeを開始し、イベントを受信するためのReceiveChannelを返却します。

こうして受け取ったReceiveChannelのAPI(例えばconsumeEach()など)を用いることでPublisherで発行したイベントを受信することが出来ます。


Channel.kt/openSubscription

@ObsoleteCoroutinesApi

@Suppress("CONFLICTING_OVERLOADS")
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>(request)
subscribe(channel)
return channel
}


openSubscriptionサンプル

val publisher: Publisher<Int> = publish(coroutineContext) {

:
}
publisher.openSubscription()
.consumeEach {
println(it)
}

ただし、このopenSubscription()ObsoleteCoroutinesApiとなっていますので、こちらも先述の設定が必要になります。

※Channelの詳細については公式ページを参照ください。

参考: Kotlin Reference : Channels


変換関数(Conversion functions)


ReceiveChannel.asPublisher

先ほどご紹介したPublisher.openSubscription()の逆パターンです。ReceiveChannelをPublisherへと変換することが出来ます。

こちらもObsoleteCoroutinesApiになっています。


Convert.kt/asPublisher

@ObsoleteCoroutinesApi

public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = GlobalScope.publish(context) {
for (t in this@asPublisher)
send(t)
}


asPublisherサンプル

val publisher = receiveChannel.asPublisher()

publisher.subscribe(object: Subscriber<Int?> {
override fun onComplete() {}

override fun onSubscribe(s: Subscription?) {}

override fun onNext(t: Int?) {}

override fun onError(t: Throwable?) {}
})



kotlinx-coroutines-rx2


コルーチンビルダー(Coroutine builders)

kotlinx-coroutines-rx2ではCompletableMaybeSingleObservableFlowableを返却するCoroutine Builderが用意されています。

Name
Result
Scope

rxCompletable
Completable
CoroutineScope

rxMaybe
Maybe
CoroutineScope

rxSingle
Single
CoroutineScope

rxObservable
Observable
ProducerScope

rxFlowable
Flowable
ProducerScope


rxCompletable

Completableを返却するCoroutine Builderです。Coroutine Scope内の処理が正常に完了した場合にonComplete()、例外やキャンセルがされた場合にはonError()へ通知されます。


RxCompletable.kt/rxCompletable

public fun CoroutineScope.rxCompletable(

context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
): Completable = Completable.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxCompletableCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}


rxCoompletableサンプル

val completable = rxCompletable {

// 処理が正常に成功すればonComplete
// Exception or unsubscribe, cancelされた場合はonError
}

この関数の処理中で生成しているprivate class RxCompletableCoroutineのsuper class AbstractCoroutineの関数onCompletionInternal()を覗いてみると上記の処理を確認することが出来ます。


AbstractCoroutine.kt

internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {

if (state is CompletedExceptionally) // CompletedExceptionally : Jobがキャンセルされたときの状態を表す内部クラス
onCompletedExceptionally(state.cause)
else
onCompleted(state as T)
}


rxMaybe

Maybeを返却するCoroutine Builderです。Coroutine ScopeブロックでNonNullな値を返すとonSuccess()、nullを返却するとonComplete()、rxCompleteと同様に例外発生もしくはキャンセルされるとonError()へ通知されます。


RxMaybe.kt/rxMaybe

public fun <T> CoroutineScope.rxMaybe(

context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxMaybeCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}


rxMaybeサンプル

val maybe = rxMaybe<Int> {

// NonNullな値を返却するとonSuccess
return@rxMaybe 100
// nullを返却するとonComplete
return@rxMaybe null
// Exception or unsubscribe, cancelされた場合はonError
}

また、rxMaybeの内部で生成されるRxMaybeCoroutineでは定義の通りvalueの値によって呼び出す関数を切り替えている処理になっているのが分かります。


RxMaybeCoroutine

private class RxMaybeCoroutine<T>(

parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
) : AbstractCoroutine<T>(parentContext, true) {
override val cancelsParent: Boolean get() = true
override fun onCompleted(value: T) {
if (!subscriber.isDisposed) {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
}
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!subscriber.isDisposed) subscriber.onError(exception)
}
}



rxSingle

Singleを返却するCoroutine Builderです。Coroutine Scopeブロック内で値を返すとonSuccess()、先述の2つのCoroutine Builderと同様に例外発生もしくはキャンセルされた際にonError()へと通知されます。


RxSingle.kt/rxSingle

public fun <T : Any> CoroutineScope.rxSingle(

context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxSingleCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}


rxSingleサンプル

val single = rxSingle<Int> {

// 値を返却するとonSuccess
return@rxSingle 100
// Exception or unsubscribe, cancelされた場合はonError
}

また、<T : Any>とジェネネリクスが宣言されているため、rxSingleのCoroutine Scopeでは返り値はNonNullな値でなければいけないことが分かります。


rxObservable

Observableを返却するCoroutine Builderです。これまではCoroutineScopeがスコープとなっていましたが、rxObservableはProducerScopeになっています。send()/offer()を呼び出すとonNext()、引数なしのclose()の様な正常終了の場合にはonComplete()、例外発生もしくは引数ありでclose()を呼び出した際にはonError()が呼ばれます。

また、ExperimentalCoroutinesApiとなっている点にも注意してください。


RxObservabe.kt/rxObservable

@ExperimentalCoroutinesApi

public fun <T : Any> CoroutineScope.rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxObservableCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}


rxObservableサンプル

val observable = rxObservable<Int> {

// 値の発行でonNext
offer(100)
send(200)

// 通常終了でonComplete
return@rxObservable
close()

// Exception or unsubscribe, cancelされた場合はonError
close(Throwable)
}


Publisher同様KDocにはsend()の記載しかありませんが、rxObservable内で生成されるRxObservableCoroutineの中を見ると、send()/offer()両方でdoLockedNext()を呼び出し、その中でonNext()が呼び出されているのが分かります。


RxObservable.kt/RxObservableCoroutine

override fun offer(element: T): Boolean {

if (!mutex.tryLock()) return false
doLockedNext(element)
return true
}

public override suspend fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
return sendSuspend(element)
}

private suspend fun sendSuspend(element: T) {
mutex.lock()
doLockedNext(element)
}

private fun doLockedNext(elem: T) {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted()
throw getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
try {
if (!cancel(e))
handleCoroutineException(context, e, this)
} finally {
doLockedSignalCompleted()
}
throw getCancellationException()
}
/*
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
happen after this check and before `unlock` (see `onCancellation` that does not do anything
if it fails to acquire the lock that we are still holding).
We have to recheck `isCompleted` after `unlock` anyway.
*/

mutex.unlock()
// recheck isActive
if (!isActive && mutex.tryLock())
doLockedSignalCompleted()
}


今のチームではObservableかSingleで上位layerへ公開している機能がほとんどなこともあり、rxObservableと先述のrxSingleは使う機会が多くなっています。


rxFlowable

Flowableを返却するCoroutine Builderです。rxObservableと同様にScopeはProducerScope、各イベント関数の呼び出し条件は、send()/offer()を呼び出しでonNext()、引数なしのclose()の様な正常終了でonComplete()、例外発生もしくは引数ありでのclose()呼び出しでonError()となっています。


rxFlowableサンプル

val flowable = rxFlowable<Int> {

// send/offerでonNext
offer(100)
send(200)

// 通常終了でonComplete
return@rxFlowable
close()

// Exception or unsubscribe, cancelされた場合はonError
close(Throwable)
}



中断関数(Suspending extension functions and suspending iteration)


CompletableSource await

Completableの処理が完了するまで待ちます。


RxAwait.kt/CompletableSource.await

public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->

subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
})
}

suspendCancellableCoroutineではCancellableContinuationが渡され、CompletableObserverの各イベントでinternal関数disposeOnCancellation()などが呼ばれています。


MaybeSource await / openSubscription

MaybeSourceのの拡張関数として2種類のawaitとopenSubscriptionが用意されています。

Name
Description

MaybeSource.await
Maybeの値もしくはnullの返却を待つ

MaybeSource.awaitOrDefault
Maybeの値もしくはデフォルト値の返却を待つ

MaybeSource.openSubscription
Maybeのsubscribeを開始し、Maybeからのイベントを受け取るためのReceiveChannelを受け取る


SingleSource await

SingleSourceでは拡張関数でawaitが用意されています。Singleの処理が完了するまで待ち、値を受け取ります。


RxAwait.kt/SingleSource.await

public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->

subscribe(object : SingleObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}

CompletableSourceのawaitとほぼ同じですね。


ObservableSource await / openSubscription

ObservableSourceにはPublisherで用意されているのと同じだけの種類のawait関数とopenSubscription関数が用意されています。

Name
Description

ObservableSource.awaitFirst
Observableからの一つ目の値を待ち、その値を返却

ObservableSource.awaitFirstOrDefault
Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければデフォルト値を返却

ObservableSource.awaitFirstOrElse
Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければ引数のfunctionで返却されるデフォルト値を返却

ObservableSource.awaitFirstOrNull
Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければnullを返却

ObservableSource.awaitLast
Observableからの最後の値を待ち、その値を返却

ObservableSource.awaitSingle
Observableからの最後の値を待ち、その値を一度だけ返却

awaitに関しては内部の処理についてもPublisherの処理と類似した構成になっており、ObservableSource向けのawaitOne()Modeによって使い分けされています。


RxAwait.kt

public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)

public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)

private suspend fun <T> ObservableSource<T>.awaitOne(mode: Mode, default: T? = null): T


また、openSubscription()はPublisherのものと同様にObsoleteCoroutinesApiとなっています。


FlowableSource.await

章立てをしてはいますが、Flowableは先述のPublisherを継承しているため、await関数についてもPublisherのものを利用できます。そのため、取り立てて追記する事項はありません。


変換関数(Conversion functions)

変換関数として、Observable、Completable、Singleへの変換関数以外にもSchedulerをCoroutineDispatcherに変換する関数も用意されています。

Name
Description

Job.asCompletable
JobをCompletableへ変換する

Deferred.asSingle
DeferredをSingleへ変換する

ReceiveChannel.asObservable
ReceiveChannelをObservableへ変換する

Scheduler.asCoroutineDispatcher
SchedulerをCoroutineDispatcherへ変換する


Job.asCompletable

JobからCompletableへ変換します。内部ではGlobalScopeが用いられています。

ExperimentalCoroutinesApiになっているので先述の設定を忘れない様にご注意ください。


RxConvert.kt/Job.asCompletable

@ExperimentalCoroutinesApi

public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) {
this@asCompletable.join()
}


Deferred.asSingle

DeferredからSingleへ変換します。こちらも内部ではGlobalScopeが用いられています。

こちらもExperimentalCoroutinesApiなので注意してください。


RxConvert.kt/Deferred.asSingle

@ExperimentalCoroutinesApi

public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
this@asSingle.await()
}


ReceiveChannel.asObservable

ReceiveChannelからObservableへ変換します。先述の2つと同様にGlobalScopeが用いられています。

こちらもObsoleteCoroutinesApiになっています。


RxConvert.kt/ReceiveChannel.asObservable

@ObsoleteCoroutinesApi

public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = GlobalScope.rxObservable(context) {
for (t in this@asObservable)
send(t)
}


Scheduler.asCoroutineDispatcher

SchedulerをCoroutineDispatcherへ変換します。


RxScheduler.kt/Scheduler.asCoroutineDispatcher

public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)



終わりに

kotlinx-coroutines-reactive、kotlinx-coroutines-rx2について一通りご紹介しました。

Rx導入済みの環境に対してCoroutinesを導入していくにあたって十分な機能が提供されていると思っていますが、私もまだまだ理解が及んでいないところが多いです。

それでも便利な機能が多く、Coroutinesへの置き換えをサポートしてくれる印象を受けましたので、まだCoroutinesに触れられていないという方はRxの置き換えからトライしてみるのは如何でしょうか?


参考