はじめに
今回は今までプロジェクトでなんとなくで読んで使っていたRxを学び直しつつ、理解する。
読んでいただければ、Rx入門者の方でもプロジェクトのコードをある程度読めるレベルになるのではないか、と勝手に思っています。
誤った言い回し、解釈、記述などあればコメントいただければ幸いです。
Rx の概要(主な機能)
Rxの主な機能としてはざっくり以下だと認識している。
- 「値を流して処理する」(ストリーム)
- 「値の監視 / イベントを検知して処理する」(イベントバス的な)
- 「スレッド操作(切り替え・使い分け)」
今回は
- 「値を流して処理する」(ストリーム)
のイメージを掴んで、知識を整理しながら理解したいと思う。
参考リンク
- https://qiita.com/disc99/items/1b2e44a1105008ec3ac9#merge
- https://ichi.pro/android-de-rxjava-o-shiyosuru-riyu-139590064902698
- https://tech.ga-tech.co.jp/entry/2020/01/rxjava-beginner
- https://qiita.com/k-mats/items/4d374460a3f6284dd09f
- https://qiita.com/k-mats/items/3844a08b8958f77c45d0
- https://qiita.com/pirorirori_n712/items/3f01aaa2876f3121d43a
- https://qiita.com/Yoshi25/items/200e253a9d61df9bb21b
- https://qiita.com/toastkidjp/items/581e89559f05952fbdb6
処理の流れのイメージをざっくり理解する
最初はざっくり大体こんな感じに処理が流れることをイメージできればいいと思う。
複雑なコードであってもほとんどやっていることは同じで、こういった構造になっているはず。
- 値の流れを作る
- 値を操作する
- 流れてきた値を受け取って処理を実行する
※ 「2. 値を操作する」がない場合もある。また値を流さないこともある(後述するCompletable
など)。
// コードベースで見てみる
Observable.just(1, 2, 3, 4, 5, 6, 7) // just で数字を7個流す
.filter { it % 2 == 0 } // 7個の数字が流れてくる
.subscribe { println("偶数: $it") } // フィルタリングされた偶数の 2, 4, 6 を受け取って処理
①値の流れを作る
.②値を操作する
.③流れてきた値を受け取って処理を実行する
出力結果
System.out: 偶数: 2
System.out: 偶数: 4
System.out: 偶数: 6
登場人物
① 値の流れを作る(監視する):Observable(他Singleなど)
② 値を操作する:Operator(filterなど)
③ 流れてきた値を受け取って処理を実行する:Subscribe
① 値の流れを作る(監視する):Observable(他Singleなど)
流すことができる値 | 利用できるメソッド(Subscribeにて) | 使い所 | |
---|---|---|---|
Observable |
複数 |
onNext(a: Any) /onError(e: Throwable) / onComplete()
|
GUIでのイベント/複数回値を実行したいとき |
Single |
1つ |
onSuccess(a: Any) /onError(e: Throwable)
|
値を流す or エラーかのどちらかの場合/httpのGET などでAPIからレスポンスを取得するときなど |
Complete |
なし |
onComplete() /onEroor(e: Throwable)
|
返り値がない非同期処理/httpのPOST 処理など |
他にも、Maybe
とかFuture
とかあるようです。しかしとりあえず上記3つを使いこなせれば良さそう。
参考リンク
- https://rxmarbles.com/
- https://qiita.com/yu-aiki/items/f94ad9822dadc6a6ec62
- https://qiita.com/takahirom/items/f3e576e91b219c7239e7
生成するときの関数
create
just
-
from
などなど
② 値を操作する:Operator(filterなど)
collection
がもつような関数がたくさんある。
個々を詳しく解説はしないですが、Operator のメソッドはかなりたくさんあるので、出くわした都度調べて使えるようになるしかないのかな??と思いました。
Operator関数 (ごく一部)
filter
take
first
last
map
flatMap
などなど
Operatorに関して、有益でわかりやすい参考記事がたくさんありました。
これは出会した都度確認して、感覚を養いつつ使いこなせるようにするしかなさそう。
参考リンク
- https://qiita.com/yukiyamadajp/items/627c4bc25956d421cb6a#%E7%B5%84%E3%81%BF%E5%90%88%E3%82%8F%E3%81%9B
- https://qiita.com/yukiyamadajp/items/a0855189988539c18b8f#%E3%83%AA%E3%82%A2%E3%82%AF%E3%83%86%E3%82%A3%E3%83%96%E3%83%97%E3%83%AD%E3%82%B0%E3%83%A9%E3%83%9F%E3%83%B3%E3%82%B0%E3%81%A8%E3%81%AF
③ 流れてきた値を受け取って処理を実行する:Subscribe
Subscribe
実際に値が流れてきたときにどのような処理をするのかを定義しておく。
-
onNext(a: Any)
- 値が流れてきたことを通知する。
- 値を受け取って処理することができる。
-
onNext()
の処理中にエラーが起きればonError()
が呼ばれ、onComplete()
は呼ばれない。
-
onError(e: Throwable)
- エラー発生によるストリーム処理の終了を伝える。
-
onComplete()
- ストリームの処理が正常に終了したことを伝える。
-
onSuccess(a: Any)
- ストリームの処理が正常に終了したことを伝える。
- 値を受け取って処理することができる。
-
onSuccess()
の処理中にエラーが起きてもonError()
が呼ばれるわけでは無い。(クラッシュしたりする)
この中で唯一onNext()
だけがストリーム処理の途中で実行されるもの。
参考リンク
コードで動作を確認してみる
Observable
おさらい
- 流すことができる値の数: 複数可能
- 利用できるメソッド:
onError()
onComplete()
onNext()
Observable.just(1, 2, 3, 4, 5, 6, 7)
.filter { it % 2 == 0 }
.subscribeBy(
onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
onNext = { println("onNext: $it") } // onNext() では値を受け取れる
)
出力結果
System.out: onNext: 2
System.out: onNext: 4
System.out: onNext: 6
System.out: onComplete: 正常に処理終了!
subscribe に流れてきた値の数だけonNext()
が呼ばれ、onComplete()
は値が全て流れ終わり、処理が終了したタイミングで最後に1回だけ呼ばれる。
エラーを起こしてみる
Operator処理のタイミングでエラーを起こす
Observable.just(1, 2, 3, 4, 5, 6, 7)
.filter {
// 流れてきた値が 4 のときにエラーが起きるようにする
if (it == 4) {
throw IllegalStateException("値が $it でエラー")
}
it % 2 == 0
}
.subscribeBy(
onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
onNext = { println("onNext: $it") } // onNext() では値を受け取れる
)
出力結果
System.out: onNext: 2
System.out: onError: エラーーーー java.lang.IllegalStateException: 値が 4 でエラー
流れてきた値が2のときは通常通りonNext()
が呼ばれ、値が4のときにエラーを起こしているため例外処理以降はonError()
が実行される。
そして、onError()
でエラー内容を受け取ることができて、ストリーム処理が終了する。(5以降は流れてこない)
onNext()のタイミングでエラーを起こす
Observable.just(1, 2, 3, 4, 5, 6, 7)
.filter { it % 2 == 0 }
.subscribeBy(
onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
onNext = {
// 流れてきた値が 4 のときにエラーが起きるようにする
if (it == 4) {
throw IllegalStateException("onNext: 値が $it でエラー")
}
println("onNext: $it")
} // onNext() では値を受け取れる
)
System.out: onNext: 2
System.out: onError: エラーーーー java.lang.IllegalStateException: onNext: 値が 4 でエラー
流れてきた値が2のときは通常通りonNext()
が呼ばれ、値が4のときにエラーを起こしているためonNext()
の例外処理以降は実行されず、onError()
が実行される。
そして、onError()
でエラー内容を受け取ることができて、ストリーム処理が終了する。(5以降は流れてこない)
さらに doOnNext() で本当にエラーが起きた時点で値が流れてきてないか確かめてみる
先ほどのコードでjust
の直後 .doOnNext { println("doOnNext: $it") }
を追加しただけです。
ちなみにdoOnNext()
はストリームの流れの途中で値を受け取って処理ができたりするものです。(ここら辺も解説する記事とか書きたい...)
Observable.just(1, 2, 3, 4, 5, 6, 7)
.doOnNext { println("doOnNext: $it") }
.filter { it % 2 == 0 }
.subscribeBy(
onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
onNext = {
// 流れてきた値が 4 のときにエラーが起きるようにする
if (it == 4) {
throw IllegalStateException("onNext: 値が $it でエラー")
}
println("onNext: $it")
} // onNext() では値を受け取れる
)
System.out: doOnNext: 1
System.out: doOnNext: 2
System.out: onNext: 2
System.out: doOnNext: 3
System.out: doOnNext: 4
System.out: onError: エラーーーー java.lang.IllegalStateException: onNext: 値が 4 でエラー
確かにdoOnNext()
では4までしか実行されておらず、onNext()
でエラーが起きるとそれ以降の値5からは流れてきていないようである。
Single
おさらい
- 流すことができる値の数: 1つだけ
- 利用できるメソッド:
onError()
onSuccess()
// 流せる値は1個だけなので、リストを一つの値として渡している
Single.just(arrayListOf(1, 2, 3, 4, 5, 6, 7))
.map {
it.filter { i ->
i % 2 == 0
}
}
.subscribeBy(
onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
onSuccess = { println("onSuccess: $it") } // onSuccess() では流れてきた値を受け取れる
)
出力結果
System.out: onSuccess: [2, 4, 6]
1回だけonSuccess()
が呼ばれ、値を受け取れる。
エラーを起こしてみる
Operator処理のタイミングでエラーを起こす
// 流せる値は1個だけなので、リストを一つの値として渡している
Single.just(arrayListOf(1, 2, 3, 4, 5, 6, 7))
.map {
it.filter { i ->
// 処理の値が 4 のときにエラーが起きるようにする
if (i == 4) {
throw IllegalStateException("値が $i でエラー")
}
i % 2 == 0
}
}
.subscribeBy(
onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
onSuccess = { println("onSuccess: $it") } // onSuccess() では流れてきた値を受け取れる
)
出力結果
System.out: onError: エラーーーー java.lang.IllegalStateException: 値が 4 でエラー
onSuccess()
は呼ばれず、onError()
だけ呼ばれる。
onSuccess()のタイミングでエラーを起こす
// 流せる値は1個だけなので、リストを一つの値として渡している
Single.just(arrayListOf(1, 2, 3, 4, 5, 6, 7))
.map {
it.filter { i ->
i % 2 == 0
}
}
.subscribeBy(
onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
onSuccess = {
throw IllegalStateException("onSuccess でエラー")
println("onSuccess: $it")
} // onSuccess() では流れてきた値を受け取れる
)
出力結果(エラーログ)
2021-07-xx xx:xx:xx.xx 19076-19076/com.android.sample E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.android.rxsample, PID: 19076
java.lang.IllegalStateException: onSuccess でエラー
at
~~ 省略 ~~
普通にエラーが吐き出される(クラッシュする)。
Observable
のonNext()
でエラーになったらonError()
が呼ばれる、というような動きになるわけではない。
当然と言えば当然なのですが、onSuccess()
とonError()
は両方ともストリーム処理が終了されたときに呼ばれるので、どちらかしか呼ばれない。
Completable
おさらい
- 流すことができる値の数: 0 (値は流せない)
- 利用できるメソッド:
onError()
onComplete()
// 値は渡せない
Completable.create { emitter: CompletableEmitter ->
try {
// 何らかの処理
emitter.onComplete()
} catch (ex: Exception) {
emitter.onError(ex)
}
}
.subscribeBy(
onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = { println("onComplete: 正常に終了!") } // onComplete() では値は受け取れない
)
出力結果
System.out: onComplete: 正常に終了!
基本的に処理が「正常に終了した」もしくは「エラーになったか」を通知する(受け取る)だけ。
正常に処理が終了すれば、onComplete()
が呼ばれる。
当然onError()
は呼ばれない。
エラーを起こしてみる
create 処理のタイミングでエラーを起こす
// 値は渡せない
Completable.create { emitter: CompletableEmitter ->
try {
// ここでエラーを起こす
throw IllegalStateException("create で エラーーー")
emitter.onComplete()
} catch (ex: Exception) {
emitter.onError(ex)
}
}
.subscribeBy(
onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = { println("onComplete: 正常に終了!") } // onComplete() では値を受け取れない
)
出力結果
System.out: onError: エラーーーー java.lang.IllegalStateException: create で エラーーー
create
の処理中にエラーを起こす。
onError()
が呼ばれ、当然onComplete()
は呼ばれない。
onComplete() のタイミングでエラーを起こす
// 値は渡せない
Completable.create { emitter: CompletableEmitter ->
try {
emitter.onComplete()
} catch (ex: Exception) {
emitter.onError(ex)
}
}
.subscribeBy(
onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
onComplete = {
throw IllegalStateException("onComplete でエラー")
println("onComplete: 正常に終了!")
} // onComplete() では値は受け取れない
)
出力結果(エラーログ)
2021-07-xx xx:xx:xx.xx 19076-19076/com.android.sample E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.android.rxsample, PID: 19076
java.lang.IllegalStateException: onSuccess でエラー
at
~~ 省略 ~~
普通にエラーが吐き出される(クラッシュする)。
(SingleのonSuccess()
とonError()
の関係と同様に)onComplete()
とonError()
は両方ともストリーム処理が終了されたときに呼ばれるので、どちらかしか呼ばれない。
蛇足ですが、 subscribeBy() と subscribeBy{ }の話
subscribeBy()
と subscribeBy{ }
ってどう違うの??って気になったので調べた。
● 結論
Kotlin
では最後の引数を { }
(ラムダ式)で書けるって言うだけ。
subscribeBy{ }
では、引数を省略して最後の引数だけをスコープの処理として渡している。
実際Observable
のsubscribeBy()
に定義ジャンプしてみると、Observable<T>
の拡張関数としてこのように定義されている。↓↓
fun <T : Any> Observable<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
つまり、Observable
の場合はonError
とonComplete
を省略して最後の引数onNext
の処理を{ }
に書いて渡しているだけ。
それぞれこんな感じ
// Observable の場合
.subscribeBy(
onError = {エラー処理},
onComplete = {完了処理},
onNext = {値が流れてきたときの処理}
)
.subscribeBy {
// onNext の処理を書く(onError, onComplete を引数に渡すのを省略している)
}
// Single の場合
.subscribeBy(
onError = {エラー処理},
onSuccess = {値を受け取りつつ、完了処理}
)
.subscribeBy {
// onSuccess の処理を書く(onError を引数に渡すのを省略している)
}
// Completable の場合
.subscribeBy(
onError = {エラー処理},
onComplete = {完了処理}
)
.subscribeBy {
// onComplete の処理を書く(onError を引数に渡すのを省略している)
}
最後に
Rxに関する有益な記事はたくさんありますが、断片的な情報の記事だったり前提知識がかなり必要であったりと、初学者にとってはかなりとっつきづらい技術な気がします。
今後もいろいろ頑張ります。笑
ありがとうございました。