172
148

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJavaAdvent Calendar 2015

Day 4

RxJavaのエラーハンドリング

Last updated at Posted at 2015-12-07

RxJavaでは非同期処理などで非常に便利なライブラリですが、そのエラーハンドリングもとてもスマートに行うことができます。ここではRxJavaのエラーハンドリングの詳細について説明したいと思います。

subscribeでのエラー処理

通常はsubscribe()でエラーを受け取ってハンドリングすることとなると思います。

subscribeでのエラー処理
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        // エラー時の処理をここで受け取る
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable

注意が必要なのは下記のようにonNext()のハンドリングだけを行うようにしていると、onError()発火時にクラッシュします。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2

Crash!! 
by rx.exceptions.OnErrorNotImplementedException

のでエラーが発生する可能性がある場合はきちんとハンドリングしましょう。

Errorをキャッチする

onErrorReturn

Observableチェインのなかで発生したErrorをキャッチして、大体のObjectに変換することでsubscriberにErrorが渡されるのを防ぐことができます。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        return "return";
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> on next: return
> completed

エラー時に無事return文字列に変換されました。

onErrorResumeNext

Observableチェインのなかで発生したErrorをキャッチして、その中で再度Observableを呼び出すことで、エラー時に代替のStreamを返すことができます。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
                    @Override
                    public Observable<? extends String> call(Throwable throwable) {
                        return Observable.from(new String[]{ "resume 1", "resume 2"});
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> on next: resume 1
> on next: resume 2
> completed

エラー時にresume 1,resume 2のストリームに変換されました。

エラー時にリトライする

retry

retry()メソッドはErrorが起こった時に、自動的にsubscribeしなおしてくれる、非常に便利なメソッドです。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retry()
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2

...成功するまでリトライし続ける。。。

引数なしのretry()メソッドは、それまでの処理を成功するまでリトライし続けます。
これでは復旧できない問題が起こった時に無限ループになりかねないので、リトライの回数を制限するメソッドもあります。

// count: リトライ回数
public final Observable<T> retry(long count);

また、Errorの状況をみてリトライするかどうかも判断できます。

        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retry(new Func2<Integer, Throwable, Boolean>() {
                    @Override
                    public Boolean call(Integer count, Throwable throwable) {
                    // 2回までは無条件にリトライする
                        if(count < 3){
                            return true;
                        }
                    // それ以降はIllegalStateExceptionの場合だけリトライする
                        return throwable instanceof IllegalStateException;
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable

retryWhen

retryWhen()メソッドはちょっと複雑でわかりにくいですが、より細かくリトライ処理を制御するためのメソッドです。retryWhen()のコールバックは初回からリトライ後に起こるErrorをObservableとして受け取り、それを変換して何らかのObservableとして返します。そしてこのコールバックで返したObservableで何らかの値がonNext()されると、そのタイミングでリトライされます。また、このObservableでonComplete()もしくはonError()を呼び出されると、もとのObservableがonComplete()もしくはonError()終了します。

ちょっとわかりにくいので簡単に例を示します。

3秒後にリトライ

3秒後にリトライするには以下のように記述します。

エラー発生後3秒待ってからリトライする
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.timer(3, TimeUnit.SECONDS);
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後

> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2

....繰り返し

Errorのまま終了

Errorのまま終了するには

Errorのまま終了する
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.error(throwable);
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable

このコードはちょうどretryWhenメソッドを適用せずに、Errorを受け取ったのと同じ動作となります。

ErrorをハンドリングせずにComplete

Errorをハンドリングせずにその場でCompleteするには一見以下のコードで良さそうですが、これでは実は正しく動作しません。

Errorが来たらempty()でcompleteしたい
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.empty();
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });
> subscribe
> on next: emit 1
> on next: emit 2

詳しくはここのやり取りにもあるのですが、ここでflatMapしてempty()で返しても、もともとObservable<Throwable>の流れをcompleteするわけではないので、ここでは無視されてしまいます。

実際、Errorが起こったらいきなりcompleteしたい処理を書きたい場合には、実際には以下のように、errorのstreamを最初の一つだけ受け取って、それを無視する、というコードを書く必要があります。難しいですね。

Errorが来たらempty()でcompleteしたい
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.take(1).ignoreElements();
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

> subscribe
> on next: emit 1
> on next: emit 2
> completed

ちゃんとCompleteできた!!

ちなみにtake(0)だとerrorが起こる前にいきなり全体のObservableが終了してしまいます。

余談ですが、エラー時に何もせずcompletedにしたい場合はonErrorResumeNext()を使ったほうがスマートかも知れません。

         .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
                    @Override
                    public Observable<? extends String> call(Throwable throwable) {
                        return Observable.empty();
                    }
                })

3回リトライしたら終了

上記の例をもとに3回リトライする場合は、

3回リトライ
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.take(3);
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });

のように書きます。

> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> completed

ちゃんと3回リトライして終了しました。
この場合は先程のretry(count)のメソッドとの違いは、retry(count)ではリトライ回数が上限に達した後にerrorで終了するが、このケースではcompletedで終了する、という点にあります。

3秒後にリトライ、を3回試して終了

上記の例と、retryWhenの一番最初の例を組み合わせることで、3秒後にリトライ、を3回試して終了、という処理は以下のように書けます。

3秒後にリトライ、を3回試して終了
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        log("subscribe");
                        subscriber.onNext("emit 1");
                        subscriber.onNext("emit 2");
                        subscriber.onError(new Throwable());
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final Observable<? extends Throwable> observable) {
                        return observable.take(3).flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                return Observable.timer(3, TimeUnit.SECONDS);
                            }
                        });
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        log("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("error:" + e);
                    }

                    @Override
                    public void onNext(String s) {
                        log("on next: " + s);
                    }
                });


> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後

> subscribe
> on next: emit 1
> on next: emit 2

// 3秒後

> subscribe
> on next: emit 1
> on next: emit 2
> completed

以上です。retryWhen()を駆使すればもっと細かなリトライ処理を書けそうですね!

あとがき

Advent Calendarの4日目としてエントリさせて頂いたのですが、いきなり期日に間に合わず、穴を開ける形となってしまいました。大変申し訳ありません。

このエントリを書く際に非常に困ったことがありました。初めはこのエントリのサンプルコードはすべてKotlinで書く予定だったんです。しかし、実際にためしてみるとKotlinではretryWhenのコードがコンパイル通らない!

以下のコードがコンパイルがとおらない

Observable
            .just("hoge")
            .retryWhen { it.flatMap{ Observable.timer(3, TimeUnit.SECONDS) } }

いろいろ調べたのですが、結局解決至らず、Kotlinでサンプルコードを書くのは諦めました。 orz

172
148
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
172
148

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?