LoginSignup
5
4

More than 5 years have passed since last update.

cluster モジュールと WebSocket サーバの graceful な reload

Last updated at Posted at 2017-01-29

やりたいこと

NodeJS の cluster モジュール は、親プロセス (Master) から 子プロセス (Worker) を fork し, コネクションを Worker に分散させることができます. 子プロセスごとに別々の port を listen しなくても、 ある単一の port へのコネクションが Worker へと自動で割り振られます.

このあたりの仕組みに関しては、この記事がわかりやすいかと思います.

Node.jsのClusterをセットアップして、処理を並列化・高速化する

cluster を使う大きなモチベーションとしては、 CPU のリソースをうまく使うことだと思いますが、
他には、ソースコードの変更があった場合に、 Worker プロセスを入れ替えることによってダウンタイムなしに反映が行えることがあると思います. 既存 Worker の処理中のリクエストを中断することなく、新しいリクエストは新しく上がってきた Worker に割り振るということができます.

これは、通常の HTTP のサーバでも有効ですし、比較的長い接続を維持する WebSocket サーバでうまく使えたら嬉しいと思います.

では、 cluster モジュールの使い方について、みていこうと思います.

cluster.fork で Worker をつくってみる

cluster.fork で WebSocket サーバを Worker として生成する基本的なコードを作ってみます.
また、そのサーバに接続を行なってみます.

WebSocket のサーバとクライアントには、 ws を使用しました.

サーバ側のコード

server.1.js
let cluster = require('cluster');

const WORKER_NUM = 2;

if (cluster.isMaster) {
  runMaster();
} else {
  runWorker();
}

function runMaster() {
  for (let i=0; i<WORKER_NUM; ++i) {
    cluster.fork();
  }
  console.log('master start (PID: %d)', process.pid);
}

function runWorker() {
  let WebSocketServer = require('ws').Server
  let wss = new WebSocketServer({ port: 8080 });

  console.log('worker start (workerId: %d, PID: %d)', cluster.worker.id, process.pid);

  wss.on('connection', (socket) => {
    let connectionId = getConnectionId();

    console.log('socket open (workerId: %d, connId: %d)', cluster.worker.id, connectionId);

    socket.on('message', (message) => {
      console.log('server received (workerId: %d, connId: %d): %s', cluster.worker.id, connectionId, message);
    });

    socket.on('close', () => {
      console.log('socket close (workerId: %d, connId: %d)', cluster.worker.id, connectionId);
    });
  });


  let _connectionId = 0;
  function getConnectionId() {
    return ++ _connectionId;
  }
}

特に指定しなければ、 Worker プロセスは、 Master プロセスと同じ js ファイルがエントリポイントになります. なので、 cluster.isMaster で 今実行中のプロセスが Master なのか、 Worker なのかを調べます.

Master は cluster.fork で Worker を生成します.

Worker は、 普通にサーバを立ち上げて listen すればよいだけです. 上の例では、 8080 ポートで WebSocket の接続を待ち受けます. Worker は 2つ生成され、ラウンドロビンでコネクションが分散されます.

このサーバに接続を行うクライアント側のコードです.

client.1.js
let WebSocket = require('ws');
let ws = new WebSocket('ws://localhost:8080/');

let intervalId;
ws.on('open', () => {
  intervalId = setInterval(() => {
    let message = 'hello';
    ws.send(message);
    console.log('client sent: %s', message);
  }, 1000);
});

ws.on('close', () => {
  clearInterval(intervalId);
});

クライアント側は、WebSocket の接続を1つ開き、1秒ごとにメッセージを送り続けるコードです.

では、これを実行してみます.

サーバ側の実行コマンドは node server.1.js 、クライアント側の実行コマンドは node client.1.js です. ターミナルを複数立ち上げて、例えば、クライアントを 3つ実行してみましょう. すると サーバ側の出力は次のようになりました.

$ node server.1.js 
master start (PID: 28600)
worker start (workerId: 1, PID: 28601)
worker start (workerId: 2, PID: 28602)
socket open (workerId: 1, connId: 1)
server received (workerId: 1, connId: 1): hello
server received (workerId: 1, connId: 1): hello
...
socket open (workerId: 2, connId: 1)
server received (workerId: 1, connId: 1): hello
server received (workerId: 2, connId: 1): hello
server received (workerId: 1, connId: 1): hello
server received (workerId: 2, connId: 1): hello
...
socket open (workerId: 1, connId: 2)
server received (workerId: 1, connId: 1): hello
server received (workerId: 2, connId: 1): hello
server received (workerId: 1, connId: 2): hello
server received (workerId: 1, connId: 1): hello
server received (workerId: 2, connId: 1): hello
server received (workerId: 1, connId: 2): hello
...

まず、 Master が起動後、 Workerが2つ立ち上がっているのがわかると思います.
その後、クライアントから接続を行うと、 Worker 側で、Websocket のコネクションを確立できており、さらに接続を増やしていくと、Worker 間でコネクションが分散できているのがわかります.

worker.disconnect を使って Worker を入れ替える

Worker オブジェクトには、 disconnect というメソッドがあります.
これは何かというと、その Worker 配下の net.Serverclose が呼ばれ、新規のコネクションを受け付けなくなります. ポイントは、既存のコネクションが強制的にすぐ閉じられるということはなく、正常に閉じられるのが待たれるところです. 全てのコネクションが閉じられると、 Worker はプロセスを終了します.

ということで、 forkで新しい Worker を生成しつつ、 disconnect で古い Worker がコネクションを受け付けないようにすれば、ダウンタイムなしに Worker を入れ替えることができます.

