0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Node.js の Stream を使いこなす ─ pipeline・Transform・バックプレッシャー実践ガイド

0
Posted at

Node.js の Stream を使いこなす ─ pipeline・Transform・バックプレッシャー実践ガイド

Node.js で大きなファイルを読み込んだり、APIからデータをストリーミングで受け取ったりするとき、fs.readFileSyncawait response.json() で全部読み込んでいませんか。

データが小さければ問題ないですが、数百MBのCSVや、LLMのストリーミングレスポンスなど、「終わりが見えないデータ」を扱うとき、Stream を使うとメモリ効率が大幅に改善します。

この記事で学べること:

  • Stream の基本概念(Readable・Writable・Transform)
  • pipeline で安全にストリームをつなぐ
  • Transform Stream を使って流れながらデータを変換する
  • バックプレッシャーの仕組みと対処
  • LLM のストリーミングレスポンス処理の実践例

検証環境: Node.js 18+, TypeScript 5+


Stream の基本概念

Stream は「少しずつ流れてくるデータ」を扱う仕組みです。

Readable ──→ Transform ──→ Transform ──→ Writable
(読み取り)   (変換1)     (変換2)     (書き込み)
  • Readable: データを読み取る(ファイル・HTTP レスポンス・API レスポンス等)
  • Writable: データを書き込む(ファイル・HTTP レスポンス・標準出力等)
  • Transform: 読み取りながら変換して書き込む(gzip圧縮・JSON変換等)
  • Duplex: 読み取りと書き込みの両方(TCP ソケット等)

pipe より pipeline を使う

古いコードでは stream.pipe() が使われていますが、エラーハンドリングが不完全なので stream.pipeline() を使うのが現在の推奨です。

import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";

// ファイルを読み取り → gzip 圧縮 → 別ファイルに書き出す
await pipeline(
  createReadStream("large-file.txt"),
  createGzip(),
  createWriteStream("large-file.txt.gz")
);
// 全処理が完了するまで await で待てる
// エラーは例外として throw される

Readable Stream の基本

import { Readable } from "node:stream";

// 配列からReadableを作る
const readable = Readable.from(["hello", " ", "world"]);

// for await...of でイテレート(推奨)
for await (const chunk of readable) {
  process.stdout.write(chunk);
}
// hello world

// ファイルから読む
import { createReadStream } from "node:fs";

const fileStream = createReadStream("data.csv", { encoding: "utf8" });
for await (const chunk of fileStream) {
  console.log("chunk received:", chunk.length, "chars");
}

Transform Stream で流れながら変換

Transform は「受け取ったデータを変換して流す」ストリームです。

行分割 Transform の実装

import { Transform } from "node:stream";

class LineSplitter extends Transform {
  private buffer = "";

  _transform(chunk: Buffer | string, _encoding: string, callback: Function) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split("\n");

    // 最後の要素は次のチャンクへ持ち越し(改行で終わっていないかもしれない)
    this.buffer = lines.pop() ?? "";

    for (const line of lines) {
      if (line.trim()) {
        this.push(line + "\n");
      }
    }
    callback();
  }

  _flush(callback: Function) {
    // ストリーム終端でバッファに残っているデータを流す
    if (this.buffer.trim()) {
      this.push(this.buffer);
    }
    callback();
  }
}

// 使い方
import { pipeline } from "node:stream/promises";
import { createReadStream } from "node:fs";

const lineSplitter = new LineSplitter();
let lineCount = 0;

lineSplitter.on("data", (line: Buffer) => {
  lineCount++;
  // 行ごとに処理
});

await pipeline(
  createReadStream("large-file.csv"),
  lineSplitter
);

console.log(`総行数: ${lineCount}`);

Transform.from() で簡単に作る(Node.js 18+)

import { Transform } from "node:stream";

