#Embulkとは 〜Pluggable Bulk Data Loader〜
- 並列データ転送ツール
- Fluentd開発者 古橋氏が開発
- Fluentdのバッチ版
- プラグインアーキテクチャ
#Embulkの概念図
https://www.slideshare.net/frsyuki/embuk-making-data-integration-works-relaxed/12
#特徴その1
- プラグインが多数用意されている、かつ日々増殖
- 独自プラグインの簡単実装追加可能 ( ruby, java )
- リトライとレジューム
- Embulkでは失敗したタスクだけを後からやり直すリジューム機能
- オープンソース(Gitに公開)
- guess機能
- 入力データを自動で推定し、設定ファイルを生成
- 対応していないプラグインもある
#特徴その2〜Fluentdとの違い
バッチ(バルク)処理に特化
-
fluentd
はstream,embulk
はstorage - 巨大なデータに対応(並列分散処理)
- 高速性
- トランザクション制御あり
- すべて成功しなかったら、実行前の状態に巻き戻る
- スキーマを使ったデータのバリデーション
- 実行はコマンド実行(cronなどでも)
#Embulkの使い方
- 必要最低限の設定ファイルを作成
-
seed.yml
の作成
-
- データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成
embulk guess seed.yml -o config.yml
-
config.yml
を編集する- プラグインの設定を追加(FilterとかOutputとか)
- dryrun
embulk preview config.yml
- エラーが起きれば、
config.yml
の修正などを行う
- 実行
embulk run config.yml
#プラグイン種類
- Input plugin
- Output plugin
- Filter plugin
- File parser plugin
- File decoder plugin
- File formatter plugin
- File encoder plugin
- Executor plugin
##Input Plugin
- RDBS ( mysql, postgres, jdbc... )
- NoSQL ( redis, mongodb)
- Cloud Service (redshift, s3 )
- Files (CSV, JSON ...)
- Etc ( hdfs, http, elastic search, slack-history, google analitics )
##Output Plugin
- RDBS ( mysql, postgres, oracle, jdbc...)
- Cloud Service ( redshift, s3, bigquery)
- NoSQL ( redis, hdfs )
- Files
- Etc ( elastic search, hdfs, swift)
##Filter Plugin
- column (カラムを削る)
- insert 指定した場所にホスト名などのカラム追加する
- row 所定の条件に合致するローのみ抽出する
- rearrange 一行のデータを複数行に再構成する
##File parser Plugin
- json
- xml
- csv
- apache log
- query_string
- regex
##File formatter Plugin
- json
- レコードの内容をjsonl(1 json 1行)の形式に整形するプラグイン
- poi_excel
- Excel(xls,xlsx)形式のデータに変換するプラグイン
##Executor Plugin
- mapreduce
- EmbulkのタスクをHadoop上で実行するためのプラグイン
#作業環境
- Java1.8
- MySQL Ver 14.14 Distrib 5.7.11, for osx10.10 (x86_64) using EditLine wrapper
- Embulk 0.8.9
- MacBook Pro (Retina, 15-inch, Mid 2015)
- プロセッサ:2.2 GHz Intel Core i7
- メモリ:16 GB 1600 MHz DDR3
- macOS Sieera (バージョン:10.12.2(16C67))
#インストール
##公式
予め、Javaのインストールを行っておく
curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
ターミナル閉じるたびにsourceしなきゃいけないって方は .bash_profile
に以下を追加したら幸せになります
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
##Homebrewを使った方法
$ brew install embulk
##バージョンの確認
$ embulk --version
embulk 0.8.9
#実行例1) CSVからMySQLに大量データを投入してみる
参考: EmbulkでMySQLに大量データを投入してみる - その1
livedoor グルメの研究用データセットを使い、口コミのデータを MySQL に投入する
→https://github.com/livedoor/datasets
約20万件のCSV形式のファイル
ldgourmet.tar.gz
をダウンロードし、解凍。この中のratings.csv
のみを使用
$ wget -O test.tar.gz https://github.com/livedoor/datasets/blob/master/ldgourmet.tar.gz?raw=true
$ tar xvfz 20161219_ldgourmet.tar.gz
x areas.csv
x categories.csv
x prefs.csv
x ratings.csv
x rating_votes.csv
x restaurants.csv
x stations.csv
ただし、上記の ratings.csv
では、日時フォーマットが不正( 0000−00−00 00:00:00
があり、パースに失敗する)があるので、
$ sed -i.bak -e 's/0000-00-00 00:00:00/2000-01-01 09:00:00/g' ratings.csv
とする方法の他に、config.yml
のparser
オプションで null_string: 0000-00-00 00:00:00
を設定する方法で対応もできる。そちらは、別途記載を行っているのでそちらを参考にすること。
設定ファイルを自動生成
$ embulk guess seed.yml -o config.yml
設定ファイルを自動生成するコマンド
コマンドを使って、config.yml
を作成
自動生成するにしても最低限の情報がなければいけないので、
seed.yml
ファイルに次のように記述
in:
type: file
path_prefix: "./ratings.csv"
out:
type: stdout
$ embulk guess seed.yml -o config.yml
2016-09-05 16:30:57.386 +0000: Embulk v0.8.9
2016-09-05 16:30:58.470 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-09-05 16:30:58.474 +0000 [INFO] (0001:guess): Loading files [ratings.csv]
2016-09-05 16:30:58.620 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-09-05 16:30:58.630 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-09-05 16:30:58.645 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-09-05 16:30:58.653 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
type: file
path_prefix: ./ratings.csv
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: restaurant_id, type: long}
- {name: user_id, type: string}
- {name: total, type: long}
- {name: food, type: long}
- {name: service, type: long}
- {name: atmosphere, type: long}
- {name: cost_performance, type: long}
- {name: title, type: string}
- {name: body, type: string}
- {name: purpose, type: long}
- {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}
Created 'config.yml' file.
in:
type: file
path_prefix: ./ratings.csv
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: restaurant_id, type: long}
- {name: user_id, type: string}
- {name: total, type: long}
- {name: food, type: long}
- {name: service, type: long}
- {name: atmosphere, type: long}
- {name: cost_performance, type: long}
- {name: title, type: string}
- {name: body, type: string}
- {name: purpose, type: long}
- {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}
156445,310595,ee02f26a,5,0,0,0,0,,"名前は忘れましたが、札幌で食べたお店よりも、全然こっちの方が美味しかったので、載せました。お店も綺麗(新規オープン・・)でランチは結構混んでいます。個人
的にはゆったりと食事できるので夜の方がオススメです。 辛さが0倍から50倍まで選べるのもGOOD!、スープも2種類みたいで、友達は黄色がオススメと言っていましたが、自分は赤の方を食べました。かなり美味しかったです。店長も好感のもてるお兄さんでした。 駅近くなので一度お試しあれです!",0,"2006-10-07 05:06:09"
↓↓↓↓↓↓↓↓↓↓
+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09 |
+--------+---------------+----------+-------+---------------------+
このままだと、TIMEZONE
が default:UTC なので、設定を追加する
in:
(略)
parser:
(略)
allow_optional_columns: false
default_timezone: Asia/Tokyo
columns:
- {name: id, type: long}
(略)
mysql> select * from ratings where id = 156445;
+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 05:06:09 |
+--------+---------------+----------+-------+---------------------+
日時が、 0000-00-00 00:00:00
だと、デフォルトだとCSVのParseに失敗し、
2016-12-19 18:47:24.258 +0900 [WARN] (0018:task-0000): Skipped line 201422 (org.embulk.spi.time.TimestampParseException:
Failed to parse '0000-00-00 00:00:00'):
126548,304,d55363c1,5,0,0,0,0,,"道玄坂中腹に、風俗店ひしめく妖しいエリアがあります。渋谷百軒店商店街です。昼間でも入り込むのをためらってしまうようなその中心部に、ひっそりと昔からそのままの姿を変えずに営業している、老舗のカレー店が、このムルギーです。 秘伝の味を守るのは、今にも倒れてしまいそうに腰の曲がったお婆さん。何十年もかけて、鍋に継ぎ足して作り続けているそのカレーは、日々進化し続け、複雑で奥深い味となっています。 天を突き抜けるように高く盛り上げられたご飯は、そびえ立つ山のよう。そして、その麓には無限に広がる大海のようにカレーがかかっています。一口噛みしめるごとに、その長い蓄積による歴史の重みが胃袋に流れ込み、時の流れより解き放たれ、遥か彼方へのタイムトリップをしているようです。 どうやら、無事に跡継ぎの娘さんがいてくれたようで、今後も新たな未来へと歴史は続いていくことになりそう。安心です。",0,"0000-00-00 00:00:00"
このようにSKIPしてしまう
in:
(略)
parser:
(略)
default_timezone: Asia/Tokyo
null_string: 0000-00-00 00:00:00
columns:
- {name: id, type: long}
(略)
と null_string: 0000-00-00 00:00:00
を追加するとSKIPせず取り込まれる。
mysql> select * from ratings where id = 126223;
+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 126223 | 16360 | 1a12db7d | 3 | 2016-12-19 18:54:35 |
+--------+---------------+----------+-------+---------------------+
1 row in set (0.13 sec)
mysql> desc ratings;
+---------------+------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+------------+------+-----+-------------------+-----------------------------+
| id | bigint(20) | YES | | NULL | |
| restaurant_id | bigint(20) | YES | | NULL | |
| user_id | text | YES | | NULL | |
| total | bigint(20) | YES | | NULL | |
| created_on | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.00 sec)
embulk-filter-column プラグイン
このまま MySQL にデータを取り込んでも良いのですが、今回は必要なカラムを絞り、 id
, restaurant_id
, user_id
, total
, created_on
の5つのみのデータを扱うことにしたいと思います。
まずは プラグインのインストール
$ embulk gem install embulk-filter-column
in:
(略)
filters:
- type: column
columns:
- {name: 'id'}
- {name: 'restaurant_id'}
- {name: 'user_id'}
- {name: 'total'}
- {name: 'created_on'}
out: {type: stdout}
MySQLへ取り込む
embulk-output-mysqlプラグインのインストール
$ embulk gem install embulk-output-mysql
MySQLにテーブルを作成
GRANT ALL PRIVILEGES ON *.* to embulk@localhost IDENTIFIED BY '********';
CREATE DATABASE testdb;
config.xmlにMySQLプラグイン情報を追記
out:
type: mysql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
mode: replace
mode
という部分は insert
、insert_direct
, truncate_insert
, merge
, merge_direct
, replace
も選択することが可能。
mode | 挙動 |
---|---|
insert | 書込み(PRIMARY制約に引っかかると、全部失敗) |
insert_direct | 書込み(PRIMARY制約に引っかかると、全部失敗) |
truncate_insert | 置き換え、テーブル定義は変わらない |
merge | 上書き更新、追加 |
merge_direct | 上書き更新、追加 |
replace | 置き換え、テーブル定義は変更できる、 config.ymlの定義次第 |
データベースは事前に作成しておく必要がありますが、テーブルやスキーマの作成は Embulk 側で自動で作成してくれるので、事前に作成する必要はありません(ただ、INDEXなどは張らないといけないから自分で作っておいたほうが良い)
out:
type: mysql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
mode: replace
column_options:
id: {type: INT NOT NULLPRIMARY KEY}
restaurant_id: {type: INT DEFAULT 0}
user_id: {type: varchar(255) DEFAULT NULL}
total {type: INT}
created_on: {type: TIMESTAMP}
というようにテーブルの定義を記載することも可能
DROP TABLE IF EXISTS `ratings`;
CREATE TABLE `ratings` (
`id` INT DEFAULT 0,
`restaurant_id` INT DEFAULT 0,
`user_id` varchar(255) DEFAULT NULL,
`total` INT DEFAULT 0,
`created_on` datetime NOT NULL default '1970-01-01 00:00:00'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 MAX_ROWS=1000000000 AVG_ROW_LENGTH=200;
mysql> desc ratings;
+---------------+--------------+------+-----+---------------------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------------------+-------+
| id | int(11) | YES | | 0 | |
| restaurant_id | int(11) | YES | | 0 | |
| user_id | varchar(255) | YES | | NULL | |
| total | int(11) | YES | | 0 | |
| created_on | datetime | NO | | 1970-01-01 00:00:00 | |
+---------------+--------------+------+-----+---------------------+-------+
5 rows in set (0.00 sec)
テーブルを予め作成しない場合
mysql> desc ratings;
+---------------+------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+------------+------+-----+-------------------+-----------------------------+
| id | bigint(20) | YES | | NULL | |
| restaurant_id | bigint(20) | YES | | NULL | |
| user_id | text | YES | | NULL | |
| total | bigint(20) | YES | | NULL | |
| created_on | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.00 sec)
Embulkが config.yml
を元に、テーブルを自動的に作成されますが、カラムの型やDefault、Indexを決めるので、しっかりとした運用をする場合は、自分でTable定義をしっかりつめて、作成した方がいい。
テーブル定期もある程度はできるのでそちらに記載しておくのも良い。
複数のカラムを対象にユニークキー制約は記載はできないと思われる。
##Embulkの実行
$ embulk preview config.yml
で、dryrun
エラーが起きれば、原因を調査し、config.yml
などを修正
$ embulk run config.yml
で、実行。20万件のインストールが30秒程度で出来ました
$ mysql testdb
mysql> select count(*) from ratings;
+----------+
| count(*) |
+----------+
| 205832 |
+----------+
mysql> select * from ratings limit 10;
+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09 |
| 3842 | 10237 | fcc21401 | 1 | 2004-10-20 09:34:28 |
| 144379 | 3334 | 06412af7 | 2 | 2006-06-04 01:07:43 |
| 144377 | 15163 | 06412af7 | 5 | 2006-06-04 00:14:45 |
| 75967 | 567 | 4ceec99d | 3 | 2004-12-02 08:12:29 |
| 104898 | 1026 | 4ceec99d | 5 | 2005-01-04 12:57:02 |
| 86073 | 1058 | 4ceec99d | 5 | 2004-11-09 09:34:17 |
| 13968 | 2569 | 4ceec99d | 3 | 2004-09-23 08:29:57 |
| 97833 | 3309 | 4ceec99d | 4 | 2005-05-29 08:17:16 |
| 13991 | 3648 | 4ceec99d | 4 | 2004-09-27 20:14:50 |
+--------+---------------+----------+-------+---------------------+
10 rows in set (0.01 sec)
#実行例2 CSVをPostgreSQLに大量データを投入してみる
参考: EmbulkでMySQLに大量データを投入してみる - その1
実行例1と同様に、livedoor グルメの研究用データセットを使い、口コミのデータを PostgreSQL に投入する
→https://github.com/livedoor/datasets
約20万件のCSV形式のファイル
ldgourmet.tar.gzをダウンロードし、解凍。この中のratings.csvのみを使用
Postgresはローカルにインストール済みとする
seed_csv2postgres.ymlの作成
in:
type: file
path_prefix: "./ratings.csv"
out:
type: stdout
###Posgres Output Pluginのインストールと設定の追加
$ embulk gem install embulk-output-postgresql
2016-12-20 19:08:11.615 +0900: Embulk v0.8.9
Fetching: embulk-output-postgresql-0.7.2.gem (100%)
Successfully installed embulk-output-postgresql-0.7.2
1 gem installed
参考)PostgreSQLに関連するEmbulkのPlugins
-
embulk-output-postgresql
PostgreSQLのテーブルへデータを出力するJDBCプラグイン -
embulk-output-postgres-json
PostgreSQLのJSONカラムへデータを出力するプラグイン -
embulk-output-postgres-udf
PostgreSQLのユーザー定義関数を実行するプラグイン -
embulk-input-postgresql
PostgreSQLのテーブルからデータを取得するJDBCプラグイン
postgreSQLのインストールと設定(参考までに)
brew install postgres
postgres -D /usr/local/var/postgres &
$ postgres -D /usr/local/var/postgres
LOG: database system was shut down at 2016-12-19 16:40:42 JST
LOG: MultiXact member wraparound protections are now enabled
LOG: database system is ready to accept connections
LOG: autovacuum launcher started
$ createdb testdb
$ psql testdb
psql (9.6.1)
Type "help" for help.
testdb=#
postgres=# CREATE ROLE embulk LOGIN PASSWORD '*******';
DROP TABLE IF EXISTS ratings;
CREATE TABLE ratings (
id INT DEFAULT 0,
restaurant_id INT DEFAULT 0,
user_id varchar(255) DEFAULT NULL,
total INT DEFAULT 0,
created_on timestamp NOT NULL default '1970-01-01 00:00:00'
);
testdb=# grant all on ratings to embulk;
GRANT
testdb=# \z
Access privileges
Schema | Name | Type | Access privileges | Column privileges | Policies
--------+---------+-------+-----------------------+-------------------+----------
public | ratings | table | a11052=arwdDxt/a11052+| |
| | | embulk=arwdDxt/a11052 | |
(1 row)
testdb=# \d ratings
Table "public.ratings"
Column | Type | Modifiers
---------------+-----------------------------+---------------------------------------------------------------------
id | integer | default 0
restaurant_id | integer | default 0
user_id | character varying(255) | default NULL::character varying
total | integer | default 0
created_on | timestamp without time zone | not null default '1970-01-01 00:00:00'::timestamp without time zone
config_csv2postgres.ymlの作成
###設定ファイルの雛形作成
$ embulk guess seed_csv2postgres.yml -o config_csv2postgres.yml
日時の 0000-00-00 00:00:00
へのParseError対応のために
$ diff config_csv2postgres.yml.genarated config_csv2postgres.yml
14a15
> null_string: 0000-00-00 00:00:00
PostgreSQLの出力の設定を追加
out:
type: postgresql
host: localhost
user: embulk
password: "*********"
database: testdb
table: ratings
mode: insert_direct
modeは
mode | 説明 |
---|---|
insert | 幾つかの中間テーブルに一旦書き出した後に、INSERT。Resume可能 |
insert_direct | 直接TABLEにINSERT。Resume不可 |
truncate_insert | mode:insertと挙動は一緒、ただしそれまでのTableの内容は破棄される。Resume可 |
replace | 一旦中間テーブルに書き出し、無事書込みが完了したら、以前のTableをDROPし、中間テーブルの名前をRenameします。Resume不可 |
merge | 一旦、中間テーブルにINSERT、そちらが正常に書き込まれれば、既存のテーブルに追記。既に存在するデータに関しては、Updateされる。(MySQLでいうところの、REPLACEな挙動) |
の5modeがある。
FilterPluginを用いて、DBへ投入する項目を5つ抜き出す
in:
(略)
filters:
- type: column
columns:
- {name: 'id'}
- {name: 'restaurant_id'}
- {name: 'user_id'}
- {name: 'total'}
- {name: 'created_on'}
out:
(略)
embulk preview config_csv2postgres.yml
$ embulk preview config_csv2postgres.yml
2016-12-21 15:49:17.779 +0900: Embulk v0.8.9
2016-12-21 15:49:18.639 +0900 [INFO] (0001:preview): Loaded plugin embulk-filter-column (0.6.0)
2016-12-21 15:49:18.654 +0900 [INFO] (0001:preview): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-12-21 15:49:18.660 +0900 [INFO] (0001:preview): Loading files [ratings.csv]
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long | created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 | 310,595 | ee02f26a | 5 | 2006-10-07 05:06:09 UTC |
| 3,842 | 10,237 | fcc21401 | 1 | 2004-10-20 00:34:28 UTC |
UTCではなく、TimeZoneはTokyoにするので
in:
type: file
path_prefix: ./ratings.csv
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
default_timezone: Asia/Tokyo
null_string: 0000-00-00 00:00:00
columns:
- {name: id, type: long}
(略)
$ embulk preview config_csv2postgres.yml
2016-12-21 15:51:29.392 +0900: Embulk v0.8.9
2016-12-21 15:51:30.245 +0900 [INFO] (0001:preview): Loaded plugin embulk-filter-column (0.6.0)
2016-12-21 15:51:30.263 +0900 [INFO] (0001:preview): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-12-21 15:51:30.267 +0900 [INFO] (0001:preview): Loading files [ratings.csv]
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long | created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 | 310,595 | ee02f26a | 5 | 2006-10-06 20:06:09 UTC |
embulk run config_csv2postgres.yml
testdb=# select * from ratings;
id | restaurant_id | user_id | total | created_on
----+---------------+---------+-------+------------
(0 rows)
$ embulk run config_csv2postgres.yml
2016-12-21 15:54:21.597 +0900 [ERROR] (0018:task-0000): Operation failed (0:23502)
2016-12-21 15:54:21.603 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: org.postgresql.util.PSQLException: ERROR: null value in column "created_on" violates not-null constraint
詳細: Failing row contains (137714, 302419, 0826905a, 4, null).
場所: COPY ratings, line 7312: "137714 302419 0826905a 4 \N"
Column: create_on の定義で nullを許容しないのに、 0000-00-00 00:00:00
をNULLとしたので怒られた。
DROP TABLE IF EXISTS ratings;
CREATE TABLE ratings (
id INT DEFAULT 0,
restaurant_id INT DEFAULT 0,
user_id varchar(255) DEFAULT NULL,
total INT DEFAULT 0,
created_on timestamp default '1970-01-01 00:00:00'
);
GRANT ALL ON ratings TO embulk;
created_on の NOT NULL 制約を外した。(うまいやり方が見つからなかった)
testdb=# select count(*) from ratings;
count
--------
205832
(1 row)
testdb=# select * from ratings limit 10;
id | restaurant_id | user_id | total | created_on
--------+---------------+----------+-------+---------------------
156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09
3842 | 10237 | fcc21401 | 1 | 2004-10-20 09:34:28
144379 | 3334 | 06412af7 | 2 | 2006-06-04 01:07:43
144377 | 15163 | 06412af7 | 5 | 2006-06-04 00:14:45
75967 | 567 | 4ceec99d | 3 | 2004-12-02 08:12:29
104898 | 1026 | 4ceec99d | 5 | 2005-01-04 12:57:02
86073 | 1058 | 4ceec99d | 5 | 2004-11-09 09:34:17
13968 | 2569 | 4ceec99d | 3 | 2004-09-23 08:29:57
97833 | 3309 | 4ceec99d | 4 | 2005-05-29 08:17:16
13991 | 3648 | 4ceec99d | 4 | 2004-09-27 20:14:50
testdb=# select * from ratings where id = 126548;
id | restaurant_id | user_id | total | created_on
--------+---------------+----------+-------+------------
126548 | 304 | d55363c1 | 5 |
(1 row)
created_on が 0000-00-00 00:00:00
はNULL
として取り込まれていた。
#実行例3) PostgreSQLからMySQLに大量データを投入してみる
実行例2でPostgreSQLに投入したデータをMySQLへデータ転送してみます。
プラグインのインストール
embulk-input-postgresql プラグインのインストール
$ embulk gem install embulk-input-postgresql
2016-12-21 17:06:41.072 +0900: Embulk v0.8.9
Fetching: embulk-input-postgresql-0.8.0.gem (100%)
Successfully installed embulk-input-postgresql-0.8.0
1 gem installed
embulk-output-mysqlプラグインのインストール
※既に実行例1でインストールしている場合は不要
$ embulk gem install embulk-output-mysql
(略)
seed_postgres2mysql.ymlの作成
in:
type: postgresql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
out:
type: mysql
host: localhost
user: embulk
password: ********
database: testdb
table: ratings
mode: replace
config_postgresql2mysql.ymlの作成
$ embulk guess seed_postgresql2mysql.yml -o config_postgresql2mysql.yml
in: {type: postgresql, host: localhost, user: embulk, password: ********, database: testdb,
table: ratings}
out: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
table: ratings, mode: replace}
embulk preview config_postgresql2mysql.yml
$ embulk preview config_postgresql2mysql.yml
2016-12-21 17:19:51.908 +0900: Embulk v0.8.9
2016-12-21 17:19:52.737 +0900 [INFO] (0001:preview): Loaded plugin embulk-input-postgresql (0.8.0)
2016-12-21 17:19:52.809 +0900 [INFO] (0001:preview): SQL: SET search_path TO "public"
2016-12-21 17:19:52.956 +0900 [INFO] (0001:preview): SQL: SET search_path TO "public"
2016-12-21 17:19:52.958 +0900 [INFO] (0001:preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "ratings"
2016-12-21 17:19:52.960 +0900 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2016-12-21 17:19:52.969 +0900 [INFO] (0001:preview): > 0.01 seconds
2016-12-21 17:19:52.986 +0900 [INFO] (0001:preview): Fetched 500 rows.
+---------+--------------------+----------------+------------+-------------------------+
| id:long | restaurant_id:long | user_id:string | total:long | created_on:timestamp |
+---------+--------------------+----------------+------------+-------------------------+
| 156,445 | 310,595 | ee02f26a | 5 | 2006-10-07 05:06:09 UTC |
| 3,842 | 10,237 | fcc21401 | 1 | 2004-10-20 00:34:28 UTC |
embulk run config_postgresql2mysql.yml
$ embulk run config_postgresql2mysql.yml
20万件が6秒程度で転送されました。
testdb=# select count(*) from ratings;
count
--------
205832
(1 row)
mysql> select count(*) from ratings;
+----------+
| count(*) |
+----------+
| 205832 |
+----------+
1 row in set (0.05 sec)
testdb=# select * from ratings where id = 156445;
id | restaurant_id | user_id | total | created_on
--------+---------------+----------+-------+---------------------
156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09
(1 row)
mysql> select * from ratings where id = 156445;
+--------+---------------+----------+-------+---------------------+
| id | restaurant_id | user_id | total | created_on |
+--------+---------------+----------+-------+---------------------+
| 156445 | 310595 | ee02f26a | 5 | 2006-10-07 14:06:09 |
+--------+---------------+----------+-------+---------------------+
日付も問題なく転送されています。
#Embulkの性能評価
##テストデータの作成(embulk-input-random)
まず、embulk-input-random
をプラグインを用いて、1000万件のデータを作成します
参考: http://hito4-t.hatenablog.com/entry/2015/03/04/122051
$ embulk gem install embulk-input-random
2016-12-21 17:59:02.029 +0900: Embulk v0.8.9
Fetching: embulk-plugin-input-random-0.0.2.gem (100%)
Successfully installed embulk-plugin-input-random-0.0.2
1 gem installed
exec: {}
in:
type: random
rows: 10000000
schema:
id: primary_key
number: integer
value: string
date: date
out:
type: file
path_prefix: ./testdata
file_ext: csv
formatter:
type: csv
header_line: true
charset: UTF-8
newline: CRLF
$ embulk run config_testdata.yml
として、実行
出力ファイルが7つ作成されます。
id,number,value,date
0,6429,AhU9wES1oNowLqSpbU9UjRDctMuRJhGe24LPeR_zi9k,1982-11-23 21:38:49.810582 +0000
1,5239,OzzGzHNK5pqTXiRL9kpLF5rJ2I5UU5U053nm3p2PNRk,2015-03-11 10:20:37.964128 +0000
2,6841,AHRodbeQ2kDFzsiZG2tPJb6O0zaJ-kKGczKXJF0ADZ8,2014-07-17 09:27:21.998475 +0000
3,1212,kanc2z0YjujfzE8ceYsv5RFbtj6JsMGTD92vPvApQss,1978-11-20 09:54:52.473764 +0000
というファイルが7つできるので、testdata000.00.csv
以外のファイルの1行目を削除して、マージ
testdata.csv
を作成した。
行数は、10,000,000件(一行目は、ヘッダーとして id,number,value,date
となっている)
id,number,value,date
0,6429,AhU9wES1oNowLqSpbU9UjRDctMuRJhGe24LPeR_zi9k,1982-11-23 21:38:49.810582 +0000
1,5239,OzzGzHNK5pqTXiRL9kpLF5rJ2I5UU5U053nm3p2PNRk,2015-03-11 10:20:37.964128 +0000
........
CSV to MySQLの計測
seed_speedtest.yml の作成
in:
type: file
path_prefix: "./testdata.csv"
out:
type: mysql
host: localhost
user: embulk
password: ********
database: testdb
table: speedtest
mode: replace
config_speedtest.yml の作成
$ embulk guess seed_speedtest.yml -o config_speedtest.yml
で生成されたファイルに
default_timezone: Asia/Tokyo
とout: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb, table: speedtest, mode: insert}
と mode: insert
にして作成
in:
type: file
path_prefix: ./testdata.csv
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
default_timezone: Asia/Tokyo
columns:
- {name: id, type: long}
- {name: number, type: long}
- {name: value, type: string}
- {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
out: {type: mysql, host: localhost, user: embulk, password: ********, database: testdb,
table: speedtest, mode: insert}
スピードテストの内容
$ time embulk run config_speedtest.yml
にてテスト
mysql> select table_name, table_rows from information_schema.TABLES where table_schema = 'testdb';
+----------------------------------------------+------------+
| table_name | table_rows |
+----------------------------------------------+------------+
| ratings | 205216 |
| speedtest_00000000585b76f40243d580_bl_tmp000 | 90168 |
| speedtest_00000000585b76f40243d580_bl_tmp001 | 90272 |
| speedtest_00000000585b76f40243d580_bl_tmp002 | 89948 |
| speedtest_00000000585b76f40243d580_bl_tmp003 | 89908 |
| speedtest_00000000585b76f40243d580_bl_tmp004 | 89908 |
| speedtest_00000000585b76f40243d580_bl_tmp005 | 90077 |
| speedtest_00000000585b76f40243d580_bl_tmp006 | 89908 |
| speedtest_00000000585b76f40243d580_bl_tmp007 | 90272 |
+----------------------------------------------+------------+
9 rows in set (0.00 sec)
途中経過はこのように。8つのThreadで中間テーブルに一旦INSERT処理をしているのがわかる
計測結果
Item | ||
---|---|---|
Input | CSVFile | 固定 |
Output | MySQL | 固定 |
件数 | 1000万件 | 100万、200万、300万、、、、1000万 |
mode: | insert | insert, insert_direct, truncate_insert, merge, merge_direct, replace |
実行環境
- Java1.8
- MySQL Ver 14.14 Distrib 5.7.11, for osx10.10 (x86_64) using EditLine wrapper
- Embulk 0.8.9
- MacBook Pro (Retina, 15-inch, Mid 2015)
- プロセッサ:2.2 GHz Intel Core i7
- メモリ:16 GB 1600 MHz DDR3
- macOS Sieera (バージョン:10.12.2(16C67))
+--------+------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+--------+------------+------+-----+-------------------+-----------------------------+
| id | bigint(20) | YES | | NULL | |
| number | bigint(20) | YES | | NULL | |
| value | text | YES | | NULL | |
| date | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------+------------+------+-----+-------------------+-----------------------------+
実行は、time embulk run config_speedtest.yml
にて実行時間を計測
####CSVtoMySQL mode:insert, 1000万件
Time | 平均 | 1回目 | 2回目 | 3回目 | 4回目 | 5回目 |
---|---|---|---|---|---|---|
real | 26m34s | 26m38s | 26m24s | 27m6s | 26m41s | 26m2s |
user | 27m59s | 27m38s | 32m18s | 32m0s | 27m25s | |
sys | 0m10s | 0m9s | 0m14s | 0m12s | 0m9s |
####CSVtoMySQL mode:insert、取込件数100万件〜1000万件
|件数|100万|200万|300万|400万|500万|600万|700万|800万|900万|1000万|
|----|----|----|----|----|----|----|----|----|----|----|----|----|
|Time|2m49s|5m34s|8m10s|10m39s|12m54s|17m7s|19m59s|22m16s|25m21s|33m50s|
600万以降は別の日に行ったので若干傾向が変わってしまっています。
####CSV to MySQL NewTable
取込件数:200万件、mode:insert, insert_direct, truncate_insert, marge, marge_direct, replace
事前に、MySQLのテーブルをDROP
mode | insert | insert_direct | truncate_insert | marge | marge_direct | replace |
---|---|---|---|---|---|---|
Time | 6m39s | 6m46s | 6m43s | 6m50s | 6m25s | 6m38s |
こちらは、実施前に、テーブルをDROPしているので、各種オプションの挙動に大きな違いが出なかったものと思われる。
CSVtoMySQL 追記Version(200万件あるデータに重複ID100万、新規ID100万)
取込件数:200万件、mode:insert, insert_direct, truncate_insert, marge, marge_direct, replace
事前に、ID:1〜200,000のデータが入ったTableを準備、追記するデータは200万件のデータ。(100万件は重複ID、100万件は新規ID)
mode | insert | insert_direct | truncate_insert | marge | marge_direct | replace |
---|---|---|---|---|---|---|
Time | 6m4s | 5m33s | 6m32s | 6m28s | 6m13s | 6m32s |
count(before) | 2,000,000 | 2,000,000 | 2,000,000 | 2,000,000 | 2,000,000 | 2,000,000 |
count(after) | 2,000,000 | 2,000,000 | 2,000,000 | 2,999,936 | 2,999,936 | 2,000,000 |
State | 更新なし、異常終了 *1 | 更新なし、異常終了 *1 | 置き換わり | 更新、追加 *2 | 更新、追加 *2 | 置き換わり |
*1 idのPRYMARY KEYの制約により書込み不能
*2 約100万件の重複データが更新、約100万件が追加、約100万がそのまま
truncate_insert と replaceの違いは、truncate_insertは、すでにあるTABLEをTRUNCATEしているので、テーブル定義は変更なし。replaceはテーブル定義は設定ファイルのものになる(すでにあるテーブルをDROPして、中間テーブルをRENAME)
#独自プラグインの作成(Java)
$ embulk new java-filter reports
016-12-23 21:50:50.486 +0900: Embulk v0.8.9
Creating embulk-filter-reports/
Creating embulk-filter-reports/README.md
Creating embulk-filter-reports/LICENSE.txt
Creating embulk-filter-reports/.gitignore
Creating embulk-filter-reports/gradle/wrapper/gradle-wrapper.jar
Creating embulk-filter-reports/gradle/wrapper/gradle-wrapper.properties
Creating embulk-filter-reports/gradlew.bat
Creating embulk-filter-reports/gradlew
Creating embulk-filter-reports/config/checkstyle/checkstyle.xml
Creating embulk-filter-reports/config/checkstyle/default.xml
Creating embulk-filter-reports/build.gradle
Creating embulk-filter-reports/lib/embulk/filter/reports.rb
Creating embulk-filter-reports/src/main/java/org/embulk/filter/reports/ReportsFilterPlugin.java
Creating embulk-filter-reports/src/test/java/org/embulk/filter/reports/TestReportsFilterPlugin.java
Plugin template is successfully generated.
Next steps:
$ cd embulk-filter-reports
$ ./gradlew package
こちらのコマンドを使って、雛形を作成する。
$ cd embulk-filter-reports
$ git init
Initialized empty Git repository in /Users/a11052/git/embulk-filter-reports/.git/
$ cat .git/config
[core]
repositoryformatversion = 0
filemode = true
bare = false
logallrefupdates = true
ignorecase = true
precomposeunicode = true
$ git remote add origin git@github.com:tashirogakuca/embulk-filter-reports.git
$ cat .git/config
[core]
repositoryformatversion = 0
filemode = true
bare = false
logallrefupdates = true
ignorecase = true
precomposeunicode = true
[remote "origin"]
url = git@github.com:tashirogakuca/embulk-filter-reports.git
fetch = +refs/heads/*:refs/remotes/origin/*
こちらで、GitにPushできるようする
実際には、
package org.embulk.filter.reports;
import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
public class ReportsFilterPlugin implements FilterPlugin{
public interface PluginTask extends Task {
// configuration option 1 (required integer)
@Config("option1")
public int getOption1();
// configuration option 2 (optional string, null is not allowed)
@Config("option2")
@ConfigDefault("\"myvalue\"")
public String getOption2();
// configuration option 3 (optional string, null is allowed)
@Config("option3")
@ConfigDefault("null")
public Optional<String> getOption3();
}
@Override
public void transaction(ConfigSource config, Schema inputSchema, FilterPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
Schema outputSchema = inputSchema;
control.run(task.dump(), outputSchema);
}
@Override
public PageOutput open(TaskSource taskSource, Schema inputSchema, Schema outputSchema, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
// Write your code here :)
throw new UnsupportedOperationException("ReportsFilterPlugin.open method is not implemented yet");
}
}
こちらのファイルを触り、所望の挙動を実装する。
(以下、引用)
開発時には試行錯誤がつきものですので、Embulk plugin 開発時に Embulk 本体へ手早く開発中の plugin を読み込ませ動作確認をしたいときがあります。Embulk の run コマンドの -I オプションで、その plugin の gem ファイルまで作成せずに Embulk 本体にその plugin を読み込ませテストすることができます。
$ embulk run -I <path-to-plugin>/lib config.yml
ただし、上記のコマンドを繰り返し実行する前に、./gradlew classpath
や ./gradle gem
等を利用し、あらかじめ classpath/
にプラグインに依存関係のある jar files
を取得しておく必要があります
#まとめ
Embulkを使って、様々な形式の入力をデータ加工して出力した
性能評価、Embulkの有用性がよく分かった。
- 手順が簡単
- 必要最低限の設定ファイルを作成
- データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成
-
config.yml
を編集する - dryrun
- 実行
- わかりやすい、直感的
- 入力(input)→解析(parse)→加工(filter)→出力(output) という流れの設定ファイル
- guess機能(入力データを自動で推定し、設定ファイルを生成)が非常に便利
- かゆいところに手が届く
- CSVなら項目をすべて抜き出し、データ形式を推定
- 圧縮ファイルならば、decodeフィルターを自動的に設定
- 3つの形で転送を行った
- CSV → MySQL
- CSV → PostgreSQL
- PostgreSQL → MySQL
- 大量デモデータの作成を行った、プラグイン(
embulk-input-random
)使用 - 性能評価
- 1000万件のCSV(866MB)をMySQLに→約30分
- 処理時間はほぼ件数に比例する(100万件、200万件〜1000万件)
- MySQLのOutputのmodeによる時間の比較と動作内容
#今後
Orionの統計情報をEmbulkを使い、MySQLへ転送する
MySQLへ転送したデータを用い、BIツール「Tableau」で、レポート化を行う予定
(画像精度評価レポート、監視進捗報告レポート)
独自プラグインの実装については、やり方の調査までだった。
実際にプラグインを実装してみたい
#参考