10
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

TypeScriptでワーカースレッド(Web Worker)を手軽に、そして型安全に使いたい

Last updated at Posted at 2022-06-02

ことの発端

Webブラウザで動作する javascriptWeb Wroker の存在は知っていたのですが、ワーカースレッドで動作するコードは別ファイルにする必要があるという認識で、今まで「何とかならんかなぁ?」と思いながら過ごしていました。
で、つい最近 PapaParse を使用する機会があって、この PapaParse がワーカースレッドで動作させることができるので、これは良い機会と興味本位でスレッドで動くべき javascript ファイルはどこで読み込まれるのかを Webブラウザのネットワークログを見て観測していたら、Blob URL で読み込んでいました。

「なんですと!」

そこで、もうちょい調べてみると、こんな記事が!!!

。。。ということで、Inline Workers を検証しようと思った次第です。
簡単そうなら Typescript のジェネリクスクラスを作ってみるし、「こりゃ大変だわ」だったら他力本願でライブラリを探そうと思います。(苦笑)

検証

基本形

こういうのは、まずエコー的なもですよね。

const url = window.URL.createObjectURL(new Blob([`
  self.onmessage = function (e) {
    self.postMessage("echo: " + e.data);
  };
`], {
    type: "text/javascript"
}));

const worker = new Worker(url);

worker.onmessage = function (e) {
  console.log(e.data);
  worker.terminate();
  URL.revokeObjectURL(url);
};

worker.postMessage("hello");
結果
[LOG]: "echo: hello" 

ファイルへの URL じゃなくて Blob URL にするだけなので、特に難しい感じじゃないですね。

関数を渡してみる

さて、いきなり欲が出てきて関数を渡してみましたが。。。

function x() {
    return "world";
}

const url = window.URL.createObjectURL(new Blob([`
  self.onmessage = function (e) {
    self.postMessage("echo: " + e.data());
  };
`], {
    type: "text/javascript"
}));

const worker = new Worker(url);

worker.onmessage = function (e) {
  console.log(e.data);
  worker.terminate();
  URL.revokeObjectURL(url);
};

worker.postMessage(x);
結果
[ERR]: "Executed JavaScript Failed:" 
[ERR]: Failed to execute 'postMessage' on 'Worker': function x() {
    return "world";
} could not be cloned. 

ダメでした。
postMessage() で渡せるものは、構造化複製アルゴリズム に適合するオブジェクトじゃないと渡せないです。
ザックリ言うと 「関数はダメ」って感じです。
Object も渡せますが、独自に作成したクラスObject に関数が含まれるとダメです。1

関数を文字列に展開してみる

function オブジェクトは toString() することでソースコードの文字列になるのですね~~
。。。知りませんでした。
で、ならばと「関数を文字列展開+呼び出す」のはどうでしょう?

function x() {
    return "world";
}

const url = window.URL.createObjectURL(new Blob([`
  self.onmessage = function (e) {
    self.postMessage("echo: " + e.data + ${x.toString()}());
  };
`], {
    type: "text/javascript"
}));

const worker = new Worker(url);

worker.onmessage = function (e) {
  console.log(e.data);
  worker.terminate();
  URL.revokeObjectURL(url);
};

worker.postMessage("hello");
結果
[LOG]: "echo: helloworld" 

お~できた。

関数に引数を渡せるようにしてみる

関数を呼び出せるのであれば、引数を渡してみたいですよね。

type x_param = {
    a: number;
    b: number;
};

function x(p: x_param) {
    return p.a + p.b;
}

const url = window.URL.createObjectURL(new Blob([`
  self.onmessage = function (e) {
    self.postMessage("result: " + ${x.toString()}(e.data));
  };
`], {
    type: "text/javascript"
}));

const worker = new Worker(url);

worker.onmessage = function (e) {
  console.log(e.data);
  worker.terminate();
  URL.revokeObjectURL(url);
};

worker.postMessage({a: 1, b: 2});
結果
[LOG]: "result: 3"

できました。

クラス化してみる

検証してみると、思ったほど難しい話では無さそうなので、このままクラスを作ります。
ということで、型安全に扱えるように下記のジェネリクスクラスを作成しました。

TypedWorkerクラス
class TypedWorker<FUNC extends (...args: any[]) => any> {
  private m_func: FUNC;

