4
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【Android / Kotlin】Rx を理解する(ストリーム編)

Last updated at Posted at 2021-07-22

はじめに

今回は今までプロジェクトでなんとなくで読んで使っていたRxを学び直しつつ、理解する。
読んでいただければ、Rx入門者の方でもプロジェクトのコードをある程度読めるレベルになるのではないか、と勝手に思っています。

誤った言い回し、解釈、記述などあればコメントいただければ幸いです。

Rx の概要(主な機能)

Rxの主な機能としてはざっくり以下だと認識している。

  1. 「値を流して処理する」(ストリーム)
  2. 「値の監視 / イベントを検知して処理する」(イベントバス的な)
  3. 「スレッド操作(切り替え・使い分け)」

今回は

  1. 「値を流して処理する」(ストリーム)

のイメージを掴んで、知識を整理しながら理解したいと思う。

参考リンク

処理の流れのイメージをざっくり理解する

最初はざっくり大体こんな感じに処理が流れることをイメージできればいいと思う。
複雑なコードであってもほとんどやっていることは同じで、こういった構造になっているはず。

  1. 値の流れを作る
  2. 値を操作する
  3. 流れてきた値を受け取って処理を実行する

※ 「2. 値を操作する」がない場合もある。また値を流さないこともある(後述するCompletableなど)。

// コードベースで見てみる
Observable.just(1, 2, 3, 4, 5, 6, 7) // just で数字を7個流す
      .filter { it % 2 == 0 } // 7個の数字が流れてくる
      .subscribe { println("偶数: $it") } // フィルタリングされた偶数の 2, 4, 6 を受け取って処理
①値の流れを作る
    .②値を操作する
    .③流れてきた値を受け取って処理を実行する
出力結果
System.out: 偶数: 2
System.out: 偶数: 4
System.out: 偶数: 6

登場人物

① 値の流れを作る(監視する):Observable(他Singleなど)
② 値を操作する:Operator(filterなど)
③ 流れてきた値を受け取って処理を実行する:Subscribe

① 値の流れを作る(監視する):Observable(他Singleなど)

流すことができる値 利用できるメソッド(Subscribeにて) 使い所
Observable 複数 onNext(a: Any)/onError(e: Throwable)/ onComplete() GUIでのイベント/複数回値を実行したいとき
Single 1つ onSuccess(a: Any)/onError(e: Throwable) 値を流す or エラーかのどちらかの場合/httpのGETなどでAPIからレスポンスを取得するときなど
Complete なし onComplete()/onEroor(e: Throwable) 返り値がない非同期処理/httpのPOST処理など

他にも、MaybeとかFutureとかあるようです。しかしとりあえず上記3つを使いこなせれば良さそう。

参考リンク

生成するときの関数

  • create
  • just
  • from
    などなど

② 値を操作する:Operator(filterなど)

collectionがもつような関数がたくさんある。
個々を詳しく解説はしないですが、Operator のメソッドはかなりたくさんあるので、出くわした都度調べて使えるようになるしかないのかな??と思いました。

Operator関数 (ごく一部)

  • filter
  • take
  • first
  • last
  • map
  • flatMap

などなど

Operatorに関して、有益でわかりやすい参考記事がたくさんありました。
これは出会した都度確認して、感覚を養いつつ使いこなせるようにするしかなさそう。

参考リンク

③ 流れてきた値を受け取って処理を実行する:Subscribe

Subscribe
実際に値が流れてきたときにどのような処理をするのかを定義しておく。

  • onNext(a: Any)
    • 値が流れてきたことを通知する。
    • 値を受け取って処理することができる。
    • onNext()の処理中にエラーが起きればonError()が呼ばれ、onComplete()は呼ばれない。
  • onError(e: Throwable)
    • エラー発生によるストリーム処理の終了を伝える。
  • onComplete()
    • ストリームの処理が正常に終了したことを伝える。
  • onSuccess(a: Any)
    • ストリームの処理が正常に終了したことを伝える。
    • 値を受け取って処理することができる。
    • onSuccess()の処理中にエラーが起きてもonError()が呼ばれるわけでは無い。(クラッシュしたりする)

この中で唯一onNext()だけがストリーム処理の途中で実行されるもの。

参考リンク

コードで動作を確認してみる

Observable

おさらい

  • 流すことができる値の数: 複数可能
  • 利用できるメソッド: onError() onComplete() onNext()
Observable.just(1, 2, 3, 4, 5, 6, 7)
    .filter { it % 2 == 0 }
    .subscribeBy(
    	onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
    	onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
    	onNext = { println("onNext: $it") } // onNext() では値を受け取れる
    )
出力結果
System.out: onNext: 2
System.out: onNext: 4
System.out: onNext: 6
System.out: onComplete: 正常に処理終了!

subscribe に流れてきた値の数だけonNext()が呼ばれ、onComplete()は値が全て流れ終わり、処理が終了したタイミングで最後に1回だけ呼ばれる。

エラーを起こしてみる

Operator処理のタイミングでエラーを起こす

