52
54

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Node.jsのオートスケールをFRPで管理する

Last updated at Posted at 2015-10-20

東京Node学園祭2015事前まとめその2。

大規模Node.jsを支えるロードバランスとオートスケールの独自実装(FRPもあるよ) - Qiita

でインフラ制御にFRPを適用した話の深掘りです。

前提知識

Reactive Programming in JavaScript

【翻訳】あなたが求めていたリアクティブプログラミング入門

個人的には、FRPは

  • あらゆるイベント/データを時間軸上に連続する一つのリスト(実際はストリーム)と捉えることで、

    • 関数型のパラダイムを適用出来る
    • 強力なオペレータ群が使える・コードの見通しが向上する
  • 「時間軸上に連続したイベント」という概念が本質的にAsynchronous

    • 非同期処理(エラーハンドリング含む)を抽象度高くスマートに書ける

といったメリットがあると理解しています。

今回実現したかったこと

Redis内で刻々と更新されていくインスタンス毎のコネクション数をもとに、オートスケールの発火を管理する

→ 分かりやすいイベント駆動
→ インフラ制御にもFRP使えるのでは?

設計

具体的なオートスケールの発火条件は以下の3点。

  • 負荷(※今回は接続数)に応じてトリガー
  • 事前に設定したクラスタごとの最低稼働台数を下回った際トリガー
  • 指定時刻にトリガー

以下イメージです。

Inmemory_DB_Cache_Pattern_mm_-_Cacoo.png

最初にredisに保管されているインスタンス毎のコネクション数(['ec2-xx-xx-xx', '100', 'ec2-yy-yy-yy', '200', 'ec2-zz-zz-zz', '300'])を取得して、それをObservableとしてストリームに流し、各オペレータで任意の処理を行っています。

※ redisからデータを取得する箇所がfromPromiseとなっているのは、もともとredisに対するI/OをPromiseでラップするようにしていたため。既存の非同期処理に繋げて書ける点は良いですね。

これをRxJSを用いて実装に落としたのが以下です。

実装

※ Autoscaling管理クラスの一部抜粋

class Autoscale {

  //・・・

  // オートスケールの実行判断/HotObservableの生成と制御
  checkAndApply() {
    let published = this._mainStream().publish();
    this._checkByHostNumStream(published);
    this._checkBySpecifiedTimeStream(published);
    this._checkByConnNumAverageStream(published);
    published.connect();
  }

  // Redisから各インスタンスのコネクション情報を取得、整形
  _mainStream() {
    return Rx.Observable.fromPromise(this.redis.zrange([key, 0, -1, 'withscores']))
      .flatMap((ipAndConnNumStr) => {
        return this.oddElemFrom(ipAndConnNumStr)
          .map((connNumStr) => {
            return parseInt(connNumStr); })
          .filter((connNum) => {
            return connNum !== config.get('connNumForRedHost'); })
          .toArray();
      });
  }

  // 実行判断ストリームその1(有効なインスタンス数から判断)
  _checkByHostNumStream(parentStream) {
    parentStream.subscribe((connNumsOfGreenHost) => {
      return Rx.Observable.fromArray(connNumsOfGreenHost)
        .count()
        .filter((countOfGreenHosts) => {
          return countOfGreenHosts <= config.get('minimumGreenHostsNum'); })
        .subscribe(() => {
          this._scaleOut();
        }, (err) => {
          console.log(err);
        });
    });
  }

  // 実行判断ストリームその2(任意の指定時刻から判断)
  _checkBySpecifiedTimeStream(parentStream) => {
    parentStream.subscribe((connNumsOfGreenHost) => {
      return Rx.Observable.fromArray(connNumsOfGreenHost)
        .count()
        .filter((countOfGreenHosts) => {
          return countOfGreenHosts * config.get('serverTypeNum') < config.get('specifiedTimeGreenHostsNum'); })
        .filter(() => {
          return this._isBetween(config.get('forceScaleOutStartAt'), config.get('forceScaleOutEndAt')); })
        .subscribe(() => {
          this._applyCloudformation(config.get('specifiedTimeGreenHostsNum'));
        }, (err) => {
          log.autoscale.error(err);
        });
    });
  }

  // 実行判断ストリームその3(クラスタ内インスタンス群の平均コネクション数から判断)
  _checkByConnNumAverageStream(parentStream) => {
    parentStream.subscribe((connNumsOfGreenHost) => {
      let published = Rx.Observable.fromArray(connNumsOfGreenHost)
        .average()
        .map(ave => { return parseInt(ave); })
        .publish();
      this._notifySlackStream(published);
      this._scaleOutStream(published);
      this._scaleInStream(published);
      published.connect();
    });
  }

