Embulk

Embulkについてまとめてみた 2017/08

Embulkとは 〜Pluggable Bulk Data Loader〜

  • 並列データ転送ツール
  • Fluentd開発者 古橋氏が開発
  • Fluentdのバッチ版
  • プラグインアーキテクチャ

Embulkの概念図

https://www.slideshare.net/frsyuki/embuk-making-data-integration-works-relaxed/12

特徴その1

  • プラグインが多数用意されている、かつ日々増殖
  • 独自プラグインの簡単実装追加可能
  • リトライとレジューム
    • Embulkでは失敗したタスクだけを後からやり直すリジューム機能
  • オープンソース(Gitに公開)
  • guess機能
    • 入力データを自動で推定し、設定ファイルを生成

特徴その2〜Fluentdとの違い

バッチ(バルク)処理に特化

  • fluentdstream, embulkstorage
  • 巨大なデータに対応(並列分散処理)
  • 高速性
  • トランザクション制御あり
    • すべて成功しなかったら、実行前の状態に巻き戻る
  • スキーマを使ったデータのバリデーション
  • 実行はコマンド実行(cronなどでも)

Embulkの使い方

  1. 必要最低限の設定ファイルを作成
    • seed.ymlの作成
  2. データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成
    • embulk guess seed.yml -o config.yml
  3. config.ymlを編集する
    • プラグインの設定を追加(FilterとかOutputとか)
  4. dryrun
    • embulk preview config.yml
    • エラーが起きれば、 config.yml の修正などを行う
  5. 実行
    • embulk run config.yml

プラグイン種類

  1. Input plugin
  2. Output plugin
  3. Filter plugin
  4. File parser plugin
  5. File decoder plugin
  6. File formatter plugin
  7. File encoder plugin
  8. Executor plugin

Input Plugin

  • RDBS ( mysql, postgres, jdbc... )
  • NoSQL ( redis, mongodb)
  • Cloud Service (redshift, s3 )
  • Files (CSV, JSON ...)
  • Etc ( hdfs, http, elastic search, slack-history, google analitics )

Output Plugin

  • RDBS ( mysql, postgres, oracle, jdbc...)
  • Cloud Service ( redshift, s3, bigquery)
  • NoSQL ( redis, hdfs )
  • Files
  • Etc ( elastic search, hdfs, swift)

Filter Plugin

  • column (カラムを削る)
  • insert 指定した場所にホスト名などのカラム追加する
  • row 所定の条件に合致するローのみ抽出する
  • rearrange 一行のデータを複数行に再構成する

File parser Plugin

  • json
  • xml
  • csv
  • apache log
  • query_string
  • regex

File formatter Plugin

  • json
    • レコードの内容をjsonl(1 json 1行)の形式に整形するプラグイン
  • poi_excel
    • Excel(xls,xlsx)形式のデータに変換するプラグイン

Executor Plugin

  • mapreduce
    • EmbulkのタスクをHadoop上で実行するためのプラグイン

作業環境

  • Java1.8
  • MySQL Ver 14.14 Distrib 5.7.11, for osx10.10 (x86_64) using EditLine wrapper
  • Embulk 0.8.9
  • MacBook Pro (Retina, 15-inch, Mid 2015)
    • プロセッサ:2.2 GHz Intel Core i7
    • メモリ:16 GB 1600 MHz DDR3
    • macOS Sieera (バージョン:10.12.2(16C67))

インストール

公式

予め、Javaのインストールを行っておく

curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

ターミナル閉じるたびにsourceしなきゃいけないって方は .bash_profile に以下を追加したら幸せになります

.bash_profile
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi

Homebrewを使った方法

$ brew install embulk

バージョンの確認

$ embulk --version
embulk 0.8.9

実行例1) CSVからMySQLに大量データを投入してみる

参考: EmbulkでMySQLに大量データを投入してみる - その1
livedoor グルメの研究用データセットを使い、口コミのデータを MySQL に投入する
https://github.com/livedoor/datasets
約20万件のCSV形式のファイル
ldgourmet.tar.gzをダウンロードし、解凍。この中のratings.csvのみを使用

$ wget -O test.tar.gz https://github.com/livedoor/datasets/blob/master/ldgourmet.tar.gz?raw=true
$ tar xvfz 20161219_ldgourmet.tar.gz
x areas.csv
x categories.csv
x prefs.csv
x ratings.csv
x rating_votes.csv
x restaurants.csv
x stations.csv

ただし、上記の ratings.csv では、日時フォーマットが不正( 0000−00−00 00:00:00 があり、パースに失敗する)があるので、

$ sed -i.bak -e 's/0000-00-00 00:00:00/2000-01-01 09:00:00/g' ratings.csv

とする方法の他に、config.ymlparserオプションで null_string: 0000-00-00 00:00:00 を設定する方法で対応もできる。そちらは、別途記載を行っているのでそちらを参考にすること。

設定ファイルを自動生成

$ embulk guess seed.yml -o config.yml

設定ファイルを自動生成するコマンド
コマンドを使って、config.yml を作成

自動生成するにしても最低限の情報がなければいけないので、
seed.yml ファイルに次のように記述

in:
  type: file
  path_prefix: "./ratings.csv"
out:
  type: stdout
