RxJavaのエラーハンドリング

  • 127
    いいね
  • 2
    コメント

RxJavaでは非同期処理などで非常に便利なライブラリですが、そのエラーハンドリングもとてもスマートに行うことができます。ここではRxJavaのエラーハンドリングの詳細について説明したいと思います。

subscribeでのエラー処理

通常はsubscribe()でエラーを受け取ってハンドリングすることとなると思います。

subscribeでのエラー処理
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        // エラー時の処理をここで受け取る
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable

注意が必要なのは下記のようにonNext()のハンドリングだけを行うようにしていると、onError()発火時にクラッシュします。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2

Crash!! 
by rx.exceptions.OnErrorNotImplementedException

のでエラーが発生する可能性がある場合はきちんとハンドリングしましょう。

Errorをキャッチする

onErrorReturn

Observableチェインのなかで発生したErrorをキャッチして、大体のObjectに変換することでsubscriberにErrorが渡されるのを防ぐことができます。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        return "return";
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> on next: return
> completed

エラー時に無事return文字列に変換されました。

onErrorResumeNext

Observableチェインのなかで発生したErrorをキャッチして、その中で再度Observableを呼び出すことで、エラー時に代替のStreamを返すことができます。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
                    @Override
                    public Observable<? extends String> call(Throwable throwable) {
                        return Observable.from(new String[]{ "resume 1", "resume 2"});
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> on next: resume 1
> on next: resume 2
> completed

エラー時にresume 1,resume 2のストリームに変換されました。

エラー時にリトライする

retry

retry()メソッドはErrorが起こった時に、自動的にsubscribeしなおしてくれる、非常に便利なメソッドです。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retry()
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2

...成功するまでリトライし続ける。。。

引数なしのretry()メソッドは、それまでの処理を成功するまでリトライし続けます。
これでは復旧できない問題が起こった時に無限ループになりかねないので、リトライの回数を制限するメソッドもあります。

// count: リトライ回数
public final Observable<T> retry(long count);

また、Errorの状況をみてリトライするかどうかも判断できます。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retry(new Func2<Integer, Throwable, Boolean>() {
                    @Override
                    public Boolean call(Integer count, Throwable throwable) {
                    // 2回までは無条件にリトライする
                        if(count < 3){
                            return true;
                        }
                    // それ以降はIllegalStateExceptionの場合だけリトライする
                        return throwable instanceof IllegalStateException;
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable

retryWhen

retryWhen()メソッドはちょっと複雑でわかりにくいですが、より細かくリトライ処理を制御するためのメソッドです。retryWhen()のコールバックは初回からリトライ後に起こるErrorをObservableとして受け取り、それを変換して何らかのObservableとして返します。そしてこのコールバックで返したObservableで何らかの値がonNext()されると、そのタイミングでリトライされます。また、このObservableでonComplete()もしくはonError()を呼び出されると、もとのObservableがonComplete()もしくはonError()終了します。

ちょっとわかりにくいので簡単に例を示します。

3秒後にリトライ

3秒後にリトライするには以下のように記述します。

エラー発生後3秒待ってからリトライする
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.timer(3, TimeUnit.SECONDS);
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後

> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2

....繰り返し

Errorのまま終了

Errorのまま終了するには

Errorのまま終了する
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.error(throwable);
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable

このコードはちょうどretryWhenメソッドを適用せずに、Errorを受け取ったのと同じ動作となります。

ErrorをハンドリングせずにComplete

Errorをハンドリングせずにその場でCompleteするには一見以下のコードで良さそうですが、これでは実は正しく動作しません。

Errorが来たらempty()でcompleteしたい
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.empty();
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2

詳しくはここのやり取りにもあるのですが、ここでflatMapしてempty()で返しても、もともとObservable<Throwable>の流れをcompleteするわけではないので、ここでは無視されてしまいます。

実際、Errorが起こったらいきなりcompleteしたい処理を書きたい場合には、実際には以下のように、errorのstreamを最初の一つだけ受け取って、それを無視する、というコードを書く必要があります。難しいですね。

Errorが来たらempty()でcompleteしたい
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.take(1).ignoreElements();
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> completed

ちゃんとCompleteできた!!

ちなみにtake(0)だとerrorが起こる前にいきなり全体のObservableが終了してしまいます。

余談ですが、エラー時に何もせずcompletedにしたい場合はonErrorResumeNext()を使ったほうがスマートかも知れません。

         .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
                    @Override
                    public Observable<? extends String> call(Throwable throwable) {
                        return Observable.empty();
                    }
                })

3回リトライしたら終了

上記の例をもとに3回リトライする場合は、

3回リトライ
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.take(3);
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

のように書きます。

> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> completed

ちゃんと3回リトライして終了しました。
この場合は先程のretry(count)のメソッドとの違いは、retry(count)ではリトライ回数が上限に達した後にerrorで終了するが、このケースではcompletedで終了する、という点にあります。

3秒後にリトライ、を3回試して終了

上記の例と、retryWhenの一番最初の例を組み合わせることで、3秒後にリトライ、を3回試して終了、という処理は以下のように書けます。

3秒後にリトライ、を3回試して終了
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.take(3).flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.timer(3, TimeUnit.SECONDS);
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });


> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後

> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後

> subscribe
> on next: emit 1
> on next: emit 2
> completed

以上です。retryWhen()を駆使すればもっと細かなリトライ処理を書けそうですね!

あとがき

Advent Calendarの4日目としてエントリさせて頂いたのですが、いきなり期日に間に合わず、穴を開ける形となってしまいました。大変申し訳ありません。

このエントリを書く際に非常に困ったことがありました。初めはこのエントリのサンプルコードはすべてKotlinで書く予定だったんです。しかし、実際にためしてみるとKotlinではretryWhenのコードがコンパイル通らない!

以下のコードがコンパイルがとおらない

Observable
            .just("hoge")
            .retryWhen { it.flatMap{ Observable.timer(3, TimeUnit.SECONDS) } }

いろいろ調べたのですが、結局解決至らず、Kotlinでサンプルコードを書くのは諦めました。 orz