4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Web Workerによる並列処理をRxJSを用いて実装する

Last updated at Posted at 2018-08-09

はじめに

JavaScriptはシングルスレッドだけど、時々、並列処理をしたくなります。
RxJSを使うと綺麗に書けたので紹介します。

やり方

ホスト
import { Observable, from } from 'rxjs'
import { bufferCount, mergeMap } from 'rxjs/operators'

const enum InputType {
  InitialData,
  Direction,
}

interface InitialData {
  type: InputType.InitialData
  // 初期化のためのデータを追加
}

interface Direction {
  type: InputData.Direction
  // 並列化する処理のためのデータを追加
}

function start(initialData: InitialData, directions: Direction[], threads: number) {
  const chunkSize = Math.ceil(directions.length / threads)
  return from(directions)
    .pipe(
      bufferCount(chunkSize),
      mergeMap(directions => createWorker(initialData, directions)),
    )
}

const enum ReturnType {
  CalculationResult,
  Finished,
}

interface CalculationResult {
  type: ReturnType.CalculationResult
  // 計算結果を追加
}

interface Finished {
  type: ReturnType.Finished
}

function createWorker(initialData: InitialData, directions: Direction[]) {
  return new Observable<CalculationResult>(observer => {
    const worker = new Worker('worker.js')

    worker.addEventListener('message', e => {
      const data: CalculationResult | Finished = e.data
      switch (data.type) {
        case ReturnType.CalculationResult:
          observer.next(data)
          break
        case ReturnType.Finished:
          observer.complete()
          break
      }
    })

    worker.addEventListener('error', e => {
      observer.error(e)
    })

    worker.postMessage(initialData)
    worker.postMessage(directions)

    return () => {
      worker.terminate()
    }
  })
}
ワーカー
onmessage = e => {
  const data: InitialData | Direction[] = e.data

  if (!Array.isArray(data)) {
    // 初期化処理
  } else {
    data.forEach(direction => {
      // 計算
      postMessage({ type: ReturnType.CalculationResult })
    })

    postMessage({ type: ReturnType.Finished })
  }
}

解説

ポイントは2つです。

一つは、スレッド数からチャンクサイズを計算して bufferCountdirections を分割することです。ほぼ均等な数に分割できます。

もう一つは、 Observable のコンストラクタ引数では関数を一つ渡しますが、その関数の戻り値として worker.terminate() を実行する関数を返すことです。これで、購読が終わった時に必ずワーカーが止まります。これを知らなかった時は、計算が終了した数を自分で計算してワーカーを停止していました。

参考にしたページ

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?