$ embulk guess seed.yml -o config.yml
2016-09-05 16:30:57.386 +0000: Embulk v0.8.9
2016-09-05 16:30:58.470 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-09-05 16:30:58.474 +0000 [INFO] (0001:guess): Loading files [ratings.csv]
2016-09-05 16:30:58.620 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-09-05 16:30:58.630 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-09-05 16:30:58.645 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-09-05 16:30:58.653 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./ratings.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: restaurant_id, type: long}
    - {name: user_id, type: string}
    - {name: total, type: long}
    - {name: food, type: long}
    - {name: service, type: long}
    - {name: atmosphere, type: long}
    - {name: cost_performance, type: long}
    - {name: title, type: string}
    - {name: body, type: string}
    - {name: purpose, type: long}
    - {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}
Created 'config.yml' file.
in:
  type: file
  path_prefix: ./ratings.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: restaurant_id, type: long}
    - {name: user_id, type: string}
    - {name: total, type: long}
    - {name: food, type: long}
    - {name: service, type: long}
    - {name: atmosphere, type: long}
    - {name: cost_performance, type: long}
    - {name: title, type: string}
    - {name: body, type: string}
    - {name: purpose, type: long}
    - {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}
156445,310595,ee02f26a,5,0,0,0,0,,"名前は忘れましたが、札幌で食べたお店よりも、全然こっちの方が美味しかったので、載せました。お店も綺麗(新規オープン・・)でランチは結構混んでいます。個人
的にはゆったりと食事できるので夜の方がオススメです。  辛さが0倍から50倍まで選べるのもGOOD!、スープも2種類みたいで、友達は黄色がオススメと言っていましたが、自分は赤の方を食べました。かなり美味しかったです。店長も好感のもてるお兄さんでした。  駅近くなので一度お試しあれです!",0,"2006-10-07 05:06:09"

↓↓↓↓↓↓↓↓↓↓

+--------+---------------+----------+-------+---------------------+
| id     | restaurant_id | user_id  | total | created_on          |
+--------+---------------+----------+-------+---------------------+
| 156445 |        310595 | ee02f26a |     5 | 2006-10-07 14:06:09 |
+--------+---------------+----------+-------+---------------------+

このままだと、TIMEZONEが default:UTC なので、設定を追加する

in:
(略)
parser:
    (略)
    allow_optional_columns: false
    default_timezone: Asia/Tokyo
    columns:
    - {name: id, type: long}
    (略)
mysql> select * from ratings where id = 156445;
+--------+---------------+----------+-------+---------------------+
| id     | restaurant_id | user_id  | total | created_on          |
+--------+---------------+----------+-------+---------------------+
| 156445 |        310595 | ee02f26a |     5 | 2006-10-07 05:06:09 |
+--------+---------------+----------+-------+---------------------+

日時が、 0000-00-00 00:00:00 だと、デフォルトだとCSVのParseに失敗し、

2016-12-19 18:47:24.258 +0900 [WARN] (0018:task-0000): Skipped line 201422 (org.embulk.spi.time.TimestampParseException: 
Failed to parse '0000-00-00 00:00:00'): 
126548,304,d55363c1,5,0,0,0,0,,"道玄坂中腹に、風俗店ひしめく妖しいエリアがあります。渋谷百軒店商店街です。昼間でも入り込むのをためらってしまうようなその中心部に、ひっそりと昔からそのままの姿を変えずに営業している、老舗のカレー店が、このムルギーです。    秘伝の味を守るのは、今にも倒れてしまいそうに腰の曲がったお婆さん。何十年もかけて、鍋に継ぎ足して作り続けているそのカレーは、日々進化し続け、複雑で奥深い味となっています。    天を突き抜けるように高く盛り上げられたご飯は、そびえ立つ山のよう。そして、その麓には無限に広がる大海のようにカレーがかかっています。一口噛みしめるごとに、その長い蓄積による歴史の重みが胃袋に流れ込み、時の流れより解き放たれ、遥か彼方へのタイムトリップをしているようです。    どうやら、無事に跡継ぎの娘さんがいてくれたようで、今後も新たな未来へと歴史は続いていくことになりそう。安心です。",0,"0000-00-00 00:00:00"

このようにSKIPしてしまう

in:
(略)
parser:
    (略)
    default_timezone: Asia/Tokyo
    null_string: 0000-00-00 00:00:00
    columns:
    - {name: id, type: long}
    (略)

null_string: 0000-00-00 00:00:00 を追加するとSKIPせず取り込まれる。

mysql> select * from ratings where id = 126223;
+--------+---------------+----------+-------+---------------------+
| id     | restaurant_id | user_id  | total | created_on          |
+--------+---------------+----------+-------+---------------------+
| 126223 |         16360 | 1a12db7d |     3 | 2016-12-19 18:54:35 |
+--------+---------------+----------+-------+---------------------+
1 row in set (0.13 sec)

mysql> desc ratings;
+---------------+------------+------+-----+-------------------+-----------------------------+
| Field         | Type       | Null | Key | Default           | Extra                       |
+---------------+------------+------+-----+-------------------+-----------------------------+
| id            | bigint(20) | YES  |     | NULL              |                             |
| restaurant_id | bigint(20) | YES  |     | NULL              |                             |
| user_id       | text       | YES  |     | NULL              |                             |
| total         | bigint(20) | YES  |     | NULL              |                             |
| created_on    | timestamp  | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.00 sec)

