Embulk

Embulkについてまとめてみた 2017/08

More than 1 year has passed since last update.


Embulkとは 〜Pluggable Bulk Data Loader〜


  • 並列データ転送ツール

  • Fluentd開発者 古橋氏が開発

  • Fluentdのバッチ版

  • プラグインアーキテクチャ


Embulkの概念図

https://www.slideshare.net/frsyuki/embuk-making-data-integration-works-relaxed/12


特徴その1


  • プラグインが多数用意されている、かつ日々増殖

  • 独自プラグインの簡単実装追加可能

  • リトライとレジューム


    • Embulkでは失敗したタスクだけを後からやり直すリジューム機能



  • オープンソース(Gitに公開)

  • guess機能


    • 入力データを自動で推定し、設定ファイルを生成




特徴その2〜Fluentdとの違い

バッチ(バルク)処理に特化



  • fluentdstream, embulkstorage

  • 巨大なデータに対応(並列分散処理)

  • 高速性

  • トランザクション制御あり


    • すべて成功しなかったら、実行前の状態に巻き戻る



  • スキーマを使ったデータのバリデーション

  • 実行はコマンド実行(cronなどでも)


Embulkの使い方


  1. 必要最低限の設定ファイルを作成



    • seed.ymlの作成



  2. データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成


    • embulk guess seed.yml -o config.yml




  3. config.ymlを編集する


    • プラグインの設定を追加(FilterとかOutputとか)



  4. dryrun


    • embulk preview config.yml

    • エラーが起きれば、 config.yml の修正などを行う



  5. 実行


    • embulk run config.yml




プラグイン種類



  1. Input plugin


  2. Output plugin


  3. Filter plugin


  4. File parser plugin


  5. File decoder plugin


  6. File formatter plugin


  7. File encoder plugin


  8. Executor plugin


Input Plugin


  • RDBS ( mysql, postgres, jdbc... )

  • NoSQL ( redis, mongodb)

  • Cloud Service (redshift, s3 )

  • Files (CSV, JSON ...)

  • Etc ( hdfs, http, elastic search, slack-history, google analitics )


Output Plugin


  • RDBS ( mysql, postgres, oracle, jdbc...)

  • Cloud Service ( redshift, s3, bigquery)

  • NoSQL ( redis, hdfs )

  • Files

  • Etc ( elastic search, hdfs, swift)


Filter Plugin


  • column (カラムを削る)

  • insert 指定した場所にホスト名などのカラム追加する

  • row 所定の条件に合致するローのみ抽出する

  • rearrange 一行のデータを複数行に再構成する


File parser Plugin


  • json

  • xml

  • csv

  • apache log

  • query_string

  • regex


File formatter Plugin


  • json


    • レコードの内容をjsonl(1 json 1行)の形式に整形するプラグイン



  • poi_excel


    • Excel(xls,xlsx)形式のデータに変換するプラグイン




Executor Plugin


  • mapreduce


    • EmbulkのタスクをHadoop上で実行するためのプラグイン




作業環境


  • Java1.8

  • MySQL Ver 14.14 Distrib 5.7.11, for osx10.10 (x86_64) using EditLine wrapper

  • Embulk 0.8.9

  • MacBook Pro (Retina, 15-inch, Mid 2015)


    • プロセッサ:2.2 GHz Intel Core i7

    • メモリ:16 GB 1600 MHz DDR3

    • macOS Sieera (バージョン:10.12.2(16C67))




インストール


公式

予め、Javaのインストールを行っておく

curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"

chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

ターミナル閉じるたびにsourceしなきゃいけないって方は .bash_profile に以下を追加したら幸せになります


.bash_profile

if [ -f ~/.bashrc ]; then

. ~/.bashrc
fi


Homebrewを使った方法

$ brew install embulk


バージョンの確認

$ embulk --version

embulk 0.8.9


実行例1) CSVからMySQLに大量データを投入してみる

参考: EmbulkでMySQLに大量データを投入してみる - その1

livedoor グルメの研究用データセットを使い、口コミのデータを MySQL に投入する

https://github.com/livedoor/datasets

約20万件のCSV形式のファイル

ldgourmet.tar.gzをダウンロードし、解凍。この中のratings.csvのみを使用

$ wget -O test.tar.gz https://github.com/livedoor/datasets/blob/master/ldgourmet.tar.gz?raw=true

$ tar xvfz 20161219_ldgourmet.tar.gz
x areas.csv
x categories.csv
x prefs.csv
x ratings.csv
x rating_votes.csv
x restaurants.csv
x stations.csv

ただし、上記の ratings.csv では、日時フォーマットが不正( 0000−00−00 00:00:00 があり、パースに失敗する)があるので、

$ sed -i.bak -e 's/0000-00-00 00:00:00/2000-01-01 09:00:00/g' ratings.csv

とする方法の他に、config.ymlparserオプションで null_string: 0000-00-00 00:00:00 を設定する方法で対応もできる。そちらは、別途記載を行っているのでそちらを参考にすること。