Observable.just(1, 2, 3, 4, 5, 6, 7)
    .filter {
    	// 流れてきた値が 4 のときにエラーが起きるようにする
    	if (it == 4) {
    	    throw IllegalStateException("値が $it でエラー")
    	}
    	it % 2 == 0
    }
    .subscribeBy(
    	onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
    	onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
    	onNext = { println("onNext: $it") } // onNext() では値を受け取れる
    )
出力結果
System.out: onNext: 2
System.out: onError: エラーーーー java.lang.IllegalStateException: 値が 4 でエラー

流れてきた値が2のときは通常通りonNext()が呼ばれ、値が4のときにエラーを起こしているため例外処理以降はonError()が実行される。
そして、onError()でエラー内容を受け取ることができて、ストリーム処理が終了する。(5以降は流れてこない)

onNext()のタイミングでエラーを起こす

Observable.just(1, 2, 3, 4, 5, 6, 7)
    .filter { it % 2 == 0 }
    .subscribeBy(
    	onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
    	onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
    	onNext = {
    	    // 流れてきた値が 4 のときにエラーが起きるようにする
    	    if (it == 4) {
    	    	throw IllegalStateException("onNext: 値が $it でエラー")
    	    }
    	    println("onNext: $it")
    	} // onNext() では値を受け取れる
    )
System.out: onNext: 2
System.out: onError: エラーーーー java.lang.IllegalStateException: onNext: 値が 4 でエラー

流れてきた値が2のときは通常通りonNext()が呼ばれ、値が4のときにエラーを起こしているためonNext()の例外処理以降は実行されず、onError()が実行される。
そして、onError()でエラー内容を受け取ることができて、ストリーム処理が終了する。(5以降は流れてこない)

さらに doOnNext() で本当にエラーが起きた時点で値が流れてきてないか確かめてみる

先ほどのコードでjust の直後 .doOnNext { println("doOnNext: $it") }を追加しただけです。

ちなみにdoOnNext()はストリームの流れの途中で値を受け取って処理ができたりするものです。(ここら辺も解説する記事とか書きたい...)

Observable.just(1, 2, 3, 4, 5, 6, 7)
    .doOnNext { println("doOnNext: $it") }
    .filter { it % 2 == 0 }
    .subscribeBy(
    	onError = { println("onError: エラーーーー $it")}, // onError() ではエラーの内容(Throwable型)を受け取れる
    	onComplete = { println("onComplete: 正常に処理終了!") }, // onComplete() では値は受け取れない
    	onNext = {
    	    // 流れてきた値が 4 のときにエラーが起きるようにする
    	    if (it == 4) {
        		throw IllegalStateException("onNext: 値が $it でエラー")
    	    }
    	    println("onNext: $it")
    	} // onNext() では値を受け取れる
    )
System.out: doOnNext: 1
System.out: doOnNext: 2
System.out: onNext: 2
System.out: doOnNext: 3
System.out: doOnNext: 4
System.out: onError: エラーーーー java.lang.IllegalStateException: onNext: 値が 4 でエラー

確かにdoOnNext()では4までしか実行されておらず、onNext()でエラーが起きるとそれ以降の値5からは流れてきていないようである。

Single

おさらい

  • 流すことができる値の数: 1つだけ
  • 利用できるメソッド: onError() onSuccess()
// 流せる値は1個だけなので、リストを一つの値として渡している
Single.just(arrayListOf(1, 2, 3, 4, 5, 6, 7))
    .map {
    	it.filter { i ->
    	    i % 2 == 0
    	}
    }
    .subscribeBy(
    	onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
    	onSuccess = { println("onSuccess: $it") } // onSuccess() では流れてきた値を受け取れる
    )
出力結果
System.out: onSuccess: [2, 4, 6]

1回だけonSuccess()が呼ばれ、値を受け取れる。

エラーを起こしてみる

Operator処理のタイミングでエラーを起こす

// 流せる値は1個だけなので、リストを一つの値として渡している
Single.just(arrayListOf(1, 2, 3, 4, 5, 6, 7))
    .map {
        it.filter { i ->
            // 処理の値が 4 のときにエラーが起きるようにする
            if (i == 4) {
                throw IllegalStateException("値が $i でエラー")
            }
            i % 2 == 0
        }
    }
    .subscribeBy(
        onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
        onSuccess = { println("onSuccess: $it") } // onSuccess() では流れてきた値を受け取れる
    )
出力結果
System.out: onError: エラーーーー java.lang.IllegalStateException: 値が 4 でエラー

onSuccess()は呼ばれず、onError()だけ呼ばれる。

onSuccess()のタイミングでエラーを起こす

// 流せる値は1個だけなので、リストを一つの値として渡している
Single.just(arrayListOf(1, 2, 3, 4, 5, 6, 7))
    .map {
        it.filter { i ->
            i % 2 == 0
        }
     }
     .subscribeBy(
         onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
         onSuccess = {
            throw IllegalStateException("onSuccess でエラー")
            println("onSuccess: $it")
         } // onSuccess() では流れてきた値を受け取れる
    )
