Rubyで、ストリーム処理(どれぐらいサイズが有るかわからないデータを扱う処理)を行う際に使えるスニペットを集めてみました。
前提
ストリーム処理は、例えば標準入出力であったり、大きなファイルの処理だったり、socketからの読み込みだったりを想定しています。
以降の説明文で「データ」とぼかしているのは、IOが標準入出力でもファイルでもsocketでも同じように扱うことができるからです。
Rubyはダックタイピングを得意とする言語なので、ストリーム処理でもダックタイピングを最大限活用することになります。
オブジェクトはIO
File
Tempfile
StringIO
などが想定されます。
仮に変数io
とします。
使うメソッドはread
write
gets
puts
each
ぐらいです。
どんなオブジェクトでも、メソッドの挙動が統一されていれば組み合わせて扱うことができます。
n byteずつioから読み込む
while buff = io.read(n)
# do something
end
n byte読み込んだら別のIOに書き込むなり読み込んだbyte数を数えたりできます。
1行ずつioから読み込む
while buff = io.gets
# do something
end
io.each do |line|
# do something
end
例えば1行毎にjsonが書かれたjsonl形式のデータや、csvデータの読み込みに使えると思います。
さらにCSV
classでラップすれば、1行ずつ読み込みつつcsvとしてパースしてループを回すこともできます。
io = CSV.new(io)
io.each do |data|
# do something
end
1000行ずつioから読み込む
io.each_slice(1000) do |chunk|
# chunk.each do |line|
# do something
end
データが巨大なので、メモリを圧迫しないように少しずつ読みたい。しかしながらSQLのINSERT文を打つときなど、ある程度まとめて処理しないとパフォーマンスが出ない。
そんなときに使えるのがeach_slice
です。
each_slice
は第一引数に渡した行数だけストリームから読み込むことができます。
ブロック引数のchunk
は、第一引数に渡したnだけのサイズの配列になり、それぞれ1行ずつのデータが格納されています。
大変便利なeach_slice
ですが、例えばRackのInput Stream仕様ではeach_slice
は定義されていません。
each
が実装されていれば大抵Enumerable
がclassにincludeされているのでeach_slice
が使えることが多いのですが例外もあるということですね。
gzip展開しつつ1行ずつ読み込む
CRuby標準添付ライブラリーzlib
に含まれているZlib::GzipReader
を使うと、gzip圧縮されたデータでも、少ないコードでgzip圧縮されたデータに対応できます。
io = Zlib::GzipReader.new(io)
io.each do |line|
# do something
end
さらにCSV
classと組み合わせることもできます。
io = Zlib::GzipReader.new(io)
io = CSV.new(io)
io.each do |line|
# do something
end
このような書き方が可能になっているのも、zlibやcsvと言ったライブラリがダックタイピングを意識して実装されているためです。
具体的には、CSV#each
ではCSV#shift
でデータを1行ずつ読み込んでいますが、このCSV#shift
の実装ではラップしたオブジェクトのgets
メソッドによって行なわれています。
また、Zlib::GzipReader#gets
メソッドでは、ラップしているオブジェクトのread
メソッドを利用しています。
このようにメソッド同士が協調しあっているので、表向きにはシンプルなAPIで実現できるわけですね。
さらにeach
はeach_slice
で置き換えることによって、gzipデータを展開しながら1000行ずつで区切ってCSVとしてパースなんてことも簡単に実現できます。
io = Zlib::GzipReader.new(io)
io = CSV.new(io)
io.each_slice(1000) do |chunk|
# do something
end
IO.copy_stream
IO.copy_stream
も便利なメソッドです。
引数にIOっぽいオブジェクトを渡すと、組み合わせによって最適化してストリームにデータをコピーすることができます。
データコピーだけだとあまり使いどころがないように思えるかもしれませんが、基本的に以下のような場合はIO.copy_stream
に置き換えられます。
while buff = src.read(16 * 1024)
dst.write(buff)
end
IO.copy_stream(src, dst)
IO.copy_stream
は、第一引数は最低限read
メソッドが、第二引数は最低限write
メソッドが実装されていれば利用できます。
内部で持つバッファーサイズは2.5.0次点では16KBで、変更できません。内部仕様なので変わらない保証もないでしょう。
ioのflushは自動でやってくれるので考えなくて大丈夫です。
書き込んだ後にまたデータを利用したい場合は、IO
class等はrewind
が必要です。
ioをgzip展開しつつTempfileに書き込む
Tempfile.create("") do |dst|
src = Zlib::GzipReader.new(io)
IO.copy_stream(src, dst)
dst.rewind
# do something
end
ioをgzip圧縮しつつTempfileに保存する
Tempfile.create("") do |dst|
dst = Zlib::GzipWriter.new(dst)
IO.copy_stream(io, dst)
dst.finish
dst.rewind
# do something
end
複数行分のデータをまとめて書き込む
io.puts array
IO#puts
は引数にarrayを渡すと、arrayの1要素につき1行書き込んでくれます。
例えばeach_slice
と組み合わせて、10000行ずつ処理して結果を一気に書き込むことで、IO負荷を減らすことができます。
read_io.each_slice(10000) do |chunk|
result = chunk.each_with_object([]) do |line, ary|
# do something
end
write_io.puts result
end
どれぐらいの性能差があるのかも測ってみました。(CRuby v2.5.0)
Tempfile.create do |f|
Benchmark.realtime do
10000.times { f.puts("hello") }
f.flush
end
end
#=> 0.004009999975096434
Tempfile.create do |f|
Benchmark.realtime do
f.puts(["hello"] * 10000)
f.flush
end
end
#=> 0.0023689999943599105
番外編:メソッドの引数にIOっぽいオブジェクトをとるようにする
Rubyでストリーム処理する時は、引数にIOっぽいオブジェクトを取るメソッドを作れば、様々なオブジェクトを渡すことができるのでテストにも便利です。
ダックタイピングの恩恵を最大限利用しましょう。
# 実装
def copy(input, output)
input.each do |line|
output.puts line
end
end
# 本来の用途
File.open("input.txt") do |input|
File.open("output.txt", "w+") do |output|
copy(input, output)
end
end
# テスト
input = StringIO.new("aaa\nbbb\nccc\n")
output = StringIO.new
copy(input, output)
assert_equal "aaa\nbbb\nccc\n", output.string
まとめ
RubyのIOとはフレームワークだと思っています。
このフレームワークを覚えればさまざまな場面で応用が可能になり、複雑なはずのストリーム(IO)処理が簡単にかつ汎用的に書けるようになるでしょう。
楽しいストリーム処理ライフを。