Everything is a stream
みなさん良いStreamライフを送っていますか?
私は最高のStreamライフを送っています!
特に最近Kotlin Coroutineを使い始め、新たな風が流れてきています。(Streamだけに)
寒いギャグのStreamはonCompleteするとして・・・
昨今のAndroid開発において、RxJava(RxKotlin)とKotlin Coroutineのどっちを使うか問題があると思います。
RxJavaはもちろん素晴らしいフレームワークですが、Kotlin Coroutineは公式が出しているフレームワークなので、こちらを使う方が安心感があるようにも感じます。
Kotlin Coroutineを使ってみて、こちらの方が良いと感じる部分も多数あったので
- RxJava使いがKotlin Coroutineを使い始めるための基本知識
- RxJavaに比べてKotlin Coroutineが良いと私が思った部分
の二つを話していきたいと思います。
対象読者
- Kotlinが読める
- RxJava(RxKotlin)を使ったことがある
- kotlin Coroutineを使いたいと思っている
- (Kotlin/MPPを信じてiOSをKotlinで書こうと思っているRxSwift使い)
まずはKotlin Coroutineについてざっくりと
と言っても、QiitaにKotlin Coroutineについての記事はたくさんあるので、改めて私が書くことはありません。
なので、私が参考にした記事をひたすら貼っていきます!
つまり、巨人の肩に乗るということです。
まずはKotlin Coroutineの概要をざっくりと
実は、CoroutineはRxと違いStreamだけを扱うものではないのです。
async/awaitも一時中断関数(suspend fun)もKotlin Coroutineの一部です。
以下の記事はこの辺りがわかりやすくまとめられています。
【Kotlin】Coroutineを理解する (AtsushiUemuraさま)
Coroutineで動くアプリを作ってみる
Coroutineについてなんとなく理解したら、簡単なアプリを作ってみたくなりますよね?
以下の記事ではHTTP通信をCoroutineを使って行う例が示されています。
Android + Kotlin 1.3のcoroutines(Async, Await)でHTTP通信を非同期処理(jonghyoさま)
Coroutineを効果的に運用するために
Coroutineは非同期で動くため、下手に使うとメモリリークします。
効率的に運用するためにはCoroutineContextについて知る必要があります。
Coroutineは動かすContextがあり,Coroutineは親子関係を作ります。
そして、親が破棄されたら子は自動的に破棄されます。
なので、AndroidでしたらActivityでCoroutineContextを作成し、onDestroy()で破棄するように設定しておくと、そこから動かした子Coroutineは自動的に破棄されて安全に扱えます!
この辺りの親子関係について以下の記事がとても分かりやすいです!
図で理解する Kotlin Coroutine(kawmraさま)
だいたい分かってきた時に押さえておきたい
チュートリアルや紹介では頻繁に出てくるGlobalScope.launch()ですが、実はアンチパターンです。
Globalなので、ライフサイクルとして最長なためです。
この辺りのアンチパターンがまとまっているので、ある程度理解してきたら一読しておく価値ありです!
Kotlin Coroutinesパターン&アンチパターン (ikemura23さま)
RxJavaの「あれ」ってKotlin Coroutineではどうするの?[基本編]
ここからが本題です。
Kotlin Coroutineをだいたい理解したところで、Rxとの対応が知りたくなります。
「CoroutineはRxの代替では無い。」と書きましたが、Flowの登場により、Rxの代替としても使えます。
そして、Flow型にはmapやfilter、scanやretryなどRxで見慣れたオペレーターが揃っています。
なので、ObservableとFlowはほとんど互換といって差し支えありません。
しかも、RxのSingleやCompletableはCoroutineでは、もっと簡単に書けます。
Rxの「あれ」をKotlin Coroutineでやりたい!という気持ちに答えていきます。
RxとCoroutineの違いを大雑把に考察
CoroutineはRxに比べて型が厳しいです。
例えば、RxのHot Stream, Cold Streamを型で分類できません。一つ一つ覚える必要があります。
逆にRxはObservableのオペレーターが全てなので、SubjectだろうがSingleだろうが同じように扱えます。
一方で、CoroutineはHot StreamはChannelクラス、Cold StreamはFlowクラスといったように、型が厳密です。
ChannelとFlowに生えているオペレーターはまるで別物です。
さらにsuspend funまで加わってきて、初見では複雑に感じると思います。
型が多く、厳密ゆえに書くのが難しかったりします。
一方で、型が厳密なので、「Cold Streamだと思っていたら、Hot Streamでリークしていた。」なんてことは起こりにくいです。
その辺りも交えながら比較していきます!
Single
Rxで値を1度だけ返すSingleです。DBへのinsertやHTTPアクセスなど、頻繁に使われている印象があります。
Rx
fun getRequest(): Single<String> {
return Single.create<String> { emitter ->
// 何らかの処理する
emitter.onSuccess(body.message)
}
}
Coroutine
注意: suspend funはCold Streamでは無いため、厳密にはSingleの互換ではありません。
しかし、似た扱いができるため、このように紹介しています。
suspend fun getRequest(): String{
// 何らかの処理する
return body.message
}
Single[callbackを使う場合]
Singleの内部でcallbackを使う場合はCoroutine側は少し変える必要があります。
Rx
fun getRequest(): Single<String> {
return Single.create<String> { emitter ->
val callback = object: Callback {
override fun onNextValue(value: String) {
emitter.onSuccess(value)
}
override fun onApiError(cause: Throwable) {
emitter.onFailure(Exception("API Error", cause))
}
}
}
}
Coroutine
suspendCoroutineは関数を抜けたところで一時停止し、resumeまたはresumeWithExceptionが呼ばれるまで待機します。
(resumeWithもありますが、説明を省略します)
suspend fun getMessages(): String {
return suspendCoroutine<String> { coroutine ->
val callback = object: Callback {
override fun onNextValue(value: String) {
coroutine.resume(value)
}
override fun onApiError(cause: Throwable) {
coroutine.resumeWithException(Exception("API Error", cause))
}
}
}
}
Completable
Rxで値を返さず完了したことだけ返すCompletableです。
Rx
fun updateRequest(): Completable {
return Completable.create { emitter ->
// 何らかの処理
emitter.onComplete()
}
}
Coroutine
suspend fun updateRequest() {
// 何らかの処理
return
}
Observable
Observable.just()
一度だけアイテムを流して終えるStreamです。実質Singleです。
Rx
fun getMessage(): Observable<String> {
return Observable.just("サンプル")
}
Coroutine
fun getMessage(): Flow<String> {
return flowOf("サンプル")
}
Observable.create()[固定回数onNextを呼ぶ場合]
任意の回数onNextを呼べる汎用Streamです。
Coroutineの場合は、固定回数onNextを呼ぶ場合と、callbackの場合で異なり、こちらは固定回数呼ぶ場合です。
Rx
fun getMessages(): Observable<String> {
return Observable.create { observer ->
for(/*何回か回す*/) {
observer.onNext("サンプル")
}
observer.onComplete()
}
}
Coroutine
Coroutineの場合はflowスコープを抜けると自動的にcompletedしたことになります。
fun getMessages(): Flow<String> {
return flow {
repeat(/*繰り返す回数*/) {
emit("サンプル")
}
}
}
Observable.create()[callbackからonNextを呼ぶ場合]
任意の回数onNextを呼べる汎用Streamです。
Coroutineの場合は、固定回数onNextを呼ぶ場合と、callbackの場合で異なり、こちらはcallbackバージョンです。
Rx
fun getMessages(): Observable<String> {
return Observable.create { observer ->
val callback = object: Callback {
override fun onNextValue(value: String) {
observer.onNext(value)
}
override fun onApiError(cause: Throwable) {
observer.onError(Exception("API Error", cause))
}
override fun onComplete() = observer.onComplete()
}
}.doFinally {/*streamが止まった時の後処理*/}
}
Coroutine
fun getMessages(): Flow<String> {
return callbackFlow {
val callback = object: Callback {
override fun onNextValue(value: String) {
offer(value)
}
override fun onApiError(cause: Throwable) {
cancel(Exception("API Error", cause))
}
override fun onComplete() = channel.close()
}
}
// awaitCloseのところで処理が止まるため、awaitClose{}の下にコードを書いてはいけない
awaitClose {/*streamが止まった時の後処理*/}
}
BehaviorSubject()
常に値を1個持っているHost Stream
Rx
class SampleClass {
private val behaviorSubject: BehaviorSubject<Int> = BehaviorSubject.create(0)
}
Coroutine
class SampleClass {
private val channel: Channel<Int> = Channel(Channel.CONFLATED)
}
ReplaySubject()
Rx
class SampleClass {
private val replaySubject: ReplaySubject<Int> = ReplaySubject.create()
}
Coroutine
class SampleClass {
private val channel: Channel<Int> = Channel(Channel.UNLIMTED)
}
Subscribe()
Coroutineの場合は、型によって呼ぶメソッドが変わるのが特徴。
Rx
Single.just(1).subscribeBy(
onNext = { v ->
// vがonNextした値
}
)
Coroutine[suspend funを呼ぶ場合]
suspend fun sample(): Int {
return 1
}
//// ここまで準備 ////
launch {
sample() // 普通に呼ぶ
}
Coroutine[flowを呼ぶ場合]
fun getFlow(): Flow<Int> = flowOf(1)
//// ここまで準備 ////
launch {
getFlow().collect{ v ->
// vがemitされてきた値
}
}
Coroutine[channelを呼ぶ場合]
fun getChannel(): ReceiveChannel<Int> {
// 予めchannelが作ってある前提
return this.channel
}
//// ここまで準備 ////
launch {
getChannel()
.consumeAsFlow() // ここで一度Cold Streamに変換する。その後はFlowと同様
.collect { v ->
// vがsendされてきた値
}
}
RxJavaの「あれ」ってKotlin Coroutineではどうするの?[応用編]
複数のStreamをシーケンシャルに処理する
複数のStreamを順番に処理したい場合
Rx
fun getMessages(): Observable<String> {
return Observable.create<String> { observer ->
val callback = object: Callback {
override fun onNextValue(value: String) {
observer.onNext(value)
}
override fun onApiError(cause: Throwable) {
observer.onError(Exception("API Error", cause))
}
override fun onComplete() = observer.onComplete()
}
}.doFinally {/*streamが止まった時の後処理*/}
}
fun getRequest(url: String): Single<String> {
return Single.create { emitter ->
// http通信をする
emitter.onSuccess(message.body)
}
}
fun insertDB(body: String): Completable {
return Completable.create { emitter ->
// DBに入れる
emitter.onComplete()
}
}
//// ここまで準備 ////
getMessages()
.flatMapSingle { getRequest(it) }
.flatMapCompletable { insertDB(it) }
.subscribeBy(onComplete = {})
Coroutine
onEachで後ろからStreamが流れてきたら処理をします。
今回は値を返さないCompletableだったので、onEachですが、flatMapMergeもあります。
fun getMessages(): Flow<String> {
return callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: String) {
offer(value)
}
override fun onApiError(cause: Throwable) {
cancel(Exception("API Error", cause))
}
override fun onComplete() = channel.close()
}
}
// awaitCloseのところで処理が止まるため、awaitClose{}の下にコードを書いてはいけない
awaitClose {/*streamが止まった時の後処理*/}
}
suspend fun getRequest(url: String): String {
// http通信をする
return message.body
}
suspend fun insertDB(body: String) {
// DBに入れる
return
}
//// ここまで準備 ////
launch {
getMessages().onEach { url ->
val body = getRequest(rul)
insertDB(body)
}.collect {}
}
並列に処理して待ち合わせる
2か所にHTTP Requestを送信して、両方が成功したら次に進む処理。
シーケンシャルではなく、同時に送信するのがポイント
Rx
fun getHogeRequest(): Single<String> {
return Single.create { emitter ->
// HTTPリクエストする
emitter.onSuccess(message.body)
}
}
fun getFugaRequest(): Single<String> {
return Single.create { emitter ->
// HTTPリクエストする
emitter.onSuccess(message.body)
}
}
//// ここまで準備 ////
Single.zip(getHogeRequest(), getFugaRequest()).subscribeBy(onNext = { v: Pair<String, String> ->
})
Coroutine
suspend funはasync{}で囲むことで、並列に処理ができるようになります。
全てが終了するまで待つときはList<Deffered<T>>にawaitAll()があるので、それを呼びます。
suspend fun getHogeRequest(): String {
// HTTPリクエストする
return message.body
}
suspend fun getFugaRequest(): String {
// HTTPリクエストする
return message.body
}
//// ここまで準備 ////
launch {
val resultList: List<String> = listOf(async{ getHogeRequest() }, async{ getFugaRequest() }).awaitAll()
}
Dispose
Rx
val disposable = hogeObservable().subscribeBy(onNext = {})
disposable.dispose()
Coroutine
val job = launch {
hogeFlow.collect {}
}
job.cancel()
まとめ
Rx使いがCoroutineを使いこなすためのポイントをまとめます。
- CoroutineScopeやCoroutineContextを何となく理解する
-
SingleやCompletableを使いたくなったらsuspend funを使う -
Observableを使いたくなったらFlowを使う -
BehaviorSubjectなどのSubject系を使いたくなったらChannelを使う
ここでは細かく書きませんでしたが、ObservableとFlowのオペレーターは似ていますが、異なる部分も多々あります。
最終的には公式ドキュメントが一番ですので、概要を理解したら公式ドキュメントを読んでみてください。
Rxのリファレンス、Coroutineのリファレンス