設定ファイルを自動生成

$ embulk guess seed.yml -o config.yml

設定ファイルを自動生成するコマンド

コマンドを使って、config.yml を作成

自動生成するにしても最低限の情報がなければいけないので、

seed.yml ファイルに次のように記述

in:

type:
file
path_prefix: "./ratings.csv"
out:
type:
stdout

$ embulk guess seed.yml -o config.yml

2016-09-05 16:30:57.386 +0000: Embulk v0.8.9
2016-09-05 16:30:58.470 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-09-05 16:30:58.474 +0000 [INFO] (0001:guess): Loading files [ratings.csv]
2016-09-05 16:30:58.620 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-09-05 16:30:58.630 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-09-05 16:30:58.645 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-09-05 16:30:58.653 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path

in:

type:
file
path_prefix: ./ratings.csv
parser:
charset:
UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: restaurant_id, type: long}
- {name: user_id, type: string}
- {name: total, type: long}
- {name: food, type: long}
- {name: service, type: long}
- {name: atmosphere, type: long}
- {name: cost_performance, type: long}
- {name: title, type: string}
- {name: body, type: string}
- {name: purpose, type: long}
- {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}
Created 'config.yml' file.

in:

type:
file
path_prefix: ./ratings.csv
parser:
charset:
UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: restaurant_id, type: long}
- {name: user_id, type: string}
- {name: total, type: long}
- {name: food, type: long}
- {name: service, type: long}
- {name: atmosphere, type: long}
- {name: cost_performance, type: long}
- {name: title, type: string}
- {name: body, type: string}
- {name: purpose, type: long}
- {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}

156445,310595,ee02f26a,5,0,0,0,0,,"名前は忘れましたが、札幌で食べたお店よりも、全然こっちの方が美味しかったので、載せました。お店も綺麗(新規オープン・・)でランチは結構混んでいます。個人

的にはゆったりと食事できるので夜の方がオススメです。 辛さが0倍から50倍まで選べるのもGOOD!、スープも2種類みたいで、友達は黄色がオススメと言っていましたが、自分は赤の方を食べました。かなり美味しかったです。店長も好感のもてるお兄さんでした。 駅近くなので一度お試しあれです!",0,"2006-10-07 05:06:09"

↓↓↓↓↓↓↓↓↓↓

+--------+---------------+----------+-------+---------------------+

| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09 |
+--------+---------------+----------+-------+---------------------+

このままだと、TIMEZONEが default:UTC なので、設定を追加する

in:

(略)
parser:
(略)
allow_optional_columns: false
default_timezone: Asia/Tokyo
columns:
- {name: id, type: long}
(略)

mysql> select * from ratings where id = 156445;

+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 05:06:09 |
+--------+---------------+----------+-------+---------------------+

日時が、 0000-00-00 00:00:00 だと、デフォルトだとCSVのParseに失敗し、

2016-12-19 18:47:24.258 +0900 [WARN] (0018:task-0000): Skipped line 201422 (org.embulk.spi.time.TimestampParseException: 

Failed to parse '0000-00-00 00:00:00'):
126548,304,d55363c1,5,0,0,0,0,,"道玄坂中腹に、風俗店ひしめく妖しいエリアがあります。渋谷百軒店商店街です。昼間でも入り込むのをためらってしまうようなその中心部に、ひっそりと昔からそのままの姿を変えずに営業している、老舗のカレー店が、このムルギーです。 秘伝の味を守るのは、今にも倒れてしまいそうに腰の曲がったお婆さん。何十年もかけて、鍋に継ぎ足して作り続けているそのカレーは、日々進化し続け、複雑で奥深い味となっています。 天を突き抜けるように高く盛り上げられたご飯は、そびえ立つ山のよう。そして、その麓には無限に広がる大海のようにカレーがかかっています。一口噛みしめるごとに、その長い蓄積による歴史の重みが胃袋に流れ込み、時の流れより解き放たれ、遥か彼方へのタイムトリップをしているようです。 どうやら、無事に跡継ぎの娘さんがいてくれたようで、今後も新たな未来へと歴史は続いていくことになりそう。安心です。",0,"0000-00-00 00:00:00"

このようにSKIPしてしまう

in:

(略)
parser:
(略)
default_timezone: Asia/Tokyo
null_string: 0000-00-00 00:00:00
columns:
- {name: id, type: long}
(略)

null_string: 0000-00-00 00:00:00 を追加するとSKIPせず取り込まれる。

mysql> select * from ratings where id = 126223;

+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 126223 | 16360 | 1a12db7d | 3 | 2016-12-19 18:54:35 |
+--------+---------------+----------+-------+---------------------+
1 row in set (0.13 sec)

mysql> desc ratings;
+---------------+------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+------------+------+-----+-------------------+-----------------------------+
| id | bigint(20) | YES | | NULL | |
| restaurant_id | bigint(20) | YES | | NULL | |
| user_id | text | YES | | NULL | |
| total | bigint(20) | YES | | NULL | |
| created_on | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.00 sec)


