11
3

More than 1 year has passed since last update.

【Nodejs】ストリーム(stream)に念入りに気合を込めて正拳突きを繰り返しながら説明していく

Last updated at Posted at 2021-12-13

#ストリームとは
ストリームとは、大きいデータを少量に分けてちょっとずつデータを運ぶ概念のことです。
英語でstreamは「流れや川」を表します。
小さいデータが連続的に流れる様子が川のようなのでstreamというのですね。

streamには、大きく分けて2つの役割があります。

  • データを読み込む (write)
  • データを書き込む (read)

具体的な例を見ていく前にストリームが役立つシーンを紹介します。
また、ここでは基本的にファイルモジュールを扱っていきます

ストリームが役立つ例

ファイルをコピーしてフォルダ1からフォルダ2に移す。単純な例を考えます。

.
├── フォルダ1── a.txt
└── フォルダ2

ここで2つのコードの書き方が考えられます。

1.ファイルを一度完全にバッファ(一時保存される領域)にコピーしてから、フォルダ2にファイルを書き込む
2.ファイルをバッファに読み込み、読み込めたものから順に書き込みを行う

1の場合

const fs = require('fs')
text = fs.readFileSync('./フォルダ1/a.txt')
fs.writeFileSync('./フォルダ2/a_copy.txt',text)

2の場合

const fs = require('fs')
fs.createReadStream("./フォルダ1/a.txt")  //読み込みストリーム
  .pipe(fs.createWriteStream("./フォルダ2/a_copy.txt")) //書き込みストリーム
  .on("finish",()=>console.log("コピー終わったよ"))

1の場合ではファイルの読み込みが完了してから、ファイルの書き込みを行うのに対し、
2の場合はファイルの読み込みと書き込みを並行して行います。pipeを使っていますが、これはストリーム同士をつなぐものです。
1と比べて2のほうが、効率よく実行できますね。

一つ注意すべきことがあります。
1の場合では2GBバイト以上のファイルを読み込むことができません。
これは、バッファに入るファイルのサイズが2GBバイトまでだからです。地味に重要なことなので覚えておきましょう。2GBを超えるファイルはstreamで読み込むべきということですね。

streamのイメージ図
image.png

読み込みストリーム

fs.createReadStream(ファイルパス)を指定するだけでストリームが作れます。

const fs = require('fs')
const readStream = fs.createReadStream("./フォルダ1/a.txt")
let i=0
readStream.on("readable", () => {
  let chunck
  while (true) {
    i++
    chunck = readStream.read()
    if (chunck == null) break
    console.log(i+"回目"+chunck.toString())
  }
})

ここでは、chunckは小分けにしたデータが入ります。
readable状態になったときにreadStream.read()で中身を取り出すことができます。

実行結果

1回目アンパンマン

a.txtにはアンパンマンが入っていたのですね。ここではファイルサイズが小さすぎるために1回しかreadableが読み込まれませんでした。

readableは読み込みが可能になったよ!というイベントを通知してくれます。
.onという書き方はイベントエミッターですね。 イベントエミッターがわからない人はこの記事を参考にしてください。簡単に言うと.on(イベント名,関数)でイベントに対するリスナーを追加できます。イベント駆動開発でよく用いられます。

書き込みストリーム

これもfs.createWriteStream(ファイルパス)を指定するだけです。

const fs = require('fs')
const fileWriteStream = fs.createWriteStream("./フォルダ1/b.txt")
fileWriteStream.write("大きな鈴がリンリンリン\n")
fileWriteStream.write("不思議な冒険ルンルンルン\n")
fileWriteStream.end("アンパン食パンカレーパン\n")

実行結果

b.txt
大きな鈴がリンリンリン
不思議な冒険ルンルンルン
アンパン食パンカレーパン

書き込みストリームに対して.write(文字列)とすることでファイルに書き込みを行うことができます。
すべてのデータを流し終えたら.endすることでストリームを明示的に閉じることができます。

ここで気になるのが、ストリームが閉じられるタイミングですよね。調べていきます。

streamが閉じられるタイミング

