RxJava2初心者です。
正確には、subscribeOnで別スレッドで処理中の場合という条件付き。でも、同じスレッドを使う人はいないと思うので大半は引っかかるはず。(AndroidだとUIスレッド等の兼ね合いもありますし。)
発端
subscribeしたけどやっぱキャンセルしよーっと!って思ってdispose(unsubscribe)したら、Androidのアプリがクラッシュした。安直にdisposeするのは良くなかったらしい。
調べてみると、disposeした瞬間に処理を中断するわけではなくて、まずInterruptが行われsleepしていればInterruptExceptionが発生する模様。もちろん、(interrupted状態をチェックしないような)普通の処理中であれば処理が続く。そのままonErrorまで処理が到達すると、本件の例外が発生し、待ち受けているdoOnError等の処理が行われないまま、アプリが死ぬという感じ。
要するに、onErrorを書いてるのに何も考えずにdisposeするとクラッシュする場合がある。
対応
色々悩んだ挙句、ドキュメントに書いてあった。ちゃんとドキュメント読みましょう。
大まかには、グローバルなエラーハンドラーがあるのでそれでキャッチして処理するか、OnErrorする前にdispose済みであるかどうかで処理を書き分けるしかなさそう。
-
2.0以上であれば、グローバルなエラーハンドラを設定する
RxJavaPlugins.setErrorHandler { println("${Thread.currentThread().name}: catch GlobalErrorHandler ${it.message}") }
ただ、今回のケースで最適なのか良くわかってない。
ObservableEmitter#onError
使う前にObservableEmitter#isDiposed
で破棄済みかどうか確認してから行う。2.1.1以降であれば、
ObservableEmitter#onError
ではなくObservableEmitter#tryOnError
を使う。
disposeしたならもう処理結果に期待していないのだから、クラッシュしない程度に適当に処理しましょう、という感じに見える(雑)
蛇足
最初、ドキュメントのエラーハンドリングの項目を見つけるまでは悩んでて、でもtryOnErrorを実装していることから、tryOnErrorの実装した雰囲気から頑張ってチェック処理かけってことなのかな?って思っていたが、そうじゃなかった。
https://github.com/ReactiveX/RxJava/blob/2.x/CHANGES.md#version-211---june-21-2017-maven
https://github.com/ReactiveX/RxJava/pull/5344
再現コード
エラーだけ見たいのであればこんな感じに書いておけば起きます。
上記からグローバルなエラーハンドラーを登録するコードをコピペしたり、onErrorをtryOnErrorにすると落ち着くことが分かります。
fun sleep(time: Long ) = try{ TimeUnit.MILLISECONDS.sleep(time) }catch(e:InterruptedException){ println("Ignore Exception ${e.message}") }
@Test
fun test1() {
println("start")
val disposable = Observable.create<String> { emitter ->
// 別スレッドで重い処理
println("observer start")
sleep(1000)
println("observer end")
emitter.onError(Exception("FOOOOO")) // NG. isDisposedで確認してから使う。もしくはtryOnErrorを使う。
}.onErrorReturn {
println("onErrorReturn ${it.message}")
"onErrorReturn"
}.doOnError {
println("doOnError ${it.message}")
}.doOnDispose {
println("Dispose")
}.subscribeOn(
Schedulers.newThread() // Observerを別スレッドで処理する
).subscribe({
println("next")
}, {
println("error")
println(it)
}, {
println("compleate")
})
sleep(200)
// 気が変わったのでキャンセル
disposable.dispose()
sleep(1000)
println("end")
}
エラーハンドリングしないと以下のようなエラーになります。(sleepのtry-catchを削除すればInterruptException)
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: FOOOOO