  public constructor(func: FUNC) {
    this.m_func = func;
  }

  public execute(params: Parameters<FUNC>): Promise<ReturnType<FUNC>> {
    const url = window.URL.createObjectURL(
      new Blob(
        [
          `self.onmessage = function (e) { self.postMessage(${this.m_func.toString()}(...e.data)); };`
        ],
        {
          type: "text/javascript"
        }
      )
    );

    const worker = new Worker(url);
    const destruct = () => {
      worker.terminate();
      URL.revokeObjectURL(url);
    };
    return new Promise((resolve, reject) => {
      worker.onmessage = (e) => {
        resolve(e.data);
        destruct();
      };
      worker.onerror = (e) => {
        reject(e);
        destruct();
      };
      worker.postMessage(params);
    });
  }
}
サンプル
function add_in_worker(a: number, b: number, c: string) {
  if (c === "error") {
    throw new Error(`${c} : ${a} + ${b} = ${a + b}`);
  }
  return `${c} : ${a} + ${b} = ${a + b}`;
}

const worker = new TypedWorker(add_in_worker);

worker.execute([1, 2, "abc"])
  .then((re) => {
    console.log(re);
  })
  .catch((err) => {
    console.error(err.message);
  });

worker.execute([1, 2, "error"])
  .then((re) => {
    console.log(re);
  })
  .catch((err: Error) => {
    console.error(err.message);
  });
結果
[LOG]: "abc : 1 + 2 = 3"
[ERR]: "Uncaught Error: error : 1 + 2 = 3" 

エラー処理も追加してちゃんと動いています。
execute() の引数がタプルなのはご愛敬ということで。

メモ書き的なもの

1. デバッグ

考えられるアイデアは下記のとおり。

  • ソースコード上で debugger; を仕込んでデバッガに遷移させる。(他に手が無いときの常套手段ですね)
  • TypedWorker<> クラスにデバッグモードを仕込んでおいて、 execute() が「即時実行するけど Promise を返却する」みたいにすれば、関数はワーカースレッドではなくメインスレッドで実行されますが、モジュールチェック的な使い方はアリかと。

2. 所有権移転

Web Worker に「巨大、且つ、メインスレッドで不要となるオブジェクト」を渡す場合は、postMessage() の第二引数で所有権を放棄する手法を使うべきです。

注意点としては、所有権を放棄できるのは下記の Transfarable オブジェクトに限られます。

  • ArrayBuffer
  • MessagePort
  • ReadableStream
  • WritableStream
  • TransformStream
  • AudioData
  • ImageBitmap
  • VideoFrame
  • OffscreenCanvas
  • RTCDataChannel
  • Note: Transferrable

そして、この Transfarable は独自で定義することがでないため、用途は限定的です。
例えば、巨大な画像から特徴点抽出を行いハッシュ化するなんて処理ですが、仮に巨大画像をインターネット上から取得するのであれば、画像の取得処理そのものを WebWorker に移譲すべきじゃないかと考えるのですが、これはこれで大変です。
というのも、 WebWorkeraxios 等のライブラリは使えません。2
この辺、必要になれば後で考えようかと。

3. 共用メモリ

