RxJavaでは非同期処理などで非常に便利なライブラリですが、そのエラーハンドリングもとてもスマートに行うことができます。ここではRxJavaのエラーハンドリングの詳細について説明したいと思います。
subscribeでのエラー処理
通常はsubscribe()
でエラーを受け取ってハンドリングすることとなると思います。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
// エラー時の処理をここで受け取る
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable
注意が必要なのは下記のようにonNext()のハンドリングだけを行うようにしていると、onError()発火時にクラッシュします。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
Crash!!
by rx.exceptions.OnErrorNotImplementedException
のでエラーが発生する可能性がある場合はきちんとハンドリングしましょう。
Errorをキャッチする
onErrorReturn
Observableチェインのなかで発生したErrorをキャッチして、大体のObjectに変換することでsubscriberにErrorが渡されるのを防ぐことができます。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "return";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> on next: return
> completed
エラー時に無事return文字列に変換されました。
onErrorResumeNext
Observableチェインのなかで発生したErrorをキャッチして、その中で再度Observableを呼び出すことで、エラー時に代替のStreamを返すことができます。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Throwable throwable) {
return Observable.from(new String[]{ "resume 1", "resume 2"});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> on next: resume 1
> on next: resume 2
> completed
エラー時にresume 1
,resume 2
のストリームに変換されました。
エラー時にリトライする
retry
retry()
メソッドはErrorが起こった時に、自動的にsubscribeしなおしてくれる、非常に便利なメソッドです。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retry()
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
...成功するまでリトライし続ける。。。
引数なしのretry()
メソッドは、それまでの処理を成功するまでリトライし続けます。
これでは復旧できない問題が起こった時に無限ループになりかねないので、リトライの回数を制限するメソッドもあります。
// count: リトライ回数
public final Observable<T> retry(long count);
また、Errorの状況をみてリトライするかどうかも判断できます。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer count, Throwable throwable) {
// 2回までは無条件にリトライする
if(count < 3){
return true;
}
// それ以降はIllegalStateExceptionの場合だけリトライする
return throwable instanceof IllegalStateException;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable
retryWhen
retryWhen()
メソッドはちょっと複雑でわかりにくいですが、より細かくリトライ処理を制御するためのメソッドです。retryWhen()
のコールバックは初回からリトライ後に起こるErrorをObservableとして受け取り、それを変換して何らかのObservableとして返します。そしてこのコールバックで返したObservableで何らかの値がonNext()
されると、そのタイミングでリトライされます。また、このObservableでonComplete()
もしくはonError()
を呼び出されると、もとのObservableがonComplete()
もしくはonError()
終了します。
ちょっとわかりにくいので簡単に例を示します。
3秒後にリトライ
3秒後にリトライするには以下のように記述します。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.timer(3, TimeUnit.SECONDS);
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2
// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2
// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2
// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2
....繰り返し
Errorのまま終了
Errorのまま終了するには
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.error(throwable);
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> error:java.lang.Throwable
このコードはちょうどretryWhenメソッドを適用せずに、Errorを受け取ったのと同じ動作となります。
ErrorをハンドリングせずにComplete
Errorをハンドリングせずにその場でCompleteするには一見以下のコードで良さそうですが、これでは実は正しく動作しません。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.empty();
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
詳しくはここのやり取りにもあるのですが、ここでflatMapしてempty()で返しても、もともとObservable<Throwable>
の流れをcompleteするわけではないので、ここでは無視されてしまいます。
実際、Errorが起こったらいきなりcomplete
したい処理を書きたい場合には、実際には以下のように、errorのstreamを最初の一つだけ受け取って、それを無視する、というコードを書く必要があります。難しいですね。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.take(1).ignoreElements();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
> completed
ちゃんとCompleteできた!!
ちなみにtake(0)
だとerrorが起こる前にいきなり全体のObservableが終了してしまいます。
余談ですが、エラー時に何もせずcompletedにしたい場合はonErrorResumeNext()
を使ったほうがスマートかも知れません。
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Throwable throwable) {
return Observable.empty();
}
})
3回リトライしたら終了
上記の例をもとに3回リトライする場合は、
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.take(3);
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
のように書きます。
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> subscribe
> on next: emit 1
> on next: emit 2
> completed
ちゃんと3回リトライして終了しました。
この場合は先程のretry(count)
のメソッドとの違いは、retry(count)
ではリトライ回数が上限に達した後にerrorで終了するが、このケースではcompletedで終了する、という点にあります。
3秒後にリトライ、を3回試して終了
上記の例と、retryWhenの一番最初の例を組み合わせることで、3秒後にリトライ、を3回試して終了、という処理は以下のように書けます。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
log("subscribe");
subscriber.onNext("emit 1");
subscriber.onNext("emit 2");
subscriber.onError(new Throwable());
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.take(3).flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.timer(3, TimeUnit.SECONDS);
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("completed");
}
@Override
public void onError(Throwable e) {
log("error:" + e);
}
@Override
public void onNext(String s) {
log("on next: " + s);
}
});
> subscribe
> on next: emit 1
> on next: emit 2
// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2
// 3秒後
> subscribe
> on next: emit 1
> on next: emit 2
> completed
以上です。retryWhen()
を駆使すればもっと細かなリトライ処理を書けそうですね!
あとがき
Advent Calendarの4日目としてエントリさせて頂いたのですが、いきなり期日に間に合わず、穴を開ける形となってしまいました。大変申し訳ありません。
このエントリを書く際に非常に困ったことがありました。初めはこのエントリのサンプルコードはすべてKotlinで書く予定だったんです。しかし、実際にためしてみるとKotlinではretryWhenのコードがコンパイル通らない!
以下のコードがコンパイルがとおらない
Observable
.just("hoge")
.retryWhen { it.flatMap{ Observable.timer(3, TimeUnit.SECONDS) } }
いろいろ調べたのですが、結局解決至らず、Kotlinでサンプルコードを書くのは諦めました。 orz