embulk-filter-column プラグイン

このまま MySQL にデータを取り込んでも良いのですが、今回は必要なカラムを絞り、 id, restaurant_id, user_id, total, created_on の5つのみのデータを扱うことにしたいと思います。

まずは プラグインのインストール

$ embulk gem install embulk-filter-column

in:

(略)
filters:
- type: column
columns:
- {name: 'id'}
- {name: 'restaurant_id'}
- {name: 'user_id'}
- {name: 'total'}
- {name: 'created_on'}
out: {type: stdout}


MySQLへ取り込む


embulk-output-mysqlプラグインのインストール

$ embulk gem install embulk-output-mysql


MySQLにテーブルを作成

GRANT ALL PRIVILEGES ON *.* to embulk@localhost IDENTIFIED BY '********';

CREATE DATABASE testdb;


config.xmlにMySQLプラグイン情報を追記

out:

type:
mysql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
mode: replace

mode という部分は insertinsert_direct, truncate_insert, merge, merge_direct, replace も選択することが可能。

mode
挙動

insert
書込み(PRIMARY制約に引っかかると、全部失敗)

insert_direct
書込み(PRIMARY制約に引っかかると、全部失敗)

truncate_insert
置き換え、テーブル定義は変わらない

merge
上書き更新、追加

merge_direct
上書き更新、追加

replace
置き換え、テーブル定義は変更できる、 config.ymlの定義次第

データベースは事前に作成しておく必要がありますが、テーブルやスキーマの作成は Embulk 側で自動で作成してくれるので、事前に作成する必要はありません(ただ、INDEXなどは張らないといけないから自分で作っておいたほうが良い)

out:

type:
mysql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
mode: replace

   column_options:

id:
{type: INT NOT NULLPRIMARY KEY}
restaurant_id: {type: INT DEFAULT 0}
user_id: {type: varchar(255) DEFAULT NULL}
total {type: INT}
created_on: {type: TIMESTAMP}

というようにテーブルの定義を記載することも可能

DROP TABLE IF EXISTS `ratings`;

CREATE TABLE `ratings` (
`id` INT DEFAULT 0,
`restaurant_id` INT DEFAULT 0,
`user_id` varchar(255) DEFAULT NULL,
`total` INT DEFAULT 0,
`created_on` datetime NOT NULL default '1970-01-01 00:00:00'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 MAX_ROWS=1000000000 AVG_ROW_LENGTH=200;

mysql> desc ratings;

+---------------+--------------+------+-----+---------------------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------------------+-------+
| id | int(11) | YES | | 0 | |
| restaurant_id | int(11) | YES | | 0 | |
| user_id | varchar(255) | YES | | NULL | |
| total | int(11) | YES | | 0 | |
| created_on | datetime | NO | | 1970-01-01 00:00:00 | |
+---------------+--------------+------+-----+---------------------+-------+
5 rows in set (0.00 sec)

テーブルを予め作成しない場合

mysql> desc ratings;

+---------------+------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+------------+------+-----+-------------------+-----------------------------+
| id | bigint(20) | YES | | NULL | |
| restaurant_id | bigint(20) | YES | | NULL | |
| user_id | text | YES | | NULL | |
| total | bigint(20) | YES | | NULL | |
| created_on | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.00 sec)

Embulkが config.yml を元に、テーブルを自動的に作成されますが、カラムの型やDefault、Indexを決めるので、しっかりとした運用をする場合は、自分でTable定義をしっかりつめて、作成した方がいい。

テーブル定期もある程度はできるのでそちらに記載しておくのも良い。

複数のカラムを対象にユニークキー制約は記載はできないと思われる。


Embulkの実行


dryrun

$ embulk preview config.yml


で、dryrun

エラーが起きれば、原因を調査し、config.ymlなどを修正

$ embulk run config.yml

で、実行。20万件のインストールが30秒程度で出来ました

$ mysql testdb

mysql> select count(*) from ratings;
+----------+
| count(*) |
+----------+
| 205832 |
+----------+

mysql> select * from ratings limit 10;
+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09 |
| 3842 | 10237 | fcc21401 | 1 | 2004-10-20 09:34:28 |
| 144379 | 3334 | 06412af7 | 2 | 2006-06-04 01:07:43 |
| 144377 | 15163 | 06412af7 | 5 | 2006-06-04 00:14:45 |
| 75967 | 567 | 4ceec99d | 3 | 2004-12-02 08:12:29 |
| 104898 | 1026 | 4ceec99d | 5 | 2005-01-04 12:57:02 |
| 86073 | 1058 | 4ceec99d | 5 | 2004-11-09 09:34:17 |
| 13968 | 2569 | 4ceec99d | 3 | 2004-09-23 08:29:57 |
| 97833 | 3309 | 4ceec99d | 4 | 2005-05-29 08:17:16 |
| 13991 | 3648 | 4ceec99d | 4 | 2004-09-27 20:14:50 |
+--------+---------------+----------+-------+---------------------+
10 rows in set (0.01 sec)


