22
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJSでWebSocketクライアント

Last updated at Posted at 2014-02-15

RxJSでWebSocketクライアントを扱うコードです。
探してもよさそうなものが見つからなかったので自分で書いてみました。

observablewebsocket.js
/**
 * WebSocketクライアントをRxJSで扱える形に変換して返す関数
 * @param {string} WebSocketの接続URL
 * @returns {Rx.Observable<Rx.Observable<any>>} データ受信用Observableを返すObservable
 */
function webSocketAsObservable(url) {
    var connector = new Rx.AsyncSubject();

    var Socket = WebSocket || MozWebSocket;
    var socket = new Socket(url);

    socket.onerror = function (ee) {
        connector.onError(ee);
    };
    socket.onclose = function (ce) {
        connector.onCompleted();
    };

    // 接続に成功したら、データ受信用Observable(receiver)を作って通知する
    socket.onopen = function (e) {
        var receiver =
            Rx.Observable.createWithDisposable(function (observer) {
                socket.onmessage = function (msg) {
                    observer.onNext(msg);
                };
                socket.onerror = function (ee) {
                    observer.onError(ee);
                };
                socket.onclose = function (ce) {
                    observer.onCompleted();
                };
                return Rx.Disposable.create(function () {
                    socket.close();
                });
            });
        connector.onNext(receiver);
        connector.onCompleted();
    };

    return connector.asObservable();
}

利用するにはrx.jsが必要になるので、下記からダウンロードするかbowerなどでインストールしてください。

使い方

webSocketAsObservableを呼ぶと、接続に成功した時に「データ受信用Observable」を返すObservableが得られます。
Observableが入れ子になっているのでちょっとややこしいですね。

入れ子になっているときはRxJSのselectManyを使います。
すると「データ受信用Observable」を取り出して、受信したデータを後続に流すことができます。

エラー処理などを考えなければこんな感じで使えます。

sample1.js
observableWebSocket("ws://localhost:9090/")
  .selectMany(function (receiver) {
    // 接続に成功したので、データ受信用のObservableを取り出して、後続に受信データを流す
    return receiver;
  })
  .subscribe(function (val) {
    // WebSocketで受け取ったデータを使う処理
    console.log(val);
  });

エラー処理も含めるとこんな感じで書けます。

sample2.js
var sock = observableWebSocket("ws://localhost:9090/");

var disposable = sock.catch(function (ex) {
    // 接続に失敗した時の処理
    return Rx.Observable.empty();
}).selectMany(function (receiver) {
    // 接続に成功した時の処理
    return receiver;
}).finally(function () {
    // 通信中にエラーが発生したり、接続が切れたりした後の処理
}).subscribe(function (val) {
    // WebSocketで受け取ったデータを使う処理
    console.log(val);
});

// 切断したい場合はdisposeを呼ぶ
disposable.dispose();
22
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?