embulk-filter-column プラグイン

このまま MySQL にデータを取り込んでも良いのですが、今回は必要なカラムを絞り、 id, restaurant_id, user_id, total, created_on の5つのみのデータを扱うことにしたいと思います。

まずは プラグインのインストール

$ embulk gem install embulk-filter-column
in:
(略)
filters:
  - type: column
    columns:
      - {name: 'id'}
      - {name: 'restaurant_id'}
      - {name: 'user_id'}
      - {name: 'total'}
      - {name: 'created_on'}
out: {type: stdout}

MySQLへ取り込む

embulk-output-mysqlプラグインのインストール

$ embulk gem install embulk-output-mysql

MySQLにテーブルを作成

GRANT ALL PRIVILEGES ON *.* to embulk@localhost IDENTIFIED BY '********';
CREATE DATABASE testdb;

config.xmlにMySQLプラグイン情報を追記

out:
  type: mysql
  host: localhost
  user: embulk
  password: ********
  database: testdb
  table: ratings
  mode: replace

mode という部分は insertinsert_direct, truncate_insert, merge, merge_direct, replace も選択することが可能。

mode 挙動
insert 書込み(PRIMARY制約に引っかかると、全部失敗)
insert_direct 書込み(PRIMARY制約に引っかかると、全部失敗)
truncate_insert 置き換え、テーブル定義は変わらない
merge 上書き更新、追加
merge_direct 上書き更新、追加
replace 置き換え、テーブル定義は変更できる、 config.ymlの定義次第

データベースは事前に作成しておく必要がありますが、テーブルやスキーマの作成は Embulk 側で自動で作成してくれるので、事前に作成する必要はありません(ただ、INDEXなどは張らないといけないから自分で作っておいたほうが良い)

out:
  type: mysql
  host: localhost
  user: embulk
  password: ********
  database: testdb
  table: ratings
  mode: replace
   column_options:
      id: {type: INT NOT NULLPRIMARY KEY}
      restaurant_id: {type: INT DEFAULT 0}
      user_id: {type: varchar(255) DEFAULT NULL}
      total {type: INT}
      created_on: {type: TIMESTAMP}

というようにテーブルの定義を記載することも可能

DROP TABLE IF EXISTS `ratings`;
CREATE TABLE `ratings` (
  `id` INT DEFAULT 0,
  `restaurant_id` INT DEFAULT 0,
  `user_id` varchar(255) DEFAULT NULL,
  `total` INT DEFAULT 0,
  `created_on` datetime NOT NULL default '1970-01-01 00:00:00'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 MAX_ROWS=1000000000 AVG_ROW_LENGTH=200;
mysql> desc ratings;
+---------------+--------------+------+-----+---------------------+-------+
| Field         | Type         | Null | Key | Default             | Extra |
+---------------+--------------+------+-----+---------------------+-------+
| id            | int(11)      | YES  |     | 0                   |       |
| restaurant_id | int(11)      | YES  |     | 0                   |       |
| user_id       | varchar(255) | YES  |     | NULL                |       |
| total         | int(11)      | YES  |     | 0                   |       |
| created_on    | datetime     | NO   |     | 1970-01-01 00:00:00 |       |
+---------------+--------------+------+-----+---------------------+-------+
5 rows in set (0.00 sec)

テーブルを予め作成しない場合

mysql> desc ratings;
+---------------+------------+------+-----+-------------------+-----------------------------+
| Field         | Type       | Null | Key | Default           | Extra                       |
+---------------+------------+------+-----+-------------------+-----------------------------+
| id            | bigint(20) | YES  |     | NULL              |                             |
| restaurant_id | bigint(20) | YES  |     | NULL              |                             |
| user_id       | text       | YES  |     | NULL              |                             |
| total         | bigint(20) | YES  |     | NULL              |                             |
| created_on    | timestamp  | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.00 sec)

Embulkが config.yml を元に、テーブルを自動的に作成されますが、カラムの型やDefault、Indexを決めるので、しっかりとした運用をする場合は、自分でTable定義をしっかりつめて、作成した方がいい。

テーブル定期もある程度はできるのでそちらに記載しておくのも良い。
複数のカラムを対象にユニークキー制約は記載はできないと思われる。

Embulkの実行

dryrun
$ embulk preview config.yml

で、dryrun
エラーが起きれば、原因を調査し、config.ymlなどを修正

$ embulk run config.yml

で、実行。20万件のインストールが30秒程度で出来ました

$ mysql testdb
mysql> select count(*) from ratings;
+----------+
| count(*) |
+----------+
|   205832 |
+----------+

