したいこと
Embulkを利用して大量データ(CSV)をMySQLに投入する
1. 準備
Quick Startを参考にEmbulkの導入
インストール
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x .embulk/bin/embulk
chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
サンプル作成
$ embulk example ./sample
Creating ./sample directory...
Creating ./sample/
Creating ./sample/csv/
Creating ./sample/csv/sample_01.csv.gz
Creating ./sample/example.yml
Embulk定義ファイル(config.yml)作成
$ embulk guess ./sample/example.yml -o ./sample/config.yml
$ cat ./sample/config.yml
in:
type: file
path_prefix: /home/<ユーザ>/sample/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}
サンプルプレビュー実行
$ embulk preview ./sample/config.yml
# ↓のようなプレビューが出力される
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+
サンプル実行
$ embulk run ./sample/config.yml
# ./sample/csv/sample_01.csv.gz の情報が出力される
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL
2. MySQLへデータ投入
MySQL用outputプラグイン
MySQL用outputプラグインをインストール
$ embulk gem install embulk-output-mysql
CSV
csvファイルを作成
$ vim ./sample/csv/bulk_01.csv.gz
$ cat ./sample/csv/bulk_01.csv.gz
col01,col02,col03,col04,col05,col06,col07,col08,col09,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20
"01","02","03","04","05","06","07","08","09","10","11","12","13","14","15","16","17","18","19","20"
DB
データベース作成
$ mysql -u root
mysql> CREATE DATABASE bulk_db CHARACTER SET utf8;
サンプルユーザ作成
mysql> GRANT ALL PRIVILEGES ON bulk_db.* TO bulk_user @localhost IDENTIFIED BY 'パスワード';
テーブル作成
mysql>
CREATE TABLE bulk_tbl (
col01 varchar(20),
col02 varchar(20),
col03 varchar(20),
col04 varchar(20),
col05 varchar(20),
col06 varchar(20),
col07 varchar(20),
col08 varchar(20),
col09 varchar(20),
col10 varchar(20),
col11 varchar(20),
col12 varchar(20),
col13 varchar(20),
col14 varchar(20),
col15 varchar(20),
col16 varchar(20),
col17 varchar(20),
col18 varchar(20),
col19 varchar(20),
col20 varchar(20)) CHARACTER SET utf8;
Embulk定義ファイル
csvファイルのパスを変更
path_prefix: /home/<ユーザ>/sample/csv/bulk_
使用するCSVではヘッダはないので一行目をスキップする記述を削除
skip_header_lines: 1
フィールド名を変更
columns:
- {name: col01, type: string}
- {name: col02, type: string}
- {name: col03, type: string}
- {name: col04, type: string}
- {name: col05, type: string}
- {name: col06, type: string}
- {name: col07, type: string}
- {name: col08, type: string}
- {name: col09, type: string}
- {name: col10, type: string}
- {name: col11, type: string}
- {name: col12, type: string}
- {name: col13, type: string}
- {name: col14, type: string}
- {name: col15, type: string}
- {name: col16, type: string}
- {name: col17, type: string}
- {name: col18, type: string}
- {name: col19, type: string}
- {name: col20, type: string}
ここを参考にMySQLへのoutputの定義を行う
out:
type: mysql
host: localhost
user: bulk_user
password: パスワード
database: bulk_db
table: bulk_tbl
mode: insert
完成形は以下のとおり
$ cat ./sample/config.yml
in:
type: file
path_prefix: /home/<ユーザ>/sample/csv/bulk_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
trim_if_not_quoted: false
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: col01, type: string}
- {name: col02, type: string}
- {name: col03, type: string}
- {name: col04, type: string}
- {name: col05, type: string}
- {name: col06, type: string}
- {name: col07, type: string}
- {name: col08, type: string}
- {name: col09, type: string}
- {name: col10, type: string}
- {name: col11, type: string}
- {name: col12, type: string}
- {name: col13, type: string}
- {name: col14, type: string}
- {name: col15, type: string}
- {name: col16, type: string}
- {name: col17, type: string}
- {name: col18, type: string}
- {name: col19, type: string}
- {name: col20, type: string}
out:
type: mysql
host: localhost
user: bulk_user
password: パスワード
database: bulk_db
table: bulk_tbl
mode: insert
データ投入
Embulkを実行し、データを投入してみる
$ embulk run ./sample/config.yml
テーブルにレコードが投入されているのを確認
mysql>
mysql> select * from bulk_tbl;
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| col01 | col02 | col03 | col04 | col05 | col06 | col07 | col08 | col09 | col10 | col11 | col12 | col13 | col14 | col15 | col16 | col17 | col18 | col19 | col20 |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
1 row in set (0.00 sec)
3. 大量データ投入
100万件のcsvを30ファイル作成
generate_csv.rb
require 'zlib'
class GenerateCsv
def run
for i in 1..30
file = "./csv/bulk_#{format('%02d',i)}.csv.gz"
data = ""
for j in 1..1000000
row = []
for k in 1..20
row.push("val#{i}_#{j}_#{k}")
end
data << row.join(",") + "\n"
end
_generate(file, data)
end
end
private
def _generate(file, data)
Zlib::GzipWriter.open(file) {|gz|
gz.puts data
}
end
instance = GenerateCsv.new
instance.run
end
Embulkを実行し、データを投入してみる
$ embulk run ./sample/config.yml
エラー発生。。。
EmbulkでMySQLに大量データを投入してみる - その2
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 2,839,153 milliseconds ago. The last packet sent successfully to the server was 2,839,151 milliseconds ago.
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331)
at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:529)
at org.embulk.exec.BulkLoader.access$100(org/embulk/exec/BulkLoader.java:36)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:342)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:338)
at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:338)
at org.embulk.command.Runner.run(org/embulk/command/Runner.java:149)
at org.embulk.command.Runner.main(org/embulk/command/Runner.java:101)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:483)
....