  //・・・
}

_mainStream()_checkByHostNumStream(), _checkBySpecifiedTimeStream()は設計が割と素直にコードに落ちているかと思います。
「宣言的に書くことで処理の本質に集中できる」という関数型的なプログラミングのメリットの一つではないでしょうか。

一方でcheckAndApply()_checkByConnNumAverageStream()は、後述の概念を知らないと意図が分かりづらいかもしれません。

その説明をする準備として、まずはRxJSでストリームを構築する際の注意点から見ていきます。

注意点

今回のような設計の際、深く考えずにいると、以下のように3つのストリームそれぞれを別物として構築してしまいがちです。

// 実行判断ストリームその1(有効なインスタンス数から判断)
_checkByHostNumStream(parentStream) {
  return Rx.Observable.fromPromise(this.redis.zrange([key, 0, -1, 'withscores']))
    .flatMap((ipAndConnNumStr) => {
      return this.oddElemFrom(ipAndConnNumStr)
        .map((connNumStr) => {
          // ・・・
}

// 実行判断ストリームその2(任意の指定時刻から判断)
_checkBySpecifiedTimeStream(parentStream) => {
  return Rx.Observable.fromPromise(this.redis.zrange([key, 0, -1, 'withscores']))
    .flatMap((ipAndConnNumStr) => {
      return this.oddElemFrom(ipAndConnNumStr)
        .map((connNumStr) => {
          // ・・・
}

// 実行判断ストリームその3(クラスタ内インスタンス群の平均コネクション数から判断)
_checkByConnNumAverageStream(parentStream) => {
  return Rx.Observable.fromPromise(this.redis.zrange([key, 0, -1, 'withscores']))
    .flatMap((ipAndConnNumStr) => {
      return this.oddElemFrom(ipAndConnNumStr)
        .map((connNumStr) => {
          // ・・・
}

図にするとこんな状態。

Inmemory_DB_Cache_Pattern_mm_-_Cacoo.png

この場合、Redisからのデータ取得を毎回3回分行ってしまうことになります。
コード的にDRYではないし、何より3倍のI/Oコストです。

大本のRedisデータの取得は一度で問題ないはず。
発想を変えて、ストリームを「途中で分岐」させてあげる必要があります。

ということで、RxにおけるHot/Coldの概念が必要です。

HotObservable / ColdObservable

RxにおけるObservableは、HotもしくはColdという特徴を持ちます。

主に「Observerとの関係」という観点に着目するのが良いと思いますが、今回の実装に対しては以下4点を抑えておけばよいかと思います。

  • Cold -> Observable : Observer = 1 : 1

  • Hot -> Observable : Observer = 1 : n

  • 特定のオペレータ(publish等)を使うことでCold→Hotに変換可能

  • 特定のオペレータ(connect等)を使うことでHotObservableの値がObserver達に一斉通知

※ 詳細については以下の記事がよくまとまっていますのでご一読ください。

RxのHotとColdについて - Qiita

Rx入門 (14) - Cold to Hot変換 - xin9le.net

最初のコードに戻る

以上のことが分かると、checkAndApply()で行っているのは

  • mainStreamをHotObservableに変換(publish)
  • 各observer(checkByXXX)に分岐した後にconnect

ということが分かります。

  // オートスケールの実行判断/HotObservableの生成と制御
  checkAndApply() {
    let published = this._mainStream().publish();
    this._checkByHostNumStream(published);
    this._checkBySpecifiedTimeStream(published);
    this._checkByConnNumAverageStream(published);
    published.connect();
  }

このようにしてmainStreamを一つに集約することが出来ました。

またあえて触れていませんでしたが、mainStreamだけでなく_checkByConnNumAverageStream()も複数に分岐していました。

Inmemory_DB_Cache_Pattern_mm_-_Cacoo.png

これも同様で、

  • 平均コネクション数(ここでは200)という値に対して行いたい制御が複数ある

ためにこのような形になっています。

以下コードを見て頂ければ分かるかと思いますが、平均コネクション数に応じてSlackに通知したり、また当然オートスケールの発火を制御したりしています。

  // 実行判断ストリームその3(クラスタ内インスタンス群の平均コネクション数から判断)
  _checkByConnNumAverageStream(parentStream) => {
    parentStream.subscribe((connNumsOfGreenHost) => {
      let published = Rx.Observable.fromArray(connNumsOfGreenHost)
        .average()
        .map(ave => { return parseInt(ave); })
        .publish();
      this._notifySlackStream(published);
      this._scaleOutStream(published);
      this._scaleInStream(published);
      published.connect();
    });
  }

以上で今回登場したコードの全貌を把握することが出来たと思います。

手続き型との比較

最後に、FRPで享受出来たメリットを把握するために、同様の処理を手続き的に書いた場合のコードを示します。

※ 少し前に書いたコードなのでES5ベースですがご容赦ください。

Autoscale.prototype.checkAndApply = function() {
  var _this = this;
  co(function *() {
    if (_this.serverType === 'lobby') var key = config.get('redisKeyForLobbyLoadbalancing');
    else var key = config.get('redisKeyForGameLoadbalancing');
    var ipAndConnNumsStr = yield _this.redis.zrange([key, 0, -1, 'withscores']);
    var connNumsStr = _this._getOddAttributes(ipAndConnNumsStr);
    var connNums = _this._parseInt4Arr(connNumsStr);
    var connNumsOfGreenHosts = _this._greenHostsFrom(connNums);
    _this.greenHostsNum = connNumsOfGreenHosts.length * config.get('serverTypeNum');
    if (_this._checkByMinimumHostNum(connNumsOfGreenHosts)) return;
    if (_this._checkBySpecifiedTime(connNumsOfGreenHosts)) return;
    _this._checkByConnNumAverage(connNumsOfGreenHosts);
  });
};

// private

Autoscale.prototype._checkByMinimumHostNum = function(connNumsOfGreenHosts) {
  if (connNumsOfGreenHosts.length > config.get('minimumGreenHostsNum')) return false;
  this._scaleOut();
  return true;
};

Autoscale.prototype._checkBySpecifiedTime = function(connNumsOfGreenHosts) {
  if (connNumsOfGreenHosts.length >= config.get('specifiedTimeGreenHostsNum')) return false;
  if (!this._isBetween(config.get('forceScaleOutStartAt'), config.get('forceScaleOutEndAt'))) return false;
  this._applyCloudformation(config.get('specifiedTimeGreenHostsNum'));
  return true;
};

Autoscale.prototype._checkByConnNumAverage = function(connNumsOfGreenHosts) {
  var connNumSum = this._sumFrom(connNumsOfGreenHosts);
  var connNumAverage = parseInt(connNumSum / connNumsOfGreenHosts.length);
  if (connNumAverage >= (CONN_NUM_BY_LB + 1)) this._notifyRoomNum(connNumsOfGreenHosts);
  if (connNumAverage >= config.get('scaleOutThreshold')) this._scaleOut();
  if (this._isBetween(config.get('scaleInAvailableStartAt'), config.get('scaleInAvailableEndAt'))
    && connNumAverage <= config.get('scaleInThreshold')) this._scaleIn();
};

// まだまだ続く・・

FRPverと比較してみると

  • プリミティブな制御構造(今回は主に条件分岐)が随所に登場し、全体の流れを俯瞰しにくい

という点が目立ちます。

逆に言えば、FRPの導入により

  • リストとして扱うことでオペレータ(filterやmapなど)を適用でき、制御構造が抽象化/隠蔽される
  • 非同期処理もストリームの一部として違和感なく扱える

結果「コード全体の見通し向上」、つまり「本質的な処理に集中できる」というメリットが享受出来たようです。

以上のように、クラスタの状態管理=インフラ制御の世界でもFRPを有効活用することが出来ました。

Tools

今回はRxJSを利用しました。

Rxは大抵のプラットフォームをカバーしていますので、まずは好みの言語の実装を触ってみると良いと思います。

ReactiveX - Languages

(JS縛りならBacon.jsでも。)

まとめ

  • FPRはコードの見通し↑でなかなか良い
  • インフラの制御はだいたいイベント駆動なので相性○
  • まずはRx眺めてみると良いかも

続きは学園祭で!

追記

先日同内容で若手エンジニア勉強会#10でLTさせて頂きました。

Node.jsのオートスケールをFRPで管理する
http://www.slideshare.net/kidach1/nodejsfrp-54029505

最も盛り上がったテーマが若手とは何か(LT無関係)という、楽しい会でした。
ありがとうございました!

52
54
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
52
54

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?