JavaScriptでgoroutine(WaitGroup)っぽい動きを実装してみる
背景
卒業研究の検証を行う際、常に一定数の非同期処理を同時に実行するプログラムが必要になりました。Promise.all
を用いて並行実行することはできるのですが、ある程度の処理をまとめて実行する関係上、処理の数を常に一定にすることができませんでした。他に情報も見当たらなかった(記事書いてる時に発見したやつは最後に)ので、並列処理が扱いやすいと言われているGoのWaitGroupを元にして実装してみました。
環境
- TypeScript Playground
- TypeScript 4.9.4
使い方
- インスタンスを生成
- 非同期処理の実行前に
addメソッド
を実行 - 非同期処理の実行
- 非同期処理の終了時に
doneメソッド
を実行 - 2~4を繰り返す
- 一通り実行したら、
waitメソッド
で全ての処理が終了するまで待機
基本的にはGolangにおける使い方(公式ドキュメント)と変わりませんが、waitメソッド
を拡張することで、実行中の処理が全て完了するまでという条件から、実行数が一定値より少なくなるまで待機という条件に変更しています。
wait
メソッドのデフォルト引数では、同時実行数が1より少なくなる(要は全ての処理が完了する)まで待機する仕様になっています。
同時実行の対象となる非同期処理は、実行時にawait
を付けないという点に注意してください。await
を付けてしまうと繰り返すたびに、非同期処理でブロッキングされるため、同時実行にならず、逐次実行になります。
const asyncFunction = (n: number) => new Promise(resolve => {
setTimeout(() => {
resolve(undefined);
}, n * 10);
});
// 同時実行数の上限
const maxConcurrent = 3;
// main関数
(async () => {
// 1. インスタンスの生成
const wg = new WaitGroup();
// 5. 2~4を繰り返す
for (const i of Array(10).keys()) {
// 同時実行数が設定値を下回るまで待機
await wg.wait(maxConcurrent);
// 2. 非同期処理の実行前にaddメソッドを実行
wg.add();
console.log(`同時実行数:${wg.getExecutingNumber()} インデックス:${i}`)
// 3. 非同期処理の実行
//
asyncFunction(wg, i)
.then(() => console.log(`${i}番目の処理が終了しました`))
.then(() => wg.done()); // 4. 非同期処理の終了時にdoneメソッドを実行
}
// 6. 一通り実行したら、`waitメソッド`で全ての処理が終了するまで待機
await wg.wait();
})();
実行結果(Playgroundのログ)
ターミナルで実行すれば[Log]
やログのダブルクオテーションは消えるはずです。
同時実行数を常に一定にしながら非同期処理を行えていることがわかると思います。
[LOG]: "同時実行数:1 インデックス:0"
[LOG]: "同時実行数:2 インデックス:1"
[LOG]: "同時実行数:3 インデックス:2"
[LOG]: "0番目の処理が終了しました"
[LOG]: "1番目の処理が終了しました"
[LOG]: "同時実行数:2 インデックス:3"
[LOG]: "同時実行数:3 インデックス:4"
[LOG]: "2番目の処理が終了しました"
[LOG]: "同時実行数:3 インデックス:5"
[LOG]: "3番目の処理が終了しました"
[LOG]: "同時実行数:3 インデックス:6"
[LOG]: "4番目の処理が終了しました"
[LOG]: "同時実行数:3 インデックス:7"
[LOG]: "5番目の処理が終了しました"
[LOG]: "同時実行数:3 インデックス:8"
[LOG]: "6番目の処理が終了しました"
[LOG]: "同時実行数:3 インデックス:9"
[LOG]: "7番目の処理が終了しました"
[LOG]: "8番目の処理が終了しました"
[LOG]: "9番目の処理が終了しました
実装
waitは元々whileによる無限ループで実装していたのですが、イベントループのキューに投げられた非同期処理が実行されないので、非同期のsleepメソッドを用いて一定間隔でイベントループを回すようにしています。この間隔はインスタンスの生成時に指定可能です。
待機間隔はデフォルトで10msに設定されています(初期化時に変更可)。
type WaitGroupConstructorParameter = { waitIntervalMilli?: number };
class WaitGroup {
private executingNumber = 0;
private waitIntervalMilli = 10;
constructor(parameter?: WaitGroupConstructorParameter) {
if (parameter?.waitIntervalMilli) this.setWaitInterval(parameter.waitIntervalMilli);
}
public add() {
this.setExecutingNumber(this.executingNumber + 1);
}
public done() {
this.setExecutingNumber(this.executingNumber - 1);
}
public async wait(limit=1) {
if (limit < 1) {
Error("1以上の値を設定してください");
}
while (this.executingNumber > limit - 1) {
await this.sleep(this.waitIntervalMilli);
}
}
public getExecutingNumber() {
return this.executingNumber;
}
private setExecutingNumber(value: number) {
if (value < 0) {
throw Error('すでに全ての処理が終了しています')
}
this.executingNumber = value;
}
private setWaitInterval(value: number) {
if (value < 1) {
throw Error("1以上の値を設定してください(単位ms)")
}
this.waitIntervalMilli = value;
}
private sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
注意点
Goに倣って実装していますが、非同期処理の部分はJavaScriptなので、gorutineのような並列処理にはなりません。並列処理が必要な場合はWeb Worker等を利用する必要があります。今回の手法は、通信やファイルの読み込みなどJavaScriptの処理外で時間がかかる非同期関数に対して有効な手段となります。
最後に
ここまで実装して記事を書いてる時に以下の記事を知りました(もう少し調べるべきだった…)。JavaScriptの流儀に則った実装も可能なようです。ジェネレータや、プロミスの中で複数のプロミスを捌くなど、非同期処理に関して非常に勉強になる記事でした。
参考資料