46
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rubyでストリーム処理をするときのパターン集

Posted at

Rubyで、ストリーム処理(どれぐらいサイズが有るかわからないデータを扱う処理)を行う際に使えるスニペットを集めてみました。

前提

ストリーム処理は、例えば標準入出力であったり、大きなファイルの処理だったり、socketからの読み込みだったりを想定しています。
以降の説明文で「データ」とぼかしているのは、IOが標準入出力でもファイルでもsocketでも同じように扱うことができるからです。

Rubyはダックタイピングを得意とする言語なので、ストリーム処理でもダックタイピングを最大限活用することになります。

オブジェクトはIO File Tempfile StringIO などが想定されます。
仮に変数ioとします。

使うメソッドはread write gets puts each ぐらいです。

どんなオブジェクトでも、メソッドの挙動が統一されていれば組み合わせて扱うことができます。

n byteずつioから読み込む

while buff = io.read(n)
  # do something
end

n byte読み込んだら別のIOに書き込むなり読み込んだbyte数を数えたりできます。

1行ずつioから読み込む

while buff = io.gets
  # do something
end
io.each do |line|
  # do something
end

例えば1行毎にjsonが書かれたjsonl形式のデータや、csvデータの読み込みに使えると思います。

さらにCSVclassでラップすれば、1行ずつ読み込みつつcsvとしてパースしてループを回すこともできます。

io = CSV.new(io)
io.each do |data|
  # do something
end

1000行ずつioから読み込む

io.each_slice(1000) do |chunk|
  # chunk.each do |line|
  #   do something
end

データが巨大なので、メモリを圧迫しないように少しずつ読みたい。しかしながらSQLのINSERT文を打つときなど、ある程度まとめて処理しないとパフォーマンスが出ない。
そんなときに使えるのがeach_sliceです。

each_sliceは第一引数に渡した行数だけストリームから読み込むことができます。

ブロック引数のchunkは、第一引数に渡したnだけのサイズの配列になり、それぞれ1行ずつのデータが格納されています。

大変便利なeach_sliceですが、例えばRackのInput Stream仕様ではeach_sliceは定義されていません。

eachが実装されていれば大抵Enumerableがclassにincludeされているのでeach_sliceが使えることが多いのですが例外もあるということですね。

gzip展開しつつ1行ずつ読み込む

CRuby標準添付ライブラリーzlibに含まれているZlib::GzipReaderを使うと、gzip圧縮されたデータでも、少ないコードでgzip圧縮されたデータに対応できます。

io = Zlib::GzipReader.new(io)
io.each do |line|
  # do something
end

さらにCSVclassと組み合わせることもできます。

io = Zlib::GzipReader.new(io)
io = CSV.new(io)
io.each do |line|
  # do something
end

このような書き方が可能になっているのも、zlibやcsvと言ったライブラリがダックタイピングを意識して実装されているためです。

具体的には、CSV#eachではCSV#shiftでデータを1行ずつ読み込んでいますが、このCSV#shiftの実装ではラップしたオブジェクトのgetsメソッドによって行なわれています。
また、Zlib::GzipReader#getsメソッドでは、ラップしているオブジェクトのreadメソッドを利用しています。
このようにメソッド同士が協調しあっているので、表向きにはシンプルなAPIで実現できるわけですね。

さらにeacheach_sliceで置き換えることによって、gzipデータを展開しながら1000行ずつで区切ってCSVとしてパースなんてことも簡単に実現できます。

io = Zlib::GzipReader.new(io)
io = CSV.new(io)
io.each_slice(1000) do |chunk|
  # do something
end

IO.copy_stream

IO.copy_streamも便利なメソッドです。
引数にIOっぽいオブジェクトを渡すと、組み合わせによって最適化してストリームにデータをコピーすることができます。

データコピーだけだとあまり使いどころがないように思えるかもしれませんが、基本的に以下のような場合はIO.copy_streamに置き換えられます。

while buff = src.read(16 * 1024)
  dst.write(buff)
end
IO.copy_stream(src, dst)

IO.copy_streamは、第一引数は最低限readメソッドが、第二引数は最低限writeメソッドが実装されていれば利用できます。
内部で持つバッファーサイズは2.5.0次点では16KBで、変更できません。内部仕様なので変わらない保証もないでしょう。
ioのflushは自動でやってくれるので考えなくて大丈夫です。
書き込んだ後にまたデータを利用したい場合は、IOclass等はrewindが必要です。

ioをgzip展開しつつTempfileに書き込む

Tempfile.create("") do |dst|
  src = Zlib::GzipReader.new(io)
  IO.copy_stream(src, dst)
  dst.rewind
  # do something
end

ioをgzip圧縮しつつTempfileに保存する

Tempfile.create("") do |dst|
  dst = Zlib::GzipWriter.new(dst)
  IO.copy_stream(io, dst)
  dst.finish
  dst.rewind
  # do something
end

複数行分のデータをまとめて書き込む

io.puts array

IO#putsは引数にarrayを渡すと、arrayの1要素につき1行書き込んでくれます。
例えばeach_sliceと組み合わせて、10000行ずつ処理して結果を一気に書き込むことで、IO負荷を減らすことができます。

read_io.each_slice(10000) do |chunk|
  result = chunk.each_with_object([]) do |line, ary|
    # do something
  end
  write_io.puts result
end

どれぐらいの性能差があるのかも測ってみました。(CRuby v2.5.0)

Tempfile.create do |f|
  Benchmark.realtime do
    10000.times { f.puts("hello") }
    f.flush
  end
end
#=> 0.004009999975096434

Tempfile.create do |f|
  Benchmark.realtime do
    f.puts(["hello"] * 10000)
    f.flush
  end
end
#=> 0.0023689999943599105

番外編:メソッドの引数にIOっぽいオブジェクトをとるようにする

Rubyでストリーム処理する時は、引数にIOっぽいオブジェクトを取るメソッドを作れば、様々なオブジェクトを渡すことができるのでテストにも便利です。
ダックタイピングの恩恵を最大限利用しましょう。

# 実装
def copy(input, output)
  input.each do |line|
    output.puts line
  end
end

# 本来の用途
File.open("input.txt") do |input|
  File.open("output.txt", "w+") do |output|
    copy(input, output)
  end
end

# テスト
input = StringIO.new("aaa\nbbb\nccc\n")
output = StringIO.new
copy(input, output)
assert_equal "aaa\nbbb\nccc\n", output.string

まとめ

RubyのIOとはフレームワークだと思っています。
このフレームワークを覚えればさまざまな場面で応用が可能になり、複雑なはずのストリーム(IO)処理が簡単にかつ汎用的に書けるようになるでしょう。

楽しいストリーム処理ライフを。

46
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
46
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?