はじめに
2023年12月21日分のアドベントカレンダーの記事になります。
以下記事の続きです。
【L1、L2】ブラウザでWorkerを動かしてみる(基本編)
【L1、L2】ブラウザでWorkerを動かしてみる(番外編)
前回、前々回では、ブラウザにてWorkerを実行してみましたが、今回はサーバーでWorkerを実行してみようと思います。javascriptの実行基盤であるNode.js上することで確認できそうですね。
せっかくですから、前回、前々回で構築した環境であるfastifyを利用しちゃいましょう。
この記事で分かること
- Node.js上で専用ワーカーを使いやすくするwokerpoolモジュールを利用方法。
準備
確認環境は前回、前々回で構築したfastify+typescriptを利用します。workerpoolもインストール済だと思いますので特にモジュールのインストール、設定変更は不要です。
確認コードの作成
特定リクエストにワーカーを実施して時間のかかる処理を実行できるようにしましょう。
時間のかかる処理はフィボナッチ数列を計算する処理です。
毎回シードの値が特定数値範囲内でランダムに与えられ計算されますので、それに伴って計算時間も変化します。
サーバーにて実施するワーカーコードを作成しましょう。
src/myWorker.tsを以下内容で追加します。
※こちらはworkerpoolサイトにて掲載されているコードを少しだけ修正してTypescriptのコードとして変更しています。
import workerpool from "workerpool";
// a deliberately inefficient implementation of the fibonacci sequence
function fibonacci(n: number): number {
if (n < 2) return n;
return fibonacci(n - 2) + fibonacci(n - 1);
}
// create a worker and register public functions
workerpool.worker({
fibonacci: fibonacci,
});
src/server.tsを以下内容で追加します。
import fastify from "fastify";
import fastifyStatic from "@fastify/static";
import path, { resolve } from "path";
import * as workerpool from "workerpool";
// 追加1
const PROMISE_TIMEOUT = 1000;
// 追加2
// TypescriptからJSにコンパイルされた時のパスを指定
// 手をぬいて「dist/src」配下に公開されてます。すんません。
const pool = workerpool.pool(path.join(process.cwd(), "dist/src/myWorker.js"), {
// これによって並列処理のタイプが変わる。
// Node.jsの時にしか指定できない値があるのでworkerpool のドキュメントをみてね。
nodeWorker: "thread",
// nodeWorker: "process",
maxQueueSize: 10,
maxWorkers: 2,
} as workerpool.WorkerPoolOptions);
// 追加3
// 指定範囲をランダムな値を取得する関数
const randRange = (min: number, max: number) =>
Math.floor(Math.random() * (max - min + 1) + min);
// 追加4
// WorkerPoolのエラーをフォーマットする
const formatError = (err: Error) => {
if (err instanceof workerpool.Promise.TimeoutError) {
return (
"TimeoutError: Evaluation exceeded maximum duration of " +
PROMISE_TIMEOUT / 1000 +
" seconds"
);
} else {
return err.toString();
}
};
const server = fastify();
server.register(fastifyStatic, {
root: path.join(process.cwd(), "public"),
});
server.register(fastifyStatic, {
root: path.join(process.cwd(), "node_modules/workerpool/dist"),
prefix: "/workerpool/dist",
decorateReply: false,
});
server.get("/ping", async (request, reply) => {
return "Hello world\n";
});
// 追加5
// 特定のURLにアクセスすると、workerpoolを使って計算を実行する
server.get("/take_much_time", async (request, reply) => {
try {
const seedNum = randRange(1, 50);
console.log(`seedNum: ${seedNum}`);
const result = await pool
// myWorkerスクリプト内のfibonacciを実行する
.exec("fibonacci", [seedNum])
// workerpool独自のPromiseメソッドでタイムアウトを設定する
.timeout(PROMISE_TIMEOUT)
// myWorkerスクリプト内のfibonacciの正常終了した場合の処理
.then((res) => {
console.log(res);
return { calculation: res, status: "success" };
})
// myWorkerスクリプト内のfibonacciの異常終了(例外発生含む)した場合の処理
// fibonacciがタイムアウトした場合もここに入る
.catch((err) => {
console.error("error!");
console.error(err);
console.error(formatError(err));
// 例外補足しても処理は継続
return { calculation: null, status: "failed" };
})
// myWorkerスクリプト内のfibonacciの結果に関わらず実行される処理
.then((res) => {
console.log("finish!");
console.log(res);
return res;
});
return result;
} catch (error) {
// workerpoolで実行前に既に指定したキューの上限に達している場合に発生する
// メッセージ:Max queue size of {maxQueueSize} reached
console.error(error);
throw error;
}
});
server.listen({ port: 18080 }, (err, address) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log(`server is listening at ${address}`);
});
実行結果
ボタン連打等で、maxQueueSizeの指定サイズを超えたら以下のエラー
まとめ
いかがだったでしょうか。
workerはjavascriptの実行基盤であれば、動作することがわかりますね。
但し、実行基盤の違いによりコードはことなります。
専用ワーカーに関してはworkerpoolのようなモジュールを利用するとかなり似た感じで実装できるので便利です。
また、node.jsでマルチスレッド、マルチプロセスのコードを記載するときに、まともに素のNode.js機能だけで記載しようとするとそれなりに時間もかかりますので積極的に利用していければと思っています。
それでは~