EmbulkでMySQLに大量データを投入してみる - その1


したいこと

Embulkを利用して大量データ(CSV)をMySQLに投入する


1. 準備


Quick Startを参考にEmbulkの導入

インストール

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"

$ chmod +x .embulk/bin/embulk
chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

サンプル作成

$ embulk example ./sample

Creating ./sample directory...
Creating ./sample/
Creating ./sample/csv/
Creating ./sample/csv/sample_01.csv.gz
Creating ./sample/example.yml

Embulk定義ファイル(config.yml)作成

$ embulk guess ./sample/example.yml -o ./sample/config.yml

$ cat ./sample/config.yml
in:
type: file
path_prefix: /home/<ユーザ>/sample/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}

サンプルプレビュー実行

$ embulk preview ./sample/config.yml

# ↓のようなプレビューが出力される

+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+

サンプル実行

$ embulk run ./sample/config.yml

# ./sample/csv/sample_01.csv.gz の情報が出力される

1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL


2. MySQLへデータ投入


MySQL用outputプラグイン

MySQL用outputプラグインをインストール

$ embulk gem install embulk-output-mysql


CSV

csvファイルを作成

$ vim ./sample/csv/bulk_01.csv.gz

$ cat ./sample/csv/bulk_01.csv.gz
col01,col02,col03,col04,col05,col06,col07,col08,col09,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20
"01","02","03","04","05","06","07","08","09","10","11","12","13","14","15","16","17","18","19","20"


DB

データベース作成

$ mysql -u root

mysql> CREATE DATABASE bulk_db CHARACTER SET utf8;

サンプルユーザ作成

mysql> GRANT ALL PRIVILEGES ON bulk_db.* TO bulk_user @localhost IDENTIFIED BY 'パスワード';

テーブル作成

mysql>

CREATE TABLE bulk_tbl (
col01 varchar(20),
col02 varchar(20),
col03 varchar(20),
col04 varchar(20),
col05 varchar(20),
col06 varchar(20),
col07 varchar(20),
col08 varchar(20),
col09 varchar(20),
col10 varchar(20),
col11 varchar(20),
col12 varchar(20),
col13 varchar(20),
col14 varchar(20),
col15 varchar(20),
col16 varchar(20),
col17 varchar(20),
col18 varchar(20),
col19 varchar(20),
col20 varchar(20)) CHARACTER SET utf8;


Embulk定義ファイル

csvファイルのパスを変更

path_prefix: /home/<ユーザ>/sample/csv/bulk_

使用するCSVではヘッダはないので一行目をスキップする記述を削除



skip_header_lines: 1

フィールド名を変更

columns:

- {name: col01, type: string}
- {name: col02, type: string}
- {name: col03, type: string}
- {name: col04, type: string}
- {name: col05, type: string}
- {name: col06, type: string}
- {name: col07, type: string}
- {name: col08, type: string}
- {name: col09, type: string}
- {name: col10, type: string}
- {name: col11, type: string}
- {name: col12, type: string}
- {name: col13, type: string}
- {name: col14, type: string}
- {name: col15, type: string}
- {name: col16, type: string}
- {name: col17, type: string}
- {name: col18, type: string}
- {name: col19, type: string}
- {name: col20, type: string}

ここを参考にMySQLへのoutputの定義を行う

out:

type: mysql
host: localhost
user: bulk_user
password: パスワード
database: bulk_db
table: bulk_tbl
mode: insert

完成形は以下のとおり

$ cat ./sample/config.yml

in:
type: file
path_prefix: /home/<ユーザ>/sample/csv/bulk_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
trim_if_not_quoted: false
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: col01, type: string}
- {name: col02, type: string}
- {name: col03, type: string}
- {name: col04, type: string}
- {name: col05, type: string}
- {name: col06, type: string}
- {name: col07, type: string}
- {name: col08, type: string}
- {name: col09, type: string}
- {name: col10, type: string}
- {name: col11, type: string}
- {name: col12, type: string}
- {name: col13, type: string}
- {name: col14, type: string}
- {name: col15, type: string}
- {name: col16, type: string}
- {name: col17, type: string}
- {name: col18, type: string}
- {name: col19, type: string}
- {name: col20, type: string}
out:
type: mysql
host: localhost
user: bulk_user
password: パスワード
database: bulk_db
table: bulk_tbl
mode: insert


データ投入

Embulkを実行し、データを投入してみる

$ embulk run ./sample/config.yml

テーブルにレコードが投入されているのを確認

mysql>

mysql> select * from bulk_tbl;
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| col01 | col02 | col03 | col04 | col05 | col06 | col07 | col08 | col09 | col10 | col11 | col12 | col13 | col14 | col15 | col16 | col17 | col18 | col19 | col20 |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
1 row in set (0.00 sec)


3. 大量データ投入

100万件のcsvを30ファイル作成


generate_csv.rb

require 'zlib'

class GenerateCsv

def run
for i in 1..30
file = "./csv/bulk_#{format('%02d',i)}.csv.gz"
data = ""
for j in 1..1000000
row = []
for k in 1..20
row.push("val#{i}_#{j}_#{k}")
end
data << row.join(",") + "\n"
end
_generate(file, data)
end
end

private

def _generate(file, data)
Zlib::GzipWriter.open(file) {|gz|
gz.puts data
}
end

instance = GenerateCsv.new
instance.run
end


Embulkを実行し、データを投入してみる

$ embulk run ./sample/config.yml

エラー発生。。。

EmbulkでMySQLに大量データを投入してみる - その2

org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 2,839,153 milliseconds ago. The last packet sent successfully to the server was 2,839,151 milliseconds ago.
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331)
at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:529)
at org.embulk.exec.BulkLoader.access$100(org/embulk/exec/BulkLoader.java:36)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:342)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:338)
at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:338)
at org.embulk.command.Runner.run(org/embulk/command/Runner.java:149)
at org.embulk.command.Runner.main(org/embulk/command/Runner.java:101)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:483)
....