RxJSでWebSocketクライアント

  • 20
    Like
  • 0
    Comment
More than 1 year has passed since last update.

RxJSでWebSocketクライアントを扱うコードです。
探してもよさそうなものが見つからなかったので自分で書いてみました。

observablewebsocket.js
/**
 * WebSocketクライアントをRxJSで扱える形に変換して返す関数
 * @param {string} WebSocketの接続URL
 * @returns {Rx.Observable<Rx.Observable<any>>} データ受信用Observableを返すObservable
 */
function webSocketAsObservable(url) {
    var connector = new Rx.AsyncSubject();

    var Socket = WebSocket || MozWebSocket;
    var socket = new Socket(url);

    socket.onerror = function (ee) {
        connector.onError(ee);
    };
    socket.onclose = function (ce) {
        connector.onCompleted();
    };

    // 接続に成功したら、データ受信用Observable(receiver)を作って通知する
    socket.onopen = function (e) {
        var receiver =
            Rx.Observable.createWithDisposable(function (observer) {
                socket.onmessage = function (msg) {
                    observer.onNext(msg);
                };
                socket.onerror = function (ee) {
                    observer.onError(ee);
                };
                socket.onclose = function (ce) {
                    observer.onCompleted();
                };
                return Rx.Disposable.create(function () {
                    socket.close();
                });
            });
        connector.onNext(receiver);
        connector.onCompleted();
    };

    return connector.asObservable();
}

利用するにはrx.jsが必要になるので、下記からダウンロードするかbowerなどでインストールしてください。

使い方

webSocketAsObservableを呼ぶと、接続に成功した時に「データ受信用Observable」を返すObservableが得られます。
Observableが入れ子になっているのでちょっとややこしいですね。

入れ子になっているときはRxJSのselectManyを使います。
すると「データ受信用Observable」を取り出して、受信したデータを後続に流すことができます。

エラー処理などを考えなければこんな感じで使えます。

sample1.js
observableWebSocket("ws://localhost:9090/")
  .selectMany(function (receiver) {
    // 接続に成功したので、データ受信用のObservableを取り出して、後続に受信データを流す
    return receiver;
  })
  .subscribe(function (val) {
    // WebSocketで受け取ったデータを使う処理
    console.log(val);
  });

エラー処理も含めるとこんな感じで書けます。

sample2.js
var sock = observableWebSocket("ws://localhost:9090/");

var disposable = sock.catch(function (ex) {
    // 接続に失敗した時の処理
    return Rx.Observable.empty();
}).selectMany(function (receiver) {
    // 接続に成功した時の処理
    return receiver;
}).finally(function () {
    // 通信中にエラーが発生したり、接続が切れたりした後の処理
}).subscribe(function (val) {
    // WebSocketで受け取ったデータを使う処理
    console.log(val);
});

// 切断したい場合はdisposeを呼ぶ
disposable.dispose();