はじめに
- TypeScript + RxJS で WebSocket 接続をして切断時に自動で再接続をする方法を記載する
RxJS 6 の場合
- retry を使って WebsocketSubject のエラーをハンドリングするだけでできるらしい。
RxJS 5.5 の場合
- Subject を拡張して自動再接続する WebSocket 接続を提供する Subject を作成する
websocket-subject.ts
import { Subject } from 'rxjs/Subject';
import { Observer } from 'rxjs/Observer';
import { Observable } from 'rxjs/Observable';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';
import { share, distinctUntilChanged, takeWhile } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
export class WebsocketSubject<T> extends Subject<T> {
private reconnectionObservable: Observable<number>;
private wsSubjectConfig: WebSocketSubjectConfig;
private socket: WebSocketSubject<any>;
private connectionObserver: Observer<boolean>;
public connectionStatus: Observable<boolean>;
constructor(
private url: string,
private reconnectInterval: number = 5000, /// 接続状態を確認する間隔
private reconnectAttempts: number = 10, /// 再接続試行回数
) {
super();
/// 接続状態を提供する Observable
/// connectionObserver を購読して変化があればイベントを出す
this.connectionStatus = new Observable((observer: Observer<boolean>) => {
this.connectionObserver = observer;
}).pipe(
share(),
distinctUntilChanged()
);
/// RxJS の WebSocketSubject の設定クラス
/// url は接続先
/// closeObserver, openObserver で接続状態を connectionStatus に提供する
/// resultSelector は WebSocket 接続で受け取ったデータの処理関数で WebSocketSubject の同名の関数を上書きする. ここでは何もしないでデータを返すだけとした.
this.wsSubjectConfig = {
url: this.url,
closeObserver: {
next: (_e: CloseEvent) => {
this.socket = null;
this.connectionObserver.next(false);
}
},
openObserver: {
next: (_e: Event) => {
this.connectionObserver.next(true);
}
},
resultSelector: (e: MessageEvent) => {
return e.data;
}
};
/// WebSocket 接続を開始する
this.connect();
/// 接続が切断されたら自動で再接続を行う
this.connectionStatus.subscribe((isConnected) => {
if (!this.reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) {
this.reconnect();
}
});
}
connect(): void {
this.socket = new WebSocketSubject(this.wsSubjectConfig);
this.socket.subscribe(
(m) => {
// WebSocketSubject から受け取ったデータはそのまま Subject.next に渡す.
// WebSocketSubject が受け取った時点で WebSocketSubject.resultSelector によって変換は行われている.
this.next(m);
},
(_error: Event) => {
if (!this.socket) {
/// 接続エラーが発生したら自動で再接続を行う
this.reconnect();
}
});
}
/// WebSocket 再接続
reconnect(): void {
this.reconnectionObservable = interval(this.reconnectInterval)
.pipe(
takeWhile((_v, index) => {
return index < this.reconnectAttempts && !this.socket;
})
);
this.reconnectionObservable.subscribe(
() => {
this.connect();
},
null,
() => {
/// 再接続に失敗した場合は complete とする
this.reconnectionObservable = null;
if (!this.socket) {
this.complete();
this.connectionObserver.complete();
}
});
}
/// メッセージ送信
send(data: any): void {
this.socket.next(data);
}
}
- jasmine のテストコードはこちら
websocket-subject.spec.ts
import {Server} from 'mock-socket';
import {WebsocketSubject} from './websocket-subject';
describe('WebsocketSubject', () => {
const url = 'ws://localhost:8080';
let originalTimeout;
beforeEach(function() {
originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL;
// 30 秒まで延ばす
jasmine.DEFAULT_TIMEOUT_INTERVAL = 30000;
});
it('reconnect test', (done) => {
let mockServer = new Server(url);
// 1秒間隔で再接続する
const ws = new WebsocketSubject(url, 1000);
const s = ws.subscribe((ret) => {
expect(ret).toBe('"message":1');
});
// 一度目のメッセージを送信
mockServer.send('"message":1');
s.unsubscribe();
// サーバーを一度停止して接続を切る
mockServer.close();
mockServer = new Server(url);
// 再接続が終了しているであろう 2 秒後に二度目のメッセージの送信と終了
setTimeout(() => {
ws.subscribe((ret) => {
expect(ret).toBe('"message":2');
});
mockServer.send('"message":2');
mockServer.stop(done);
}, 2000);
});
afterEach(function() {
// テストが終わったら元に戻す
jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout;
});
});