Node.jsのCluster APIをラップするコードが書けたのでメモを残します。
-
easy-cluster-utils
をインストールする。 - マスタースレッド用の
master.ts
を書く。-
isMaster
とClusterMaster
をimportする。
-
- ワーカースレッド用の
worker.ts
を書く。-
isWorker
とsendToMaster
とCommandMessage
をimportする。
-
- エントリーポイントとなる
index.ts
を書く。-
master.ts
で定義した関数を使ってPromiseチェーンで実行処理を書く。
-
npm install
$ npm install easy-cluster-utils rxjs
使い方
master.ts
master.ts
import { isMaster, ClusterMaster } from 'easy-cluster-utils';
let master: ClusterMaster<number>;
if (isMaster) {
master = new ClusterMaster({ debug: true });
}
export function concurrentMap(list: number[]): Promise<number[]> {
return master.executeList(list, 'map');
}
export function concurrentReduce(list: number[]): Promise<number[]> {
return master.executeList(list, 'reduce');
}
worker.ts
worker.ts
import { isWorker, sendToMaster, CommandMessage } from 'easy-cluster-utils';
type Message = CommandMessage<number[]>;
if (isWorker) {
process.on('message', (message: Message) => {
if (message.type === 'map') {
workerMap(message);
} else if (message.type === 'reduce') {
workerReduce(message);
}
});
}
function workerMap(message: Message): void {
const result = message.payload.map(v => v * 2);
sendToMaster({ result });
}
function workerReduce(message: Message): void {
const result = message.payload.reduce((p, v) => p + v);
sendToMaster({ result });
}
index.ts
index.ts
import { range } from 'lodash';
import { terminate } from 'easy-cluster-utils';
import { concurrentMap, concurrentReduce } from './master';
function main(): void {
const list = range(1, 10);
console.log('list:', list);
concurrentMap(list)
.then(list => concurrentReduce(list))
.then(list => list.reduce((p, v) => p + v))
.then(value => {
console.log('result:', value);
terminate();
})
.catch(err => {
throw err;
});
}
// エントリーポイント。
main();
package.json
package.json
にworker
プロパティを書き足して、Workerスレッドが読み込むJSファイルを指定します。これを書かないとエラーになります。
{
...(省略),
"worker": "build/worker.js"
}
この記述によってWorkerスレッドが読み込むJSファイルはこれだけに限定されるので、実際はisMaster
とかisWorker
とか書いて処理を分岐させる必要はないのですが、「書いてある方が読みやすい」という人はいるかもしれないです。
出力
list: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
worker:97028 started.
worker:97029 started.
command-message to worker:97028. { type: 'map', payload: [ 1, 2, 3, 4, 5 ] }
command-message to worker:97029. { type: 'map', payload: [ 6, 7, 8, 9 ] }
result-message from worker:97028. { result: [ 2, 4, 6, 8, 10 ] }
result-message from worker:97029. { result: [ 12, 14, 16, 18 ] }
command-message to worker:97028. { type: 'reduce', payload: [ 2, 4, 6, 8, 10 ] }
command-message to worker:97029. { type: 'reduce', payload: [ 12, 14, 16, 18 ] }
result-message from worker:97028. { result: 30 }
result-message from worker:97029. { result: 60 }
result: 90
worker:97029 died.
worker:97028 died.