10
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJavaスタイルのAndroid向けWebSocketClient作ってみた

Last updated at Posted at 2016-10-24

はじめに

RxJavaのリトライがいい感じそうだったので、RxJavaスタイルのAndroid向けWebSocketClientのライブラリを作ってみたお話

リポジトリ

RxWebSocketClient

作成の背景

最近RxJavaを導入しはじめていて、非常に使い勝手が良くて見通しが良くなるパターンもあれば、無理にRxにこだわって意味不明なコードになることもあるので、適材適所が重要だとつくづく思う。
そんな中RxJavaのエラーハンドリングを読んで、RxJavaのエラー処理はなかなか興味深いなと思った。

例えば、ずーっと接続しっぱなしにしたい環境において、途中何らかのエラーが起こった場合、再度接続をし直すコードを書こうと思うと意外と煩雑になる。(ひと昔前ならgoto文でGO!と飛ばすのだろうけど。。。)

先に紹介したエラーハンドリングのページによれば、抜粋ではあるが処理をリトライさせたい場合、概ね以下のようなコードになる。

Observable
    .create(new Observable.OnSubscribe<String>() {
    })
    .retry(new Func2<Integer, Throwable, Boolean>() {
        @Override
        public Boolean call(Integer count, Throwable throwable) {
            // エラー時のリトライ処理(trueならば、createからやり直し、最大5回まで)
            if(count < 5){
                return true;
            } else {
                return false;
            }
        }
     })
     .subscribe(new Subscriber<String>() {
         // メイン処理
      });

重要なのは、retryメソッドに明示的な処理を渡すことで、リトライを実現していることにあると思う。結果的に、本来やるべき処理がsubscribe、リトライ処理はretry(retryWhen)と綺麗にまとまっているし、リトライの繰り返しもretry(retryWhen)にまとめて記述できている。

リトライに渡すFuncの引数が分かりづらいといのは指摘として十分にあると思うのだけど。。。

そんなことを考えていたところ、常時接続、切れたら再接続のような要件の通信プログラムにRxJavaは相性がいいのではないかということで、常時接続が期待されるWebSocketのClientをRxJavaのライブラリしてみようと思ったというのが、ことの背景。

使用例

以下は、WebSocketのEventが流れるストリームをsubscribeで処理して、通信断を伴う通信エラーが発生した場合は、retryWhenにて1秒後に再接続を行う参考実装だ。
retryWhenの中でflatMapしてるあたりお世辞にも綺麗なコードかと言われると怪しい。だけど、RxJavaを使わず遅延ロジックを含んだリトライ処理を実装しようと思うと、タイマーを併用しなければならず、往々にして不具合の温床となりそうなコードなのは安易に想像できるわけで、好き嫌いは分かれるにせよ、このすっきり感は悪くない。(lambdaを使えばもう少しすっきり)

この例では1秒の遅延を入れてリトライするのでretryWhenを使っているけど、すぐにリトライをしたければ、前述のretryを使えばよい

mSocketClient = new RxWebSocketClient();
mSubscription = mSocketClient.connect(Uri.parse("ws://hogehoge"))
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(final Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        // 通信エラーの場合、1秒後に再接続
                        if (throwable instanceof ConnectionException) {
                            Log.d(TAG, "retry with delay");
                            return Observable.timer(1, TimeUnit.SECONDS);
                        }else {
                            // 通信エラー以外はonErrorに回す
                            return Observable.error(throwable);
                        }
                    }
                });
            }
        })
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<RxWebSocketClient.Event>() {
            @Override
            public void call(RxWebSocketClient.Event event) {
                // WebSocketで行いたいメインの処理
                Log.d(TAG, "== onNext ==");
                switch (event.getType()) {
                    case CONNECT:
                        Log.d(TAG, "  CONNECT");
                        mSocketClient.send("test");
                        break;
                    case DISCONNECT:
                        Log.d(TAG, "  DISCONNECT");
                        break;
                    case MESSAGE_BINARY:
                        Log.d(TAG, "  MESSAGE_BINARY : bytes = " + event.getBytes().length);
                        break;
                    case MESSAGE_STRING:
                        Log.d(TAG, "  MESSAGE_STRING = " + event.getString());
                        break;
                }
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                // 通信エラー以外のエラー処理
                Log.d(TAG, "== onError ==");
                throwable.printStackTrace();
            }
        });

ということで、宣伝ちっくになってしまったけど、簡易のWebSocketサーバ同梱のサンプルもあるので、詳しくはRxWebSocketClientへ。

最後に

まだRxJava歴は短いので、Rxの使い方間違ってるよ!とか指摘お待ちしています!

10
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?