Node.js

Node.jsのCluster APIをラップするコードが書けたのでメモ

Node.jsのCluster APIをラップするコードが書けたのでメモを残します。

  • easy-cluster-utilsをインストールする。
  • マスタースレッド用のmaster.tsを書く。
    • isMasterClusterMasterをimportする。
  • ワーカースレッド用のworker.tsを書く。
    • isWorkersendToMasterCommandMessageをimportする。
  • エントリーポイントとなるindex.tsを書く。
    • master.tsで定義した関数を使ってPromiseチェーンで実行処理を書く。

npm install

$ npm install easy-cluster-utils rxjs

使い方

master.ts

master.ts
import { isMaster, ClusterMaster } from 'easy-cluster-utils';

let master: ClusterMaster<number>;

if (isMaster) {
  master = new ClusterMaster({ debug: true });
}

export function concurrentMap(list: number[]): Promise<number[]> {
  return master.executeList(list, 'map');
}

export function concurrentReduce(list: number[]): Promise<number[]> {
  return master.executeList(list, 'reduce');
}

worker.ts

worker.ts
import { isWorker, sendToMaster, CommandMessage } from 'easy-cluster-utils';
type Message = CommandMessage<number[]>;

if (isWorker) {
  process.on('message', (message: Message) => {
    if (message.type === 'map') {
      workerMap(message);
    } else if (message.type === 'reduce') {
      workerReduce(message);
    }
  });
}

function workerMap(message: Message): void {
  const result = message.payload.map(v => v * 2);
  sendToMaster({ result });
}

function workerReduce(message: Message): void {
  const result = message.payload.reduce((p, v) => p + v);
  sendToMaster({ result });
}

index.ts

index.ts
import { range } from 'lodash';
import { terminate } from 'easy-cluster-utils';
import { concurrentMap, concurrentReduce } from './master';

function main(): void {
  const list = range(1, 10);
  console.log('list:', list);

  concurrentMap(list)
    .then(list => concurrentReduce(list))
    .then(list => list.reduce((p, v) => p + v))
    .then(value => {
      console.log('result:', value);
      terminate();
    })
    .catch(err => {
      throw err;
    });
}

// エントリーポイント。
main();

package.json

package.jsonworkerプロパティを書き足して、Workerスレッドが読み込むJSファイルを指定します。これを書かないとエラーになります。

{
  ...(省略),
  "worker": "build/worker.js"
}

この記述によってWorkerスレッドが読み込むJSファイルはこれだけに限定されるので、実際はisMasterとかisWorkerとか書いて処理を分岐させる必要はないのですが、「書いてある方が読みやすい」という人はいるかもしれないです。

出力

list: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
worker:97028 started.
worker:97029 started.
command-message to worker:97028. { type: 'map', payload: [ 1, 2, 3, 4, 5 ] }
command-message to worker:97029. { type: 'map', payload: [ 6, 7, 8, 9 ] }
result-message from worker:97028. { result: [ 2, 4, 6, 8, 10 ] }
result-message from worker:97029. { result: [ 12, 14, 16, 18 ] }
command-message to worker:97028. { type: 'reduce', payload: [ 2, 4, 6, 8, 10 ] }
command-message to worker:97029. { type: 'reduce', payload: [ 12, 14, 16, 18 ] }
result-message from worker:97028. { result: 30 }
result-message from worker:97029. { result: 60 }
result: 90
worker:97029 died.
worker:97028 died.