Ruby

Rubyでストリーム処理をするときのパターン集

More than 1 year has passed since last update.

Rubyで、ストリーム処理(どれぐらいサイズが有るかわからないデータを扱う処理)を行う際に使えるスニペットを集めてみました。


前提

ストリーム処理は、例えば標準入出力であったり、大きなファイルの処理だったり、socketからの読み込みだったりを想定しています。

以降の説明文で「データ」とぼかしているのは、IOが標準入出力でもファイルでもsocketでも同じように扱うことができるからです。

Rubyはダックタイピングを得意とする言語なので、ストリーム処理でもダックタイピングを最大限活用することになります。

オブジェクトはIO File Tempfile StringIO などが想定されます。

仮に変数ioとします。

使うメソッドはread write gets puts each ぐらいです。

どんなオブジェクトでも、メソッドの挙動が統一されていれば組み合わせて扱うことができます。


n byteずつioから読み込む

while buff = io.read(n)

# do something
end

n byte読み込んだら別のIOに書き込むなり読み込んだbyte数を数えたりできます。


1行ずつioから読み込む

while buff = io.gets

# do something
end

io.each do |line|

# do something
end

例えば1行毎にjsonが書かれたjsonl形式のデータや、csvデータの読み込みに使えると思います。

さらにCSVclassでラップすれば、1行ずつ読み込みつつcsvとしてパースしてループを回すこともできます。

io = CSV.new(io)

io.each do |data|
# do something
end


1000行ずつioから読み込む

io.each_slice(1000) do |chunk|

# chunk.each do |line|
# do something
end

データが巨大なので、メモリを圧迫しないように少しずつ読みたい。しかしながらSQLのINSERT文を打つときなど、ある程度まとめて処理しないとパフォーマンスが出ない。

そんなときに使えるのがeach_sliceです。

https://docs.ruby-lang.org/ja/latest/method/Enumerable/i/each_slice.html

each_sliceは第一引数に渡した行数だけストリームから読み込むことができます。

ブロック引数のchunkは、第一引数に渡したnだけのサイズの配列になり、それぞれ1行ずつのデータが格納されています。

大変便利なeach_sliceですが、例えばRackのInput Stream仕様ではeach_sliceは定義されていません。

http://www.rubydoc.info/github/rack/rack/file/SPEC#The_Input_Stream

eachが実装されていれば大抵Enumerableがclassにincludeされているのでeach_sliceが使えることが多いのですが例外もあるということですね。


gzip展開しつつ1行ずつ読み込む

CRuby標準添付ライブラリーzlibに含まれているZlib::GzipReaderを使うと、gzip圧縮されたデータでも、少ないコードでgzip圧縮されたデータに対応できます。

io = Zlib::GzipReader.new(io)

io.each do |line|
# do something
end

さらにCSVclassと組み合わせることもできます。

io = Zlib::GzipReader.new(io)

io = CSV.new(io)
io.each do |line|
# do something
end

このような書き方が可能になっているのも、zlibやcsvと言ったライブラリがダックタイピングを意識して実装されているためです。

具体的には、CSV#eachではCSV#shiftでデータを1行ずつ読み込んでいますが、このCSV#shiftの実装ではラップしたオブジェクトのgetsメソッドによって行なわれています。

また、Zlib::GzipReader#getsメソッドでは、ラップしているオブジェクトのreadメソッドを利用しています。

このようにメソッド同士が協調しあっているので、表向きにはシンプルなAPIで実現できるわけですね。

さらにeacheach_sliceで置き換えることによって、gzipデータを展開しながら1000行ずつで区切ってCSVとしてパースなんてことも簡単に実現できます。

io = Zlib::GzipReader.new(io)

io = CSV.new(io)
io.each_slice(1000) do |chunk|
# do something
end


IO.copy_stream

https://docs.ruby-lang.org/ja/latest/method/IO/s/copy_stream.html

IO.copy_streamも便利なメソッドです。

引数にIOっぽいオブジェクトを渡すと、組み合わせによって最適化してストリームにデータをコピーすることができます。

データコピーだけだとあまり使いどころがないように思えるかもしれませんが、基本的に以下のような場合はIO.copy_streamに置き換えられます。

while buff = src.read(16 * 1024)

dst.write(buff)
end

IO.copy_stream(src, dst)

IO.copy_streamは、第一引数は最低限readメソッドが、第二引数は最低限writeメソッドが実装されていれば利用できます。

内部で持つバッファーサイズは2.5.0次点では16KBで、変更できません。内部仕様なので変わらない保証もないでしょう。

ioのflushは自動でやってくれるので考えなくて大丈夫です。

書き込んだ後にまたデータを利用したい場合は、IOclass等はrewindが必要です。


ioをgzip展開しつつTempfileに書き込む

Tempfile.create("") do |dst|

src = Zlib::GzipReader.new(io)
IO.copy_stream(src, dst)
dst.rewind
# do something
end


ioをgzip圧縮しつつTempfileに保存する

Tempfile.create("") do |dst|

dst = Zlib::GzipWriter.new(dst)
IO.copy_stream(io, dst)
dst.finish
dst.rewind
# do something
end


複数行分のデータをまとめて書き込む

io.puts array

IO#putsは引数にarrayを渡すと、arrayの1要素につき1行書き込んでくれます。

例えばeach_sliceと組み合わせて、10000行ずつ処理して結果を一気に書き込むことで、IO負荷を減らすことができます。

read_io.each_slice(10000) do |chunk|

result = chunk.each_with_object([]) do |line, ary|
# do something
end
write_io.puts result
end

どれぐらいの性能差があるのかも測ってみました。(CRuby v2.5.0)

Tempfile.create do |f|

Benchmark.realtime do
10000.times { f.puts("hello") }
f.flush
end
end
#=> 0.004009999975096434

Tempfile.create do |f|
Benchmark.realtime do
f.puts(["hello"] * 10000)
f.flush
end
end
#=> 0.0023689999943599105


番外編:メソッドの引数にIOっぽいオブジェクトをとるようにする

Rubyでストリーム処理する時は、引数にIOっぽいオブジェクトを取るメソッドを作れば、様々なオブジェクトを渡すことができるのでテストにも便利です。

ダックタイピングの恩恵を最大限利用しましょう。

# 実装

def copy(input, output)
input.each do |line|
output.puts line
end
end

# 本来の用途
File.open("input.txt") do |input|
File.open("output.txt", "w+") do |output|
copy(input, output)
end
end

# テスト
input = StringIO.new("aaa\nbbb\nccc\n")
output = StringIO.new
copy(input, output)
assert_equal "aaa\nbbb\nccc\n", output.string


まとめ

RubyのIOとはフレームワークだと思っています。

このフレームワークを覚えればさまざまな場面で応用が可能になり、複雑なはずのストリーム(IO)処理が簡単にかつ汎用的に書けるようになるでしょう。

楽しいストリーム処理ライフを。