実行例2 CSVをPostgreSQLに大量データを投入してみる

参考: EmbulkでMySQLに大量データを投入してみる - その1

実行例1と同様に、livedoor グルメの研究用データセットを使い、口コミのデータを PostgreSQL に投入する

https://github.com/livedoor/datasets

約20万件のCSV形式のファイル

ldgourmet.tar.gzをダウンロードし、解凍。この中のratings.csvのみを使用

Postgresはローカルにインストール済みとする


seed_csv2postgres.ymlの作成


seed_csv2postgres.yml

in:

type:
file
path_prefix: "./ratings.csv"
out:
type:
stdout


Posgres Output Pluginのインストールと設定の追加

$ embulk gem install embulk-output-postgresql

2016-12-20 19:08:11.615 +0900: Embulk v0.8.9
Fetching: embulk-output-postgresql-0.7.2.gem (100%)
Successfully installed embulk-output-postgresql-0.7.2
1 gem installed

参考)PostgreSQLに関連するEmbulkのPlugins



  • embulk-output-postgresql PostgreSQLのテーブルへデータを出力するJDBCプラグイン


  • embulk-output-postgres-json PostgreSQLのJSONカラムへデータを出力するプラグイン


  • embulk-output-postgres-udf PostgreSQLのユーザー定義関数を実行するプラグイン


  • embulk-input-postgresql PostgreSQLのテーブルからデータを取得するJDBCプラグイン


postgreSQLのインストールと設定(参考までに)


PostgreSQLのインストールと起動

brew install postgres

postgres -D /usr/local/var/postgres &

$ postgres -D /usr/local/var/postgres
LOG: database system was shut down at 2016-12-19 16:40:42 JST
LOG: MultiXact member wraparound protections are now enabled
LOG: database system is ready to accept connections
LOG: autovacuum launcher started


$ createdb testdb

$ psql testdb
psql (9.6.1)
Type "help" for help.

testdb=#
postgres=# CREATE ROLE embulk LOGIN PASSWORD '*******';


テーブル作成

DROP TABLE IF EXISTS ratings;

CREATE TABLE ratings (
id INT DEFAULT 0,
restaurant_id INT DEFAULT 0,
user_id varchar(255) DEFAULT NULL,
total INT DEFAULT 0,
created_on timestamp NOT NULL default '1970-01-01 00:00:00'
);


権限の付与

testdb=# grant all on ratings to embulk;

GRANT
testdb=# \z
Access privileges
Schema | Name | Type | Access privileges | Column privileges | Policies
--------+---------+-------+-----------------------+-------------------+----------
public | ratings | table | a11052=arwdDxt/a11052+| |
| | | embulk=arwdDxt/a11052 | |
(1 row)

testdb=# \d ratings

Table "public.ratings"
Column | Type | Modifiers
---------------+-----------------------------+---------------------------------------------------------------------
id | integer | default 0
restaurant_id | integer | default 0
user_id | character varying(255) | default NULL::character varying
total | integer | default 0
created_on | timestamp without time zone | not null default '1970-01-01 00:00:00'::timestamp without time zone


config_csv2postgres.ymlの作成


設定ファイルの雛形作成

$ embulk guess seed_csv2postgres.yml -o config_csv2postgres.yml

日時の 0000-00-00 00:00:00 へのParseError対応のために

$ diff config_csv2postgres.yml.genarated config_csv2postgres.yml

14a15
> null_string: 0000-00-00 00:00:00

PostgreSQLの出力の設定を追加

out:

type:
postgresql
host: localhost
user: embulk
password: "*********"
database: testdb
table: ratings
mode: insert_direct

modeは

mode
説明

insert
幾つかの中間テーブルに一旦書き出した後に、INSERT。Resume可能

insert_direct
直接TABLEにINSERT。Resume不可

truncate_insert
mode:insertと挙動は一緒、ただしそれまでのTableの内容は破棄される。Resume可

replace
一旦中間テーブルに書き出し、無事書込みが完了したら、以前のTableをDROPし、中間テーブルの名前をRenameします。Resume不可

merge
一旦、中間テーブルにINSERT、そちらが正常に書き込まれれば、既存のテーブルに追記。既に存在するデータに関しては、Updateされる。(MySQLでいうところの、REPLACEな挙動)

の5modeがある。

FilterPluginを用いて、DBへ投入する項目を5つ抜き出す

in:

(略)
filters:
- type: column
columns:
- {name: 'id'}
- {name: 'restaurant_id'}
- {name: 'user_id'}
- {name: 'total'}
- {name: 'created_on'}
out:
(略)


embulk preview config_csv2postgres.yml

$ embulk preview config_csv2postgres.yml

