1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Node.jsのストリーム(Stream)を理解する

Posted at

はじめに

Node.jsを利用していますが、重要な概念としてストリーム(Stream)が出てきます。何となくでしか分かっていないストリームのコードを、さらに理解しようと思いこの記事を書きました。

ストリームとは何か

読み込む時や書き込む時にデータが細切れ(chunk)に流れてきて、全てのデータの読み込み or 書き込みを終了するのを待つ必要がないことです。そのため、全てのデータをメモリに保存する必要がありません。
例えばファイルからデータをストリームを利用して読み込む時に、少しデータを読んで何らかの作業(データ書き出しなど)をして、メモリを開放します。その後、また少しデータを読んでメモリを開放し、、、の繰り返しを続け、最後までデータを読み込みます。これがストリームです。
YoutubeやNetflixもストリーミングサービスと言われており、同じ原理ですね。すべての動画データが読み込まれる前に、視聴を開始することが出来ます。

ストリームのメリット

  • メモリの効率的な利用
  • 時間の効率的な利用(全てのデータの読み込みなどを待つ必要がない、例えばNetflixですぐに視聴が始められます)

Node.jsのストリーム

Node.jsには4つのStreamsがありますが、ここでは重要なReadableとWritableのみ取り上げます。Node.jsのコアモジュールには、ストリームの特性を持つものが多くあります。ストリームはEventEmitterクラスのインスタンスです。つまり発火(Emit)と監視(Listen)が出来るという事です。

Readable streams

特徴:読み込み(消費)
例:http request、fs read streams
重要なイベント:data(データが消費されるたびに発生する)とend(消費されるデータがなくなった時に発生)
重要なfunction:pipe()とread()

Writable streams

特徴:書き込み
例:http responses(responseをクライアントに返す時の書き込み先がwritable stream)、fs write streams
重要なイベント:drainとfinish
重要なfunction:write()とend()

コード事例

次にコード事例を見ていきます。モジュールは、fsとhttpを利用します。コードの内容としては、サーバがリクエストを受けた時に、"test-file.txt"を読み込んで、それをクライアントに返すというものです。test-file.txtは大きなデータが入っていることを想定しています。solution1 ~ 3までありますが、最初と最後のコードは共通しています。共通部分の記載で、「server.on("request", (req, res) => {...})」の部分は、サーバーがリクエストを受けたときに実行されるイベントリスナーです。reqはリクエストオブジェクト、resはレスポンスオブジェクトです。

Solution1

solution 1
const fs = require("fs");
const server = require("http").createServer();