出力結果(エラーログ)
2021-07-xx xx:xx:xx.xx 19076-19076/com.android.sample E/AndroidRuntime: FATAL EXCEPTION: main
    Process: com.android.rxsample, PID: 19076
    java.lang.IllegalStateException: onSuccess でエラー
        at
~~ 省略 ~~

普通にエラーが吐き出される(クラッシュする)。
ObservableonNext()でエラーになったらonError()が呼ばれる、というような動きになるわけではない。
当然と言えば当然なのですが、onSuccess()onError()は両方ともストリーム処理が終了されたときに呼ばれるので、どちらかしか呼ばれない。

Completable

おさらい

  • 流すことができる値の数: 0 (値は流せない)
  • 利用できるメソッド: onError() onComplete()
// 値は渡せない
Completable.create { emitter: CompletableEmitter ->
        try {
            // 何らかの処理
            emitter.onComplete()
        } catch (ex: Exception) {
            emitter.onError(ex)
        }
    }
    .subscribeBy(
        onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
        onComplete = { println("onComplete: 正常に終了!") } // onComplete() では値は受け取れない
    )
出力結果
System.out: onComplete: 正常に終了!

基本的に処理が「正常に終了した」もしくは「エラーになったか」を通知する(受け取る)だけ。
正常に処理が終了すれば、onComplete()が呼ばれる。
当然onError()は呼ばれない。

エラーを起こしてみる

create 処理のタイミングでエラーを起こす

// 値は渡せない
Completable.create { emitter: CompletableEmitter ->
        try {
            // ここでエラーを起こす
            throw IllegalStateException("create で エラーーー")
            emitter.onComplete()
        } catch (ex: Exception) {
            emitter.onError(ex)
        }
    }
    .subscribeBy(
        onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
        onComplete = { println("onComplete: 正常に終了!") } // onComplete() では値を受け取れない
    ) 
出力結果
System.out: onError: エラーーーー java.lang.IllegalStateException: create で エラーーー

createの処理中にエラーを起こす。
onError()が呼ばれ、当然onComplete()は呼ばれない。

onComplete() のタイミングでエラーを起こす

// 値は渡せない
Completable.create { emitter: CompletableEmitter ->
        try {
            emitter.onComplete()
	    } catch (ex: Exception) {
	    	emitter.onError(ex)
     	}
    }
    .subscribeBy(
        onError = { println("onError: エラーーーー $it") }, // onError() ではエラーの内容(Throwable型)を受け取れる
        onComplete = {
            throw IllegalStateException("onComplete でエラー")
            println("onComplete: 正常に終了!")
        } // onComplete() では値は受け取れない
    )
出力結果(エラーログ)
2021-07-xx xx:xx:xx.xx 19076-19076/com.android.sample E/AndroidRuntime: FATAL EXCEPTION: main
    Process: com.android.rxsample, PID: 19076
    java.lang.IllegalStateException: onSuccess でエラー
        at
~~ 省略 ~~

普通にエラーが吐き出される(クラッシュする)。
(SingleのonSuccess()onError()の関係と同様に)onComplete()onError()は両方ともストリーム処理が終了されたときに呼ばれるので、どちらかしか呼ばれない。

蛇足ですが、 subscribeBy() と subscribeBy{ }の話

subscribeBy()subscribeBy{ } ってどう違うの??って気になったので調べた。

● 結論
Kotlin では最後の引数を { }(ラムダ式)で書けるって言うだけ。
subscribeBy{ }では、引数を省略して最後の引数だけをスコープの処理として渡している。
実際ObservablesubscribeBy()に定義ジャンプしてみると、Observable<T>の拡張関数としてこのように定義されている。↓↓

subscribers.kt
fun <T : Any> Observable<T>.subscribeBy(
        onError: (Throwable) -> Unit = onErrorStub,
        onComplete: () -> Unit = onCompleteStub,
        onNext: (T) -> Unit = onNextStub
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())

つまり、Observableの場合はonErroronCompleteを省略して最後の引数onNextの処理を{ }に書いて渡しているだけ。

それぞれこんな感じ
// Observable の場合
.subscribeBy(
  onError = {エラ処理},
  onComplete = {完了処理},
  onNext = {値が流れてきたときの処理}
)

.subscribeBy {
  // onNext の処理を書く(onError, onComplete を引数に渡すのを省略している)
}
// Single の場合
.subscribeBy(
  onError = {エラ処理},
  onSuccess = {値を受け取りつつ、完了処理}
)

.subscribeBy {
  // onSuccess の処理を書く(onError を引数に渡すのを省略している)
}
// Completable の場合
.subscribeBy(
  onError = {エラ処理},
  onComplete = {完了処理}
)

.subscribeBy {
  // onComplete の処理を書く(onError を引数に渡すのを省略している)
}

最後に

Rxに関する有益な記事はたくさんありますが、断片的な情報の記事だったり前提知識がかなり必要であったりと、初学者にとってはかなりとっつきづらい技術な気がします。
今後もいろいろ頑張ります。笑

ありがとうございました。

4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?