Help us understand the problem. What is going on with this article?

Node.js Stream を使いこなす

More than 1 year has passed since last update.

Node.js Stream を使いこなす

Node.js には Stream というオブジェクトがあります。名前の通り、データストリームを扱うもので、Java や .NET にも同じようなクラスがあります。

Stream オブジェクトはデータをストリームとして扱いたいということに重宝します。

Stream の便利さ

例えば、ファイルの内容を別のファイルにコピーする場合、以下のようにファイルを全て読み込んでから全て書き込む方法があります。後述するように問題があるのですが、コードが非常に読みやすいというメリットがあります。

const text = fs.readFileSync('src.txt', 'utf8');
fs.writeFileSync('dest.txt', text);

この方法はファイルを読み込んだり書き込んだりしている間は処理がブロッキングされてしまいます。特にウェブアプリの場合、ブロッキングされている間は他のブラウザーからの応答をさばくことができません。

非ブロッキング I/O を使ってこのように書き換えます。

fs.readFile('src.txt', 'utf8', (err, data) => {
  fs.writeFile('dest.txt', data);
});

一気に読みにくくなりましたね! しかも、全てのデータを読み込んでから全てのデータを書き込むことには変わりはありませんので、2GB のテキストファイルの場合、メモリを 2GB 消費します。

そこでストリームの出番です。

const src = fs.createReadStream('src.txt', 'utf8');
const dest = fs.createWriteStream('dest.txt', 'utf8');
src.on('data', chunk => dest.write(chunk));
src.on('end', () => dest.end());

この例ではを一定量だけ読み込んでイベントを発生させています。大きなファイルでも全てを読み込むわけではないのでメモリに非常に優しいです。

ただ、ちょっと見にくいですね。pipe() を使うと一気に読みやすくなります。

const src = fs.createReadStream('src.txt', 'utf8');
const dest = fs.createWriteStream('dest.txt', 'utf8');
src.pipe(dest);

pipe() メソッドは読み取り可能なストリームと書き込み可能なストリームを関連付けるもので、後はよしなにデータを渡してくれます。

Node.js では、ここで例にあげたファイルだけでなく、いくつかのオブジェクトはこの Stream となっています。process.stdout もそうです。ファイルの内容を標準出力に出力するのもとても簡単です。

const src = fs.createReadStream('src.txt', 'utf8');
src.pipe(process.stdout);

Stream を使うとデータを流れるように扱えるのが分かります。

Stream の種類

一口に Stream と言っても四種類あります。

stream.Readable

読み取りだけができるストリームです。data イベントを利用してデータを読み込みます。

fs.createReadStream()child.stdout がこれにあたります。

const src = fs.createReadStream('src.txt' 'utf8');
src.on('data', chunk => process.stdout.write(chunk));

stream.Writable

書き込みだけができるストリームです。write() メソッド を使ってデータを書き込みます。

fs.createWriteStream()child.stdin がこれにあたります。

const dest = fs.createWriteStream('dest.txt');
dest.write('Hello!');

stream.Duplex

読み取りと書き込みの両方ができるストリームです。読み取るときは stream.Readable と同じで、書き込むときは stream.Writable と同じです。

TCP Socket がこれにあたります。

const client = net.connect(80, 'www.yahoo.co.jp');
client.pipe(process.stdout);
client.once('connect', () => client.write('GET / HTTP/1.0\r\n\r\n'));

stream.Transform

変換をするためのストリームです。書き込まれたデータを変換して出力します。

Crypt'zlib' がこれにあたります。

const gzip = zlib.createGzip();
const src = fs.createReadStream('src.txt');
const dest = fs.createWriteStream('src.txt.gz');
src.pipe(gzip).pipe(dest);

サンプルコードではひとつしか使ってませんが、変換をつなげて書くこともできます。

src
  .pipe(transformA)
  .pipe(transformB)
  .pipe(transformC)
  .pipe(dest);

ストリームの種類の判定

読み取り可能なストリームが全て stream.Readable を継承しているというわけではないようです。instanceof で読み取り可能なストリームかどうかを判定するのは難しいです。また、stream.Duplex でも process.stdout のように読み取りができないものもあります。

自分は readable もしくは writable プロパティの値が true かどうかで判断していますが、ドキュメントにその記述が見つからないので使っていいか微妙です。

さまざまな Stream

Stream をより便利に使うためにいろいろと紹介します。

fs.createReadStream/createWriteStream

ファイルの入出力を扱う fs ライブラリにはストリームでファイルをオープンするメソッドが用意されています。

細かい説明は不要ですよね!

TCP Socket

TCP Socket を扱う net モジュールもストリームが使えます。入出力の両方に対応しています。

以下のプログラムは Yahoo のサーバーに接続して、トップページの HTML を標準出力に出力しています。

const net = require('net');

const client = net.connect(80, 'www.yahoo.co.jp');
client.pipe(process.stdout);
client.once('connect', () => client.write('GET / HTTP/1.0\r\n\r\n'));

httphttps モジュールもストリームが使えます。

