28
29

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

EmbulkでMySQLに大量データを投入してみる - その1

Last updated at Posted at 2015-08-01

したいこと

Embulkを利用して大量データ(CSV)をMySQLに投入する

1. 準備

Quick Startを参考にEmbulkの導入

インストール

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x .embulk/bin/embulk
chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

サンプル作成

$ embulk example ./sample
Creating ./sample directory...
  Creating ./sample/
  Creating ./sample/csv/
  Creating ./sample/csv/sample_01.csv.gz
  Creating ./sample/example.yml

Embulk定義ファイル(config.yml)作成

$ embulk guess ./sample/example.yml -o ./sample/config.yml

$ cat ./sample/config.yml 
in:
  type: file
  path_prefix: /home/<ユーザ>/sample/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

サンプルプレビュー実行

$ embulk preview ./sample/config.yml

# ↓のようなプレビューが出力される

+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                       NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+

サンプル実行

$ embulk run ./sample/config.yml

# ./sample/csv/sample_01.csv.gz の情報が出力される

1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL

2. MySQLへデータ投入

MySQL用outputプラグイン

MySQL用outputプラグインをインストール

$ embulk gem install embulk-output-mysql

CSV

csvファイルを作成

$ vim ./sample/csv/bulk_01.csv.gz
$ cat ./sample/csv/bulk_01.csv.gz
col01,col02,col03,col04,col05,col06,col07,col08,col09,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20
"01","02","03","04","05","06","07","08","09","10","11","12","13","14","15","16","17","18","19","20"

DB

データベース作成

$ mysql -u root
mysql> CREATE DATABASE bulk_db CHARACTER SET utf8;

サンプルユーザ作成

mysql> GRANT ALL PRIVILEGES ON bulk_db.* TO bulk_user @localhost IDENTIFIED BY 'パスワード';

テーブル作成

mysql>
CREATE TABLE bulk_tbl (
  col01 varchar(20),
  col02 varchar(20),
  col03 varchar(20),
  col04 varchar(20),
  col05 varchar(20),
  col06 varchar(20),
  col07 varchar(20),
  col08 varchar(20),
  col09 varchar(20),
  col10 varchar(20),
  col11 varchar(20),
  col12 varchar(20),
  col13 varchar(20),
  col14 varchar(20),
  col15 varchar(20),
  col16 varchar(20),
  col17 varchar(20),
  col18 varchar(20),
  col19 varchar(20),
  col20 varchar(20)) CHARACTER SET utf8;

Embulk定義ファイル

csvファイルのパスを変更

path_prefix: /home/<ユーザ>/sample/csv/bulk_

使用するCSVではヘッダはないので一行目をスキップする記述を削除

skip_header_lines: 1

フィールド名を変更

columns:
- {name: col01, type: string}
- {name: col02, type: string}
- {name: col03, type: string}
- {name: col04, type: string}
- {name: col05, type: string}
- {name: col06, type: string}
- {name: col07, type: string}
- {name: col08, type: string}
- {name: col09, type: string}
- {name: col10, type: string}
- {name: col11, type: string}
- {name: col12, type: string}
- {name: col13, type: string}
- {name: col14, type: string}
- {name: col15, type: string}
- {name: col16, type: string}
- {name: col17, type: string}
- {name: col18, type: string}
- {name: col19, type: string}
- {name: col20, type: string}

ここを参考にMySQLへのoutputの定義を行う

out:
  type: mysql
  host: localhost
  user: bulk_user
  password: パスワード
  database: bulk_db
  table: bulk_tbl
  mode: insert

完成形は以下のとおり

$ cat ./sample/config.yml
in:
  type: file
  path_prefix: /home/<ユーザ>/sample/csv/bulk_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    trim_if_not_quoted: false
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: col01, type: string}
    - {name: col02, type: string}
    - {name: col03, type: string}
    - {name: col04, type: string}
    - {name: col05, type: string}
    - {name: col06, type: string}
    - {name: col07, type: string}
    - {name: col08, type: string}
    - {name: col09, type: string}
    - {name: col10, type: string}
    - {name: col11, type: string}
    - {name: col12, type: string}
    - {name: col13, type: string}
    - {name: col14, type: string}
    - {name: col15, type: string}
    - {name: col16, type: string}
    - {name: col17, type: string}
    - {name: col18, type: string}
    - {name: col19, type: string}
    - {name: col20, type: string}
out:
  type: mysql
  host: localhost
  user: bulk_user
  password: パスワード
  database: bulk_db
  table: bulk_tbl
  mode: insert

データ投入

Embulkを実行し、データを投入してみる

$ embulk run ./sample/config.yml

テーブルにレコードが投入されているのを確認

mysql>
mysql> select * from bulk_tbl;
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| col01 | col02 | col03 | col04 | col05 | col06 | col07 | col08 | col09 | col10 | col11 | col12 | col13 | col14 | col15 | col16 | col17 | col18 | col19 | col20 |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| 01    | 02    | 03    | 04    | 05    | 06    | 07    | 08    | 09    | 10    | 11    | 12    | 13    | 14    | 15    | 16    | 17    | 18    | 19    | 20    |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
1 row in set (0.00 sec)

3. 大量データ投入

100万件のcsvを30ファイル作成

generate_csv.rb
require 'zlib'

class GenerateCsv

  def run
    for i in 1..30
      file = "./csv/bulk_#{format('%02d',i)}.csv.gz"
      data = ""
      for j in 1..1000000
        row = []
        for k in 1..20
          row.push("val#{i}_#{j}_#{k}")
        end
        data << row.join(",") + "\n"
      end
      _generate(file, data)
    end
  end

  private

  def _generate(file, data)
    Zlib::GzipWriter.open(file) {|gz|
      gz.puts data
    }
  end

instance = GenerateCsv.new
instance.run
end

Embulkを実行し、データを投入してみる

$ embulk run ./sample/config.yml

エラー発生。。。
EmbulkでMySQLに大量データを投入してみる - その2

org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 2,839,153 milliseconds ago.  The last packet sent successfully to the server was 2,839,151 milliseconds ago.
	at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331)
	at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:529)
	at org.embulk.exec.BulkLoader.access$100(org/embulk/exec/BulkLoader.java:36)
	at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:342)
	at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:338)
	at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
	at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:338)
	at org.embulk.command.Runner.run(org/embulk/command/Runner.java:149)
	at org.embulk.command.Runner.main(org/embulk/command/Runner.java:101)
	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:483)
....
28
29
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
28
29

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?