Ruby

Rubyで大規模なデータを高速にCSV出力する

はじめに

とある理由でブロックチェーンのデータを読める形にしてCSV出力する機会があった.
その際にブロックのデータ、トランザクションのデータ...と順番に出力していたらとてつもなく時間がかかっていたので高速化することにした.
この記事はその時の対策の記録である.
同様の問題で困っている方の参考になれば嬉しい.

やりたいこと

1.ブロックチェーンデータの読み取り
2.データを加工してCSV出力 ← ここが今回の課題
3.CSVを使ってデータベースにインポート

結果

高速化前は2時間かかっていた処理が6分で終わるレベルまで高速化することができた.

ベースプログラム

実際に使っていたプログラムはそのまま載せるわけにはいかないので多少簡略化して載せる.

csv_export.rb
require 'csv'

block_file = CSV.open("block.csv", "w")
tx_file = CSV.open("tx.csv", "w")
tx_in_file = CSV.open("tx_in.csv", "w")
tx_out_file = CSV.open("tx_out.csv", "w")
blocks.each do |block|
  block_file << [block.block_hash, block.header.version, block.header.merkle_root, block.header.time, block.header.bits, block.header.nonce, block.size, block.height, block.tx_count, block.input_count, block.output_count]

  block.transactions.each do |tx|
    tx_file << [tx.txid, tx.version, tx.marker, tx.flag, tx.lock_time]

    tx.inputs.each do |tx_in|
      tx_in_file << [tx_in.script_sig.to_hex, tx_in.script_witness.to_s, tx_in.sequence, tx_in.out_point.index, tx_in.out_point.hash]
    end

    tx.outputs.each_with_index do |tx_out, n|
      tx_out_file << [tx_out.value, n, tx_out.script_pubkey.to_hex]
    end
  end
end

...
# file close 処理

上記のような感じのコードで出力を行なっていたがデータ量が多いと結構な時間がかかってしまう.
この出力部分がボトルネックになって全体の処理時間が延びていたため対策を考えることにした.

高速化のアプローチ

思いついた対策は次の3つ
* 記述を工夫して処理の高速化を計る
* 並列化
* 出力と入力を直接繋ぐ(pipeを使って出力と入力を同時にやる)

今回とったアプローチは次の2つ
* 記述を工夫して処理の高速化を計る
* 並列化

理由はベースプログラムの作り的に大きな変更を加えずに対応できそうだったため.

成果物

csv_export.rb
class CsvExport
  attr_reader :block_file
  attr_reader :tx_file
  attr_reader :tx_in_file
  attr_reader :tx_out_file

  def initialize
    @block_file = BlockFile.new
    @tx_file = TxFile.new
    @tx_in_file = TxInFile.new
    @tx_out_file = TxOutFile.new
    @block_data = []
    @tx_data = []
    @tx_in_data = []
    @tx_out_data = []
  end

  def parallel_format_data(blocks)
    Parallel.map(blocks, in_thread: 4, finish: -> (item, i, result) {
      @block_data << result[0]
      result[1].each{ |data| @tx_data << data }
      result[2].each{ |data| @tx_in_data << data }
      result[3].each{ |data| @tx_out_data << data }
    }) do |block|
      tx_data = []
      tx_in_data = []
      tx_out_data = []
      block_node = [block.block_hash, block.header.version, block.header.merkle_root, block.header.time, block.header.bits, block.header.nonce, block.size, block.height, block.tx_count, block.input_count, block.output_count]

      block.transactions.each do |tx|
        tx_data << [tx.txid, tx.version, tx.marker, tx.flag, tx.lock_time]

        tx.inputs.each do |tx_in|
          tx_in_data << [tx_in.script_sig.to_hex, tx_in.script_witness.to_s, tx_in.sequence, tx_in.out_point.index, tx_in.out_point.hash]
        end

        tx.outputs.each_with_index do |tx_out, n|
          tx_out_data << [tx_out.value, n, tx_out.script_pubkey.to_hex]
        end
      end
      [block_data, tx_data, tx_in_data, tx_out_data]
    end
  end

  def parallel_export
    Parallel.map([[@block_file, @block_data], [@tx_file, @tx_data], [@tx_in_file, @tx_in_data], [@tx_out_file, @tx_out_data]], in_thread: 4
    ) do |file, datas|
      file.export(datas)
    end
  end

  def export(blocks)
    parallel_format_data(blocks)
    parallel_export
  end
end
file_manager.rb
require 'csv'

class FileManager
  attr_reader :file
  attr_reader :dir

  def initialize
    @dir = Dir.pwd #CSV作成するディレクトリ指定
  end

  def path(file_name)
    File.join(dir, file_name)
  end

  def open(file_name, mode = "r")
    @file = CSV.open(path(file_name + ".csv"), mode, force_quotes: true)
  end

  def close
    file.close
  end
end
block_file.rb(tx_file等も同様)
class BlockFile < FileManager

  def initialize
    super
  end

  def open(file_name, mode = "r")
    super(file_name, mode)
  end

  def export(datas)
    open("block", "w", header: true)
    datas.each{ |data| file << data }
    close
  end
end
  • 記述を工夫して処理の高速化を計る
    ファイル操作は逐次処理より一括で処理した方が早いという記事[1]があったので適用 (効果が出たかは不明)

  • 並列化
    gem 'parallel'を使用
    IOバウンドな処理と判断したのでスレッドを使って並列化した[2]

おまけ

parallelを使わない場合はThreadを使って書くことができる.

threads = []
threads << Thread.new {
  # このスレッドでさせたい処理
}
threads.each {|t| t.join}

参考にした記事

[1]https://qiita.com/kotauchisunsun/items/84e01c6fb621fcc1a647
[2]https://qiita.com/yuroyoro/items/92c5bc864fa9c05127a9