はじめに
この記事は Kotlin Advent Calendar 2018 21日目の記事です。
これまでExperimental APIであったCoroutinesがKotlin1.3.0よりstableになりました。
「Experimentalだったので見送っていたがこれを機にCoroutinesを導入してみたい」と考えた方も多くいらっしゃるのではないでしょうか?
自分のAndroidチームではCoroutinesの導入について話し合った結果、layerやmoduleの境界となるI/FはRxで統一し、それ以外に関してはRxとCoroutinesとで使いやすい方を使うという方針で進めてみています。
そのため「RxとCoroutinesとの相互変換」が必要になるのですが、kotlinx.coroutinesにあるreacrive module下に既に用意されており、現在進行形でこちらを使って開発を進めています。
本稿では、Coroutines導入時に注意した点や内部処理に触れたりしつつ、このreactive module内の「kotlinx-coroutines-reactive」「kotlinx-coroutines-rx2」についてご紹介していきたいと思います。
※kotlinx-coroutines-reactorについては触れていません。ご注意ください。
環境
- Android Studio : 3.2.1
- Kotlin : 1.3.0
- kotlinx-coroutines-rx2 : 1.0.0
- kotlinx-coroutines-reactive : 1.0.0
ご紹介の前に(設定)
今回ご紹介するAPIの中にはExperimental APIおよびObsolete APIが含まれます。そのため利用の際には必要に応じて以下の設定が必要になります。
- Experimental API/Obsolete API利用メソッド/クラスにannotationを付与する。
- compile optionとしてExperimental APIを使用することを宣言する。
Experimental API/Obsolete API利用メソッド/クラスにannotationを付与する
Experimental API/Obsolete APIを利用する際には該当APIを呼び出すクラスもしくはメソッドに対して以下のいずれかのannotationを付与する必要があります。
-
@ExperimentalCoroutinesApi
/@ObsoleteCoroutinesApi
-
@UseExperimental(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
(@UseExperimental
の引数はvararg
になっています)
compile optionとしてExperimental APIを使用することを宣言する
先述のannotationを付与するとコンパイル時に以下の警告が出てくるようになります。
This class can only be used with the compiler argument '-Xuse-experimental=kotlin.Experimental'
これを解消するためには先述のAPIを利用しているモジュールのbuild.gradle.kts(もしくはbuild.gradle)のcompile optionへExperimental APIを使うことを追記する必要があります。
tasks.withType(KotlinCompile::class).all {
kotlinOptions {
freeCompilerArgs = listOf("-Xuse-experimental=kotlin.Experimental")
}
}
ご紹介の前に(章立て)
各モジュールのREADMEでは以下の構成で説明されています。本稿ではそちらに倣ってご紹介していきます。
1. コルーチンビルダー(Coroutine builders)
各種Observableを返却する機能を持つコルーチンビルダーについて触れています。
2. 中断関数(Suspending extension functions and suspending iteration)
各種Observableクラスを拡張した中断関数(suspend関数)について触れています。
3. 変換関数(Conversion functions)
各種Observableへ変換する関数やSchedulerをCoroutineDispatcherへ変換するための関数について触れています。
kotlinx-coroutines-reactive
コルーチンビルダー(Coroutine builders)
publish
文字通り、Publisher
を返却するCoroutine Builderです。
ブロック内はProducerScope
になっており、send()/offer()
でonNext()
、close()
をThrowableなしで呼び出した場合はonComplete()
、Throwableつきで呼び出した場合はonError()
が通知されます。
また、publishはExperimentalCoroutinesApiのため、先述の設定を忘れないように注意してください。
@ExperimentalCoroutinesApi fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>
val publisher: Publisher<Int> = publish(coroutineContext) {
// onNext通知
send(element)
offer(element)
// onComplete通知
close()
// onError通知
close(Throwable)
}
中断関数(Suspending extension functions and suspending iteration)
Publisher await
Publisherの拡張関数として6種類のawaitが用意されています。
Name | Description |
---|---|
Publisher.awaitFirst | Publisherからの一つ目の値を待ち、その値を返却 |
Publisher.awaitFirstOrDefault | Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければデフォルト値を返却 |
Publisher.awaitFirstOrElse | Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければ引数のfunctionで返却されるデフォルト値を返却 |
Publisher.awaitFirstOrNull | Publisherから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければnullを返却 |
Publisher.awaitLast | Publisherからの最後の値を待ち、その値を返却 |
Publisher.awaitSingle | Publisherからの最後の値を待ち、その値を一度だけ返却 |
awaitSingle() を補足すると、既にいずれかのawaitを用いて値を取得している状態で呼び出した場合はIllegalArgumentException が投げられるので注意してください。 |
また余談になりますが、これらのawait関数は内部ではMode
というenum classとNullableなデフォルト値を引数に取るawaitOne()
という関数でまとめられており、この辺りを見ると各awaitについての理解が深めやすいかなと思いました。
例えば、awaitFirstOrNull()
はawaitFirstOrDefault()
のデフォルト値がデフォルト引数のnullを指定したものになっていることが分かります。
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) // awaitFirstOrDefaultとModeは同じだが,nullを指定している.
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
private suspend fun <T> Publisher<T>.awaitOne(mode: Mode, default: T? = null): T
Publisher openSubscription
await以外にもPublisherから発行されたイベントをChannelで扱うための拡張関数、openSubscription()
が提供されています。
このopenSubscription()
は呼び出すとsubscribeを開始し、イベントを受信するためのReceiveChannel
を返却します。
こうして受け取ったReceiveChannel
のAPI(例えばconsumeEach()
など)を用いることでPublisherで発行したイベントを受信することが出来ます。
@ObsoleteCoroutinesApi
@Suppress("CONFLICTING_OVERLOADS")
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>(request)
subscribe(channel)
return channel
}
val publisher: Publisher<Int> = publish(coroutineContext) {
:
}
publisher.openSubscription()
.consumeEach {
println(it)
}
ただし、このopenSubscription()
はObsoleteCoroutinesApiとなっていますので、こちらも先述の設定が必要になります。
※Channelの詳細については公式ページを参照ください。
参考: Kotlin Reference : Channels
変換関数(Conversion functions)
ReceiveChannel.asPublisher
先ほどご紹介したPublisher.openSubscription()
の逆パターンです。ReceiveChannelをPublisherへと変換することが出来ます。
こちらもObsoleteCoroutinesApiになっています。
@ObsoleteCoroutinesApi
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = GlobalScope.publish(context) {
for (t in this@asPublisher)
send(t)
}
val publisher = receiveChannel.asPublisher()
publisher.subscribe(object: Subscriber<Int?> {
override fun onComplete() {}
override fun onSubscribe(s: Subscription?) {}
override fun onNext(t: Int?) {}
override fun onError(t: Throwable?) {}
})
kotlinx-coroutines-rx2
コルーチンビルダー(Coroutine builders)
kotlinx-coroutines-rx2ではCompletable
、Maybe
、Single
、Observable
、Flowable
を返却するCoroutine Builderが用意されています。
Name | Result | Scope |
---|---|---|
rxCompletable | Completable | CoroutineScope |
rxMaybe | Maybe | CoroutineScope |
rxSingle | Single | CoroutineScope |
rxObservable | Observable | ProducerScope |
rxFlowable | Flowable | ProducerScope |
rxCompletable
Completableを返却するCoroutine Builderです。Coroutine Scope内の処理が正常に完了した場合にonComplete()
、例外やキャンセルがされた場合にはonError()
へ通知されます。
public fun CoroutineScope.rxCompletable(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
): Completable = Completable.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxCompletableCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
val completable = rxCompletable {
// 処理が正常に成功すればonComplete
// Exception or unsubscribe, cancelされた場合はonError
}
この関数の処理中で生成しているprivate class RxCompletableCoroutine
のsuper class AbstractCoroutine
の関数onCompletionInternal()
を覗いてみると上記の処理を確認することが出来ます。
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally) // CompletedExceptionally : Jobがキャンセルされたときの状態を表す内部クラス
onCompletedExceptionally(state.cause)
else
onCompleted(state as T)
}
rxMaybe
Maybeを返却するCoroutine Builderです。Coroutine ScopeブロックでNonNullな値を返すとonSuccess()
、nullを返却するとonComplete()
、rxCompleteと同様に例外発生もしくはキャンセルされるとonError()
へ通知されます。
public fun <T> CoroutineScope.rxMaybe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxMaybeCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
val maybe = rxMaybe<Int> {
// NonNullな値を返却するとonSuccess
return@rxMaybe 100
// nullを返却するとonComplete
return@rxMaybe null
// Exception or unsubscribe, cancelされた場合はonError
}
また、rxMaybeの内部で生成されるRxMaybeCoroutine
では定義の通りvalueの値によって呼び出す関数を切り替えている処理になっているのが分かります。
private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
) : AbstractCoroutine<T>(parentContext, true) {
override val cancelsParent: Boolean get() = true
override fun onCompleted(value: T) {
if (!subscriber.isDisposed) {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
}
}
override fun onCompletedExceptionally(exception: Throwable) {
if (!subscriber.isDisposed) subscriber.onError(exception)
}
}
rxSingle
Singleを返却するCoroutine Builderです。Coroutine Scopeブロック内で値を返すとonSuccess()
、先述の2つのCoroutine Builderと同様に例外発生もしくはキャンセルされた際にonError()
へと通知されます。
public fun <T : Any> CoroutineScope.rxSingle(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxSingleCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
val single = rxSingle<Int> {
// 値を返却するとonSuccess
return@rxSingle 100
// Exception or unsubscribe, cancelされた場合はonError
}
また、<T : Any>
とジェネネリクスが宣言されているため、rxSingleのCoroutine Scopeでは返り値はNonNullな値でなければいけないことが分かります。
rxObservable
Observableを返却するCoroutine Builderです。これまではCoroutineScopeがスコープとなっていましたが、rxObservableはProducerScope
になっています。send()/offer()
を呼び出すとonNext()
、引数なしのclose()
の様な正常終了の場合にはonComplete()
、例外発生もしくは引数ありでclose()
を呼び出した際にはonError()
が呼ばれます。
また、ExperimentalCoroutinesApiとなっている点にも注意してください。
@ExperimentalCoroutinesApi
public fun <T : Any> CoroutineScope.rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = RxObservableCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
val observable = rxObservable<Int> {
// 値の発行でonNext
offer(100)
send(200)
// 通常終了でonComplete
return@rxObservable
close()
// Exception or unsubscribe, cancelされた場合はonError
close(Throwable)
}
Publisher同様KDocにはsend()
の記載しかありませんが、rxObservable内で生成されるRxObservableCoroutine
の中を見ると、send()/offer()
両方でdoLockedNext()
を呼び出し、その中でonNext()
が呼び出されているのが分かります。
override fun offer(element: T): Boolean {
if (!mutex.tryLock()) return false
doLockedNext(element)
return true
}
public override suspend fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
return sendSuspend(element)
}
private suspend fun sendSuspend(element: T) {
mutex.lock()
doLockedNext(element)
}
private fun doLockedNext(elem: T) {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted()
throw getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
try {
if (!cancel(e))
handleCoroutineException(context, e, this)
} finally {
doLockedSignalCompleted()
}
throw getCancellationException()
}
/*
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
happen after this check and before `unlock` (see `onCancellation` that does not do anything
if it fails to acquire the lock that we are still holding).
We have to recheck `isCompleted` after `unlock` anyway.
*/
mutex.unlock()
// recheck isActive
if (!isActive && mutex.tryLock())
doLockedSignalCompleted()
}
今のチームではObservableかSingleで上位layerへ公開している機能がほとんどなこともあり、rxObservableと先述のrxSingleは使う機会が多くなっています。
rxFlowable
Flowableを返却するCoroutine Builderです。rxObservableと同様にScopeはProducerScope、各イベント関数の呼び出し条件は、send()/offer()
を呼び出しでonNext()
、引数なしのclose()
の様な正常終了でonComplete()
、例外発生もしくは引数ありでのclose()
呼び出しでonError()
となっています。
val flowable = rxFlowable<Int> {
// send/offerでonNext
offer(100)
send(200)
// 通常終了でonComplete
return@rxFlowable
close()
// Exception or unsubscribe, cancelされた場合はonError
close(Throwable)
}
中断関数(Suspending extension functions and suspending iteration)
CompletableSource await
Completableの処理が完了するまで待ちます。
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
})
}
suspendCancellableCoroutineではCancellableContinuation
が渡され、CompletableObserverの各イベントでinternal関数disposeOnCancellation()
などが呼ばれています。
MaybeSource await / openSubscription
MaybeSourceの拡張関数として2種類のawaitとopenSubscriptionが用意されています。
Name | Description |
---|---|
MaybeSource.await | Maybeの値もしくはnullの返却を待つ |
MaybeSource.awaitOrDefault | Maybeの値もしくはデフォルト値の返却を待つ |
MaybeSource.openSubscription | Maybeのsubscribeを開始し、Maybeからのイベントを受け取るためのReceiveChannelを受け取る |
SingleSource await
SingleSourceでは拡張関数でawaitが用意されています。Singleの処理が完了するまで待ち、値を受け取ります。
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}
CompletableSourceのawaitとほぼ同じですね。
ObservableSource await / openSubscription
ObservableSourceにはPublisherで用意されているのと同じだけの種類のawait関数とopenSubscription関数が用意されています。
Name | Description |
---|---|
ObservableSource.awaitFirst | Observableからの一つ目の値を待ち、その値を返却 |
ObservableSource.awaitFirstOrDefault | Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければデフォルト値を返却 |
ObservableSource.awaitFirstOrElse | Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければ引数のfunctionで返却されるデフォルト値を返却 |
ObservableSource.awaitFirstOrNull | Observableから値が取得されるまで待ち、一つ目の値があればその値、何も発行されていなければnullを返却 |
ObservableSource.awaitLast | Observableからの最後の値を待ち、その値を返却 |
ObservableSource.awaitSingle | Observableからの最後の値を待ち、その値を一度だけ返却 |
awaitに関しては内部の処理についてもPublisherの処理と類似した構成になっており、ObservableSource向けのawaitOne()
やMode
によって使い分けされています。
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
private suspend fun <T> ObservableSource<T>.awaitOne(mode: Mode, default: T? = null): T
また、openSubscription()
はPublisherのものと同様にObsoleteCoroutinesApiとなっています。
FlowableSource.await
章立てをしてはいますが、Flowableは先述のPublisherを継承しているため、await関数についてもPublisherのものを利用できます。そのため、取り立てて追記する事項はありません。
変換関数(Conversion functions)
変換関数として、Observable、Completable、Singleへの変換関数以外にもSchedulerをCoroutineDispatcherに変換する関数も用意されています。
Name | Description |
---|---|
Job.asCompletable | JobをCompletableへ変換する |
Deferred.asSingle | DeferredをSingleへ変換する |
ReceiveChannel.asObservable | ReceiveChannelをObservableへ変換する |
Scheduler.asCoroutineDispatcher | SchedulerをCoroutineDispatcherへ変換する |
Job.asCompletable
JobからCompletableへ変換します。内部ではGlobalScope
が用いられています。
ExperimentalCoroutinesApiになっているので先述の設定を忘れない様にご注意ください。
@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) {
this@asCompletable.join()
}
Deferred.asSingle
DeferredからSingleへ変換します。こちらも内部ではGlobalScope
が用いられています。
こちらもExperimentalCoroutinesApiなので注意してください。
@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
this@asSingle.await()
}
ReceiveChannel.asObservable
ReceiveChannelからObservableへ変換します。先述の2つと同様にGlobalScope
が用いられています。
こちらもObsoleteCoroutinesApiになっています。
@ObsoleteCoroutinesApi
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = GlobalScope.rxObservable(context) {
for (t in this@asObservable)
send(t)
}
Scheduler.asCoroutineDispatcher
SchedulerをCoroutineDispatcherへ変換します。
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
終わりに
kotlinx-coroutines-reactive、kotlinx-coroutines-rx2について一通りご紹介しました。
Rx導入済みの環境に対してCoroutinesを導入していくにあたって十分な機能が提供されていると思っていますが、私もまだまだ理解が及んでいないところが多いです。
それでも便利な機能が多く、Coroutinesへの置き換えをサポートしてくれる印象を受けましたので、まだCoroutinesに触れられていないという方はRxの置き換えからトライしてみるのは如何でしょうか?