Node.js Stream を使いこなす
Node.js には Stream というオブジェクトがあります。名前の通り、データストリームを扱うもので、Java や .NET にも同じようなクラスがあります。
Stream
オブジェクトはデータをストリームとして扱いたいということに重宝します。
Stream の便利さ
例えば、ファイルの内容を別のファイルにコピーする場合、以下のようにファイルを全て読み込んでから全て書き込む方法があります。後述するように問題があるのですが、コードが非常に読みやすいというメリットがあります。
const text = fs.readFileSync('src.txt', 'utf8');
fs.writeFileSync('dest.txt', text);
この方法はファイルを読み込んだり書き込んだりしている間は処理がブロッキングされてしまいます。特にウェブアプリの場合、ブロッキングされている間は他のブラウザーからの応答をさばくことができません。
非ブロッキング I/O を使ってこのように書き換えます。
fs.readFile('src.txt', 'utf8', (err, data) => {
fs.writeFile('dest.txt', data);
});
一気に読みにくくなりましたね! しかも、全てのデータを読み込んでから全てのデータを書き込むことには変わりはありませんので、2GB のテキストファイルの場合、メモリを 2GB 消費します。
そこでストリームの出番です。
const src = fs.createReadStream('src.txt', 'utf8');
const dest = fs.createWriteStream('dest.txt', 'utf8');
src.on('data', chunk => dest.write(chunk));
src.on('end', () => dest.end());
この例ではを一定量だけ読み込んでイベントを発生させています。大きなファイルでも全てを読み込むわけではないのでメモリに非常に優しいです。
ただ、ちょっと見にくいですね。pipe()
を使うと一気に読みやすくなります。
const src = fs.createReadStream('src.txt', 'utf8');
const dest = fs.createWriteStream('dest.txt', 'utf8');
src.pipe(dest);
pipe()
メソッドは読み取り可能なストリームと書き込み可能なストリームを関連付けるもので、後はよしなにデータを渡してくれます。
Node.js では、ここで例にあげたファイルだけでなく、いくつかのオブジェクトはこの Stream となっています。process.stdout
もそうです。ファイルの内容を標準出力に出力するのもとても簡単です。
const src = fs.createReadStream('src.txt', 'utf8');
src.pipe(process.stdout);
Stream
を使うとデータを流れるように扱えるのが分かります。
Stream の種類
一口に Stream
と言っても四種類あります。
-
stream.Readable
- 読み取りだけができるストリーム -
stream.Writable
- 書き込みだけができるストリーム -
stream.Duplex
- 読み取りも書き込みもできるストリーム -
stream.Transform
- 読み取ったデータを変換して出力するストリーム
stream.Readable
読み取りだけができるストリームです。data
イベントを利用してデータを読み込みます。
fs.createReadStream()
や child.stdout
がこれにあたります。
const src = fs.createReadStream('src.txt' 'utf8');
src.on('data', chunk => process.stdout.write(chunk));
stream.Writable
書き込みだけができるストリームです。write()
メソッド を使ってデータを書き込みます。
fs.createWriteStream()
や child.stdin
がこれにあたります。
const dest = fs.createWriteStream('dest.txt');
dest.write('Hello!');
stream.Duplex
読み取りと書き込みの両方ができるストリームです。読み取るときは stream.Readable
と同じで、書き込むときは stream.Writable
と同じです。
TCP Socket がこれにあたります。
const client = net.connect(80, 'www.yahoo.co.jp');
client.pipe(process.stdout);
client.once('connect', () => client.write('GET / HTTP/1.0\r\n\r\n'));
stream.Transform
変換をするためのストリームです。書き込まれたデータを変換して出力します。
const gzip = zlib.createGzip();
const src = fs.createReadStream('src.txt');
const dest = fs.createWriteStream('src.txt.gz');
src.pipe(gzip).pipe(dest);
サンプルコードではひとつしか使ってませんが、変換をつなげて書くこともできます。
src
.pipe(transformA)
.pipe(transformB)
.pipe(transformC)
.pipe(dest);
ストリームの種類の判定
読み取り可能なストリームが全て stream.Readable
を継承しているというわけではないようです。instanceof
で読み取り可能なストリームかどうかを判定するのは難しいです。また、stream.Duplex
でも process.stdout
のように読み取りができないものもあります。
自分は readable
もしくは writable
プロパティの値が true
かどうかで判断していますが、ドキュメントにその記述が見つからないので使っていいか微妙です。
さまざまな Stream
Stream をより便利に使うためにいろいろと紹介します。
fs.createReadStream/createWriteStream
ファイルの入出力を扱う fs
ライブラリにはストリームでファイルをオープンするメソッドが用意されています。
細かい説明は不要ですよね!
TCP Socket
TCP Socket を扱う net
モジュールもストリームが使えます。入出力の両方に対応しています。
以下のプログラムは Yahoo のサーバーに接続して、トップページの HTML を標準出力に出力しています。
const net = require('net');
const client = net.connect(80, 'www.yahoo.co.jp');
client.pipe(process.stdout);
client.once('connect', () => client.write('GET / HTTP/1.0\r\n\r\n'));
http や https モジュールもストリームが使えます。
child_process の stdin/stdout/stderr
child_process
は外部プロセスを扱うことができます。その標準入出力をストリームとして扱うことができます。
- child.stdin - 外部プロセスの標準入力へ書き込むストリーム
- child.stdout - 外部プロセスの標準出力から読み取るストリーム
- child.stderr - 外部プロセスの標準エラー出力から読み取るストリーム
サンプルコードでは ls
コマンドの結果をファイルに記録しています。
const ls = exec('ls');
const log = fs.createWriteStream('ls.txt', 'utf8');
ls.stdout.pipe(log);
process の stdin/stdout/stderr
process は自身のプロセスを表す Node.js のグローバルオブジェクトです。こちらの標準入出力もストリームとして扱うことができます。
- process.stdin - 標準入力から読み取るストリーム
- process.stdout - 標準出力へ書き込むストリーム
- process.stderr - 標準エラー出力へ書き込むストリーム
ls
コマンドを実行した結果をファイルではなく標準出力に出力します。
const ls = exec('ls');
ls.stdout.pipe(process.stdout);
child_process
と違い、書き込みと読み取りが反対なことに注意してください。
Crypt/Zlib
暗号化を行う Crypto や圧縮を行う Zlib は変換用の stream.Transfrom
として利用できます。
const gzip = zlib.createGzip();
const src = fs.createReadStream('src.txt');
const dest = fs.createWriteStream('src.txt.gz');
src.pipe(gzip).pipe(dest);
UNIX でパイプをよく使っている人は解ると思いますが、変換用のストリームはとても強力で、他の書き方に比べると非常に読みやすいです。
fs.createReadStream('src.txt')
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'pass'))
.pipe(fs.createWriteStream('src.txt.gz.aes'));
memory-streams
memory-streams はメモリをストリームとして扱います。Node.js の標準モジュールには、ストリームの内容をすべてメモリ上に読み取るようなオブジェクトが用意されておりません。
ファイルに Hello!
と書き込むサンプルコードです。
const streams = require('memory-streams');
const src = new streams.ReadableStream('Hello!');
const dest = fs.createWriteStream('hello.txt', 'utf8');
src.pipe(dest);
もちろん、ストリームから読み取ることもできます。
const streams = require('memory-streams');
const src = fs.createReadStream('hello.txt', 'utf8');
const dest = new streams.WritableStream();
src.pipe(dest);
// console.log(dest.toString()); <- ここに書いても何も出力されない
src.on('end', () => console.log(dest.toString()));
処理は非同期で行われるため、src.pipe(dest)
の次の行ではまだストリーム処理が開始されていないことに注意してください。
duplex-child-process
標準モジュールの child_process
に非常によく似ています。(残念ながら spawn
しか使えませんが) [duplex-child-process]` は戻り値をそのままストリームとして扱うことができます。
child_process
を使った場合はこのように書いていました。
const ls = spawn('ls');
const log = fs.createWriteStream('ls.txt', 'utf8');
ls.stdout.pipe(log);
duplex-child-process
ではこう書くことができます。
const spawn = require('duplex-child-process');
const ls = spawn('ls');
const log = fs.createWriteStream('ls.txt', 'utf8');
// ls.stdout.pipe(log);
ls.pipe(log); // <- stdout が要らない
stdin
と stdout
を結合したストリームとして扱えるため、場合によっては非常に便利です。ですが、残念ながら、stdout
と stderr
を結合したりはしてくれないようです。
pty.js
pty.js は表示を持たない仮想ターミナルです。詳しい使い方はドキュメントを読んでもらうとしますが、このターミナルはそのまま読み書き可能なストリームとして扱うことができます。
以下のサンプルコードでは、bash
を起動し、ls
コマンドと exit
コマンドを実行しています。
const pty = require('pty.js');
const streams = require('memory-streams');
const term = pty('bash');
const ls = new streams.ReadableStream('ls\nexit\n');
term.pipe(process.stdout);
ls.pipe(term);
wsstream
Node.js では比較的簡単に WebSocket を扱うことができます。ws あたりが有名だと思います。残念ながら ws
はストリームではないようです。
wsstream は ws
を読み書き可能なストリームに変換します。
サンプルコードではクライアントからの WebSocket で送信されたデータを標準出力に出力しています。
const WsStream = require('wsstream');
// Express.js の初期化
// ...
app.ws('/', (ws, req) => {
const stream = WsStream(ws);
stream.pipe(process.stdout);
});
おまけ
pty.js とブラウザー向けの仮想ターミナル xterm.js を組み合わせると、簡単にブラウザーで動くターミナルが作れます。
サーバー側はこんな感じ。
const pty.js = require('pty.js');
const WsStream = require('wsstream');
// Express.js の初期化
// ...
app.ws('/', (ws, req) => {
const stream = WsStream(ws);
const bash = pty.spawn('bash');
bash.pipe(stream);
stream.pipe(bash);
});
クライアント側はこんな感じ。
const term = new Terminal();
term.open(document.querySelector('#terminal'));
term.attach(new WebSocket('http://.../'));
ストリームって素晴らしい!