LoginSignup
6
0

More than 5 years have passed since last update.

Rx で大量のリクエストを捌... きたい

Posted at
1 / 23

Roppongi.js #7@Retty

2018年11月19日


自己紹介

ちきさん

GitHub/Twitter/Qiita: @ovrmrw

市ヶ谷のオプトという会社で働いています

3a2512bb-aa72-4515-af42-1f1721252f39.jpg


今日話すこと

  • わからないことだらけ
  • 大量リクエストを RxJS で倒す
  • 結局よくわからない

事の始まり

  • Cloud Pub/Sub の Topic へのメッセージ送信は一度に複数送れるらしい。
  • なるべく束ねて送ったらサーバーの負荷が低いのでは。
  • RxJS で簡単に作れるのでは!!

わからないことだらけ

  • GCE わからない
  • GCE で Node.js のアプリを起動する方法わからない
  • Nginx わからない
  • 負荷試験わからない
  • Tsung わからない (※負荷試験ツール)
  • 思ったほど負荷かからない
  • チューニングわからない

LT の資料を作ろうと思っていたら GCP に詳しくなった。


※ 詳しくなりたかったわけではない。


その過程で生まれた残骸

内容がしょぼいので Qiita 限定共有で投稿。主に自分用のメモ。


(ここから本編)


要件

  • 大量の計測リクエストを捌きたい。数万 QPS ぐらい。
  • データは最終的に BigQuery でいい感じに使いたい。

※ 実務で実際にあった要件ではありません ※


設計

  • RxJS で捌いてみる。
  • サーバー1台で受けきってみる。
  • Cloud Pub/Sub に漏れなくデータを送れたら勝ち。その後のことは後で考えれば良い。

詳細な設計

  • CPU コアの数だけ cluster を使って fastify のサーバーアプリを起動する。
  • 1 メッセージ毎にTopic に送るのではなく、複数のメッセージを束ねて送る。 (一度に 1000 メッセージまで送れる)
    • リクエストの発生回数を可能な限り減らす。
  • 並列で発生させるリクエストの数を制限する。 (1本でも十分速い)
    • GCP ネットワーク内では 1 リクエストが 数 ms 〜 数十 ms で完了する。

できあがった Rx まわりのコード


import { Subject, interval } from 'rxjs';
import { buffer, mergeMap, tap, filter } from 'rxjs/operators';

const INTERVAL = 500;
const NTH = 900;

class RxClient {
  private dispatcher$ = new Subject();
  private notifier$ = new Subject();
  private counter = 0;

  constructor() {
    this.setDispatcher();
  }

  send(data, attributes) {
    const message = { data, attributes };
    this.dispatcher$.next(message);
  }

  private setDispatcher() {
    this.dispatcher$
      .pipe(
        tap(() => this.notifyEveryNthMessage(NTH)),
        buffer(this.notifier$),
        filter(messages => messages.length > 0),
        mergeMap(messages => this.publish(messages), 1)
      )
      .subscribe({
        next: () => this.notifier$.next(),
        error: err => console.error(err)
      });

    interval(INTERVAL).subscribe(() => this.notifier$.next());
  }

  private publish(messages) {
    // Mocking Cloud Pub/Sub Publish logic
    return new Promise(resolve => {
      const startTime = Date.now();
      setTimeout(() => {
        console.log(`messageIds: ${messages.length}, processed: ${Date.now() - startTime}ms`);
        resolve(true);
      }, Math.random() * 200);
    });
  }

  private notifyEveryNthMessage(n) {
    this.counter++;
    if (this.counter % n === 0) {
      this.notifier$.next();
    }
    if (this.counter > n * 1000 * 1000 * 1000 * 1000 * 1000) {
      this.counter = 0;
    }
  }
}

少し解説


リクエストの入り口

外部から呼ばれるメソッド send()
message を組み立てて dispatcher$ に送る。


  send(data, attributes) {
    const message = { data, attributes };
    this.dispatcher$.next(message);
  }

心臓部 dispatcher$ から始まるストリーム

  • messagebuffer に溜め続ける。
  • ストリームが一周したら buffer に溜まっている message を配列にして次に流す。
  • buffer から流れてきた message の配列が要素数 0 だったら捨てる。
  • message の配列を Topic に送る。 (非同期処理)
  • buffuer に溜まっている message を配列にして次に流す。
  • (繰り返す)

    this.dispatcher$
      .pipe(
        tap(() => this.notifyEveryNthMessage(NTH)),
        buffer(this.notifier$),
        filter(messages => messages.length > 0),
        mergeMap(messages => this.publish(messages), 1)
      )
      .subscribe({
        next: () => this.notifier$.next(),
        error: err => console.error(err)
      });

ポーリングしてストリームを flush

  • 一定時間ごとに buffer から message を吐き出させる。 (いわゆる flush)
  • 例えば GAE は放っておくと勝手にシャットダウンするので「dispatcher$ から message が n 回流れてきた」という条件以外にこのようなポーリングが必要になる。

    interval(INTERVAL).subscribe(() => this.notifier$.next());

message が n 回流れてきたらストリームを flush

  • Topic への送信は一度に 1000 メッセージまでなので、 n (= 900) 回毎に buffer から message を吐き出させる。 (いわゆる flush)

  private notifyEveryNthMessage(n) {
    this.counter++;
    if (this.counter % n === 0) {
      this.notifier$.next();
    }
  }

実行時のイメージ

非同期処理はモック

Image from Gyazo


負荷試験をやってみたけど

  • 4,000 QPS ぐらいまでは確認できたが、それ以上はどこがボトルネックになっているのわからない。
    • (サーバースペックは 4 CPU CORE, 4 GB RAM)
    • (GCE => GCE で Tsung による負荷試験を実施)
  • CPU使用率が100%近くなっているので1台ではここらへんが限界なのか。。。

まとめ

  • Topic への送信はメッセージを束ねてもレスポンスタイムはほとんど変わらない。
    • (230 メッセージでも 30 ms くらい)
  • テスト目的なら GCE のプリエンプティブを使おう。なぜなら安い。
  • 実は GAE でやれば良かったのでは?

Thanks ! :raised_hands_tone1:

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0