はじめに
JavaScriptはシングルスレッドだけど、時々、並列処理をしたくなります。
RxJSを使うと綺麗に書けたので紹介します。
やり方
ホスト
import { Observable, from } from 'rxjs'
import { bufferCount, mergeMap } from 'rxjs/operators'
const enum InputType {
InitialData,
Direction,
}
interface InitialData {
type: InputType.InitialData
// 初期化のためのデータを追加
}
interface Direction {
type: InputData.Direction
// 並列化する処理のためのデータを追加
}
function start(initialData: InitialData, directions: Direction[], threads: number) {
const chunkSize = Math.ceil(directions.length / threads)
return from(directions)
.pipe(
bufferCount(chunkSize),
mergeMap(directions => createWorker(initialData, directions)),
)
}
const enum ReturnType {
CalculationResult,
Finished,
}
interface CalculationResult {
type: ReturnType.CalculationResult
// 計算結果を追加
}
interface Finished {
type: ReturnType.Finished
}
function createWorker(initialData: InitialData, directions: Direction[]) {
return new Observable<CalculationResult>(observer => {
const worker = new Worker('worker.js')
worker.addEventListener('message', e => {
const data: CalculationResult | Finished = e.data
switch (data.type) {
case ReturnType.CalculationResult:
observer.next(data)
break
case ReturnType.Finished:
observer.complete()
break
}
})
worker.addEventListener('error', e => {
observer.error(e)
})
worker.postMessage(initialData)
worker.postMessage(directions)
return () => {
worker.terminate()
}
})
}
ワーカー
onmessage = e => {
const data: InitialData | Direction[] = e.data
if (!Array.isArray(data)) {
// 初期化処理
} else {
data.forEach(direction => {
// 計算
postMessage({ type: ReturnType.CalculationResult })
})
postMessage({ type: ReturnType.Finished })
}
}
解説
ポイントは2つです。
一つは、スレッド数からチャンクサイズを計算して bufferCount
で directions
を分割することです。ほぼ均等な数に分割できます。
もう一つは、 Observable
のコンストラクタ引数では関数を一つ渡しますが、その関数の戻り値として worker.terminate()
を実行する関数を返すことです。これで、購読が終わった時に必ずワーカーが止まります。これを知らなかった時は、計算が終了した数を自分で計算してワーカーを停止していました。
参考にしたページ