LoginSignup
12
14

More than 5 years have passed since last update.

TypeScript + RxJS で WebSocket の自動再接続をする

Posted at

はじめに

  • TypeScript + RxJS で WebSocket 接続をして切断時に自動で再接続をする方法を記載する

RxJS 6 の場合

RxJS 5.5 の場合

websocket-subject.ts
import { Subject } from 'rxjs/Subject';
import { Observer } from 'rxjs/Observer';
import { Observable } from 'rxjs/Observable';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/observable/dom/WebSocketSubject';
import { share, distinctUntilChanged, takeWhile } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';

export class WebsocketSubject<T> extends Subject<T> {
  private reconnectionObservable: Observable<number>;
  private wsSubjectConfig: WebSocketSubjectConfig;
  private socket: WebSocketSubject<any>;
  private connectionObserver: Observer<boolean>;
  public connectionStatus: Observable<boolean>;

  constructor(
    private url: string,
    private reconnectInterval: number = 5000,  /// 接続状態を確認する間隔
    private reconnectAttempts: number = 10,  /// 再接続試行回数
  ) {
    super();

    /// 接続状態を提供する Observable
    /// connectionObserver を購読して変化があればイベントを出す
    this.connectionStatus = new Observable((observer: Observer<boolean>) => {
      this.connectionObserver = observer;
    }).pipe(
      share(),
      distinctUntilChanged()
    );

    /// RxJS の WebSocketSubject の設定クラス
    /// url は接続先
    /// closeObserver, openObserver で接続状態を connectionStatus に提供する
    /// resultSelector は WebSocket 接続で受け取ったデータの処理関数で WebSocketSubject の同名の関数を上書きする. ここでは何もしないでデータを返すだけとした.
    this.wsSubjectConfig = {
      url: this.url,
      closeObserver: {
        next: (_e: CloseEvent) => {
          this.socket = null;
          this.connectionObserver.next(false);
        }
      },
      openObserver: {
        next: (_e: Event) => {
          this.connectionObserver.next(true);
        }
      },
      resultSelector: (e: MessageEvent) => {
        return e.data;
      }
    };

    /// WebSocket 接続を開始する
    this.connect();

    /// 接続が切断されたら自動で再接続を行う
    this.connectionStatus.subscribe((isConnected) => {
      if (!this.reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) {
        this.reconnect();
      }
    });
  }

  connect(): void {
    this.socket = new WebSocketSubject(this.wsSubjectConfig);
    this.socket.subscribe(
      (m) => {
        // WebSocketSubject から受け取ったデータはそのまま Subject.next に渡す.
        // WebSocketSubject が受け取った時点で WebSocketSubject.resultSelector によって変換は行われている.
        this.next(m);
      },
      (_error: Event) => {
        if (!this.socket) {
          /// 接続エラーが発生したら自動で再接続を行う
          this.reconnect();
        }
      });
  }

  /// WebSocket 再接続
  reconnect(): void {
    this.reconnectionObservable = interval(this.reconnectInterval)
      .pipe(
        takeWhile((_v, index) => {
          return index < this.reconnectAttempts && !this.socket;
        })
      );
    this.reconnectionObservable.subscribe(
      () => {
        this.connect();
      },
      null,
      () => {
        /// 再接続に失敗した場合は complete とする
        this.reconnectionObservable = null;
        if (!this.socket) {
          this.complete();
          this.connectionObserver.complete();
        }
      });
  }

  /// メッセージ送信
  send(data: any): void {
    this.socket.next(data);
  }
}
  • jasmine のテストコードはこちら
websocket-subject.spec.ts
import {Server} from 'mock-socket';
import {WebsocketSubject} from './websocket-subject';

describe('WebsocketSubject', () => {
  const url = 'ws://localhost:8080';

  let originalTimeout;
  beforeEach(function() {
    originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL;
    // 30 秒まで延ばす
    jasmine.DEFAULT_TIMEOUT_INTERVAL = 30000;
  });

  it('reconnect test', (done) => {
    let mockServer = new Server(url);

    // 1秒間隔で再接続する
    const ws = new WebsocketSubject(url, 1000);
    const s = ws.subscribe((ret) => {
      expect(ret).toBe('"message":1');
    });

    // 一度目のメッセージを送信
    mockServer.send('"message":1');
    s.unsubscribe();

    // サーバーを一度停止して接続を切る
    mockServer.close();
    mockServer = new Server(url);

    // 再接続が終了しているであろう 2 秒後に二度目のメッセージの送信と終了
    setTimeout(() => {
      ws.subscribe((ret) => {
        expect(ret).toBe('"message":2');
      });
      mockServer.send('"message":2');
      mockServer.stop(done);
    }, 2000);
  });

  afterEach(function() {
    // テストが終わったら元に戻す
    jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout;
  });
});
12
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
14