サーバ側のコードに少し処理を加えます.

server.2.js
let cluster = require('cluster');

const WORKER_NUM = 2;

if (cluster.isMaster) {
  runMaster();
} else {
  runWorker();
}

function runMaster() {
  for (let i=0; i<WORKER_NUM; ++i) {
    cluster.fork();
  }

  cluster.on('disconnect', (worker) => {
    console.log('worker disconnected (workerId: %d)', worker.id);
  });

  cluster.on('exit', (worker) => {
    console.log('worker died (workerId: %d)', worker.id);
  });

  process.on('SIGUSR2', () => {
    for (let id in cluster.workers) {
      let worker = cluster.workers[id];
      worker.disconnect();
    }
    for (let i=0; i<WORKER_NUM; ++i) {
      cluster.fork();
    }
  });

  console.log('master start (PID: %d)', process.pid);
}

function runWorker() {
  let WebSocketServer = require('ws').Server
  let wss = new WebSocketServer({ port: 8080 });

  console.log('worker start (workerId: %d, PID: %d)', cluster.worker.id, process.pid);

  wss.on('connection', (socket) => {
    let connectionId = getConnectionId();

    console.log('socket open (workerId: %d, connId: %d)', cluster.worker.id, connectionId);

    socket.on('message', (message) => {
      console.log('server received (workerId: %d, connId: %d): %s', cluster.worker.id, connectionId, message);
    });

    socket.on('close', () => {
      console.log('socket close (workerId: %d, connId: %d)', cluster.worker.id, connectionId);
    });
  });


  let _connectionId = 0;
  function getConnectionId() {
    return ++ _connectionId;
  }
}

USR2 シグナルを受け取った際に、 Worker の 入れ替えをするようにしてみました. (別に USR2 シグナルでなくても構いません)
シグナルハンドラ内で、 全ての Worker に対して、 disconnect を呼び出し、 その後、新 Worker を fork しています.
(ダウンタイムがないように厳密にやるならば、 新 Worker で server listen がされたのを確認した後で 古 Worker を disconnect するべきです.)

では、動かしてみます.

まず、 サーバ側を動かした後、 クライアントを1つ接続してみます.

$ node server.2.js
master start (PID: 30167)
worker start (workerId: 2, PID: 30169)
worker start (workerId: 1, PID: 30168)
socket open (workerId: 2, connId: 1)
server received (workerId: 2, connId: 1): hello
server received (workerId: 2, connId: 1): hello
...

その後、 サーバー側の Master プロセスに対して USR2 シグナル を送ってみます. 次のコマンドを実行します.

kill -USR2 30167

すると、 Server 側の出力は次のようになりました.

...
worker died (workerId: 1)
worker disconnected (workerId: 1)
server received (workerId: 2, connId: 1): hello
worker start (workerId: 3, PID: 30211)
worker start (workerId: 4, PID: 30212)
server received (workerId: 2, connId: 1): hello
server received (workerId: 2, connId: 1): hello
...

とりあえず、 workerId: 1 の Worker は死んだようです. これは、クライアントからのコネクションを受けていない方のようです. workerId: 2 の Worker は引き続き クライアントとのコネクションを続けています.
そして、 新たに2つの Worker が生成されました.

ここで、 クライアントのコネクションを終了します. (クライアントを動かしてるターミナルでCtrl-Cとかで大丈夫です.)

すると、サーバ側の出力はこうなりました.

...
server received (workerId: 2, connId: 1): hello
socket close (workerId: 2, connId: 1)
worker disconnected (workerId: 2)
worker died (workerId: 2)

workerId: 2 のコネクションが閉じられ、プロセスも終了したようです.

以上が、シンプルな Worker 入れ替えの仕組みです.

graceful reload とデプロイ

少しまとまっていないことをつらつらと書きます. 知見不足もあるとおもいます.

上で説明した仕組みをもう少し洗練させていけば、継続的なデプロイに組み込んでいけそうです. 実際に自分で書き上げてもよいし、pm2 のような高機能なものを使うのも良いと思います.

私自身は、最近のプロジェクトで、同様の方法でデプロイを行う仕組みを作りましたが、
一応、その構成を説明すると、AWS上に構築しており、前段に ALB 、後ろのEC2インスタンス上に NodeJS のサーバが動いており、デプロイは同じインスタンス上に新しいコードを配布して reload するというようなものです. 小規模なものです.

単に、 WebSocket アプリケーションといっても、要求や規模感というのは異なっていて、どのようなアーキテクチャやデプロイ戦略をとるかというのは、それぞれ、という感じがしています.

負荷分散をどうするかという問題をみても、ロードバランサを置く方法と、サーバアプリケーション側でエンドポイントの一覧を持っていてクライアントにつなぐべきエンドポイントを返す方法があると思います。

デプロイも、稼働中のインスタンスにコードを配布して reload する方法もあれば、新しいコードをデプロイしたインスタンスをサービスインさせて古いインスタンスをサービスアウトさせる方法もあると思います. 時代的には後者な気がします. 流行っている Dockerコンテナをデプロイするような形であれば必然的にそうなりますしね. その場合は、コネクションがなくなるのをまってシャットダウンするにはどうしたらのかとか考える必要があります.

アプリケーションによって、コネクションが切れても再接続すれば問題ないものもあれば、コネクションが切れたら体験を致命的に悪くするものもあります. もちろん、再接続をちゃんと考慮してうまくやる、というのは重要なんですがなかなか全てを完璧にやるのは難しい場合も多いと思います.

WebSocket アプリケーションを設計する際は、どうやってデプロイするのか、という点も初めから考慮しておかないと後から苦労するかもしれません.

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4