6
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Node.jsのworker_threadsモジュール(スレッドプール)

Last updated at Posted at 2022-02-20

はじめに

node.jsでマルチスレッドを実現するworker_threadsについて、実例を交えて説明します。

参考

  • ハンズオンNode.js

worker_threadsモジュール

例として、http://localhost:3000/10へのアクセスに対して10番目のフィボナッチ数を返すような簡易Webアプリケーションを構築していきます。
パフォーマンスを担保するために生成するスレッドをプールして使い回すような仕組みにしたいです。
以下の3ファイルで実現できます。

thread-pool.js
'use strict'
const { Worker } = require('worker_threads')

module.exports = class ThreadPool {
  // 空きスレッド、キューを初期化
  availableWorkers = []
  queue = []
  constructor(size, filePath, options) {
    // 引数で指定されたとおりにスレッドを生成してプール
    for (let i = 0; i < size; i++) {
      this.availableWorkers.push(new Worker(filePath, options))
    }
  }

  // 外部からの処理要求を受け付けるメソッド
  executeInThread(arg) {
    return new Promise(resolve => {
      const request = { resolve, arg }

      // 空きスレッドがあればリクエストを処理し、なければキューに積む
      const worker = this.availableWorkers.pop()
      worker ? this.#process(worker, request) : this.queue.push(request)
    })
  }

  // 実際にスレッドで処理を実行するprivateメソッド
  #process(worker, { resolve, arg }) {
    worker.once('message', result => {
      // リクエスト元に結果を返す
      resolve(result)

      // キューに積まれたリクエストがあれば処理し、なければ空きスレッドに戻す
      const request = this.queue.shift()
      request
        ? this.#process(worker, request)
        : this.availableWorkers.push(worker)
    })
    worker.postMessage(arg)
  }
}
fibonacci.js
'use strict'
const fibonacci = require('../fibonacci')
const { parentPort } = require('worker_threads')

// messageイベントの監視によりメインスレッドからのメッセージの受信を待機、
// 受信したらフィボナッチ数を計算して結果をメインスレッドに送信
parentPort.on('message', n => parentPort.postMessage(fibonacci(n)))
web-app.js
'use strict'
const http = require('http')
const cpuCount = require('os').cpus().length
const ThreadPool = require('../thread-pool')

// CPUコア数と同じサイズのスレッドプールを生成
const threadPool = new ThreadPool(cpuCount, `${__dirname}/fibonacci.js`)

http.createServer(async (req, res) => {
  const n = Number(req.url.substr(1))
  if (Number.isNaN(n)) {
    return res.end()
  }
  const result = await threadPool.executeInThread(n)
  res.end(result.toString())
}).listen(3000)

処理の流れ

詳しく説明していきます

  • まず、node web-appでWebサーバーを起動

    • web-app.jsより、スレッドプールが作成され、
    webapp.js
    const threadPool = new ThreadPool(cpuCount, `${__dirname}/fibonacci.js`)
    
    • CPUのコア数だけフィボナッチ計算処理を行うスレッド(fibonacci.js)がプールされる。このスレッドではmessageイベントの監視によりメインスレッドからのメッセージの受信を待機するリスナーが登録されている。
      (受信したらフィボナッチ数を計算して結果をメインスレッドに送信:後述)
    fibonacci.js
    parentPort.on('message', n => parentPort.postMessage(fibonacci(n)))
    
  • http://localhost:3000/10へリクエストを送信

  • Webサーバーがhttpリクエストを受け取ると、スレッドプールにフィボナッチ計算依頼のための処理が非同期(Promiseを生成)で走る。これはthreadPool.executeInThread(n)の部分。nはリクエストした数値(=10)

web-app.js
const n = Number(req.url.substr(1))
if (Number.isNaN(n)) {
  return res.end()
}
const result = await threadPool.executeInThread(n)
thread-pool.js
executeInThread(arg) {
 return new Promise(resolve => {
   const request = { resolve, arg }

   // 空きスレッドがあればリクエストを処理し、なければキューに積む
   const worker = this.availableWorkers.pop()
   worker ? this.#process(worker, request) : this.queue.push(request)
 })
}
  • exucuteInThreadが返すPromiseの最後this.#process(worker, request)で空いているスレッド(availableWorkers)に対して、計算依頼(postMessage)をする。
thread-pool.js
#process(worker, { resolve, arg }) {
    // 省略
  worker.postMessage(arg)
}
  • messageをキャッチしたスレッド(サブスレッド)がスレッドプール(メインスレッド)に結果をpostMessageで返す
fibonacci.js
parentPort.on('message', n => parentPort.postMessage(fibonacci(n)))
  • メインスレッドはonceでmessageをキャッチし、先程生成したPromiseを計算結果でresolveする。サブスレッドはqueueにたまっている処理を引き続き行うか、空スレッドに戻る。ここのmessageイベントリスナーはonceで登録されているので、この時点で消える
thread-pool.js
#process(worker, { resolve, arg }) {
  worker.once('message', result => {
    // リクエスト元に結果を返す
    resolve(result)

   // キューに積まれたリクエストがあれば処理し、なければ空きスレッドに戻す
    const request = this.queue.shift()
    request
      ? this.#process(worker, request)
      : this.availableWorkers.push(worker)
  })
  ...
  • Promiseがresolveされたことにより、await処理が完了し、Webサーバーはres.end(result.toString())で計算結果を返す
web-app.js
const result = await threadPool.executeInThread(n)
res.end(result.toString())

終わり

ハンズオンNode.jsを読んでいて、スレッドプールの部分が少々分かりづらかったので、忘備録も兼ねてまとめてみました。一つずつ処理を紐解いていくとスレッドプールがどのように実現しているのかわかるかと思います。

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?