JavaScript をお使いの皆様、 Streams API はお使いでしょうか。直接使う機会はフロントでは特に少ないと思われるこの API に今回は焦点を当てていきたいと思います。この記事では Streams API の使い方を確認する他、言語処理系を Streams API の上に乗せる際に面白かった部分を紹介します。
Streams API の概要
Streams API ではデータのストリーム (小さなサイズの連続したデータの流れ) を扱うことで、メモリ効率よくデータの加工を行うことができます。例えばブラウザで fetch
したファイルを利用者のコンピュータに保存する機能を考えると、ストリームを使うことでメモリ上にファイルの内容すべてを乗せる必要がなくなります。
またストリームにはバッファが確保されるだけでなく背圧の概念があり、読み取りや書き込みの速度に応じてストリームに流れるデータの速度が調整されます。
Streams API で利用可能なストリームのインターフェースには ReadableStream
・TransformStream
・WritableStream
の 3 種類があり、それぞれデータの生成・加工・消費を担当します。これらのインターフェースは Iterator
に対するインターフェースと似ており、次の表のように対応します。
役割 | Streams API | Iterators |
---|---|---|
生成 | ReadableStream |
Array.values , Object.entries など |
加工 | TransformStream |
Iterator.map , Iterator.filter など |
消費 | WritableStream |
Iterator.toArray , Array.from , スプレッド構文 など |
Iterator
の例で上げたメソッド (Iterator Helpers) は 2024 年 11 月時点で最近 Stage 4 になった仕様で、Safari では使用できません。
(今まで Array にしか実装されていなかったので喜んだ人も多いはず......)
ファイルのエンコーディングを変換する例
Streams API の一般的な使用例としてファイルのエンコーディングを変換する例を示します。次のコードは Deno で file1.txt
の内容を Shift_JIS として読み込み file2.txt
に UTF-8 として書き出すものです。
(await Deno.open("file1.txt")).readable // ファイル -> Uint8Array
.pipeThrough(new TextDecoderStream("shift_jis")) // Uint8Array -> string
.pipeThrough(new TextEncoderStream()) // string -> Uint8Array
.pipeTo((await Deno.create("file2.txt")).writable); // Uint8Array -> ファイル
Deno.open(...)
や Deno.create(...)
が返す Promise
の値はファイルを表しており、readable
, writable
プロパティでファイルへ読み書きを行う ReadableStream
と WritableStream
を取得することができます。
例を簡潔にするために Deno を用いましたが、実際のブラウザでは fetch
の結果を TextDecoderStream
に通し、FileSystemWritableFileStream
に書き込むなどのユースケースが考えられます。(参考: 【JavaScript】Streams API を使って fetch の逐次ダウンロード & ファイル保存 – webfrontend.ninja)
ストリームを作る
ストリームはコンストラクタにオブジェクトを与える形で生成することができます。ReadableStream
・WritableStream
・TransformStream
のいずれもインスタンス化時に関数を与える方法と継承してコンストラクタ内で関数を与える方法で定義することができます。以下の例は無限にランダムな数値を出力する ReadableStream
と、今までに受け取った文字列の総和を出力する TransformStream
です。
// ReadableStream コンストラクタを用いて生成する
const randomNumberStream = new ReadableStream({
pull(controller) {
// ストリームには文字列以外も流せる
return controller.enqueue(Math.random());
}
});
// TransformStream を継承して生成する
class ConcatStringStream extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
// this.str に今までの文字列を結合した値が入っている
this.str += chunk;
controller.enqueue(this.str);
}
});
this.str = "";
}
}
構造化ログを読み取る例
構造化ログを扱う例を考えてみます。入力として次のような改行ごとに JSON データが流れてくることを想定します。これを行ごとにパースして利用することを考えます。
{ "type": "login", "time": 1733277105445, "user_id": 4874 }
{ "type": "login", "time": 1733277105624, "user_id": 7015 }
{ "type": "login", "time": 1733277106026, "user_id": 3399 }
{ "type": "favorite_post", "time": 1733277106935, "user_id": 7911, post_id: 9552 }
{ "type": "get_post", "time": 1733277107355, "user_id": 5830, post_id: 6381 }
{ "type": "get_post", "time": 1733277107947, "user_id": 8825, post_id: 2737 }
{ "type": "login", "time": 1733277108595, "user_id": 7789 }
ストリームを次のように作ることを想定します。標準入力はどの位置で分割されるかわからない文字列のストリームのためこれを改行ごとに変換し、JSON パースしてログに出力するものです。
TextLineStream
は細切れの文字列を受け取ると改行ごとに分割するものです。このとき最後の改行以降の部分は次のチャンクの先頭に結合して用います。
Deno.stdin.readable // 標準入力
.pipeThrough(new TextDecoderStream()) // Uint8Array -> string に変換
.pipeThrough(new TextLineStream()) // string -> 行ごとの string
.pipeThrough(new JsonParseStream()) // string (JSON) -> object
.pipeTo(new ConsoleLogWriter("Log:")); // console.log("Log:", object)
// 文字列を受け取り、行ごとの文字列 (0 個以上) を出力する
class TextLineStream extends TransformStream {
constructor() {
super({
start() {
this.chunk = "";
},
transform(chunk, controller) {
// this.chunk には最後の改行以降の部分が入る
this.chunk += chunk;
const split = this.chunk.split("\n");
for (const line of split.slice(0, -1)) {
controller.enqueue(line);
}
this.chunk = split[split.length - 1];
},
flush(controller) {
// ストリームが閉じた際に残っているチャンクを出力する
if(this.chunk !== "") {
controller.enqueue(this.chunk);
}
},
});
}
}
// 文字列を受け取り、それを JSON.parse したものを出力する
class JsonParseStream extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
controller.enqueue(JSON.parse(chunk));
},
});
}
}
// 任意の値を受け取り、コンソールに出力する
class ConsoleLogWriter extends WritableStream {
constructor(...prefix) {
super({
write(data) {
console.log(...prefix, data);
},
});
}
}
Stream の正体
前述の通り Stream は任意のデータを流すことができました。紹介しきれませんでしたがデータの生成・加工は非同期に行うことができ (終了するまで次の pull
や transform
などは呼ばれません)、返す値は 0 個以上の好きな数だけ返すことができます。
ストリームは AsyncIterator
をラップしたものと考えることができ、AsyncIterator
が非同期に返す連続なデータをバッファし、速度をコントロールをした上で次のストリームに流してくれます。例えば次のようにして AsyncIterator
を ReadableStream
に変化することができます。また、WritableStream
は AsyncIterator
を実装しています。
new ReadableStream({
async pull(controller) {
const next = await someAsyncIterator.next();
if(next.done) {
controller.close();
return;
}
controller.enqueue(next.value);
}
});
2024/12/04 時点では実験的機能ですが、例示の実装をしなくても ReadableStream.from
を用いることで AsyncIterator
を ReadableStream
に変換することが可能です。
言語処理系を作る
ストリームが任意のデータを好きな数だけ非同期に流すことができるという強力なものだということはお分かりいただけたと思います。このような特性は一列に並んだデータの一部のみを見るようなユースケースに非常に合致していると言えます。つまりオートマトンと形式言語に理論を持つ (形式) 言語処理も当然相性がいいと考えられます。そこで言語処理系の API を Streams API に乗せていくという挑戦をしてみました。
最終的に出来上がる処理系は次のように実行されます。
await (await Deno.open(Deno.args[0])).readable // ファイル入力
.pipeThrough(new TextDecoderStream()) // Uint8Array -> string
.pipeThrough(new TokenizerStream()) // string -> Token
.pipeThrough(parserStream()) // Token -> Node
.pipeThrough(new SelectorStream(Deno.args[1])) // Node -> Node (フィルタ)
.pipeTo(consoleLogWriter()); // console.log(Node)
成果物は次のレポジトリにて公開しています。
言語処理系の概要
今回は言語処理系の中でもトークナイザーとパーサーに焦点を当てます。トークナイザは文字列を扱いやすいある一定の単位に分割するもので、パーサーはトークン列を解析してコードの構造を表すデータ (一般的に木構造) を構成するものです。例えば以下のような JSON 文字列を考えます。
{ "name": "John", "age": 24, "web_site": null, "tags": ["engineer", "javascript-lover"], "online": true }
今回作成するトークナイザーでは、この文字列は次のようなトークン列に分割されます。
{
"name"
:
"John"
,
"age"
:
24
,
"web_site"
:
null
,
"tags"
:
[
"engineer"
,
"javascript-lover"
]
,
"online"
:
true
}
さらにパーサーでこのトークン列を解析して次のような AST (Abstract Syntax Tree; 抽象構文木) が作成されます。AST はコードの構造を表しており、特に JSON の場合はオブジェクトの構造そのものを表していることがわかると思います。
トークナイザーを作る
トークナイザーは文字列をトークン列に変換するものです。JSON の構文は RFCV 7159 に定義されています。今回は [
, ]
, {
, }
, :
, ,
, null
, 論理値, 文字列値, 数値 をトークン (終端記号) としました。トークナイザーは文字列を先頭から順に処理していきます。文字列の先頭をあるトークンとして解釈し、取り除く操作 (消費) を繰り返すことにより文字列を処理していきます。
const numberRegExp = /^-?(0|[1-9]\d*)(\.\d+)?([eE][+\-]?\d+)?/;
if(str[0] === '[') {
// チャンクを `begin-array` として消費
str = str.slice(1);
return { type: 'begin-array' };
} else if(str[0] === ']') {
// チャンクを `end-array` として消費
} else if(str[0] === '{') {
// チャンクを `begin-object` として消費
} else if(str[0] === '}') {
// チャンクを `end-object` として消費
} else if(str[0] === ':') {
// チャンクを `name-saparator` として消費
} else if(str[0] === '.') {
// チャンクを `value-separator` として消費
} else if(str.startsWith('null')){
// チャンクを `null` として消費
} else if(str.startsWith('true')){
// チャンクを論理値として消費
} else if(str.startsWith('false')){
// チャンクを論理値として消費
} else if (str[0] === '"') {
// 次の " までを文字列リテラルとして消費
// ただしエスケープされている場合 (\") は無視する
} else if(numberRegExp.test(str) {
// 正規表現に一致する範囲を数値として消費
} else {
throw Error("不明な文字列");
}
ストリームでこの処理を行うには、文字列がチャンクごとに与えられることにより困難が発生します。例えば現在指定処理列が tr
で終わる場合、次のチャンクは ue
から始まり true
にマッチする可能性があります (もし違う文字列であればエラーにする必要があります)。
この問題に対処するため、まずは TextLineStream
の例と同様に処理しきれなかった文字列を次のチャンクの先頭に結合する処理を追加します。
let chunk: string = '';
const transform = function (currentChunk, controller) {
chunk += currentChunk;
// ...以降の処理
};
以降の処理では上記のトークナイザーの処理に加え、処理している文字列がトークンの先頭の一部にマッチするかを判定します。例えば null
を考えた時、チャンクが n
, nu
, nul
のいずれか場合はエラーを出さずに次のチャンクを待つ必要があります。これは "null".startsWith(chunk)
という条件で判定することができます。次のコードに示すようなイメージです。
if(chunk.startsWith('null')){
// null として消費する
} else if('null'.startsWith(chunk)) {
// null トークンの途中のため、次のチャンクの処理に回す
} else {
throw Error("不明な文字列");
}
文字列の場合は 2 つ目の "
が見つからなかったら文字列トークンの途中と判定することができます。
if (chunk[0] === '"') {
// 次の " の場所 (ただし \" を無視する)
const endIndex = chunk.slice(1).match(/(?<!\\)"/)?.index;
if (typeof endIndex === 'number') {
// 文字列として消費する
} else {
// null トークンの途中のため、次のチャンクの処理に回す
}
}
次に正規表現でマッチングしている数値の場合を考えます。正規表現の途中までにマッチする正規表現は一般的に困難ですが、数値の場合は数値の途中までがそのまま数値として解釈できるため簡単になっています。チャンク全体が数値にマッチする場合は次のチャンクでの処理に回すだけでよいです。
const numberRegexp = /^-?(0|[1-9]\d*)(\.\d+)?([eE][+\-]?\d+)?/;
if(match?.[0]) {
if(match[0].length === chunk.length) {
// 数値トークンの途中のため、次のチャンクの処理に回す
}
// 数値として消費する
}
これまでの処理をまとめると、ストリームを用いた tokenizer の処理は次のようになります。
- チャンクを前のチャンクの残りと結合する
- チャンクの先頭がトークンとして消費できなくなるまで以下の処理を繰り返す
- チャンクの先頭が決まった文字列 (
[
,{
,null
など) ならトークンとして消費する - チャンクの先頭が決まった文字列の一部なら次のチャンクの処理に持ち越す
- チャンクの先頭が
"
なら- もう 1 つの
"
があるなら文字列トークンとして消費する - もう 1 つの
"
がないなら次のチャンクの処理に持ち越す
- もう 1 つの
- チャンク全体が数値の途中までに一致するなら次のチャンクの処理に持ち越す
- チャンクの先頭が数値に一致するなら数値トークンとして消費する
- 不明な文字列としてエラーを出す
- チャンクの先頭が決まった文字列 (
エラー判定を最後まで延期する方法の検討
もっとシンプルな方法として、エラー判定を最後まで延期する方法が考えられます。この方法ではトークナイザーを一般的な場合と同様に構成することができますが、ストリームが閉じられるまで判定が延期され、メモリ使用量も増えるという欠点があります。パーサーを作る
パーサーの構成方法にはいくつもの種類がありますが、再帰下降法と LR 法はその代表的な例です。再帰下降法はトークン列を AST のノードに展開する関数を再帰的に呼び出すことで AST を構成するものです。一方で LR 法は構文を受理するプッシュダウンオートマトンを構成し、状態遷移時にノードを出力するものです。トークン列が流れてくるという性質上 LR 法がより適していると考えられますが、構成方法が難しいため再起下降パーサを用いることにします。
LR パーサーはトークンの push によって動作するのに対し再起下降パーサは pull によって動作します。パーサーを (AsyncInterator) -> AsyncIterable
として作成し、push 型に変換することで TransformStream
への変換ができるようになります。
パーサーを実装する関数 (parser: (AsyncIterator) => AsyncIterable
) を TransformStream
に変換するには、TransformStream
への入力 (transform
関数の呼び出し) を parser
に渡し、parser
の出力を controller.enqueue
に渡す必要があります。しかしこの処理は次に挙げる難しい点があります。
-
parser
に渡すAsyncIterator
を予め作成し、transform
が呼び出されたら値を供給しなければならない- 愚直に実装するなら
AsyncIterator
から返すPromise
をtransform
呼び出し時に解決する必要がある
- 愚直に実装するなら
-
parser
の出力は非同期で、入力した値を消費しきったかがわからない- そのため
transform
の呼び出し内でparser
の出力をすべて待つ事ができない -
transform
に渡されるcontroller
のライフタイムに注意しなければならない-
transform
(async 関数) が解決するまでの間しか使えない
-
- そのため
前者の問題を解決するには「予め定義でき」「AsyncIterator
として消費できて」「あとから値を入れられる」ものが必要となります。どこかで聞いたことがありますね......。そう、ReadableStream
を利用するとこれらの問題をすべて解決できます。
const inputBufferStream = new TransformStream();
const inputWriter = inputBufferStream.writable.getWriter();
(async () => {
for await (const out of parser(inputBufferStream.readable.values())) {
// 出力値を処理する
}
})();
const transform = async (input, controller) => {
await inputWriter.write(input);
};
後者の問題は単純に出力をキューにためておき transform
や flush
が呼ばれたときに controller.enqueue
することで解決できます。また parser
が値を返しきったかを知ることができれば正常にストリームを閉じることができます。これらの処理を前者の処理に加えると次のようになります。
const inputBufferStream = new TransformStream();
const inputWriter = inputBufferStream.writable.getWriter();
// parser が値を返しきったら解決する Promise
let endPromiseResolve: (() => void) | undefined;
const endPromise = new Promise<void>((resolve) => {
endPromiseResolve = resolve;
});
// 出力値をためておくキュー
const queue = [];
(async () => {
for await (const out of parser(inputBufferStream.readable.values())) {
queue.push(out);
}
endPromiseResolve();
})();
const transform = async (input, controller) => {
// キューが空になるまで出力する
while (queue.length > 0) {
controller.enqueue(queue.shift());
}
await inputWriter.write(input);
};
const flush = async (controller) => {
while (queue.length > 0) {
controller.enqueue(queue.shift());
}
// parser がすべての結果を返すまで待つ
await endPromise;
inputWriter.close();
};
セレクターを作る
ここまでで文字列のノード列に変換するストリームを作ることができました。今回は jq にあるようなセレクターを実装してみます。といっても実装は単純で、ノード列のうち条件にマッチするもののみを抽出するだけです。ノードにパス (JSON オブジェクトのルート) の情報をもたせ、パスに対してマッチさせます。
// パス: (string | number)[]
const pathMatch = function (path, filterPath) {
const length = Math.max(path.length, filterPath.length);
for (let i = 0; i < length; i++) {
// * なら無条件にマッチする
if (filterPath[i] === '*') continue;
// ** ならそれ以下のパスセグメントは無視する
if (filterPath[i] === '**') return true;
// それ以外の場合セグメントを比較する
if (path[i] !== filterPath[i]) return false;
}
return true;
};
const transformOptions = (selector) => {
// string -> (string | number)[]
const filterPath = parseSelector(selector);
const transform = (node, controller) => {
// フィルターする
if (pathMatch(node.path, filterPath)) {
controller.enqueue(node);
}
};
return { transform };
};
使ってみる
以上ですべてのパーツが揃いました。次のようにして JSON ファイルを解析してみます。解析対象には GitHub の API から取得した Qiita Inc. のレポジトリ一覧を使用します。セレクターを *.name
としてすべてのレポジトリの名前を取得してみます。
const consoleLogWriter = () => {
return new WritableStream({
write(value) {
console.log(`.${value.path.join('.')}`, value.token);
},
});
};
await (await Deno.open(Deno.args[0])).readable
.pipeThrough(new TextDecoderStream())
// .pipeThrough(splitToCharactersStream())
.pipeThrough(new TokenizerStream())
.pipeThrough(parserStream())
.pipeThrough(new SelectorStream(Deno.args[1]))
.pipeTo(consoleLogWriter());
$ curl -sSL https://api.github.com/orgs/increments/repos > gh.json
$ deno run --allow-read main.ts gh.json '.*.name'
次のように出力されました。パブリックレポジトリが列挙できていることがわかりますね。
.0.name { kind: "string-literal", value: "qiita-rb" }
.0.name { kind: "string-literal", value: "active_admin" }
.0.name { kind: "string-literal", value: "Qiita-Team-Templates" }
.0.name { kind: "string-literal", value: "i18n-js" }
.0.name { kind: "string-literal", value: "bootstrap3-sass" }
.0.name { kind: "string-literal", value: "homebrew-cask" }
.0.name { kind: "string-literal", value: "vagrant" }
.0.name { kind: "string-literal", value: "qiita-markdown" }
.0.name { kind: "string-literal", value: "tasklist.js" }
.0.name { kind: "string-literal", value: "emoji-plist" }
.0.name { kind: "string-literal", value: "qiitan-rb" }
.0.name { kind: "string-literal", value: "qiita-js" }
.0.name { kind: "string-literal", value: "es-query-builder" }
.0.name { kind: "string-literal", value: "rubocop" }
.0.name { kind: "string-literal", value: "wicked_pdf" }
.0.name { kind: "string-literal", value: "compass-core" }
.0.name { kind: "string-literal", value: "shoulda-callback-matchers" }
.0.name { kind: "string-literal", value: "greenmat" }
.0.name { kind: "string-literal", value: "ruby-test-reporter" }
.0.name { kind: "string-literal", value: "increments-schedule" }
.0.name { kind: "string-literal", value: "tag-data-parser" }
.0.name { kind: "string-literal", value: "slacken" }
.0.name { kind: "string-literal", value: "jquery-highlighttextarea" }
.0.name { kind: "string-literal", value: "isomorphic-fetch" }
.0.name { kind: "string-literal", value: "node-fetch" }
.0.name { kind: "string-literal", value: "qiita-coat" }
.0.name { kind: "string-literal", value: "qiita-team-services" }
.0.name { kind: "string-literal", value: "react_phoenix" }
.0.name { kind: "string-literal", value: "libxml-ruby" }
.0.name { kind: "string-literal", value: "isucon5q" }
総評
これまで StreamAPI で JSON パーサを作ってきました。個人的には実装としてはおもしろい一方で実用性は微妙だと感じました。特にトークナイザーの構成と非同期のイテレータを使うパーサの扱いが複雑であるためわざわざ Stream API に乗せるほどのメリットは感じませんでした。今回はセレクターを用いて JSON から値を抽出するということをしましたがその後また文字列に戻す処理を実装することでフォーマッターとして使うなど他にも面白いことができそうです。長い文章を読んでいただきありがとうございました。