mysql> select * from ratings limit 10;
+--------+---------------+----------+-------+---------------------+
| id     | restaurant_id | user_id  | total | created_on          |
+--------+---------------+----------+-------+---------------------+
| 156445 |        310595 | ee02f26a |     5 | 2006-10-07 14:06:09 |
|   3842 |         10237 | fcc21401 |     1 | 2004-10-20 09:34:28 |
| 144379 |          3334 | 06412af7 |     2 | 2006-06-04 01:07:43 |
| 144377 |         15163 | 06412af7 |     5 | 2006-06-04 00:14:45 |
|  75967 |           567 | 4ceec99d |     3 | 2004-12-02 08:12:29 |
| 104898 |          1026 | 4ceec99d |     5 | 2005-01-04 12:57:02 |
|  86073 |          1058 | 4ceec99d |     5 | 2004-11-09 09:34:17 |
|  13968 |          2569 | 4ceec99d |     3 | 2004-09-23 08:29:57 |
|  97833 |          3309 | 4ceec99d |     4 | 2005-05-29 08:17:16 |
|  13991 |          3648 | 4ceec99d |     4 | 2004-09-27 20:14:50 |
+--------+---------------+----------+-------+---------------------+
10 rows in set (0.01 sec)

実行例2 CSVをPostgreSQLに大量データを投入してみる

参考: EmbulkでMySQLに大量データを投入してみる - その1
実行例1と同様に、livedoor グルメの研究用データセットを使い、口コミのデータを PostgreSQL に投入する
https://github.com/livedoor/datasets
約20万件のCSV形式のファイル
ldgourmet.tar.gzをダウンロードし、解凍。この中のratings.csvのみを使用

Postgresはローカルにインストール済みとする

seed_csv2postgres.ymlの作成

seed_csv2postgres.yml
in:
  type: file
  path_prefix: "./ratings.csv"
out:
  type: stdout

Posgres Output Pluginのインストールと設定の追加

$ embulk gem install embulk-output-postgresql
2016-12-20 19:08:11.615 +0900: Embulk v0.8.9
Fetching: embulk-output-postgresql-0.7.2.gem (100%)
Successfully installed embulk-output-postgresql-0.7.2
1 gem installed

参考)PostgreSQLに関連するEmbulkのPlugins

  • embulk-output-postgresql PostgreSQLのテーブルへデータを出力するJDBCプラグイン
  • embulk-output-postgres-json PostgreSQLのJSONカラムへデータを出力するプラグイン
  • embulk-output-postgres-udf PostgreSQLのユーザー定義関数を実行するプラグイン
  • embulk-input-postgresql PostgreSQLのテーブルからデータを取得するJDBCプラグイン

postgreSQLのインストールと設定(参考までに)

PostgreSQLのインストールと起動
brew install postgres
postgres -D /usr/local/var/postgres &

$ postgres -D /usr/local/var/postgres
LOG:  database system was shut down at 2016-12-19 16:40:42 JST
LOG:  MultiXact member wraparound protections are now enabled
LOG:  database system is ready to accept connections
LOG:  autovacuum launcher started
$ createdb testdb
$ psql testdb
psql (9.6.1)
Type "help" for help.

testdb=#
postgres=# CREATE ROLE embulk LOGIN PASSWORD '*******';
テーブル作成
DROP TABLE IF EXISTS ratings;
CREATE TABLE ratings (
  id INT DEFAULT 0,
  restaurant_id INT DEFAULT 0,
  user_id varchar(255) DEFAULT NULL,
  total INT DEFAULT 0,
  created_on timestamp NOT NULL default '1970-01-01 00:00:00'
);
権限の付与
testdb=# grant all on ratings to embulk;
GRANT
testdb=# \z
                                Access privileges
 Schema |  Name   | Type  |   Access privileges   | Column privileges | Policies
--------+---------+-------+-----------------------+-------------------+----------
 public | ratings | table | a11052=arwdDxt/a11052+|                   |
        |         |       | embulk=arwdDxt/a11052 |                   |
(1 row)
testdb=# \d ratings
                                              Table "public.ratings"
    Column     |            Type             |                              Modifiers
---------------+-----------------------------+---------------------------------------------------------------------
 id            | integer                     | default 0
 restaurant_id | integer                     | default 0
 user_id       | character varying(255)      | default NULL::character varying
 total         | integer                     | default 0
 created_on    | timestamp without time zone | not null default '1970-01-01 00:00:00'::timestamp without time zone

config_csv2postgres.ymlの作成

設定ファイルの雛形作成

$ embulk guess seed_csv2postgres.yml -o config_csv2postgres.yml

日時の 0000-00-00 00:00:00 へのParseError対応のために

$ diff config_csv2postgres.yml.genarated config_csv2postgres.yml
14a15
>     null_string: 0000-00-00 00:00:00

PostgreSQLの出力の設定を追加

out:
  type: postgresql
  host: localhost
  user: embulk
  password: "*********"
  database: testdb
  table: ratings
  mode: insert_direct

modeは

mode 説明
insert 幾つかの中間テーブルに一旦書き出した後に、INSERT。Resume可能
insert_direct 直接TABLEにINSERT。Resume不可
truncate_insert mode:insertと挙動は一緒、ただしそれまでのTableの内容は破棄される。Resume可
replace 一旦中間テーブルに書き出し、無事書込みが完了したら、以前のTableをDROPし、中間テーブルの名前をRenameします。Resume不可
merge 一旦、中間テーブルにINSERT、そちらが正常に書き込まれれば、既存のテーブルに追記。既に存在するデータに関しては、Updateされる。(MySQLでいうところの、REPLACEな挙動)

の5modeがある。

FilterPluginを用いて、DBへ投入する項目を5つ抜き出す

in:
  (略)
filters:
  - type: column
    columns:
      - {name: 'id'}
      - {name: 'restaurant_id'}
      - {name: 'user_id'}
      - {name: 'total'}
      - {name: 'created_on'}