2016-12-21 15:49:17.779 +0900: Embulk v0.8.9
2016-12-21 15:49:18.639 +0900 [INFO] (0001:preview): Loaded plugin embulk-filter-column (0.6.0)
2016-12-21 15:49:18.654 +0900 [INFO] (0001:preview): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-12-21 15:49:18.660 +0900 [INFO] (0001:preview): Loading files [ratings.csv]
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long | created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 | 310,595 | ee02f26a | 5 | 2006-10-07 05:06:09 UTC |
| 3,842 | 10,237 | fcc21401 | 1 | 2004-10-20 00:34:28 UTC |

UTCではなく、TimeZoneはTokyoにするので


config_csv2postgres.yml

in:

type:
file
path_prefix: ./ratings.csv
parser:
charset:
UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
default_timezone: Asia/Tokyo
null_string: 0000-00-00 00:00:00
columns:
- {name: id, type: long}
(略)


Dryrun

$ embulk preview config_csv2postgres.yml

2016-12-21 15:51:29.392 +0900: Embulk v0.8.9
2016-12-21 15:51:30.245 +0900 [INFO] (0001:preview): Loaded plugin embulk-filter-column (0.6.0)
2016-12-21 15:51:30.263 +0900 [INFO] (0001:preview): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-12-21 15:51:30.267 +0900 [INFO] (0001:preview): Loading files [ratings.csv]
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long | created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 | 310,595 | ee02f26a | 5 | 2006-10-06 20:06:09 UTC |


embulk run config_csv2postgres.yml


テーブルの確認(前)

testdb=# select * from ratings;

id | restaurant_id | user_id | total | created_on
----+---------------+---------+-------+------------
(0 rows)


実行

$ embulk run config_csv2postgres.yml

2016-12-21 15:54:21.597 +0900 [ERROR] (0018:task-0000): Operation failed (0:23502)
2016-12-21 15:54:21.603 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: org.postgresql.util.PSQLException: ERROR: null value in column "created_on" violates not-null constraint
詳細: Failing row contains (137714, 302419, 0826905a, 4, null).
場所: COPY ratings, line 7312: "137714 302419 0826905a 4 \N"

Column: create_on の定義で nullを許容しないのに、 0000-00-00 00:00:00 をNULLとしたので怒られた。

DROP TABLE IF EXISTS ratings;

CREATE TABLE ratings (
id INT DEFAULT 0,
restaurant_id INT DEFAULT 0,
user_id varchar(255) DEFAULT NULL,
total INT DEFAULT 0,
created_on timestamp default '1970-01-01 00:00:00'
);
GRANT ALL ON ratings TO embulk;

created_on の NOT NULL 制約を外した。(うまいやり方が見つからなかった)


実行結果

testdb=# select count(*) from ratings;

count
--------
205832
(1 row)

testdb=# select * from ratings limit 10;
id | restaurant_id | user_id | total | created_on
--------+---------------+----------+-------+---------------------
156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09
3842 | 10237 | fcc21401 | 1 | 2004-10-20 09:34:28
144379 | 3334 | 06412af7 | 2 | 2006-06-04 01:07:43
144377 | 15163 | 06412af7 | 5 | 2006-06-04 00:14:45
75967 | 567 | 4ceec99d | 3 | 2004-12-02 08:12:29
104898 | 1026 | 4ceec99d | 5 | 2005-01-04 12:57:02
86073 | 1058 | 4ceec99d | 5 | 2004-11-09 09:34:17
13968 | 2569 | 4ceec99d | 3 | 2004-09-23 08:29:57
97833 | 3309 | 4ceec99d | 4 | 2005-05-29 08:17:16
13991 | 3648 | 4ceec99d | 4 | 2004-09-27 20:14:50

testdb=# select * from ratings where id = 126548;
id | restaurant_id | user_id | total | created_on
--------+---------------+----------+-------+------------
126548 | 304 | d55363c1 | 5 |
(1 row)


created_on が 0000-00-00 00:00:00NULLとして取り込まれていた。


実行例3) PostgreSQLからMySQLに大量データを投入してみる

実行例2でPostgreSQLに投入したデータをMySQLへデータ転送してみます。


プラグインのインストール


embulk-input-postgresql プラグインのインストール


embulk-input-postgresqlのインストール

$ embulk gem install embulk-input-postgresql

2016-12-21 17:06:41.072 +0900: Embulk v0.8.9
Fetching: embulk-input-postgresql-0.8.0.gem (100%)
Successfully installed embulk-input-postgresql-0.8.0
1 gem installed


embulk-output-mysqlプラグインのインストール

※既に実行例1でインストールしている場合は不要


embulk-output-mysqlのインストール

$ embulk gem install embulk-output-mysql

(略)


seed_postgres2mysql.ymlの作成

in:

type:
postgresql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
out:
type:
mysql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
mode: replace


config_postgresql2mysql.ymlの作成

$ embulk guess seed_postgresql2mysql.yml -o config_postgresql2mysql.yml


config_postgresql2mysql.yml