ワーカスレッドが使えるのであれば、共用メモリ的なものは無いのかなと思ったらありました。
SharedArrayBufferpostMessage() に渡して、他のスレッドと同期をする際に Atomics を使い、データの更新を notify で通知して、 wait で更新を待つことができます。(waitはメインスレッドでは使ってはいけません3

準備

この SharedArrayBuffer を使うためには http のレスポンスヘッダに下記の項目が必要になります。

Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: require-corp

参考までに vue-cliserv を使っているなら vue.config.js に下記を追加します。

vue.config.js
module.exports = defineConfig({
  devServer: {
    headers: {
      "Cross-Origin-Opener-Policy": "same-origin",
      "Cross-Origin-Embedder-Policy": "require-corp"
    }
  }
})

サンプルコード

共用メモリのサンプル
  function generateRandom(arr: Float64Array, counter: Int32Array) {
    console.log("generateRandom begin");
    arr.forEach((x, i) => {
      arr[i] = Math.random();
    });
    console.log("generateRandom end");
    counter[0] = 1;
    const n = Atomics.notify(counter, 0);
    console.log(`Atomics.notify() -> ${n}`);
  }

  function calculateArray(arr: Float64Array, counter: Int32Array) {
    console.log("calculateArray begin");
    const re = Atomics.wait(counter, 0, 0);
    console.log(`calculateArray Atomics.wait() -> ${re}`);
    return arr.reduce((p, c) => p + c);
  }

  const counter = new Int32Array(new SharedArrayBuffer(1 * 4));
  const arr = new Float64Array(new SharedArrayBuffer(8 * 50 * 1000 * 1000));
  counter[0] = 0;

  const worker_generateRandom = new TypedWorker(generateRandom);
  const worker_calculateArray = new TypedWorker(calculateArray);

  worker_calculateArray
    .execute([arr, counter])
    .promise.then((x) => console.log(x));

  worker_generateRandom.execute([arr, counter]);
[LOG]: calculateArray begin
[LOG]: generateRandom begin
[LOG]: generateRandom end
[LOG]: Atomics.notify() -> 1
[LOG]: calculateArray Atomics.wait() -> ok
[LOG]: 31250214.50791968

一見、Float64Array だけで良さそうですが、Atomicsnotify() できるものは Int32Array に限られるので、ここでは同期オブジェクト的に counter を使っています。(よもやカウンターとしての機能がないので、変数名がアレですが)
ちなみに、SharedArrayBuffer のサイズですがこれはバイト数です。なのでInt32Arrayであれば4バイトで1要素、Float64Array であれば8バイトで1要素となります。
あと、JavaScript にはアトミック操作はありますが、ブロッキングをするにはひと手間必要です。(このあたり、急に低レベル言語並みにAPIのサポートが減る気がする)

癖がある Atomics.wait()

個人的に Atomics.wait() の癖が強いと感じたので忘備録。 4

Atomics.wait(
  typedArray: BigInt64Array | Int32Array,
  index: number,
  value: bigint,
  timeout?: number | undefined
): "ok" | "not-equal" | "timed-out"
「待つ条件」の罠

Atomics.wait()typedArrayindex に存在する値が value だったら待ちます。言い換えると、value と異なる値になるまで待ちます。value で指定した値になるまで待つのではありません

返却値の罠

timeout をミリ秒で指定して、タイムアウトすると "timed-out" が返却されます。先述の「待つ条件」が偽になると "ok" を返却します。
次に timeout を未指定(undefined)とすると無限に待ち、先述の「待つ条件」が偽になると "ok" を返却します。

そうなると、"not-equal" ってどのタイミングで返却さえれるんだ? という話です。

これは timeout の設定どうこうじゃなくて、Atmics.wait() を実行した際に既に value じゃない時に即時に返却されるのが "not-equal"です。

従って、サンプルコードでは Atomics.wait() の返却値を無視していますが、 "ok" の場合は Atomics.wait() を実行した時点でまだ generateRandom() が終了していないことを意味し、"not-equal" の場合は Atomics.wait() を実行した時点で generateRandom() が終了していることを意味します。しがたって、 "ok" だろうが "not-equal" だろうが generateRandom() が終了しているので timeout を指定しない限り判定は不要です。

4. 機能を追加しようか悩むところ

  • 中断が用意されていないので作ろうかな?(→実装して GitHub で公開済み)
  • ワーカースレッドからプログレス的な通知って要るかな?(→ TypedWorker<> が複雑になるのでやめよう)
  • 仮に通知機能を追加するのであれば、その通知を型安全にしたい。この場合、通信クラス的なものを作ってワーカースレッド側でそのオブジェクトを生成するソースコードを展開する必要があるが、これまでの検証で技術的には問題なさそう。(→ TypedWorker<> が複雑になるのでやめよう)

公開しました

あまりに簡単なコードなので公開するか悩んだのですが、取り敢えず、業務で使い始めたのでライブラリ化しようと思い GitHubnpm で公開しました。

  1. postMessage() の引数としては渡せますが、ワーカースレッド内でオブジェクトのインスタンス関数は undefined となり、呼び出すと例外が発生します。

  2. 理論上可能だけど、今回の TypedWorker<> クラスでは axios のソースコードを全て文字列展開するレベルなので現実的ではないという含みです。

  3. メインスレッドは大切なCPUリソースなので、これを消費するようなコードは余程の理由があったとしても書いてはいけない。

  4. C++std::condition_variable::wait() で実装しているんかなぁ~と想像しつつ腹落ちしました。

10
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?