out:
  (略)

embulk preview config_csv2postgres.yml

$ embulk preview config_csv2postgres.yml
2016-12-21 15:49:17.779 +0900: Embulk v0.8.9
2016-12-21 15:49:18.639 +0900 [INFO] (0001:preview): Loaded plugin embulk-filter-column (0.6.0)
2016-12-21 15:49:18.654 +0900 [INFO] (0001:preview): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-12-21 15:49:18.660 +0900 [INFO] (0001:preview): Loading files [ratings.csv]
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long |    created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 |            310,595 |       ee02f26a |          5 | 2006-10-07 05:06:09 UTC |
|   3,842 |             10,237 |       fcc21401 |          1 | 2004-10-20 00:34:28 UTC |

UTCではなく、TimeZoneはTokyoにするので

config_csv2postgres.yml
in:
  type: file
  path_prefix: ./ratings.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    default_timezone: Asia/Tokyo
    null_string: 0000-00-00 00:00:00
    columns:
    - {name: id, type: long}
    (略)
Dryrun
$ embulk preview config_csv2postgres.yml
2016-12-21 15:51:29.392 +0900: Embulk v0.8.9
2016-12-21 15:51:30.245 +0900 [INFO] (0001:preview): Loaded plugin embulk-filter-column (0.6.0)
2016-12-21 15:51:30.263 +0900 [INFO] (0001:preview): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-12-21 15:51:30.267 +0900 [INFO] (0001:preview): Loading files [ratings.csv]
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long |    created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 |            310,595 |       ee02f26a |          5 | 2006-10-06 20:06:09 UTC |

embulk run config_csv2postgres.yml

テーブルの確認(前)
testdb=# select * from ratings;
 id | restaurant_id | user_id | total | created_on
----+---------------+---------+-------+------------
(0 rows)
実行
$ embulk run config_csv2postgres.yml
2016-12-21 15:54:21.597 +0900 [ERROR] (0018:task-0000): Operation failed (0:23502)
2016-12-21 15:54:21.603 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: org.postgresql.util.PSQLException: ERROR: null value in column "created_on" violates not-null constraint
  詳細: Failing row contains (137714, 302419, 0826905a, 4, null).
  場所: COPY ratings, line 7312: "137714  302419  0826905a    4   \N"

Column: create_on の定義で nullを許容しないのに、 0000-00-00 00:00:00 をNULLとしたので怒られた。

DROP TABLE IF EXISTS ratings;
CREATE TABLE ratings (
  id INT DEFAULT 0,
  restaurant_id INT DEFAULT 0,
  user_id varchar(255) DEFAULT NULL,
  total INT DEFAULT 0,
  created_on timestamp default '1970-01-01 00:00:00'
);
GRANT ALL ON ratings TO embulk;

created_on の NOT NULL 制約を外した。(うまいやり方が見つからなかった)

実行結果
testdb=# select count(*) from ratings;
 count
--------
 205832
(1 row)

testdb=# select * from ratings limit 10;
   id   | restaurant_id | user_id  | total |     created_on
--------+---------------+----------+-------+---------------------
 156445 |        310595 | ee02f26a |     5 | 2006-10-07 14:06:09
   3842 |         10237 | fcc21401 |     1 | 2004-10-20 09:34:28
 144379 |          3334 | 06412af7 |     2 | 2006-06-04 01:07:43
 144377 |         15163 | 06412af7 |     5 | 2006-06-04 00:14:45
  75967 |           567 | 4ceec99d |     3 | 2004-12-02 08:12:29
 104898 |          1026 | 4ceec99d |     5 | 2005-01-04 12:57:02
  86073 |          1058 | 4ceec99d |     5 | 2004-11-09 09:34:17
  13968 |          2569 | 4ceec99d |     3 | 2004-09-23 08:29:57
  97833 |          3309 | 4ceec99d |     4 | 2005-05-29 08:17:16
  13991 |          3648 | 4ceec99d |     4 | 2004-09-27 20:14:50

testdb=# select * from ratings where id = 126548;
   id   | restaurant_id | user_id  | total | created_on
--------+---------------+----------+-------+------------
 126548 |           304 | d55363c1 |     5 |
(1 row)

created_on が 0000-00-00 00:00:00NULLとして取り込まれていた。

実行例3) PostgreSQLからMySQLに大量データを投入してみる

実行例2でPostgreSQLに投入したデータをMySQLへデータ転送してみます。

プラグインのインストール

embulk-input-postgresql プラグインのインストール

embulk-input-postgresqlのインストール
$ embulk gem install embulk-input-postgresql
2016-12-21 17:06:41.072 +0900: Embulk v0.8.9
Fetching: embulk-input-postgresql-0.8.0.gem (100%)
Successfully installed embulk-input-postgresql-0.8.0
1 gem installed

embulk-output-mysqlプラグインのインストール

※既に実行例1でインストールしている場合は不要

embulk-output-mysqlのインストール
$ embulk gem install embulk-output-mysql
(略)

seed_postgres2mysql.ymlの作成

in:
  type: postgresql
  host: localhost
  user: embulk
  password: ********
  database: testdb
  table: ratings