in: {type: postgresql, host: localhost, user: embulk, password: ********, database: testdb,

table: ratings}
out: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
table: ratings, mode: replace}


embulk preview config_postgresql2mysql.yml

$ embulk preview config_postgresql2mysql.yml

2016-12-21 17:19:51.908 +0900: Embulk v0.8.9
2016-12-21 17:19:52.737 +0900 [INFO] (0001:preview): Loaded plugin embulk-input-postgresql (0.8.0)
2016-12-21 17:19:52.809 +0900 [INFO] (0001:preview): SQL: SET search_path TO "public"
2016-12-21 17:19:52.956 +0900 [INFO] (0001:preview): SQL: SET search_path TO "public"
2016-12-21 17:19:52.958 +0900 [INFO] (0001:preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "ratings"
2016-12-21 17:19:52.960 +0900 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2016-12-21 17:19:52.969 +0900 [INFO] (0001:preview): > 0.01 seconds
2016-12-21 17:19:52.986 +0900 [INFO] (0001:preview): Fetched 500 rows.
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long | created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 | 310,595 | ee02f26a | 5 | 2006-10-07 05:06:09 UTC |
| 3,842 | 10,237 | fcc21401 | 1 | 2004-10-20 00:34:28 UTC |


embulk run config_postgresql2mysql.yml

$ embulk run config_postgresql2mysql.yml

20万件が6秒程度で転送されました。


転送前のPostgreSQLでの件数

testdb=# select count(*) from ratings;

count
--------
205832
(1 row)


転送後のMySQLでの件数

mysql> select count(*) from ratings;

+----------+
| count(*) |
+----------+
| 205832 |
+----------+
1 row in set (0.05 sec)


移行元のPostgreSQL

testdb=# select * from ratings where id = 156445;

id | restaurant_id | user_id | total | created_on
--------+---------------+----------+-------+---------------------
156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09
(1 row)


転送後のMySQL

mysql> select * from ratings where id = 156445;

+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09 |
+--------+---------------+----------+-------+---------------------+

日付も問題なく転送されています。


Embulkの性能評価


テストデータの作成(embulk-input-random)

まず、embulk-input-randomをプラグインを用いて、1000万件のデータを作成します

参考: http://hito4-t.hatenablog.com/entry/2015/03/04/122051 


embulk-plugin-input-randomのインストール

$ embulk gem install embulk-input-random

2016-12-21 17:59:02.029 +0900: Embulk v0.8.9
Fetching: embulk-plugin-input-random-0.0.2.gem (100%)
Successfully installed embulk-plugin-input-random-0.0.2
1 gem installed


config_testdata.yml

exec: {}

in:
type:
random
rows: 10000000
schema:
id:
primary_key
number: integer
value: string
date: date
out:
type:
file
path_prefix: ./testdata
file_ext: csv
formatter:
type:
csv
header_line: true
charset: UTF-8
newline: CRLF

$ embulk run config_testdata.yml

として、実行

出力ファイルが7つ作成されます。


testdata000.00.csv

id,number,value,date

0,6429,AhU9wES1oNowLqSpbU9UjRDctMuRJhGe24LPeR_zi9k,1982-11-23 21:38:49.810582 +0000
1,5239,OzzGzHNK5pqTXiRL9kpLF5rJ2I5UU5U053nm3p2PNRk,2015-03-11 10:20:37.964128 +0000
2,6841,AHRodbeQ2kDFzsiZG2tPJb6O0zaJ-kKGczKXJF0ADZ8,2014-07-17 09:27:21.998475 +0000
3,1212,kanc2z0YjujfzE8ceYsv5RFbtj6JsMGTD92vPvApQss,1978-11-20 09:54:52.473764 +0000

というファイルが7つできるので、testdata000.00.csv以外のファイルの1行目を削除して、マージ

testdata.csv を作成した。 

行数は、10,000,000件(一行目は、ヘッダーとして id,number,value,dateとなっている)


testdata.csv

id,number,value,date

0,6429,AhU9wES1oNowLqSpbU9UjRDctMuRJhGe24LPeR_zi9k,1982-11-23 21:38:49.810582 +0000
1,5239,OzzGzHNK5pqTXiRL9kpLF5rJ2I5UU5U053nm3p2PNRk,2015-03-11 10:20:37.964128 +0000
........


CSV to MySQLの計測


seed_speedtest.yml の作成


seed_speedtest.yml

in:

type:
file
path_prefix: "./testdata.csv"
out:
type:
mysql
host: localhost
user: embulk
password: ********
database: testdb
table: speedtest
mode: replace


config_speedtest.yml の作成

$ embulk guess seed_speedtest.yml -o config_speedtest.yml

で生成されたファイルに

default_timezone: Asia/Tokyoout: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,

table: speedtest, mode: insert}
mode: insert

にして作成

in:

type:
file
path_prefix: ./testdata.csv
parser:
charset:
UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
default_timezone: Asia/Tokyo
columns:
- {name: id, type: long}
- {name: number, type: long}
- {name: value, type: string}
- {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
out: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
table: speedtest, mode: insert}


