はじめに
node.jsでマルチスレッドを実現するworker_threadsについて、実例を交えて説明します。
参考
- ハンズオンNode.js
worker_threadsモジュール
例として、http://localhost:3000/10へのアクセスに対して10番目のフィボナッチ数を返すような簡易Webアプリケーションを構築していきます。
パフォーマンスを担保するために生成するスレッドをプールして使い回すような仕組みにしたいです。
以下の3ファイルで実現できます。
thread-pool.js
'use strict'
const { Worker } = require('worker_threads')
module.exports = class ThreadPool {
// 空きスレッド、キューを初期化
availableWorkers = []
queue = []
constructor(size, filePath, options) {
// 引数で指定されたとおりにスレッドを生成してプール
for (let i = 0; i < size; i++) {
this.availableWorkers.push(new Worker(filePath, options))
}
}
// 外部からの処理要求を受け付けるメソッド
executeInThread(arg) {
return new Promise(resolve => {
const request = { resolve, arg }
// 空きスレッドがあればリクエストを処理し、なければキューに積む
const worker = this.availableWorkers.pop()
worker ? this.#process(worker, request) : this.queue.push(request)
})
}
// 実際にスレッドで処理を実行するprivateメソッド
#process(worker, { resolve, arg }) {
worker.once('message', result => {
// リクエスト元に結果を返す
resolve(result)
// キューに積まれたリクエストがあれば処理し、なければ空きスレッドに戻す
const request = this.queue.shift()
request
? this.#process(worker, request)
: this.availableWorkers.push(worker)
})
worker.postMessage(arg)
}
}
fibonacci.js
'use strict'
const fibonacci = require('../fibonacci')
const { parentPort } = require('worker_threads')
// messageイベントの監視によりメインスレッドからのメッセージの受信を待機、
// 受信したらフィボナッチ数を計算して結果をメインスレッドに送信
parentPort.on('message', n => parentPort.postMessage(fibonacci(n)))
web-app.js
'use strict'
const http = require('http')
const cpuCount = require('os').cpus().length
const ThreadPool = require('../thread-pool')
// CPUコア数と同じサイズのスレッドプールを生成
const threadPool = new ThreadPool(cpuCount, `${__dirname}/fibonacci.js`)
http.createServer(async (req, res) => {
const n = Number(req.url.substr(1))
if (Number.isNaN(n)) {
return res.end()
}
const result = await threadPool.executeInThread(n)
res.end(result.toString())
}).listen(3000)
処理の流れ
詳しく説明していきます
-
まず、
node web-app
でWebサーバーを起動- web-app.jsより、スレッドプールが作成され、
webapp.jsconst threadPool = new ThreadPool(cpuCount, `${__dirname}/fibonacci.js`)
- CPUのコア数だけフィボナッチ計算処理を行うスレッド(fibonacci.js)がプールされる。このスレッドではmessageイベントの監視によりメインスレッドからのメッセージの受信を待機するリスナーが登録されている。
(受信したらフィボナッチ数を計算して結果をメインスレッドに送信:後述)
fibonacci.jsparentPort.on('message', n => parentPort.postMessage(fibonacci(n)))
-
http://localhost:3000/10へリクエストを送信
-
Webサーバーがhttpリクエストを受け取ると、スレッドプールにフィボナッチ計算依頼のための処理が非同期(Promiseを生成)で走る。これは
threadPool.executeInThread(n)
の部分。nはリクエストした数値(=10)
web-app.js
const n = Number(req.url.substr(1))
if (Number.isNaN(n)) {
return res.end()
}
const result = await threadPool.executeInThread(n)
thread-pool.js
executeInThread(arg) {
return new Promise(resolve => {
const request = { resolve, arg }
// 空きスレッドがあればリクエストを処理し、なければキューに積む
const worker = this.availableWorkers.pop()
worker ? this.#process(worker, request) : this.queue.push(request)
})
}
- exucuteInThreadが返すPromiseの最後
this.#process(worker, request)
で空いているスレッド(availableWorkers)に対して、計算依頼(postMessage)をする。
thread-pool.js
#process(worker, { resolve, arg }) {
// 省略
worker.postMessage(arg)
}
- messageをキャッチしたスレッド(サブスレッド)がスレッドプール(メインスレッド)に結果をpostMessageで返す
fibonacci.js
parentPort.on('message', n => parentPort.postMessage(fibonacci(n)))
- メインスレッドはonceでmessageをキャッチし、先程生成したPromiseを計算結果でresolveする。サブスレッドは
queue
にたまっている処理を引き続き行うか、空スレッドに戻る。ここのmessageイベントリスナーはonceで登録されているので、この時点で消える
thread-pool.js
#process(worker, { resolve, arg }) {
worker.once('message', result => {
// リクエスト元に結果を返す
resolve(result)
// キューに積まれたリクエストがあれば処理し、なければ空きスレッドに戻す
const request = this.queue.shift()
request
? this.#process(worker, request)
: this.availableWorkers.push(worker)
})
...
- Promiseがresolveされたことにより、await処理が完了し、Webサーバーは
res.end(result.toString())
で計算結果を返す
web-app.js
const result = await threadPool.executeInThread(n)
res.end(result.toString())
終わり
ハンズオンNode.jsを読んでいて、スレッドプールの部分が少々分かりづらかったので、忘備録も兼ねてまとめてみました。一つずつ処理を紐解いていくとスレッドプールがどのように実現しているのかわかるかと思います。