out:
  type: mysql
  host: localhost
  user: embulk
  password: ********
  database: testdb
  table: ratings
  mode: replace

config_postgresql2mysql.ymlの作成

$ embulk guess seed_postgresql2mysql.yml -o config_postgresql2mysql.yml
config_postgresql2mysql.yml
in: {type: postgresql, host: localhost, user: embulk, password: ********, database: testdb,
  table: ratings}
out: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
  table: ratings, mode: replace}

embulk preview config_postgresql2mysql.yml

$ embulk preview config_postgresql2mysql.yml
2016-12-21 17:19:51.908 +0900: Embulk v0.8.9
2016-12-21 17:19:52.737 +0900 [INFO] (0001:preview): Loaded plugin embulk-input-postgresql (0.8.0)
2016-12-21 17:19:52.809 +0900 [INFO] (0001:preview): SQL: SET search_path TO "public"
2016-12-21 17:19:52.956 +0900 [INFO] (0001:preview): SQL: SET search_path TO "public"
2016-12-21 17:19:52.958 +0900 [INFO] (0001:preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "ratings"
2016-12-21 17:19:52.960 +0900 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2016-12-21 17:19:52.969 +0900 [INFO] (0001:preview): > 0.01 seconds
2016-12-21 17:19:52.986 +0900 [INFO] (0001:preview): Fetched 500 rows.
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long |    created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 |            310,595 |       ee02f26a |          5 | 2006-10-07 05:06:09 UTC |
|   3,842 |             10,237 |       fcc21401 |          1 | 2004-10-20 00:34:28 UTC |

embulk run config_postgresql2mysql.yml

$ embulk run config_postgresql2mysql.yml

20万件が6秒程度で転送されました。

転送前のPostgreSQLでの件数
testdb=# select count(*) from ratings;
 count
--------
 205832
(1 row)
転送後のMySQLでの件数
mysql> select count(*) from ratings;
+----------+
| count(*) |
+----------+
|   205832 |
+----------+
1 row in set (0.05 sec)
移行元のPostgreSQL
testdb=# select * from ratings where id = 156445;
   id   | restaurant_id | user_id  | total |     created_on
--------+---------------+----------+-------+---------------------
 156445 |        310595 | ee02f26a |     5 | 2006-10-07 14:06:09
(1 row)
転送後のMySQL
mysql> select * from ratings where id = 156445;
+--------+---------------+----------+-------+---------------------+
| id     | restaurant_id | user_id  | total | created_on          |
+--------+---------------+----------+-------+---------------------+
| 156445 |        310595 | ee02f26a |     5 | 2006-10-07 14:06:09 |
+--------+---------------+----------+-------+---------------------+

日付も問題なく転送されています。

Embulkの性能評価

テストデータの作成(embulk-input-random)

まず、embulk-input-randomをプラグインを用いて、1000万件のデータを作成します

参考: http://hito4-t.hatenablog.com/entry/2015/03/04/122051 

embulk-plugin-input-randomのインストール
$ embulk gem install embulk-input-random
2016-12-21 17:59:02.029 +0900: Embulk v0.8.9
Fetching: embulk-plugin-input-random-0.0.2.gem (100%)
Successfully installed embulk-plugin-input-random-0.0.2
1 gem installed
config_testdata.yml
exec: {}
in:
  type: random
  rows: 10000000
  schema:
    id: primary_key
    number: integer
    value: string
    date: date
out:
  type: file
  path_prefix: ./testdata
  file_ext: csv
  formatter:
    type: csv
    header_line: true
    charset: UTF-8
    newline: CRLF
$ embulk run config_testdata.yml

として、実行
出力ファイルが7つ作成されます。

testdata000.00.csv
id,number,value,date
0,6429,AhU9wES1oNowLqSpbU9UjRDctMuRJhGe24LPeR_zi9k,1982-11-23 21:38:49.810582 +0000
1,5239,OzzGzHNK5pqTXiRL9kpLF5rJ2I5UU5U053nm3p2PNRk,2015-03-11 10:20:37.964128 +0000
2,6841,AHRodbeQ2kDFzsiZG2tPJb6O0zaJ-kKGczKXJF0ADZ8,2014-07-17 09:27:21.998475 +0000
3,1212,kanc2z0YjujfzE8ceYsv5RFbtj6JsMGTD92vPvApQss,1978-11-20 09:54:52.473764 +0000

というファイルが7つできるので、testdata000.00.csv以外のファイルの1行目を削除して、マージ
testdata.csv を作成した。 
行数は、10,000,000件(一行目は、ヘッダーとして id,number,value,dateとなっている)

testdata.csv
id,number,value,date
0,6429,AhU9wES1oNowLqSpbU9UjRDctMuRJhGe24LPeR_zi9k,1982-11-23 21:38:49.810582 +0000
1,5239,OzzGzHNK5pqTXiRL9kpLF5rJ2I5UU5U053nm3p2PNRk,2015-03-11 10:20:37.964128 +0000
........

CSV to MySQLの計測

seed_speedtest.yml の作成

seed_speedtest.yml
in:
  type: file
  path_prefix: "./testdata.csv"
out:
  type: mysql
  host: localhost
  user: embulk
  password: ********
  database: testdb
  table: speedtest
  mode: replace

config_speedtest.yml の作成

$ embulk guess seed_speedtest.yml -o config_speedtest.yml

で生成されたファイルに
default_timezone: Asia/Tokyoout: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
table: speedtest, mode: insert}
mode: insert
にして作成

