Node.js の Stream を使いこなす ─ pipeline・Transform・バックプレッシャー実践ガイド
Node.js で大きなファイルを読み込んだり、APIからデータをストリーミングで受け取ったりするとき、fs.readFileSync や await response.json() で全部読み込んでいませんか。
データが小さければ問題ないですが、数百MBのCSVや、LLMのストリーミングレスポンスなど、「終わりが見えないデータ」を扱うとき、Stream を使うとメモリ効率が大幅に改善します。
この記事で学べること:
- Stream の基本概念(Readable・Writable・Transform)
-
pipelineで安全にストリームをつなぐ - Transform Stream を使って流れながらデータを変換する
- バックプレッシャーの仕組みと対処
- LLM のストリーミングレスポンス処理の実践例
検証環境: Node.js 18+, TypeScript 5+
Stream の基本概念
Stream は「少しずつ流れてくるデータ」を扱う仕組みです。
Readable ──→ Transform ──→ Transform ──→ Writable
(読み取り) (変換1) (変換2) (書き込み)
- Readable: データを読み取る(ファイル・HTTP レスポンス・API レスポンス等)
- Writable: データを書き込む(ファイル・HTTP レスポンス・標準出力等)
- Transform: 読み取りながら変換して書き込む(gzip圧縮・JSON変換等)
- Duplex: 読み取りと書き込みの両方(TCP ソケット等)
pipe より pipeline を使う
古いコードでは stream.pipe() が使われていますが、エラーハンドリングが不完全なので stream.pipeline() を使うのが現在の推奨です。
import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
// ファイルを読み取り → gzip 圧縮 → 別ファイルに書き出す
await pipeline(
createReadStream("large-file.txt"),
createGzip(),
createWriteStream("large-file.txt.gz")
);
// 全処理が完了するまで await で待てる
// エラーは例外として throw される
Readable Stream の基本
import { Readable } from "node:stream";
// 配列からReadableを作る
const readable = Readable.from(["hello", " ", "world"]);
// for await...of でイテレート(推奨)
for await (const chunk of readable) {
process.stdout.write(chunk);
}
// hello world
// ファイルから読む
import { createReadStream } from "node:fs";
const fileStream = createReadStream("data.csv", { encoding: "utf8" });
for await (const chunk of fileStream) {
console.log("chunk received:", chunk.length, "chars");
}
Transform Stream で流れながら変換
Transform は「受け取ったデータを変換して流す」ストリームです。
行分割 Transform の実装
import { Transform } from "node:stream";
class LineSplitter extends Transform {
private buffer = "";
_transform(chunk: Buffer | string, _encoding: string, callback: Function) {
this.buffer += chunk.toString();
const lines = this.buffer.split("\n");
// 最後の要素は次のチャンクへ持ち越し(改行で終わっていないかもしれない)
this.buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.trim()) {
this.push(line + "\n");
}
}
callback();
}
_flush(callback: Function) {
// ストリーム終端でバッファに残っているデータを流す
if (this.buffer.trim()) {
this.push(this.buffer);
}
callback();
}
}
// 使い方
import { pipeline } from "node:stream/promises";
import { createReadStream } from "node:fs";
const lineSplitter = new LineSplitter();
let lineCount = 0;
lineSplitter.on("data", (line: Buffer) => {
lineCount++;
// 行ごとに処理
});
await pipeline(
createReadStream("large-file.csv"),
lineSplitter
);
console.log(`総行数: ${lineCount}`);
Transform.from() で簡単に作る(Node.js 18+)
import { Transform } from "node:stream";
// async generator 関数から Transform を作る
const upperCaseTransform = new Transform({
transform(chunk, _encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
});
// または Readable.from + pipeline で書く
async function* processLines(source: AsyncIterable<string>) {
for await (const chunk of source) {
const lines = chunk.split("\n");
for (const line of lines) {
if (line.trim()) {
yield line.toUpperCase() + "\n";
}
}
}
}
実践: 大きな CSV ファイルを処理する
fs.readFileSync で一気に読むとメモリが足りなくなるケースで Stream を使う例。
import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { Transform } from "node:stream";
import { stringify } from "node:querystring";
class CsvRowProcessor extends Transform {
private buffer = "";
private isFirstLine = true;
private headers: string[] = [];
private processedCount = 0;
constructor() {
super({ objectMode: false });
}
_transform(chunk: Buffer, _encoding: string, callback: Function) {
this.buffer += chunk.toString("utf8");
const lines = this.buffer.split("\n");
this.buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.trim()) continue;
if (this.isFirstLine) {
this.headers = line.split(",").map(h => h.trim());
this.push(line + "\n"); // ヘッダー行はそのまま流す
this.isFirstLine = false;
continue;
}
const values = line.split(",");
const row = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]?.trim() ?? ""])
);
// 加工: price を10%増しにする
if (row.price) {
row.price = String(Math.round(parseFloat(row.price) * 1.1));
}
this.push(Object.values(row).join(",") + "\n");
this.processedCount++;
}
callback();
}
_flush(callback: Function) {
if (this.buffer.trim()) {
this.push(this.buffer);
this.processedCount++;
}
console.log(`処理完了: ${this.processedCount}行`);
callback();
}
}
await pipeline(
createReadStream("products.csv"),
new CsvRowProcessor(),
createWriteStream("products-updated.csv")
);
実践: LLM ストリーミングレスポンスを処理する
OpenAI / Anthropic の streaming API を Stream として処理する例。
import { Readable } from "node:stream";
async function* streamLLMResponse(
prompt: string
): AsyncGenerator<string> {
const response = await fetch("https://openrouter.ai/api/v1/chat/completions", {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.OPENROUTER_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "anthropic/claude-3-haiku",
stream: true,
messages: [{ role: "user", content: prompt }],
}),
});
if (!response.body) throw new Error("No response body");
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n");
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") return;
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) yield content;
} catch {
// JSON parse エラーは無視
}
}
}
}
// 使い方: ストリーミングで表示
process.stdout.write("回答: ");
for await (const chunk of streamLLMResponse("Pythonの特徴を3点で")) {
process.stdout.write(chunk);
}
process.stdout.write("\n");
バックプレッシャーの仕組み
バックプレッシャーは「下流が処理しきれない場合に上流を遅らせる」仕組みです。
import { Transform } from "node:stream";
class SlowTransform extends Transform {
_transform(chunk: Buffer, _encoding: string, callback: Function) {
// 重い処理を模擬(実際はAPI呼び出しなど)
setTimeout(() => {
this.push(chunk);
callback(); // callbackを呼ぶまで次のchunkが来ない
}, 100);
}
}
callback() を呼んだタイミングで「次のデータを受け取れます」というシグナルが上流に伝わります。pipeline を使っていれば、バックプレッシャーは自動的に処理されます。
Writable.write() の戻り値を確認する(手動管理が必要な場合)
import { Writable } from "node:stream";
const writable = createWriteStream("output.txt");
async function writeWithBackpressure(data: string[]) {
for (const item of data) {
const canContinue = writable.write(item);
if (!canContinue) {
// バッファが満杯 → drain イベントまで待つ
await new Promise(resolve => writable.once("drain", resolve));
}
}
writable.end();
}
pipeline を使えばこの管理が不要になるので、可能な限り pipeline を使うのが推奨です。
よくある落とし穴
エラーハンドリングを忘れる
// ❌ pipe はエラーを適切に伝播しない
readable.pipe(transform).pipe(writable);
// readable でエラーが起きても writable は閉じられない
// ✅ pipeline を使う
await pipeline(readable, transform, writable);
// どこかでエラーが起きても全ストリームが適切にクリーンアップされる
objectMode の扱い
// 文字列・バッファ以外のオブジェクトを流す場合は objectMode: true が必要
const objectStream = new Transform({
objectMode: true, // ← これが必要
transform(obj, _encoding, callback) {
callback(null, { ...obj, processed: true });
},
});
まとめ
Node.js の Stream を使うポイントをまとめると:
- pipeline を使う(pipe は使わない)→ エラーハンドリングが安全
- for await...of でイテレート → シンプルで読みやすい
- Transform でデータを流しながら変換する
- バックプレッシャー は pipeline が自動管理してくれる
- 大きなデータは一括読み込みではなく Stream で処理する
LLM のストリーミングレスポンスや大容量ファイル処理など、「全部読んでから処理」が難しい場面で Stream を使うと、メモリ使用量を抑えながらレスポンスよく処理できます。