// async generator 関数から Transform を作る
const upperCaseTransform = new Transform({
  transform(chunk, _encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

// または Readable.from + pipeline で書く
async function* processLines(source: AsyncIterable<string>) {
  for await (const chunk of source) {
    const lines = chunk.split("\n");
    for (const line of lines) {
      if (line.trim()) {
        yield line.toUpperCase() + "\n";
      }
    }
  }
}

実践: 大きな CSV ファイルを処理する

fs.readFileSync で一気に読むとメモリが足りなくなるケースで Stream を使う例。

import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { Transform } from "node:stream";
import { stringify } from "node:querystring";

class CsvRowProcessor extends Transform {
  private buffer = "";
  private isFirstLine = true;
  private headers: string[] = [];
  private processedCount = 0;

  constructor() {
    super({ objectMode: false });
  }

  _transform(chunk: Buffer, _encoding: string, callback: Function) {
    this.buffer += chunk.toString("utf8");
    const lines = this.buffer.split("\n");
    this.buffer = lines.pop() ?? "";

    for (const line of lines) {
      if (!line.trim()) continue;

      if (this.isFirstLine) {
        this.headers = line.split(",").map(h => h.trim());
        this.push(line + "\n"); // ヘッダー行はそのまま流す
        this.isFirstLine = false;
        continue;
      }

      const values = line.split(",");
      const row = Object.fromEntries(
        this.headers.map((h, i) => [h, values[i]?.trim() ?? ""])
      );

      // 加工: price を10%増しにする
      if (row.price) {
        row.price = String(Math.round(parseFloat(row.price) * 1.1));
      }

      this.push(Object.values(row).join(",") + "\n");
      this.processedCount++;
    }
    callback();
  }

  _flush(callback: Function) {
    if (this.buffer.trim()) {
      this.push(this.buffer);
      this.processedCount++;
    }
    console.log(`処理完了: ${this.processedCount}行`);
    callback();
  }
}

await pipeline(
  createReadStream("products.csv"),
  new CsvRowProcessor(),
  createWriteStream("products-updated.csv")
);

実践: LLM ストリーミングレスポンスを処理する

OpenAI / Anthropic の streaming API を Stream として処理する例。

import { Readable } from "node:stream";

async function* streamLLMResponse(
  prompt: string
): AsyncGenerator<string> {
  const response = await fetch("https://openrouter.ai/api/v1/chat/completions", {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${process.env.OPENROUTER_API_KEY}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      model: "anthropic/claude-3-haiku",
      stream: true,
      messages: [{ role: "user", content: prompt }],
    }),
  });

  if (!response.body) throw new Error("No response body");

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split("\n");

    for (const line of lines) {
      if (!line.startsWith("data: ")) continue;
      const data = line.slice(6).trim();
      if (data === "[DONE]") return;

      try {
        const parsed = JSON.parse(data);
        const content = parsed.choices?.[0]?.delta?.content;
        if (content) yield content;
      } catch {
        // JSON parse エラーは無視
      }
    }
  }
}

// 使い方: ストリーミングで表示
process.stdout.write("回答: ");
for await (const chunk of streamLLMResponse("Pythonの特徴を3点で")) {
  process.stdout.write(chunk);
}
process.stdout.write("\n");

バックプレッシャーの仕組み

バックプレッシャーは「下流が処理しきれない場合に上流を遅らせる」仕組みです。

import { Transform } from "node:stream";

class SlowTransform extends Transform {
  _transform(chunk: Buffer, _encoding: string, callback: Function) {
    // 重い処理を模擬(実際はAPI呼び出しなど)
    setTimeout(() => {
      this.push(chunk);
      callback(); // callbackを呼ぶまで次のchunkが来ない
    }, 100);
  }
}

callback() を呼んだタイミングで「次のデータを受け取れます」というシグナルが上流に伝わります。pipeline を使っていれば、バックプレッシャーは自動的に処理されます。

Writable.write() の戻り値を確認する(手動管理が必要な場合)

import { Writable } from "node:stream";

const writable = createWriteStream("output.txt");

async function writeWithBackpressure(data: string[]) {
  for (const item of data) {
    const canContinue = writable.write(item);
    if (!canContinue) {
      // バッファが満杯 → drain イベントまで待つ
      await new Promise(resolve => writable.once("drain", resolve));
    }
  }
  writable.end();
}

pipeline を使えばこの管理が不要になるので、可能な限り pipeline を使うのが推奨です。


よくある落とし穴

エラーハンドリングを忘れる

// ❌ pipe はエラーを適切に伝播しない
readable.pipe(transform).pipe(writable);
// readable でエラーが起きても writable は閉じられない

// ✅ pipeline を使う
await pipeline(readable, transform, writable);
// どこかでエラーが起きても全ストリームが適切にクリーンアップされる

objectMode の扱い

// 文字列・バッファ以外のオブジェクトを流す場合は objectMode: true が必要
const objectStream = new Transform({
  objectMode: true, // ← これが必要
  transform(obj, _encoding, callback) {
    callback(null, { ...obj, processed: true });
  },
});

まとめ

Node.js の Stream を使うポイントをまとめると:

  • pipeline を使う(pipe は使わない)→ エラーハンドリングが安全
  • for await...of でイテレート → シンプルで読みやすい
  • Transform でデータを流しながら変換する
  • バックプレッシャー は pipeline が自動管理してくれる
  • 大きなデータは一括読み込みではなく Stream で処理する

LLM のストリーミングレスポンスや大容量ファイル処理など、「全部読んでから処理」が難しい場面で Stream を使うと、メモリ使用量を抑えながらレスポンスよく処理できます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?