#ストリームとは
ストリームとは、大きいデータを少量に分けてちょっとずつデータを運ぶ概念のことです。
英語でstreamは「流れや川」を表します。
小さいデータが連続的に流れる様子が川のようなのでstreamというのですね。
streamには、大きく分けて2つの役割があります。
- データを読み込む (write)
- データを書き込む (read)
具体的な例を見ていく前にストリームが役立つシーンを紹介します。
また、ここでは基本的にファイルモジュールを扱っていきます
ストリームが役立つ例
ファイルをコピーしてフォルダ1からフォルダ2に移す。単純な例を考えます。
.
├── フォルダ1── a.txt
└── フォルダ2
ここで2つのコードの書き方が考えられます。
1.ファイルを一度完全にバッファ(一時保存される領域)にコピーしてから、フォルダ2にファイルを書き込む
2.ファイルをバッファに読み込み、読み込めたものから順に書き込みを行う
1の場合
const fs = require('fs')
text = fs.readFileSync('./フォルダ1/a.txt')
fs.writeFileSync('./フォルダ2/a_copy.txt',text)
2の場合
const fs = require('fs')
fs.createReadStream("./フォルダ1/a.txt") //読み込みストリーム
.pipe(fs.createWriteStream("./フォルダ2/a_copy.txt")) //書き込みストリーム
.on("finish",()=>console.log("コピー終わったよ"))
1の場合ではファイルの読み込みが完了してから、ファイルの書き込みを行うのに対し、
2の場合はファイルの読み込みと書き込みを並行して行います。pipeを使っていますが、これはストリーム同士をつなぐものです。
1と比べて2のほうが、効率よく実行できますね。
一つ注意すべきことがあります。
1の場合では2GBバイト以上のファイルを読み込むことができません。
これは、バッファに入るファイルのサイズが2GBバイトまでだからです。地味に重要なことなので覚えておきましょう。2GBを超えるファイルはstreamで読み込むべきということですね。
読み込みストリーム
fs.createReadStream(ファイルパス)を指定するだけでストリームが作れます。
const fs = require('fs')
const readStream = fs.createReadStream("./フォルダ1/a.txt")
let i=0
readStream.on("readable", () => {
let chunck
while (true) {
i++
chunck = readStream.read()
if (chunck == null) break
console.log(i+"回目"+chunck.toString())
}
})
ここでは、chunckは小分けにしたデータが入ります。
readable
状態になったときにreadStream.read()
で中身を取り出すことができます。
実行結果
1回目アンパンマン
a.txtにはアンパンマンが入っていたのですね。ここではファイルサイズが小さすぎるために1回しかreadable
が読み込まれませんでした。
readable
は読み込みが可能になったよ!というイベントを通知してくれます。
.on
という書き方はイベントエミッターですね。 イベントエミッターがわからない人はこの記事を参考にしてください。簡単に言うと.on(イベント名,関数)でイベントに対するリスナーを追加できます。イベント駆動開発でよく用いられます。
書き込みストリーム
これもfs.createWriteStream(ファイルパス)を指定するだけです。
const fs = require('fs')
const fileWriteStream = fs.createWriteStream("./フォルダ1/b.txt")
fileWriteStream.write("大きな鈴がリンリンリン\n")
fileWriteStream.write("不思議な冒険ルンルンルン\n")
fileWriteStream.end("アンパン食パンカレーパン\n")
実行結果
大きな鈴がリンリンリン
不思議な冒険ルンルンルン
アンパン食パンカレーパン
書き込みストリームに対して.write(文字列)
とすることでファイルに書き込みを行うことができます。
すべてのデータを流し終えたら.end
することでストリームを明示的に閉じることができます。
ここで気になるのが、ストリームが閉じられるタイミングですよね。調べていきます。
streamが閉じられるタイミング
読み込みストリームと書き込みストリームそれぞれstreamが閉じられるタイミングを見ていきます。
調べる方法、至って簡単です。.on("finish",()=>{console.log("stream終了")
をつけて確認していきます。
およよ、調べていくとイベントはfinish
だけでなくclose
もあるみたいです。少し脱線しますが、調べましょう。
finish
とclose
のイベントの違いは何でしょうか。
書き込みストリームでみていきます。
-
finish
イベント・・・すべてのデータの書き込みを終了したが、まだファイルを開いている。 -
close
イベント・・・すべてのデータの書き込みを終了し、なおかつファイルを閉じたとき
また、読み込みストリームでは、finish
はなく、end
とclose
があります。
-
end
イベント・・・すべてのデータの読み込みを終了したが、まだファイルを開いている。 - `close@イベント・・・すべてのデータの読み込みを終了し、なおかつファイルを閉じたとき
どうやら、close
イベントは、ストリームが閉じられることはもちろん、ファイル自体がクローズしているかどうかを意識しているみたいです。
この違いも意識してみていきましょう。
読み込みストリーム
先程のコードにend
とclose
のイベントを付け加えて実行します。
const readStream = fs.createReadStream("./フォルダ1/a.txt")
+ readStream.on("end", () => console.log("end"))
+ readStream.on("close", () => console.log("close"))
let i = 0
readStream.on("readable", () => {
let chunck
while (true) {
i++
chunck = readStream.read()
if (chunck == null) break
console.log(i + "回目" + chunck.toString())
}
})
実行結果
1回目アンパンマン
end
close
自然な動きのように思えますが、endイベントが実行されるのはやや不自然に感じます。明示的にstreamの終了操作を発行していないからです。これについては、読み込みストリームの一時停止モードとフローイングモードで説明していきます。
また明示的にストリームを閉じる方法も存在します。それは読み込みストリームに対して.destroy
です。
以下のコマンドを実行します。
const readStream = fs.createReadStream("./フォルダ1/a.txt")
readStream.on("close", () => console.log("close"))
readStream.on("end", () => console.log("end"))
let i = 0
readStream.on("readable", () => {
let chunck
while (true) {
i++
chunck = readStream.read()
if (chunck == null) break
console.log(i + "回目" + chunck.toString())
}
+ readStream.destroy()
})
実行結果
1回目アンパンマン
close
どうやらすべてのデータを受け取る前にストリームを閉じちゃいました。笑
##書き込みストリーム
先程のコードを少し改良したものを動かしていきます。
const fileWriteStream = fs.createWriteStream("./フォルダ1/b.txt")
+ fileWriteStream.on("finish", () => console.log("書き込みfinish"))
+ fileWriteStream.on("close", () => console.log("書き込みclose"))
fileWriteStream.write("大きな鈴がリンリンリン\n")
fileWriteStream.write("不思議な冒険ルンルンルン\n")
fileWriteStream.end("アンパン食パンカレーパン\n")
実行結果
書き込みfinish
書き込みclose
.end()
を明示的に書くことによって、finish
とclose
のイベントの両方が走ることが確認できました。
ここで気になるのは、書き込みストリームを終了する際に、書き込みストリームに対して.end()
をするかしないかの挙動です。私の場合、以下のようにターミナルでコマンドを実行しているためファイルが閉じられればfinish
とclose
イベントが両方走ると予想していました。
node stream.js
fileWriteStream.end("アンパン食パンカレーパン\n")
のところをコメントアウトして実行してみます。
const fileWriteStream = fs.createWriteStream("./フォルダ1/b.txt")
fileWriteStream.on("finish", () => console.log("書き込みfinish"))
fileWriteStream.on("close", () => console.log("書き込みclose"))
fileWriteStream.write("大きな鈴がリンリンリン\n")
fileWriteStream.write("不思議な冒険ルンルンルン\n")
- // fileWriteStream.end("アンパン食パンカレーパン\n")
実行結果
実行結果が空だったので、finishイベントも
close`イベントも走っていないことがわかります。
これは大事な挙動ですね。覚えておきましょう。
これらの原因として考えられるのは、先にファイルがクローズすることによって、streamの終了処理が行われないということですね。
読み込みストリームの一時停止モードとフローイングモード
実は読み込みストリームには2種類のモードがあります。
それが
- 一時停止モード
- フローイングモード
です。
簡単に説明しておくと、
一時停止モードでは、read()メソッドが明示的に呼び出されない限りデータが流れてきません。ストリームは止まっています。
フローイングモードでは、データはストリームから自動的に読み込まれ、ストリームは常にオープンしています。
デフォルトでは一時停止モードです。
書き方の違いについて説明していきます。
##一時停止モード
このモードでは実際にデータを読み込むタイミングを制御でき、必要なときに必要なだけデータを読み込むことができる。
readable
イベントのリスナーの中で、読み込みストリームに対して.read()
をするだけです。
const fs = require('fs')
const readStream = fs.createReadStream("./フォルダ1/a.txt")
readStream.on("readable", () => {
let chunck
while (true) {
chunck = readStream.read()
if (chunck == null) break
}
})
データが全部流れた状態になると自動的にストリームを停止します。よって、end
イベントを実行します。これが先程の謎の答えです。
フローイングモード
このモードでは、自動的に読み込みストリームから読み込まれ、dataイベントのリスナで制御できます。
readStream.on("data",(chunk)=>{console.log(chunk)})
で実行します。
const fs = require('fs')
const readStream = fs.createReadStream("./フォルダ1/a.txt")
readStream.on("data",(chunk)=>{console.log(chunk)})
highWaterMarkについて
highWaterMarkとは、streamが保持できるバッファのサイズのことです。(バッファとは一時保存される領域のことです)
英語でhighWaterMarkは、その容器に保持できる最大水の量です。例えば、ペットボトルでいうと500mlであり缶でいうと250mlです。
nodeのファイルモジュールでは、highWaterMarkのデフォルト値が、64KBです。
よって、ストリームでは64KBずつデータが流れてくることがわかります。
ではhighWaterMarkを用いて遊んでみましょう。
highWaterMarkの設定の仕方は以下のとおりです。
const readStream = fs.createReadStream("./フォルダ1/a.txt", {
highWaterMark: 0
})
let i = 0
readStream.on("readable", () => {
let chunck
i++
while (true) {
chunck = readStream.read()
if (chunck == null) break
console.log(i + "回目" + chunck)
}
})
実行結果
# デフォルト値(64KB)のとき
1回目アンパンマン
# highWaterMark=0のとき
何もなし。バッファにデータがはいらない
# highWaterMark=0のとき
1回目�
2回目�
3回目�
4回目�
5回目�
6回目�
7回目�
8回目�
9回目�
10回目�
11回目�
12回目�
13回目�
14回目�
15回目�
16回目�
17回目�
18回目�
19回目
20回目
# highWaterMark=4のとき
1回目ア�
2回目���
3回目�ン
4回目マ�
5回目��
# highWaterMark=10のとき
1回目アンパ�
2回目��マン
面白いですね。
データの区切りが悪いところでは文字化けしてしまいますね。文字化けというか、その文字に対するデータが届いていないことがわかります。
バックプレッシャ
バックプレッシャとはストリームがそれ以上のデータを流せるかどうかを判定します。
まず前提としてストリームの上流にいる人はストリームの下流にいる人の状態を知りません。これを念頭に置いていください。
わかりやすく説明します。
ストリームの下流が詰まってしまってこれ以上データが流せないというふうになってしまった状態を想定します。
ストリームの上流の人は、下流が詰まってしまっていることを知らずにデータを流し続けます。そうすると、下流の人は処理しきれなくなりますよね。
こんなときに役立つのが、バックプレッシャです。
stream
は水面下でこの制御も行っています。覚えておきましょう。
streamの連結
stream の連結手法である。.pipe
とpipeline
の2種類があります。
まず.pipe
について説明します。
抽象的な例をあげます。
new stream1()
.pipe(stream2())
.pipe(stream3())
のように使います。
これで見やすくて十分なのですが、これではエラーハンドリングが厄介なのです。
new stream1()
.pipe(stream2())
.pipe(stream3())
.on("error",err=>console.log(err))
上のコードでは、.on("error",err=>console.log(err))
のエラーをキャッチできる範囲に問題があります。
どこまでキャッチできるのでしょうか?
これは、stream3()
のみです。stream3()
で発生したエラーのみキャッチできます。
stream1()
,stream2()
で発生したエラーについて拾えません。
ここで救世主が登場します。それは.pipeline
です。書き方は以下のとおりです。
stream.pipeline(
stream1(),
stream2(),
stream3()
err=>err? console.log("error"+err):console.log("エラー発生")
)
pipeline
を使うとどのストリームで起きたエラーも拾うことができます。
最後に
最後まで読んでくれてありがとうございます。
streamの概念は難しいと思います。だがしかし、理解できないことはないと思います。
とりあえずwaterhighMarkとバックプレッシャの概念は抑えていただきたいです。
これからも頑張っていきましょう!オー!