スピードテストの内容

$ time embulk run config_speedtest.yml

にてテスト

mysql> select table_name, table_rows from information_schema.TABLES where table_schema = 'testdb';

+----------------------------------------------+------------+
| table_name | table_rows |
+----------------------------------------------+------------+
| ratings | 205216 |
| speedtest_00000000585b76f40243d580_bl_tmp000 | 90168 |
| speedtest_00000000585b76f40243d580_bl_tmp001 | 90272 |
| speedtest_00000000585b76f40243d580_bl_tmp002 | 89948 |
| speedtest_00000000585b76f40243d580_bl_tmp003 | 89908 |
| speedtest_00000000585b76f40243d580_bl_tmp004 | 89908 |
| speedtest_00000000585b76f40243d580_bl_tmp005 | 90077 |
| speedtest_00000000585b76f40243d580_bl_tmp006 | 89908 |
| speedtest_00000000585b76f40243d580_bl_tmp007 | 90272 |
+----------------------------------------------+------------+
9 rows in set (0.00 sec)

途中経過はこのように。8つのThreadで中間テーブルに一旦INSERT処理をしているのがわかる


計測結果

Item

Input
CSVFile
固定

Output
MySQL
固定

件数
1000万件
100万、200万、300万、、、、1000万

mode:
insert
insert, insert_direct, truncate_insert, merge, merge_direct, replace

実行環境


  • Java1.8

  • MySQL Ver 14.14 Distrib 5.7.11, for osx10.10 (x86_64) using EditLine wrapper

  • Embulk 0.8.9

  • MacBook Pro (Retina, 15-inch, Mid 2015)


    • プロセッサ:2.2 GHz Intel Core i7

    • メモリ:16 GB 1600 MHz DDR3

    • macOS Sieera (バージョン:10.12.2(16C67))



+--------+------------+------+-----+-------------------+-----------------------------+

| Field | Type | Null | Key | Default | Extra |
+--------+------------+------+-----+-------------------+-----------------------------+
| id | bigint(20) | YES | | NULL | |
| number | bigint(20) | YES | | NULL | |
| value | text | YES | | NULL | |
| date | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------+------------+------+-----+-------------------+-----------------------------+

実行は、time embulk run config_speedtest.yml にて実行時間を計測


CSVtoMySQL mode:insert, 1000万件

Time
平均
1回目
2回目
3回目
4回目
5回目

real
26m34s
26m38s
26m24s
27m6s
26m41s
26m2s

user

27m59s
27m38s
32m18s
32m0s
27m25s

sys

0m10s
0m9s
0m14s
0m12s
0m9s


CSVtoMySQL mode:insert、取込件数100万件〜1000万件

件数
100万
200万
300万
400万
500万
600万
700万
800万
900万
1000万

Time
2m49s
5m34s
8m10s
10m39s
12m54s
17m7s
19m59s
22m16s
25m21s
33m50s

600万以降は別の日に行ったので若干傾向が変わってしまっています。


CSV to MySQL NewTable

取込件数:200万件、mode:insert, insert_direct, truncate_insert, marge, marge_direct, replace

事前に、MySQLのテーブルをDROP

mode
insert
insert_direct
truncate_insert
marge
marge_direct
replace

Time
6m39s
6m46s
6m43s
6m50s
6m25s
6m38s

こちらは、実施前に、テーブルをDROPしているので、各種オプションの挙動に大きな違いが出なかったものと思われる。


CSVtoMySQL 追記Version(200万件あるデータに重複ID100万、新規ID100万)

取込件数:200万件、mode:insert, insert_direct, truncate_insert, marge, marge_direct, replace

事前に、ID:1〜200,000のデータが入ったTableを準備、追記するデータは200万件のデータ。(100万件は重複ID、100万件は新規ID)

mode
insert
insert_direct
truncate_insert
marge
marge_direct
replace

Time
6m4s
5m33s
6m32s
6m28s
6m13s
6m32s

count(before)
2,000,000
2,000,000
2,000,000
2,000,000
2,000,000
2,000,000

count(after)
2,000,000
2,000,000
2,000,000
2,999,936
2,999,936
2,000,000

State
更新なし、異常終了 *1
更新なし、異常終了 *1
置き換わり
更新、追加 *2
更新、追加 *2
置き換わり

*1 idのPRYMARY KEYの制約により書込み不能

*2 約100万件の重複データが更新、約100万件が追加、約100万がそのまま

truncate_insert と replaceの違いは、truncate_insertは、すでにあるTABLEをTRUNCATEしているので、テーブル定義は変更なし。replaceはテーブル定義は設定ファイルのものになる(すでにあるテーブルをDROPして、中間テーブルをRENAME)


独自プラグインの作成(Java)

$ embulk new java-filter reports

