はじめに
とある理由でブロックチェーンのデータを読める形にしてCSV出力する機会があった.
その際にブロックのデータ、トランザクションのデータ...と順番に出力していたらとてつもなく時間がかかっていたので高速化することにした.
この記事はその時の対策の記録である.
同様の問題で困っている方の参考になれば嬉しい.
やりたいこと
1.ブロックチェーンデータの読み取り
2.データを加工してCSV出力 ← ここが今回の課題
3.CSVを使ってデータベースにインポート
結果
高速化前は2時間かかっていた処理が6分で終わるレベルまで高速化することができた.
ベースプログラム
実際に使っていたプログラムはそのまま載せるわけにはいかないので多少簡略化して載せる.
require 'csv'
block_file = CSV.open("block.csv", "w")
tx_file = CSV.open("tx.csv", "w")
tx_in_file = CSV.open("tx_in.csv", "w")
tx_out_file = CSV.open("tx_out.csv", "w")
blocks.each do |block|
block_file << [block.block_hash, block.header.version, block.header.merkle_root, block.header.time, block.header.bits, block.header.nonce, block.size, block.height, block.tx_count, block.input_count, block.output_count]
block.transactions.each do |tx|
tx_file << [tx.txid, tx.version, tx.marker, tx.flag, tx.lock_time]
tx.inputs.each do |tx_in|
tx_in_file << [tx_in.script_sig.to_hex, tx_in.script_witness.to_s, tx_in.sequence, tx_in.out_point.index, tx_in.out_point.hash]
end
tx.outputs.each_with_index do |tx_out, n|
tx_out_file << [tx_out.value, n, tx_out.script_pubkey.to_hex]
end
end
end
...
# file close 処理
上記のような感じのコードで出力を行なっていたがデータ量が多いと結構な時間がかかってしまう.
この出力部分がボトルネックになって全体の処理時間が延びていたため対策を考えることにした.
高速化のアプローチ
思いついた対策は次の3つ
- 記述を工夫して処理の高速化を計る
- 並列化
- 出力と入力を直接繋ぐ(pipeを使って出力と入力を同時にやる)
今回とったアプローチは次の2つ
- 記述を工夫して処理の高速化を計る
- 並列化
理由はベースプログラムの作り的に大きな変更を加えずに対応できそうだったため.
成果物
class CsvExport
attr_reader :block_file
attr_reader :tx_file
attr_reader :tx_in_file
attr_reader :tx_out_file
def initialize
@block_file = BlockFile.new
@tx_file = TxFile.new
@tx_in_file = TxInFile.new
@tx_out_file = TxOutFile.new
@block_data = []
@tx_data = []
@tx_in_data = []
@tx_out_data = []
end
def parallel_format_data(blocks)
Parallel.map(blocks, in_thread: 4, finish: -> (item, i, result) {
@block_data << result[0]
result[1].each{ |data| @tx_data << data }
result[2].each{ |data| @tx_in_data << data }
result[3].each{ |data| @tx_out_data << data }
}) do |block|
tx_data = []
tx_in_data = []
tx_out_data = []
block_node = [block.block_hash, block.header.version, block.header.merkle_root, block.header.time, block.header.bits, block.header.nonce, block.size, block.height, block.tx_count, block.input_count, block.output_count]
block.transactions.each do |tx|
tx_data << [tx.txid, tx.version, tx.marker, tx.flag, tx.lock_time]
tx.inputs.each do |tx_in|
tx_in_data << [tx_in.script_sig.to_hex, tx_in.script_witness.to_s, tx_in.sequence, tx_in.out_point.index, tx_in.out_point.hash]
end
tx.outputs.each_with_index do |tx_out, n|
tx_out_data << [tx_out.value, n, tx_out.script_pubkey.to_hex]
end
end
[block_data, tx_data, tx_in_data, tx_out_data]
end
end
def parallel_export
Parallel.map([[@block_file, @block_data], [@tx_file, @tx_data], [@tx_in_file, @tx_in_data], [@tx_out_file, @tx_out_data]], in_thread: 4
) do |file, datas|
file.export(datas)
end
end
def export(blocks)
parallel_format_data(blocks)
parallel_export
end
end
require 'csv'
class FileManager
attr_reader :file
attr_reader :dir
def initialize
@dir = Dir.pwd #CSV作成するディレクトリ指定
end
def path(file_name)
File.join(dir, file_name)
end
def open(file_name, mode = "r")
@file = CSV.open(path(file_name + ".csv"), mode, force_quotes: true)
end
def close
file.close
end
end
class BlockFile < FileManager
def initialize
super
end
def open(file_name, mode = "r")
super(file_name, mode)
end
def export(datas)
open("block", "w", header: true)
datas.each{ |data| file << data }
close
end
end
-
記述を工夫して処理の高速化を計る
ファイル操作は逐次処理より一括で処理した方が早いという記事[1]があったので適用 (効果が出たかは不明) -
並列化
gem 'parallel'
を使用
IOバウンドな処理と判断したのでスレッドを使って並列化した[2]
おまけ
parallelを使わない場合はThreadを使って書くことができる.
threads = []
threads << Thread.new {
# このスレッドでさせたい処理
}
threads.each {|t| t.join}
参考にした記事
[1]https://qiita.com/kotauchisunsun/items/84e01c6fb621fcc1a647
[2]https://qiita.com/yuroyoro/items/92c5bc864fa9c05127a9