child_process の stdin/stdout/stderr

child_process は外部プロセスを扱うことができます。その標準入出力をストリームとして扱うことができます。

  • child.stdin - 外部プロセスの標準入力へ書き込むストリーム
  • child.stdout - 外部プロセスの標準出力から読み取るストリーム
  • child.stderr - 外部プロセスの標準エラー出力から読み取るストリーム

サンプルコードでは ls コマンドの結果をファイルに記録しています。

const ls = exec('ls');
const log = fs.createWriteStream('ls.txt', 'utf8');
ls.stdout.pipe(log);

process の stdin/stdout/stderr

process は自身のプロセスを表す Node.js のグローバルオブジェクトです。こちらの標準入出力もストリームとして扱うことができます。

ls コマンドを実行した結果をファイルではなく標準出力に出力します。

const ls = exec('ls');
ls.stdout.pipe(process.stdout);

child_process と違い、書き込みと読み取りが反対なことに注意してください。

Crypt/Zlib

暗号化を行う Crypto や圧縮を行う Zlib は変換用の stream.Transfrom として利用できます。

const gzip = zlib.createGzip();
const src = fs.createReadStream('src.txt');
const dest = fs.createWriteStream('src.txt.gz');
src.pipe(gzip).pipe(dest);

UNIX でパイプをよく使っている人は解ると思いますが、変換用のストリームはとても強力で、他の書き方に比べると非常に読みやすいです。

fs.createReadStream('src.txt')
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'pass'))
  .pipe(fs.createWriteStream('src.txt.gz.aes'));

memory-streams

memory-streams はメモリをストリームとして扱います。Node.js の標準モジュールには、ストリームの内容をすべてメモリ上に読み取るようなオブジェクトが用意されておりません。

ファイルに Hello! と書き込むサンプルコードです。

const streams = require('memory-streams');

const src = new streams.ReadableStream('Hello!');
const dest = fs.createWriteStream('hello.txt', 'utf8');
src.pipe(dest);

もちろん、ストリームから読み取ることもできます。

const streams = require('memory-streams');

const src = fs.createReadStream('hello.txt', 'utf8');
const dest = new streams.WritableStream();
src.pipe(dest);

// console.log(dest.toString()); <- ここに書いても何も出力されない
src.on('end', () => console.log(dest.toString()));

処理は非同期で行われるため、src.pipe(dest) の次の行ではまだストリーム処理が開始されていないことに注意してください。

duplex-child-process

標準モジュールの child_process に非常によく似ています。(残念ながら spawn しか使えませんが) [duplex-child-process]` は戻り値をそのままストリームとして扱うことができます。

child_process を使った場合はこのように書いていました。

const ls = spawn('ls');
const log = fs.createWriteStream('ls.txt', 'utf8');
ls.stdout.pipe(log);

duplex-child-process ではこう書くことができます。

const spawn = require('duplex-child-process');

const ls = spawn('ls');
const log = fs.createWriteStream('ls.txt', 'utf8');
// ls.stdout.pipe(log);
ls.pipe(log); // <- stdout が要らない

stdinstdout を結合したストリームとして扱えるため、場合によっては非常に便利です。ですが、残念ながら、stdoutstderr を結合したりはしてくれないようです。

pty.js

pty.js は表示を持たない仮想ターミナルです。詳しい使い方はドキュメントを読んでもらうとしますが、このターミナルはそのまま読み書き可能なストリームとして扱うことができます。

以下のサンプルコードでは、bash を起動し、ls コマンドと exit コマンドを実行しています。

const pty = require('pty.js');
const streams = require('memory-streams');

const term = pty('bash');
const ls = new streams.ReadableStream('ls\nexit\n');
term.pipe(process.stdout);
ls.pipe(term);

wsstream

Node.js では比較的簡単に WebSocket を扱うことができます。ws あたりが有名だと思います。残念ながら ws はストリームではないようです。

wsstreamws を読み書き可能なストリームに変換します。

サンプルコードではクライアントからの WebSocket で送信されたデータを標準出力に出力しています。

const WsStream = require('wsstream');

// Express.js の初期化
// ...

app.ws('/', (ws, req) => {
  const stream = WsStream(ws);
  stream.pipe(process.stdout);
});

おまけ

pty.js とブラウザー向けの仮想ターミナル xterm.js を組み合わせると、簡単にブラウザーで動くターミナルが作れます。

サーバー側はこんな感じ。

const pty.js = require('pty.js');
const WsStream = require('wsstream');

// Express.js の初期化
// ...

app.ws('/', (ws, req) => {
  const stream = WsStream(ws);
  const bash = pty.spawn('bash');

  bash.pipe(stream);
  stream.pipe(bash);
});

クライアント側はこんな感じ。

const term = new Terminal();
term.open(document.querySelector('#terminal'));
term.attach(new WebSocket('http://.../'));

ストリームって素晴らしい!

masakura
gitlab-jp
Git/DevOpsツールチェーンプラットフォーム「GitLab」についての日本コミュニティ。
https://gitlab-jp.connpass.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした