12
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Node.jsのworker_threadsで並列処理

Last updated at Posted at 2021-12-27

Node.jsでworker_threadsを使うとき、非同期処理の使い方にコツがあったのでメモ。

並列処理の基本

Node.jsでworker_threadsを使うと並列処理が可能です。
以下の二つの記事がとても分かりやすいです。

上記を一口で言うと、以下のようになります:

main.js
// ワーカーを作成する
const {Worker} = require('worker_threads');
const worker = new Worker('./worker.js');
// ワーカーからの結果を受ける
worker.on('message', msg => {
    console.log(msg);
});
// ワーカーに仕事を依頼
worker.postMessage({action:'mul', args:[2, 3]});
worker.js
const {parentPort} = require('worker_threads');
// 親からのメッセージを受ける
parentPort.on('message', async msg => {
    console.log('worker received: %o', msg);
    const {action, args} = msg;
    // 依頼に応じて処理を分ける
    if (action === 'add') {
        // 処理を行う
        const v = args[0] + args[1];
        // 親に処理結果を送信
        parentPort.postMessage({'result': v});
    }
    else if (action === 'mul') {
        // 処理を行う
        const v = args[0] * args[1];
        // 親にメッセージを送信
        parentPort.postMessage({'result': v});
    }
    else {
        throw new Error('Unknown action');
    }
    process.exit(); // ワーカーを終了
});

ワーカースレッドと非同期処理を組み合わせる

例えば、指定ミリ秒だけ休むだけの以下のようなワーカーを定義。

worker_wait.js
const {parentPort} = require('worker_threads');
// 親からのメッセージを受ける
parentPort.on('message', async msg => {
    console.log('worker received: %o', msg);
    const {action, args} = msg;
    if (action === 'sleep') {
        await sleep(args[0]); // 非同期で仕事を実行
        parentPort.postMessage('ok'); // 仕事完了をメインスレッドに通知
    }
    process.exit();
});

// 非同期で行う仕事をpromiseで定義
function sleep(msec) {
    return new Promise((resolve, reject) => {
        let remain = msec;
        let timerId = setInterval(() => {
            console.log(remain);
            remain -= 300;
            if (remain < 0) {
                clearTimeout(timerId);
                resolve();
            }
        },300);
    });
}

上記ワーカーを実行するコードは以下。

main_wait.js
// ワーカーを開始
const {Worker} = require('worker_threads');
const worker = new Worker('./worker_wait.js');
// ワーカーからの結果を受ける
worker.on('message', msg => {
    console.log(msg);
});
// ワーカーに仕事を依頼
worker.postMessage({action:'sleep', args:[3000]});

実行すると、以下のような結果が表示されます。

% node main_wait.js
worker received: { action: 'sleep', args: [ 3000, [length]: 1 ] }
3000
2700
...省略...
600
300
0
ok

worker_threadsの処理を途中で中止する

ワーカーの動作を途中で中止したい場合があります。その場合、適当なstopなどのアクションを追加すればできます。動作を停止するかを判断する変数flagStopを導入します。

worker_wait2.js
const {parentPort} = require('worker_threads');
let flagStop = false;

// 親からのメッセージを受ける
parentPort.on('message', async msg => {
    console.log('worker received: %o', msg);
    const {action, args} = msg;
    if (action === 'sleep') {
        await sleep(args[0]); // 処理1を実行
        await sleep(args[0]); // 処理2を実行
        await sleep(args[0]); // 処理3を実行
        parentPort.postMessage('ok'); // 親に通知
        process.exit();
    }
    else if (action === 'stop') {
        flagStop = true;
        console.log('STOP!!!');
    }
});

function sleep(msec) {
    return new Promise((resolve, reject) => {
        let remain = msec;
        if (flagStop) {
            resolve();
            return;
        }
        let timerId = setInterval(() => {
            remain -= 300;
            console.log(remain);
            // flagStopを確認して処理を中断
            if (remain < 0 || flagStop) {
                clearTimeout(timerId);
                console.log('sleep.stop');
                resolve();
            }
        },300);
    });
}

ワーカーを開始した後、1000ミリ秒後にstopメッセージを送信します。

main_wait2.js
// ワーカーを開始
const {Worker} = require('worker_threads');
const worker = new Worker('./worker_wait2.js');
// ワーカーからの結果を受ける
worker.on('message', msg => {
    console.log(msg);
});
// ワーカーに仕事を依頼
worker.postMessage({action:'sleep', args:[3000]});
// 1000ミリ秒後中断
setTimeout(()=>{
    worker.postMessage({action:'stop'});
}, 1000);

実行してみます。無事に処理を途中で停止することができました。

% node main_wait2.js
worker received: { action: 'sleep', args: [ 3000, [length]: 1 ] }
2700
2400
2100
worker received: { action: 'stop' }
STOP!!!
1800
ok

中止するタイミングを変更しても、正しく動くことが分かるでしょう。

まとめ

ワーカースレッド、かなり便利です。Node.jsでちょっとしたサーバー作った時でも、それなりに時間がかかる処理があれば、ワーカースレッドにして実行すれば、効率的にマシンリソースを使えそうと思いました。

12
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?