Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
29
Help us understand the problem. What is going on with this article?
@shinyashikis@github

EmbulkでMySQLに大量データを投入してみる - その1

More than 1 year has passed since last update.

したいこと

Embulkを利用して大量データ(CSV)をMySQLに投入する

1. 準備

Quick Startを参考にEmbulkの導入

インストール

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x .embulk/bin/embulk
chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

サンプル作成

$ embulk example ./sample
Creating ./sample directory...
  Creating ./sample/
  Creating ./sample/csv/
  Creating ./sample/csv/sample_01.csv.gz
  Creating ./sample/example.yml

Embulk定義ファイル(config.yml)作成

$ embulk guess ./sample/example.yml -o ./sample/config.yml

$ cat ./sample/config.yml 
in:
  type: file
  path_prefix: /home/<ユーザ>/sample/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

サンプルプレビュー実行

$ embulk preview ./sample/config.yml

# ↓のようなプレビューが出力される

+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                       NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+

サンプル実行

$ embulk run ./sample/config.yml

# ./sample/csv/sample_01.csv.gz の情報が出力される

1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL

2. MySQLへデータ投入

MySQL用outputプラグイン

MySQL用outputプラグインをインストール

$ embulk gem install embulk-output-mysql

CSV

csvファイルを作成

$ vim ./sample/csv/bulk_01.csv.gz
$ cat ./sample/csv/bulk_01.csv.gz
col01,col02,col03,col04,col05,col06,col07,col08,col09,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20
"01","02","03","04","05","06","07","08","09","10","11","12","13","14","15","16","17","18","19","20"

DB

データベース作成

$ mysql -u root
mysql> CREATE DATABASE bulk_db CHARACTER SET utf8;

サンプルユーザ作成

mysql> GRANT ALL PRIVILEGES ON bulk_db.* TO bulk_user @localhost IDENTIFIED BY 'パスワード';

テーブル作成

mysql>
CREATE TABLE bulk_tbl (
  col01 varchar(20),
  col02 varchar(20),
  col03 varchar(20),
  col04 varchar(20),
  col05 varchar(20),
  col06 varchar(20),
  col07 varchar(20),
  col08 varchar(20),
  col09 varchar(20),
  col10 varchar(20),
  col11 varchar(20),
  col12 varchar(20),
  col13 varchar(20),
  col14 varchar(20),
  col15 varchar(20),
  col16 varchar(20),
  col17 varchar(20),
  col18 varchar(20),
  col19 varchar(20),
  col20 varchar(20)) CHARACTER SET utf8;

Embulk定義ファイル

csvファイルのパスを変更

path_prefix: /home/<ユーザ>/sample/csv/bulk_

使用するCSVではヘッダはないので一行目をスキップする記述を削除

skip_header_lines: 1

フィールド名を変更

columns:
- {name: col01, type: string}
- {name: col02, type: string}
- {name: col03, type: string}
- {name: col04, type: string}
- {name: col05, type: string}
- {name: col06, type: string}
- {name: col07, type: string}
- {name: col08, type: string}
- {name: col09, type: string}
- {name: col10, type: string}
- {name: col11, type: string}
- {name: col12, type: string}
- {name: col13, type: string}
- {name: col14, type: string}
- {name: col15, type: string}
- {name: col16, type: string}
- {name: col17, type: string}
- {name: col18, type: string}
- {name: col19, type: string}
- {name: col20, type: string}

ここを参考にMySQLへのoutputの定義を行う

out:
  type: mysql
  host: localhost
  user: bulk_user
  password: パスワード
  database: bulk_db
  table: bulk_tbl
  mode: insert

完成形は以下のとおり

$ cat ./sample/config.yml
in:
  type: file
  path_prefix: /home/<ユーザ>/sample/csv/bulk_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    trim_if_not_quoted: false
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: col01, type: string}
    - {name: col02, type: string}
    - {name: col03, type: string}
    - {name: col04, type: string}
    - {name: col05, type: string}
    - {name: col06, type: string}
    - {name: col07, type: string}
    - {name: col08, type: string}
    - {name: col09, type: string}
    - {name: col10, type: string}
    - {name: col11, type: string}
    - {name: col12, type: string}
    - {name: col13, type: string}
    - {name: col14, type: string}
    - {name: col15, type: string}
    - {name: col16, type: string}
    - {name: col17, type: string}
    - {name: col18, type: string}
    - {name: col19, type: string}
    - {name: col20, type: string}
out:
  type: mysql
  host: localhost
  user: bulk_user
  password: パスワード
  database: bulk_db
  table: bulk_tbl
  mode: insert

データ投入

Embulkを実行し、データを投入してみる

$ embulk run ./sample/config.yml

テーブルにレコードが投入されているのを確認

mysql>
mysql> select * from bulk_tbl;
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| col01 | col02 | col03 | col04 | col05 | col06 | col07 | col08 | col09 | col10 | col11 | col12 | col13 | col14 | col15 | col16 | col17 | col18 | col19 | col20 |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| 01    | 02    | 03    | 04    | 05    | 06    | 07    | 08    | 09    | 10    | 11    | 12    | 13    | 14    | 15    | 16    | 17    | 18    | 19    | 20    |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
1 row in set (0.00 sec)

3. 大量データ投入

100万件のcsvを30ファイル作成

generate_csv.rb
require 'zlib'

class GenerateCsv

  def run
    for i in 1..30
      file = "./csv/bulk_#{format('%02d',i)}.csv.gz"
      data = ""
      for j in 1..1000000
        row = []
        for k in 1..20
          row.push("val#{i}_#{j}_#{k}")
        end
        data << row.join(",") + "\n"
      end
      _generate(file, data)
    end
  end

  private

  def _generate(file, data)
    Zlib::GzipWriter.open(file) {|gz|
      gz.puts data
    }
  end

instance = GenerateCsv.new
instance.run
end

Embulkを実行し、データを投入してみる

$ embulk run ./sample/config.yml

エラー発生。。。
EmbulkでMySQLに大量データを投入してみる - その2

org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 2,839,153 milliseconds ago.  The last packet sent successfully to the server was 2,839,151 milliseconds ago.
    at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331)
    at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:529)
    at org.embulk.exec.BulkLoader.access$100(org/embulk/exec/BulkLoader.java:36)
    at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:342)
    at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:338)
    at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
    at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:338)
    at org.embulk.command.Runner.run(org/embulk/command/Runner.java:149)
    at org.embulk.command.Runner.main(org/embulk/command/Runner.java:101)
    at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:483)
....
29
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
churadata
沖縄で データ分析 / 機械学習 / Deep Learning をやっている会社です

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
29
Help us understand the problem. What is going on with this article?