016-12-23 21:50:50.486 +0900: Embulk v0.8.9
Creating embulk-filter-reports/
Creating embulk-filter-reports/README.md
Creating embulk-filter-reports/LICENSE.txt
Creating embulk-filter-reports/.gitignore
Creating embulk-filter-reports/gradle/wrapper/gradle-wrapper.jar
Creating embulk-filter-reports/gradle/wrapper/gradle-wrapper.properties
Creating embulk-filter-reports/gradlew.bat
Creating embulk-filter-reports/gradlew
Creating embulk-filter-reports/config/checkstyle/checkstyle.xml
Creating embulk-filter-reports/config/checkstyle/default.xml
Creating embulk-filter-reports/build.gradle
Creating embulk-filter-reports/lib/embulk/filter/reports.rb
Creating embulk-filter-reports/src/main/java/org/embulk/filter/reports/ReportsFilterPlugin.java
Creating embulk-filter-reports/src/test/java/org/embulk/filter/reports/TestReportsFilterPlugin.java

Plugin template is successfully generated.
Next steps:

$ cd embulk-filter-reports
$ ./gradlew package

こちらのコマンドを使って、雛形を作成する。

$ cd embulk-filter-reports

$ git init
Initialized empty Git repository in /Users/a11052/git/embulk-filter-reports/.git/
$ cat .git/config
[core]
repositoryformatversion = 0
filemode = true
bare = false
logallrefupdates = true
ignorecase = true
precomposeunicode = true
$ git remote add origin git@github.com:tashirogakuca/embulk-filter-reports.git
$ cat .git/config
[core]
repositoryformatversion = 0
filemode = true
bare = false
logallrefupdates = true
ignorecase = true
precomposeunicode = true
[remote "origin"]
url = git@github.com:tashirogakuca/embulk-filter-reports.git
fetch = +refs/heads/*:refs/remotes/origin/*

こちらで、GitにPushできるようする

実際には、


org.embulk.filter.reports.ReportsFilterPlugin

package org.embulk.filter.reports;

import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;

public class ReportsFilterPlugin implements FilterPlugin{
public interface PluginTask extends Task {
// configuration option 1 (required integer)
@Config("option1")
public int getOption1();

// configuration option 2 (optional string, null is not allowed)
@Config("option2")
@ConfigDefault("\"myvalue\"")
public String getOption2();

// configuration option 3 (optional string, null is allowed)
@Config("option3")
@ConfigDefault("null")
public Optional<String> getOption3();
}

@Override
public void transaction(ConfigSource config, Schema inputSchema, FilterPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);

Schema outputSchema = inputSchema;

control.run(task.dump(), outputSchema);
}

@Override
public PageOutput open(TaskSource taskSource, Schema inputSchema, Schema outputSchema, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);

// Write your code here :)
throw new UnsupportedOperationException("ReportsFilterPlugin.open method is not implemented yet");
}
}


こちらのファイルを触り、所望の挙動を実装する。

(以下、引用)


開発時には試行錯誤がつきものですので、Embulk plugin 開発時に Embulk 本体へ手早く開発中の plugin を読み込ませ動作確認をしたいときがあります。Embulk の run コマンドの -I オプションで、その plugin の gem ファイルまで作成せずに Embulk 本体にその plugin を読み込ませテストすることができます。


$ embulk run -I <path-to-plugin>/lib config.yml

ただし、上記のコマンドを繰り返し実行する前に、./gradlew classpath./gradle gem 等を利用し、あらかじめ classpath/ にプラグインに依存関係のある jar files を取得しておく必要があります


まとめ

Embulkを使って、様々な形式の入力をデータ加工して出力した

性能評価、Embulkの有用性がよく分かった。


  • 手順が簡単


    1. 必要最低限の設定ファイルを作成

    2. データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成


    3. config.ymlを編集する

    4. dryrun

    5. 実行



  • わかりやすい、直感的


    • 入力(input)→解析(parse)→加工(filter)→出力(output) という流れの設定ファイル



  • guess機能(入力データを自動で推定し、設定ファイルを生成)が非常に便利 


    • かゆいところに手が届く

    • CSVなら項目をすべて抜き出し、データ形式を推定

    • 圧縮ファイルならば、decodeフィルターを自動的に設定



  • 3つの形で転送を行った


    • CSV → MySQL

    • CSV → PostgreSQL

    • PostgreSQL → MySQL



  • 大量デモデータの作成を行った、プラグイン(embulk-input-random)使用

  • 性能評価


    • 1000万件のCSV(866MB)をMySQLに→約30分

    • 処理時間はほぼ件数に比例する(100万件、200万件〜1000万件)

    • MySQLのOutputのmodeによる時間の比較と動作内容




今後

Orionの統計情報をEmbulkを使い、MySQLへ転送する

MySQLへ転送したデータを用い、BIツール「Tableau」で、レポート化を行う予定

(画像精度評価レポート、監視進捗報告レポート)

独自プラグインの実装については、やり方の調査までだった。

実際にプラグインを実装してみたい


参考