JavaScript
WebWorker
RxJS

Web Workerによる並列処理をRxJSを用いて実装する

はじめに

JavaScriptはシングルスレッドだけど、時々、並列処理をしたくなります。
RxJSを使うと綺麗に書けたので紹介します。

やり方

ホスト
import { Observable, from } from 'rxjs'
import { bufferCount, mergeMap } from 'rxjs/operators'

const enum InputType {
    InitialData,
    Direction,
}

interface InitialData {
    type: InputType.InitialData
    // 初期化のためのデータを追加
}

interface Direction {
    type: InputData.Direction
    // 並列化する処理のためのデータを追加
}

function start(initialData: InitialData, directions: Direction[], threads: number) {
    const chunkSize = Math.ceil(directions.length / threads)
    return from(directions)
        .pipe(bufferCount(chunkSize))
        .pipe(mergeMap(directions => createWorker(initialData, directions)))
}

const enum ReturnType {
    CalculationResult,
    Finished,
}

interface CalculationResult {
    type: ReturnType.CalculationResult
    // 計算結果を追加
}

interface Finished {
    type: ReturnType.Finished
}

function createWorker(initialData: InitialData, directions: Direction[]) {
    return new Observable<CalculationResult>(observer => {
        const worker = new Worker('worker.js')

        worker.addEventListener('message', e => {
            const data: CalculationResult | Finished = e.data
            if (data.type === ReturnType.CalculationResult) {
                worker.next(data)
            } else if (data.type === ReturnType.Finished) {
                worker.complete()
            }
        })

        worker.addEventListener('error', e => {
            worker.error(e)
        })

        worker.postMessage(initialData)
        worker.postMessage(directions)

        return () => { worker.terminate() }
    })
}
ワーカー
onmessage = e => {
    const data: InitialData | Direction[] = e.data

    if (!Array.isArray(data)) {
        // 初期化処理
    } else {
        data.forEach(direction => {
            // 計算
            postMessage({ type: ReturnType.CalculationResult })
        })

        postMessage({ type: ReturnType.Finished })
    }
}

解説

ポイントは2つです。

一つは、スレッド数からチャンクサイズを計算して bufferCountdirections を分割することです。ほぼ均等な数に分割できます。

もう一つは、 Observable のコンストラクタ引数では関数を一つ渡しますが、その関数の戻り値として worker.terminate() を実行する関数を返すことです。これで、購読が終わった時に必ずワーカーが止まります。これを知らなかった時は、計算が終了した数を自分で計算してワーカーを停止していました。

参考にしたページ

https://qiita.com/_likr/items/d382dc120a942ba4c6fe