はじめに
以前書いたMemo: Embulk-1のときにはTreasureDataのoutputプラグインがなかったが、今朝みたらTreasureDataのoutputプラグインがあったので試してみた。
またせっかくなのでEmbulkで実装されいてるTreasureDataが提供しているDataConnectorも試してみた。
DataConnectorでは自分のリソースを使わずにデータをYBIに入れることが出来るので、今後既存データをYBIにデータを入れる主要な方法になると思う。
環境
- OS: CentOS7.1
- Ruby: ruby 2.0.0p598 (2014-11-13) [x86_64-linux]
- java: openjdk version "1.8.0_51"
- embulk: v0.6.19
- tdコマンド: 0.11.13
事前準備
Javaの実行環境やMySQLのClientツール、TDのコマンドラインツールをインストールするために必要なパッケージを以下のコマンドでインストールしておく。
$ sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
$ sudo yum install -y mariadb mariadb-devel
$ sudo yum install gcc gcc-c++ openssl-devel libyaml-devel libffi-devel readline-devel zlib-devel gdbm-devel ncurses-devel
Embulk + YBI
Embulk Install
https://github.com/embulk/embulk に書いてある通りにインストールする。
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
$ env | grep PATH
PATH=/root/.embulk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
$
Plugin Install
Fluentdと同様に既に多くのPluginが開発されている。
今回試してみるのはMySQL(MariaDB)からTreasureDataにデータをインポートさせるので、MySQLのinputプラグインとTreasureDataのoutputプラグインをインストールする。
現在のインストール可能なプラグイン一覧
77個あるらしい。。。
$ embulk gem list embulk -r
2015-07-29 12:34:06.823 +0900: Embulk v0.6.19
*** REMOTE GEMS ***
embulk (0.6.19)
embulk-decoder-commons-compress (0.3.0)
embulk-encoder-xz (0.1.0)
embulk-executor-mapreduce (0.1.5)
embulk-filter-column (0.1.6)
embulk-filter-eval (0.1.0)
embulk-filter-insert (1.0.0)
embulk-filter-row (0.1.3)
embulk-filter-speedometer (0.2.2)
embulk-formatter-jsonl (0.1.4)
embulk-input-apache-dummy-log (0.1.0)
embulk-input-command (0.1.2)
embulk-input-dynamodb (0.0.3)
embulk-input-filesplit (0.1.2)
embulk-input-ftp (0.1.2)
embulk-input-gcs (0.1.4)
embulk-input-hdfs (0.0.2)
embulk-input-http (0.0.6)
embulk-input-jdbc (0.6.0)
embulk-input-jira (0.0.6)
embulk-input-jstat (0.0.3)
embulk-input-marketo (0.1.0)
embulk-input-mixpanel (0.1.0)
embulk-input-mysql (0.6.0)
embulk-input-oracle (0.6.0)
embulk-input-pcapng-files (0.1.2)
embulk-input-postgresql (0.6.0)
embulk-input-random (0.0.2)
embulk-input-redis (0.1.6)
embulk-input-redshift (0.6.0)
embulk-input-remote (0.1.4)
embulk-input-riak_cs (0.2.1)
embulk-input-s3 (0.2.1)
embulk-input-sfdc (0.0.3)
embulk-input-slack-history (0.1.1)
embulk-input-sql (0.1.0)
embulk-input-sqlserver (0.6.0)
embulk-output-bigquery (0.1.7)
embulk-output-command (0.1.2)
embulk-output-elasticsearch (0.1.7)
embulk-output-gcs (0.1.1)
embulk-output-hdfs (0.1.1)
embulk-output-jdbc (0.4.1)
embulk-output-mysql (0.4.1)
embulk-output-oracle (0.4.1)
embulk-output-parquet (0.1.0)
embulk-output-postgres-json (0.2.0)
embulk-output-postgres-udf (0.1.2)
embulk-output-postgresql (0.4.1)
embulk-output-redis (0.1.6)
embulk-output-redshift (0.4.1)
embulk-output-s3 (0.1.0)
embulk-output-salesforce (0.1.2)
embulk-output-sqlite3 (0.0.1)
embulk-output-td (0.1.2)
embulk-output-vertica (0.2.2)
embulk-parser-apache-log (0.1.0)
embulk-parser-fluent-s3-log (0.0.1)
embulk-parser-jdbc-schema-csv (0.0.1)
embulk-parser-json (0.0.1)
embulk-parser-jsonl (0.0.1)
embulk-parser-pcapng (0.1.0)
embulk-parser-query_string (0.1.3)
embulk-parser-roo-excel (0.0.1)
embulk-parser-xml (0.0.3)
embulk-plugin-filter-convert (0.0.1)
embulk-plugin-filter-grep (0.0.2)
embulk-plugin-input-hbase (0.0.2 java, 0.0.1)
embulk-plugin-input-jstat (0.0.2)
embulk-plugin-input-pcapng-files (0.0.2)
embulk-plugin-input-random (0.0.2)
embulk-plugin-input-roo-excel (0.1.1)
embulk-plugin-input-sfdc-event-log-files (0.0.7)
embulk-plugin-mysql (0.0.3)
embulk-plugin-redis (0.1.5)
embulk-plugin-twitterstream (0.0.2)
embulk-plugin-vertica (0.0.2)
embulk-plugin-vim (0.0.2)
$
MySQL Inputプラグイン
$ embulk gem install embulk-input-mysql
TreasureData Outputプラグイン
$ embulk gem install embulk-output-td
環境設定
MySQL
以下のテーブル定義のデータ。
mysql> DESC datas;
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | mediumint(9) | NO | PRI | NULL | auto_increment |
| num | bigint(20) | YES | | NULL | |
| amount | bigint(20) | YES | | NULL | |
| date | datetime | YES | | NULL | |
+-------------+--------------+------+-----+---------+----------------+
5 rows in set (0.00 sec)
mysql> SELECT * FROM datas_dump LIMIT 1;
+--------+------------+-------------+---------------------+
| id | num | amount | date |
+--------+------------+-------------+---------------------+
| 122823 | 5529888889 | 83610957107 | 2015-07-24 20:00:32 |
+--------+------------+-------------+---------------------+
1 row in set (0.00 sec)
mysql>
YBI
API-Keyの取得方法はTreasureData + JavaScript SDKを参照。
embulk.embulkというテーブルを作成。
$ td db:create embulk
Database 'embulk' is created.
Use 'td table:create embulk <table_name>' to create a table.
$ td table:create embulk embulk
Table 'embulk.embulk' is created.
$
config.ymlの作成
以下のconfigファイルを作成
in:
type: mysql
user: ${user}
password: ${pass}
database: ybi
table: datas
host: hostname
select: "*"
out:
type: td
apikey:
endpoint: ybi.jp-east.idcfcloud.com
use_ssl: true
database: embulk
table: embulk
time_column: date
Import
Preview
$ embulk preview config.yml | head -10
2015-07-29 13:13:47.209 +0900: Embulk v0.6.19
2015-07-29 13:13:50.228 +0900 [INFO] (preview): Loaded plugin embulk-input-mysql (0.6.0)
2015-07-29 13:13:50.606 +0900 [INFO] (preview): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:13:51.629 +0900 [INFO] (preview): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:13:51.689 +0900 [INFO] (preview): SQL: SELECT id,import_num as num,volume as amount,dump_date as date FROM `datas_dump`
2015-07-29 13:13:51.694 +0900 [INFO] (preview): > 0.00 seconds
+---------+----------------+-----------------+-------------------------+
| id:long | num:long | amount:long | date:timestamp |
+---------+----------------+-----------------+-------------------------+
| 122,823 | 5,529,888,889 | 83,610,957,107 | 2015-07-24 11:00:32 UTC |
$
良さげですね。
Run
問題なければRun。
$ embulk run config.yml
2015-07-29 13:15:03.761 +0900: Embulk v0.6.19
2015-07-29 13:15:16.770 +0900 [INFO] (transaction): Loaded plugin embulk-input-mysql (0.6.0)
2015-07-29 13:15:17.095 +0900 [INFO] (transaction): Loaded plugin embulk-output-td (0.1.2)
2015-07-29 13:15:17.297 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:15:19.625 +0900 [INFO] (transaction): Logging initialized @25186ms
2015-07-29 13:15:22.233 +0900 [INFO] (transaction): Duplicating date:timestamp column to 'time' column as seconds for the data partitioning
2015-07-29 13:15:22.234 +0900 [INFO] (transaction): Create bulk_import session embulk_20150729_041509_962000000
2015-07-29 13:15:22.490 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-07-29 13:15:22.505 +0900 [INFO] (task-0000): Duplicating date:timestamp column to 'time' column as seconds for the data partitioning
2015-07-29 13:15:23.861 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:15:24.070 +0900 [INFO] (task-0000): SQL: SELECT * FROM `datas`
2015-07-29 13:15:24.076 +0900 [INFO] (task-0000): > 0.00 seconds
2015-07-29 13:15:24.823 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-07-29 13:15:25.039 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-07-29 13:15:25.583 +0900 [INFO] (task-0000): {uploading: {rows: 1944, size: 19,988 bytes (compressed)}}
2015-07-29 13:15:26.439 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-07-29 13:15:27.017 +0900 [INFO] (transaction): Performing bulk import session 'embulk_20150729_041509_962000000'
2015-07-29 13:16:04.166 +0900 [INFO] (transaction): job id: 1194645
2015-07-29 13:16:04.167 +0900 [INFO] (transaction): Committing bulk import session 'embulk_20150729_041509_962000000'
2015-07-29 13:16:04.167 +0900 [INFO] (transaction): valid records: 1944
2015-07-29 13:16:04.167 +0900 [INFO] (transaction): error records: 0
2015-07-29 13:16:04.167 +0900 [INFO] (transaction): valid parts: 1
2015-07-29 13:16:04.167 +0900 [INFO] (transaction): error parts: 0
2015-07-29 13:16:10.735 +0900 [INFO] (transaction): Deleting bulk import session 'embulk_20150729_041509_962000000'
2015-07-29 13:16:11.078 +0900 [INFO] (main): Committed.
2015-07-29 13:16:11.078 +0900 [INFO] (main): Next config diff: {"in":{},"out":{"last_session":"embulk_20150729_041509_962000000"}}
$
裏側ではTDのコマンドラインツールのバルクアップローダーが動いてそうですね。
確認
$ td query -w -t presto -d embulk "SELECT * FROM embulk LIMIT 1"
+--------+------+--------+-------------------------+------------+
| id | num | amount | date | time |
+--------+------+--------+-------------------------+------------+
| 123576 | 3108 | 0 | 2015-07-27 11:00:35.000 | 1437994835 |
+--------+------+--------+-------------------------+------------+
$
ちゃんと入っていますね。
DataConnector + YBI
TDコマンドラインツールのインストール
DataConnectorは今のところTDコマンドラインツール経由でしか利用できないので以下のコマンドでインストールする。
$ gem install td
DataConnectorの詳細については以下を参照。
http://ybi-docs.idcfcloud.com/categories/data-connector
config_dataconn.ymlの作成
Embulkと同様に以下のようなconfigファイルを作成する。
in:
type: mysql
user: ${user}
password: ${pass}
database: ybi
table: datas
host: hostname
select: "*"
out:
mode: append
modeはappend/replaceを指定可能。
guess
以下なぜload.ymlにいったん吐き出すかわからないが、config_dataconn.ymlでフォーマットがまずいところをいい感じにフォーマッティングしてくれるのかな。
$ td connector:guess config_dataconn.yml -o load.yml
$ td connector:preview load.yml
+---------+-------------+--------------+---------------------------+
| id:long | num:long | amount:long | date:timestamp |
+---------+-------------+--------------+---------------------------+
| 124870 | 5529986558 | 83611069544 | "2015-07-25 20:00:23 UTC" |
Database/Table作成
$ td db:create data_connector
$ td table:create data_connector mysql
job登録
YBIへのインポートJob登録は以下のコマンドで実行する。
$ td connector:issue load.yml --database data_connector --table mysql
Job 1202041 is queued.
Use 'td job:show 1202041' to show the status.
$
job登録後は今のところはクライアントツールでJobの状態を確認する必要がある。
$ td job:show 1202041
JobID : 1202041
Status : success
Type : bulkload
Database : data_connector
Use '-v' option to show detailed messages.
$
上の状態になったら完了。
確認
以下のクエリを実行してデータが入っているのかを確認。
$ td query -w -t presto -d data_connector "SELECT * FROM mysql LIMIT 1"
+--------+------------+-------------+-------------------------+------------+
| id | num | amount | date | time |
+--------+------------+-------------+-------------------------+------------+
| 124870 | 5529986558 | 83611069544 | 2015-07-25 20:00:23.000 | 1437854423 |
+--------+------------+-------------+-------------------------+------------+
$
入ってますね^^
おわりに
Embulkのおかげで今後様々な今あるデータを、様々な場所に簡単に送れそうですね。
TDのoutputプラグインにはtimeformatを記載する場所がないけど、それはoutputプラグインの方で良い感じでやってくれるのかな?、今度時間があるときにでも調べてみます。