server.on("request", (req, res) => {
  fs.readFile("test-file.txt", (err, data) => {
    if (err) return console.log(err);
    res.end(data);
  });

server.listen(8000, "127.0.0.1", () => {
  console.log("Listening...");
});

solution1では、ストリームを利用しておらず、非同期的にファイルを読み込みます。
test-file.txtを読み込み、その結果をdataに格納します。エラーがあればエラー情報を表示します。ファイルが正しく読み込まれた場合は、そのデータをレスポンスとしてクライアントに返し、レスポンスを終了します。
solution1の問題点としては、test-file.txtのデータが大きいと読み込みに時間がかかる、もしくはデータが大き過ぎて読み込めないことです。

Solution2

solution 2
const fs = require("fs");
const server = require("http").createServer();

server.on("request", (req, res) => {
  const readable = fs.createReadStream("test-file.txt");
  readable.on("data", (chunk) => {
    res.write(chunk);
  });
  readable.on("end", () => {
    res.end();
  });
  readable.on("error", (err) => {
    console.log(err);
    res.statusCode = 500;
    res.end("File not find");
  });
});

server.listen(8000, "127.0.0.1", () => {
  console.log("Listening...");
});

solution2 ではストリームを使用しています。
・fs.createReadStream("test-file.txt")
test-file.txtというファイルを読み込むための読み込み用ストリームを作成しています。これにより、ファイルを部分的に(チャンク単位で)読み込んで処理できます。これがストリーミングです。

・readable.on("data", (chunk) => {...})
ストリームがファイルの一部(チャンク)を読み込むたびに発生するdataイベントです。

・res.write(chunk)
取得したチャンクをクライアントに送信します。ここでストリーミングの利点を活かし、データを少しずつクライアントに送っています。

・readable.on("end", ...)
ファイルの読み込みが終了すると発生するendイベントです。これにより、ファイル全体が送信されたことが確認できます。

・res.end()
レスポンスを終了し、クライアントにすべてのデータが送信され終わったことを示します。

・readable.on("error", ...)
ストリームの処理中にエラーが発生した場合に実行されるerrorイベントです。ファイルが存在しない場合や、ファイルの読み込み中に何らかのエラーが発生した場合、このイベントが発火します。

solution2の問題点は、データの書き込み先(クライアント側)がデータを受信する速度が遅いため、データの送り手(サーバー側)がデータを送る速度に追いつかない点です。これは、バックプレッシャー(backpressure)と言われている問題です。うまく制御できないと、バッファがメモリに蓄積され、システムが過負荷状態に陥ります。それをsolution 3で解決します。

Solution3-1

solution 3-1
const fs = require("fs");
const server = require("http").createServer();

server.on("request", (req, res) => {
  const readable = fs.createReadStream("test-file.txt");
  readable.pipe(res);
});

server.listen(8000, "127.0.0.1", () => {
  console.log("Listening...");
});

・readable.pipe(res)
pipe()メソッドは、読み取り用ストリーム(readable)の出力をそのままレスポンス(res)に送ることを意味します。pipe()はストリーミング処理を自動化し、データのチャンクを読み取るたびにレスポンスとして送信します。バックプレッシャーの管理も自動的に行われ、書き込み側(res)がデータの処理に追いつかない場合、読み込み側(readable)は自動的に一時停止します。
pipe()を使うことで、data、end、errorイベントを手動で扱う必要がなくなり、コードがシンプルになります。

おまけとして、pipe()メソッドを使わない方法は以下です。

Solution3-2

solution 3-2
const fs = require("fs");
const server = require("http").createServer();

server.on("request", (req, res) => {
  const readable = fs.createReadStream("test-file.txt");

  // チャンクが送信されるたびに呼び出される
  readable.on("data", (chunk) => {
    const shouldContinue = res.write(chunk); // 書き込みができるかどうかを確認
    if (!shouldContinue) {
      // 書き込みが追いつかない場合は読み込みを一時停止
      readable.pause();
    }
  });

  // 書き込みバッファが解消されたときに再度読み込みを再開
  res.on("drain", () => {
    readable.resume();
  });

  readable.on("end", () => {
    res.end();
  });

  readable.on("error", (err) => {
    console.log(err);
    res.statusCode = 500;
    res.end("File not found");
  });
});

server.listen(8000, "127.0.0.1", () => {
  console.log("Listening...");
});

・const shouldContinue = res.write(chunk);
res.write(chunk)でレスポンスとしてデータをクライアントに送信します。res.write()がデータの書き込みに成功したかどうかをshouldContinue変数で確認します。

・if (!shouldContinue)
shouldContinueがfalseの場合、書き込み先のバッファがいっぱいで、これ以上データを受け入れられない状態です。このとき、読み込みストリーム(readable)をpause()で一時停止します。

・res.on("drain", ...)
書き込みバッファが空になり、再びデータを受け入れられる状態になるとdrainイベントが発生します。このときに、readable.resume()を呼び出してファイルの読み込みを再開します。

・readable.on("end", ...)
ファイルの全てのデータが読み込まれるとendイベントが発生します。このときに、res.end()でクライアントへのレスポンスを終了します。

drainイベント

Node.jsのストリーム(特に書き込み可能ストリーム)において、書き込みバッファが解放された(空いた)際に発生するイベントです。このイベントは、ストリームに対してwrite()メソッドを使ってデータを書き込むときに、書き込みバッファが一杯になり、それ以上データを一時的に受け付けられない状況から回復したことを知らせるために使われます。
Node.jsでは、書き込み可能なストリーム(例えば、HTTPレスポンスやファイル書き込みストリーム)にデータを送信する際、データがすぐに書き込まれるわけではなく、内部バッファに保存されます。バッファがいっぱいになると、write()メソッドはfalseを返し、それ以上のデータ書き込みを防ぐ必要があります。

まとめ

Node.jsのストリームを理解するために、実際にコードを書きながらまとめました。

Reference

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?