Node.jsでworker_threadsを使うとき、非同期処理の使い方にコツがあったのでメモ。
並列処理の基本
Node.jsでworker_threadsを使うと並列処理が可能です。
以下の二つの記事がとても分かりやすいです。
- (参考1) Node.js: CPU負荷で3秒かかっていた処理を「Worker Threads」で1秒に時短する
- (参考2) Node.js Worker Threads: スレッド間でデータを送受信する方法
上記を一口で言うと、以下のようになります:
main.js
// ワーカーを作成する
const {Worker} = require('worker_threads');
const worker = new Worker('./worker.js');
// ワーカーからの結果を受ける
worker.on('message', msg => {
console.log(msg);
});
// ワーカーに仕事を依頼
worker.postMessage({action:'mul', args:[2, 3]});
worker.js
const {parentPort} = require('worker_threads');
// 親からのメッセージを受ける
parentPort.on('message', async msg => {
console.log('worker received: %o', msg);
const {action, args} = msg;
// 依頼に応じて処理を分ける
if (action === 'add') {
// 処理を行う
const v = args[0] + args[1];
// 親に処理結果を送信
parentPort.postMessage({'result': v});
}
else if (action === 'mul') {
// 処理を行う
const v = args[0] * args[1];
// 親にメッセージを送信
parentPort.postMessage({'result': v});
}
else {
throw new Error('Unknown action');
}
process.exit(); // ワーカーを終了
});
ワーカースレッドと非同期処理を組み合わせる
例えば、指定ミリ秒だけ休むだけの以下のようなワーカーを定義。
worker_wait.js
const {parentPort} = require('worker_threads');
// 親からのメッセージを受ける
parentPort.on('message', async msg => {
console.log('worker received: %o', msg);
const {action, args} = msg;
if (action === 'sleep') {
await sleep(args[0]); // 非同期で仕事を実行
parentPort.postMessage('ok'); // 仕事完了をメインスレッドに通知
}
process.exit();
});
// 非同期で行う仕事をpromiseで定義
function sleep(msec) {
return new Promise((resolve, reject) => {
let remain = msec;
let timerId = setInterval(() => {
console.log(remain);
remain -= 300;
if (remain < 0) {
clearTimeout(timerId);
resolve();
}
},300);
});
}
上記ワーカーを実行するコードは以下。
main_wait.js
// ワーカーを開始
const {Worker} = require('worker_threads');
const worker = new Worker('./worker_wait.js');
// ワーカーからの結果を受ける
worker.on('message', msg => {
console.log(msg);
});
// ワーカーに仕事を依頼
worker.postMessage({action:'sleep', args:[3000]});
実行すると、以下のような結果が表示されます。
% node main_wait.js
worker received: { action: 'sleep', args: [ 3000, [length]: 1 ] }
3000
2700
...省略...
600
300
0
ok
worker_threadsの処理を途中で中止する
ワーカーの動作を途中で中止したい場合があります。その場合、適当なstopなどのアクションを追加すればできます。動作を停止するかを判断する変数flagStopを導入します。
worker_wait2.js
const {parentPort} = require('worker_threads');
let flagStop = false;
// 親からのメッセージを受ける
parentPort.on('message', async msg => {
console.log('worker received: %o', msg);
const {action, args} = msg;
if (action === 'sleep') {
await sleep(args[0]); // 処理1を実行
await sleep(args[0]); // 処理2を実行
await sleep(args[0]); // 処理3を実行
parentPort.postMessage('ok'); // 親に通知
process.exit();
}
else if (action === 'stop') {
flagStop = true;
console.log('STOP!!!');
}
});
function sleep(msec) {
return new Promise((resolve, reject) => {
let remain = msec;
if (flagStop) {
resolve();
return;
}
let timerId = setInterval(() => {
remain -= 300;
console.log(remain);
// flagStopを確認して処理を中断
if (remain < 0 || flagStop) {
clearTimeout(timerId);
console.log('sleep.stop');
resolve();
}
},300);
});
}
ワーカーを開始した後、1000ミリ秒後にstopメッセージを送信します。
main_wait2.js
// ワーカーを開始
const {Worker} = require('worker_threads');
const worker = new Worker('./worker_wait2.js');
// ワーカーからの結果を受ける
worker.on('message', msg => {
console.log(msg);
});
// ワーカーに仕事を依頼
worker.postMessage({action:'sleep', args:[3000]});
// 1000ミリ秒後中断
setTimeout(()=>{
worker.postMessage({action:'stop'});
}, 1000);
実行してみます。無事に処理を途中で停止することができました。
% node main_wait2.js
worker received: { action: 'sleep', args: [ 3000, [length]: 1 ] }
2700
2400
2100
worker received: { action: 'stop' }
STOP!!!
1800
ok
中止するタイミングを変更しても、正しく動くことが分かるでしょう。
まとめ
ワーカースレッド、かなり便利です。Node.jsでちょっとしたサーバー作った時でも、それなりに時間がかかる処理があれば、ワーカースレッドにして実行すれば、効率的にマシンリソースを使えそうと思いました。