はじめに
RxJavaのリトライがいい感じそうだったので、RxJavaスタイルのAndroid向けWebSocketClientのライブラリを作ってみたお話
リポジトリ
作成の背景
最近RxJavaを導入しはじめていて、非常に使い勝手が良くて見通しが良くなるパターンもあれば、無理にRxにこだわって意味不明なコードになることもあるので、適材適所が重要だとつくづく思う。
そんな中RxJavaのエラーハンドリングを読んで、RxJavaのエラー処理はなかなか興味深いなと思った。
例えば、ずーっと接続しっぱなしにしたい環境において、途中何らかのエラーが起こった場合、再度接続をし直すコードを書こうと思うと意外と煩雑になる。(ひと昔前ならgoto文でGO!と飛ばすのだろうけど。。。)
先に紹介したエラーハンドリングのページによれば、抜粋ではあるが処理をリトライさせたい場合、概ね以下のようなコードになる。
Observable
.create(new Observable.OnSubscribe<String>() {
})
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer count, Throwable throwable) {
// エラー時のリトライ処理(trueならば、createからやり直し、最大5回まで)
if(count < 5){
return true;
} else {
return false;
}
}
})
.subscribe(new Subscriber<String>() {
// メイン処理
});
重要なのは、retryメソッドに明示的な処理を渡すことで、リトライを実現していることにあると思う。結果的に、本来やるべき処理がsubscribe、リトライ処理はretry(retryWhen)と綺麗にまとまっているし、リトライの繰り返しもretry(retryWhen)にまとめて記述できている。
リトライに渡すFuncの引数が分かりづらいといのは指摘として十分にあると思うのだけど。。。
そんなことを考えていたところ、常時接続、切れたら再接続のような要件の通信プログラムにRxJavaは相性がいいのではないかということで、常時接続が期待されるWebSocketのClientをRxJavaのライブラリしてみようと思ったというのが、ことの背景。
使用例
以下は、WebSocketのEventが流れるストリームをsubscribeで処理して、通信断を伴う通信エラーが発生した場合は、retryWhenにて1秒後に再接続を行う参考実装だ。
retryWhenの中でflatMapしてるあたりお世辞にも綺麗なコードかと言われると怪しい。だけど、RxJavaを使わず遅延ロジックを含んだリトライ処理を実装しようと思うと、タイマーを併用しなければならず、往々にして不具合の温床となりそうなコードなのは安易に想像できるわけで、好き嫌いは分かれるにせよ、このすっきり感は悪くない。(lambdaを使えばもう少しすっきり)
この例では1秒の遅延を入れてリトライするのでretryWhenを使っているけど、すぐにリトライをしたければ、前述のretryを使えばよい
mSocketClient = new RxWebSocketClient();
mSubscription = mSocketClient.connect(Uri.parse("ws://hogehoge"))
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
// 通信エラーの場合、1秒後に再接続
if (throwable instanceof ConnectionException) {
Log.d(TAG, "retry with delay");
return Observable.timer(1, TimeUnit.SECONDS);
}else {
// 通信エラー以外はonErrorに回す
return Observable.error(throwable);
}
}
});
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<RxWebSocketClient.Event>() {
@Override
public void call(RxWebSocketClient.Event event) {
// WebSocketで行いたいメインの処理
Log.d(TAG, "== onNext ==");
switch (event.getType()) {
case CONNECT:
Log.d(TAG, " CONNECT");
mSocketClient.send("test");
break;
case DISCONNECT:
Log.d(TAG, " DISCONNECT");
break;
case MESSAGE_BINARY:
Log.d(TAG, " MESSAGE_BINARY : bytes = " + event.getBytes().length);
break;
case MESSAGE_STRING:
Log.d(TAG, " MESSAGE_STRING = " + event.getString());
break;
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// 通信エラー以外のエラー処理
Log.d(TAG, "== onError ==");
throwable.printStackTrace();
}
});
ということで、宣伝ちっくになってしまったけど、簡易のWebSocketサーバ同梱のサンプルもあるので、詳しくはRxWebSocketClientへ。
最後に
まだRxJava歴は短いので、Rxの使い方間違ってるよ!とか指摘お待ちしています!