in:
  type: file
  path_prefix: ./testdata.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    default_timezone: Asia/Tokyo
    columns:
    - {name: id, type: long}
    - {name: number, type: long}
    - {name: value, type: string}
    - {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
out: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
  table: speedtest, mode: insert}

スピードテストの内容

$ time embulk run config_speedtest.yml

にてテスト

mysql> select table_name, table_rows from information_schema.TABLES where table_schema = 'testdb';
+----------------------------------------------+------------+
| table_name                                   | table_rows |
+----------------------------------------------+------------+
| ratings                                      |     205216 |
| speedtest_00000000585b76f40243d580_bl_tmp000 |      90168 |
| speedtest_00000000585b76f40243d580_bl_tmp001 |      90272 |
| speedtest_00000000585b76f40243d580_bl_tmp002 |      89948 |
| speedtest_00000000585b76f40243d580_bl_tmp003 |      89908 |
| speedtest_00000000585b76f40243d580_bl_tmp004 |      89908 |
| speedtest_00000000585b76f40243d580_bl_tmp005 |      90077 |
| speedtest_00000000585b76f40243d580_bl_tmp006 |      89908 |
| speedtest_00000000585b76f40243d580_bl_tmp007 |      90272 |
+----------------------------------------------+------------+
9 rows in set (0.00 sec)

途中経過はこのように。8つのThreadで中間テーブルに一旦INSERT処理をしているのがわかる

計測結果

Item
Input CSVFile 固定
Output MySQL 固定
件数 1000万件 100万、200万、300万、、、、1000万
mode: insert insert, insert_direct, truncate_insert, merge, merge_direct, replace

実行環境

  • Java1.8
  • MySQL Ver 14.14 Distrib 5.7.11, for osx10.10 (x86_64) using EditLine wrapper
  • Embulk 0.8.9
  • MacBook Pro (Retina, 15-inch, Mid 2015)
    • プロセッサ:2.2 GHz Intel Core i7
    • メモリ:16 GB 1600 MHz DDR3
    • macOS Sieera (バージョン:10.12.2(16C67))
+--------+------------+------+-----+-------------------+-----------------------------+
| Field  | Type       | Null | Key | Default           | Extra                       |
+--------+------------+------+-----+-------------------+-----------------------------+
| id     | bigint(20) | YES  |     | NULL              |                             |
| number | bigint(20) | YES  |     | NULL              |                             |
| value  | text       | YES  |     | NULL              |                             |
| date   | timestamp  | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------+------------+------+-----+-------------------+-----------------------------+

実行は、time embulk run config_speedtest.yml にて実行時間を計測

CSVtoMySQL mode:insert, 1000万件

Time 平均 1回目 2回目 3回目 4回目 5回目
real 26m34s 26m38s 26m24s 27m6s 26m41s 26m2s
user 27m59s 27m38s 32m18s 32m0s 27m25s
sys 0m10s 0m9s 0m14s 0m12s 0m9s

CSVtoMySQL mode:insert、取込件数100万件〜1000万件

件数 100万 200万 300万 400万 500万 600万 700万 800万 900万 1000万
Time 2m49s 5m34s 8m10s 10m39s 12m54s 17m7s 19m59s 22m16s 25m21s 33m50s

600万以降は別の日に行ったので若干傾向が変わってしまっています。

CSV to MySQL NewTable

取込件数:200万件、mode:insert, insert_direct, truncate_insert, marge, marge_direct, replace
事前に、MySQLのテーブルをDROP

mode insert insert_direct truncate_insert marge marge_direct replace
Time 6m39s 6m46s 6m43s 6m50s 6m25s 6m38s

こちらは、実施前に、テーブルをDROPしているので、各種オプションの挙動に大きな違いが出なかったものと思われる。

CSVtoMySQL 追記Version(200万件あるデータに重複ID100万、新規ID100万)

取込件数:200万件、mode:insert, insert_direct, truncate_insert, marge, marge_direct, replace
事前に、ID:1〜200,000のデータが入ったTableを準備、追記するデータは200万件のデータ。(100万件は重複ID、100万件は新規ID)

mode insert insert_direct truncate_insert marge marge_direct replace
Time 6m4s 5m33s 6m32s 6m28s 6m13s 6m32s
count(before) 2,000,000 2,000,000 2,000,000 2,000,000 2,000,000 2,000,000
count(after) 2,000,000 2,000,000 2,000,000 2,999,936 2,999,936 2,000,000
State 更新なし、異常終了 *1 更新なし、異常終了 *1 置き換わり 更新、追加 *2 更新、追加 *2 置き換わり

*1 idのPRYMARY KEYの制約により書込み不能
*2 約100万件の重複データが更新、約100万件が追加、約100万がそのまま

truncate_insert と replaceの違いは、truncate_insertは、すでにあるTABLEをTRUNCATEしているので、テーブル定義は変更なし。replaceはテーブル定義は設定ファイルのものになる(すでにあるテーブルをDROPして、中間テーブルをRENAME)

独自プラグインの作成(Java)