読み込みストリームと書き込みストリームそれぞれstreamが閉じられるタイミングを見ていきます。
調べる方法、至って簡単です。.on("finish",()=>{console.log("stream終了")をつけて確認していきます。

およよ、調べていくとイベントはfinishだけでなくcloseもあるみたいです。少し脱線しますが、調べましょう。
finishcloseのイベントの違いは何でしょうか。

書き込みストリームでみていきます。

  • finishイベント・・・すべてのデータの書き込みを終了したが、まだファイルを開いている。
  • closeイベント・・・すべてのデータの書き込みを終了し、なおかつファイルを閉じたとき

また、読み込みストリームでは、finishはなく、endcloseがあります。

  • endイベント・・・すべてのデータの読み込みを終了したが、まだファイルを開いている。
  • `close@イベント・・・すべてのデータの読み込みを終了し、なおかつファイルを閉じたとき

どうやら、closeイベントは、ストリームが閉じられることはもちろん、ファイル自体がクローズしているかどうかを意識しているみたいです。
この違いも意識してみていきましょう。

読み込みストリーム

先程のコードにendcloseのイベントを付け加えて実行します。

const readStream = fs.createReadStream("./フォルダ1/a.txt")
+ readStream.on("end", () => console.log("end"))
+ readStream.on("close", () => console.log("close"))
let i = 0
readStream.on("readable", () => {
  let chunck
  while (true) {
    i++
    chunck = readStream.read()
    if (chunck == null) break
    console.log(i + "回目" + chunck.toString())    
  }
})

実行結果

1回目アンパンマン

end
close

自然な動きのように思えますが、endイベントが実行されるのはやや不自然に感じます。明示的にstreamの終了操作を発行していないからです。これについては、読み込みストリームの一時停止モードとフローイングモードで説明していきます。

また明示的にストリームを閉じる方法も存在します。それは読み込みストリームに対して.destroyです。
以下のコマンドを実行します。

const readStream = fs.createReadStream("./フォルダ1/a.txt")
readStream.on("close", () => console.log("close"))
readStream.on("end", () => console.log("end"))
let i = 0
readStream.on("readable", () => {
  let chunck
  while (true) {
    i++
    chunck = readStream.read()
    if (chunck == null) break
    console.log(i + "回目" + chunck.toString())
  }
+  readStream.destroy()
})

実行結果

1回目アンパンマン

close

どうやらすべてのデータを受け取る前にストリームを閉じちゃいました。笑

##書き込みストリーム
先程のコードを少し改良したものを動かしていきます。

stream.js
const fileWriteStream = fs.createWriteStream("./フォルダ1/b.txt")
+ fileWriteStream.on("finish", () => console.log("書き込みfinish"))
+ fileWriteStream.on("close", () => console.log("書き込みclose"))
fileWriteStream.write("大きな鈴がリンリンリン\n")
fileWriteStream.write("不思議な冒険ルンルンルン\n")
fileWriteStream.end("アンパン食パンカレーパン\n")

実行結果

書き込みfinish
書き込みclose

.end()を明示的に書くことによって、finishcloseのイベントの両方が走ることが確認できました。

ここで気になるのは、書き込みストリームを終了する際に、書き込みストリームに対して.end()をするかしないかの挙動です。私の場合、以下のようにターミナルでコマンドを実行しているためファイルが閉じられればfinishcloseイベントが両方走ると予想していました。

terminal
node stream.js

fileWriteStream.end("アンパン食パンカレーパン\n")のところをコメントアウトして実行してみます。

stream.js
const fileWriteStream = fs.createWriteStream("./フォルダ1/b.txt")
fileWriteStream.on("finish", () => console.log("書き込みfinish"))
fileWriteStream.on("close", () => console.log("書き込みclose"))
fileWriteStream.write("大きな鈴がリンリンリン\n")
fileWriteStream.write("不思議な冒険ルンルンルン\n")
- // fileWriteStream.end("アンパン食パンカレーパン\n")

実行結果

terminal

実行結果が空だったので、finishイベントもclose`イベントも走っていないことがわかります。
これは大事な挙動ですね。覚えておきましょう。 
これらの原因として考えられるのは、先にファイルがクローズすることによって、streamの終了処理が行われないということですね。

読み込みストリームの一時停止モードとフローイングモード

実は読み込みストリームには2種類のモードがあります。
それが

  • 一時停止モード
  • フローイングモード

です。
簡単に説明しておくと、
一時停止モードでは、read()メソッドが明示的に呼び出されない限りデータが流れてきません。ストリームは止まっています。
フローイングモードでは、データはストリームから自動的に読み込まれ、ストリームは常にオープンしています。
デフォルトでは一時停止モードです。
書き方の違いについて説明していきます。

##一時停止モード
このモードでは実際にデータを読み込むタイミングを制御でき、必要なときに必要なだけデータを読み込むことができる。
readableイベントのリスナーの中で、読み込みストリームに対して.read()をするだけです。

const fs = require('fs')
const readStream = fs.createReadStream("./フォルダ1/a.txt")
readStream.on("readable", () => {
  let chunck
  while (true) {
    chunck = readStream.read()
    if (chunck == null) break
  }
})

データが全部流れた状態になると自動的にストリームを停止します。よって、endイベントを実行します。これが先程の謎の答えです。

フローイングモード

このモードでは、自動的に読み込みストリームから読み込まれ、dataイベントのリスナで制御できます。
readStream.on("data",(chunk)=>{console.log(chunk)})で実行します。

const fs = require('fs')
const readStream = fs.createReadStream("./フォルダ1/a.txt")
readStream.on("data",(chunk)=>{console.log(chunk)})

highWaterMarkについて

highWaterMarkとは、streamが保持できるバッファのサイズのことです。(バッファとは一時保存される領域のことです)
英語でhighWaterMarkは、その容器に保持できる最大水の量です。例えば、ペットボトルでいうと500mlであり缶でいうと250mlです。
nodeのファイルモジュールでは、highWaterMarkのデフォルト値が、64KBです。

よって、ストリームでは64KBずつデータが流れてくることがわかります。

ではhighWaterMarkを用いて遊んでみましょう。
highWaterMarkの設定の仕方は以下のとおりです。

const readStream = fs.createReadStream("./フォルダ1/a.txt", {
  highWaterMark: 0
})
let i = 0
readStream.on("readable", () => {
  let chunck
  i++
  while (true) {
    chunck = readStream.read()
    if (chunck == null) break
    console.log(i + "回目" + chunck)
  }
})

実行結果

# デフォルト値(64KB)のとき
1回目アンパンマン

# highWaterMark=0のとき
何もなし。バッファにデータがはいらない

# highWaterMark=0のとき
1回目�
2回目�
3回目�
4回目�
5回目�
6回目�
7回目�
8回目�
9回目�
10回目�
11回目�
12回目�
13回目�
14回目�
15回目�
16回目�
17回目�
18回目�
19回目
20回目

# highWaterMark=4のとき
1回目ア�
2回目���
3回目�ン
4回目マ�
5回目��

# highWaterMark=10のとき
1回目アンパ�
2回目��マン

面白いですね。
データの区切りが悪いところでは文字化けしてしまいますね。文字化けというか、その文字に対するデータが届いていないことがわかります。

バックプレッシャ

バックプレッシャとはストリームがそれ以上のデータを流せるかどうかを判定します。

まず前提としてストリームの上流にいる人はストリームの下流にいる人の状態を知りません。これを念頭に置いていください。
わかりやすく説明します。
ストリームの下流が詰まってしまってこれ以上データが流せないというふうになってしまった状態を想定します。
ストリームの上流の人は、下流が詰まってしまっていることを知らずにデータを流し続けます。そうすると、下流の人は処理しきれなくなりますよね。
こんなときに役立つのが、バックプレッシャです。

streamは水面下でこの制御も行っています。覚えておきましょう。

streamの連結

stream の連結手法である。.pipepipelineの2種類があります。
まず.pipeについて説明します。
抽象的な例をあげます。

new stream1()
  .pipe(stream2())
  .pipe(stream3())

のように使います。
これで見やすくて十分なのですが、これではエラーハンドリングが厄介なのです。

new stream1()
  .pipe(stream2())
  .pipe(stream3())
  .on("error",err=>console.log(err))

上のコードでは、.on("error",err=>console.log(err))のエラーをキャッチできる範囲に問題があります。
どこまでキャッチできるのでしょうか?
これは、stream3()のみです。stream3()で発生したエラーのみキャッチできます。
stream1(),stream2()で発生したエラーについて拾えません。

ここで救世主が登場します。それは.pipelineです。書き方は以下のとおりです。

stream.pipeline(
  stream1(),
  stream2(),
  stream3()
  err=>err? console.log("error"+err):console.log("エラー発生")
)

pipelineを使うとどのストリームで起きたエラーも拾うことができます。

最後に

最後まで読んでくれてありがとうございます。

streamの概念は難しいと思います。だがしかし、理解できないことはないと思います。
とりあえずwaterhighMarkとバックプレッシャの概念は抑えていただきたいです。

これからも頑張っていきましょう!オー!

11
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3