$ embulk new java-filter reports
016-12-23 21:50:50.486 +0900: Embulk v0.8.9
Creating embulk-filter-reports/
  Creating embulk-filter-reports/README.md
  Creating embulk-filter-reports/LICENSE.txt
  Creating embulk-filter-reports/.gitignore
  Creating embulk-filter-reports/gradle/wrapper/gradle-wrapper.jar
  Creating embulk-filter-reports/gradle/wrapper/gradle-wrapper.properties
  Creating embulk-filter-reports/gradlew.bat
  Creating embulk-filter-reports/gradlew
  Creating embulk-filter-reports/config/checkstyle/checkstyle.xml
  Creating embulk-filter-reports/config/checkstyle/default.xml
  Creating embulk-filter-reports/build.gradle
  Creating embulk-filter-reports/lib/embulk/filter/reports.rb
  Creating embulk-filter-reports/src/main/java/org/embulk/filter/reports/ReportsFilterPlugin.java
  Creating embulk-filter-reports/src/test/java/org/embulk/filter/reports/TestReportsFilterPlugin.java

Plugin template is successfully generated.
Next steps:

$ cd embulk-filter-reports
$ ./gradlew package

こちらのコマンドを使って、雛形を作成する。

$ cd embulk-filter-reports
$ git init
Initialized empty Git repository in /Users/a11052/git/embulk-filter-reports/.git/
$ cat .git/config
[core]
    repositoryformatversion = 0
    filemode = true
    bare = false
    logallrefupdates = true
    ignorecase = true
    precomposeunicode = true
$ git remote add origin git@github.com:tashirogakuca/embulk-filter-reports.git
$ cat .git/config
[core]
    repositoryformatversion = 0
    filemode = true
    bare = false
    logallrefupdates = true
    ignorecase = true
    precomposeunicode = true
[remote "origin"]
    url = git@github.com:tashirogakuca/embulk-filter-reports.git
    fetch = +refs/heads/*:refs/remotes/origin/*

こちらで、GitにPushできるようする

実際には、

org.embulk.filter.reports.ReportsFilterPlugin
package org.embulk.filter.reports;

import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;

public class ReportsFilterPlugin implements FilterPlugin{
    public interface PluginTask extends Task {
        // configuration option 1 (required integer)
        @Config("option1")
        public int getOption1();

        // configuration option 2 (optional string, null is not allowed)
        @Config("option2")
        @ConfigDefault("\"myvalue\"")
        public String getOption2();

        // configuration option 3 (optional string, null is allowed)
        @Config("option3")
        @ConfigDefault("null")
        public Optional<String> getOption3();
    }

    @Override
    public void transaction(ConfigSource config, Schema inputSchema, FilterPlugin.Control control) {
        PluginTask task = config.loadConfig(PluginTask.class);

        Schema outputSchema = inputSchema;

        control.run(task.dump(), outputSchema);
    }

    @Override
    public PageOutput open(TaskSource taskSource, Schema inputSchema, Schema outputSchema, PageOutput output) {
        PluginTask task = taskSource.loadTask(PluginTask.class);

        // Write your code here :)
        throw new UnsupportedOperationException("ReportsFilterPlugin.open method is not implemented yet");
    }
}

こちらのファイルを触り、所望の挙動を実装する。

(以下、引用)

開発時には試行錯誤がつきものですので、Embulk plugin 開発時に Embulk 本体へ手早く開発中の plugin を読み込ませ動作確認をしたいときがあります。Embulk の run コマンドの -I オプションで、その plugin の gem ファイルまで作成せずに Embulk 本体にその plugin を読み込ませテストすることができます。

$ embulk run -I <path-to-plugin>/lib config.yml

ただし、上記のコマンドを繰り返し実行する前に、./gradlew classpath./gradle gem 等を利用し、あらかじめ classpath/ にプラグインに依存関係のある jar files を取得しておく必要があります

まとめ

Embulkを使って、様々な形式の入力をデータ加工して出力した
性能評価、Embulkの有用性がよく分かった。

  • 手順が簡単
    1. 必要最低限の設定ファイルを作成
    2. データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成
    3. config.ymlを編集する
    4. dryrun
    5. 実行
  • わかりやすい、直感的
    • 入力(input)→解析(parse)→加工(filter)→出力(output) という流れの設定ファイル
  • guess機能(入力データを自動で推定し、設定ファイルを生成)が非常に便利 
    • かゆいところに手が届く
    • CSVなら項目をすべて抜き出し、データ形式を推定
    • 圧縮ファイルならば、decodeフィルターを自動的に設定
  • 3つの形で転送を行った
    • CSV → MySQL
    • CSV → PostgreSQL
    • PostgreSQL → MySQL
  • 大量デモデータの作成を行った、プラグイン(embulk-input-random)使用
  • 性能評価
    • 1000万件のCSV(866MB)をMySQLに→約30分
    • 処理時間はほぼ件数に比例する(100万件、200万件〜1000万件)
    • MySQLのOutputのmodeによる時間の比較と動作内容

今後

Orionの統計情報をEmbulkを使い、MySQLへ転送する
MySQLへ転送したデータを用い、BIツール「Tableau」で、レポート化を行う予定
(画像精度評価レポート、監視進捗報告レポート)

独自プラグインの実装については、やり方の調査までだった。
実際にプラグインを実装してみたい

参考