2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulk,DataConnector + YBI

Last updated at Posted at 2015-07-29

はじめに

以前書いたMemo: Embulk-1のときにはTreasureDataのoutputプラグインがなかったが、今朝みたらTreasureDataのoutputプラグインがあったので試してみた。
またせっかくなのでEmbulkで実装されいてるTreasureDataが提供しているDataConnectorも試してみた。
DataConnectorでは自分のリソースを使わずにデータをYBIに入れることが出来るので、今後既存データをYBIにデータを入れる主要な方法になると思う。

環境

  • OS: CentOS7.1
  • Ruby: ruby 2.0.0p598 (2014-11-13) [x86_64-linux]
  • java: openjdk version "1.8.0_51"
  • embulk: v0.6.19
  • tdコマンド: 0.11.13

事前準備

Javaの実行環境やMySQLのClientツール、TDのコマンドラインツールをインストールするために必要なパッケージを以下のコマンドでインストールしておく。

$ sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
$ sudo yum install -y mariadb mariadb-devel
$ sudo yum install gcc gcc-c++ openssl-devel libyaml-devel libffi-devel readline-devel zlib-devel gdbm-devel ncurses-devel 

Embulk + YBI

Embulk Install

https://github.com/embulk/embulk に書いてある通りにインストールする。

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
$ env | grep PATH
PATH=/root/.embulk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
$

Plugin Install

Fluentdと同様に既に多くのPluginが開発されている。
今回試してみるのはMySQL(MariaDB)からTreasureDataにデータをインポートさせるので、MySQLのinputプラグインとTreasureDataのoutputプラグインをインストールする。

現在のインストール可能なプラグイン一覧
77個あるらしい。。。

$ embulk gem list embulk -r 
2015-07-29 12:34:06.823 +0900: Embulk v0.6.19

*** REMOTE GEMS ***

embulk (0.6.19)
embulk-decoder-commons-compress (0.3.0)
embulk-encoder-xz (0.1.0)
embulk-executor-mapreduce (0.1.5)
embulk-filter-column (0.1.6)
embulk-filter-eval (0.1.0)
embulk-filter-insert (1.0.0)
embulk-filter-row (0.1.3)
embulk-filter-speedometer (0.2.2)
embulk-formatter-jsonl (0.1.4)
embulk-input-apache-dummy-log (0.1.0)
embulk-input-command (0.1.2)
embulk-input-dynamodb (0.0.3)
embulk-input-filesplit (0.1.2)
embulk-input-ftp (0.1.2)
embulk-input-gcs (0.1.4)
embulk-input-hdfs (0.0.2)
embulk-input-http (0.0.6)
embulk-input-jdbc (0.6.0)
embulk-input-jira (0.0.6)
embulk-input-jstat (0.0.3)
embulk-input-marketo (0.1.0)
embulk-input-mixpanel (0.1.0)
embulk-input-mysql (0.6.0)
embulk-input-oracle (0.6.0)
embulk-input-pcapng-files (0.1.2)
embulk-input-postgresql (0.6.0)
embulk-input-random (0.0.2)
embulk-input-redis (0.1.6)
embulk-input-redshift (0.6.0)
embulk-input-remote (0.1.4)
embulk-input-riak_cs (0.2.1)
embulk-input-s3 (0.2.1)
embulk-input-sfdc (0.0.3)
embulk-input-slack-history (0.1.1)
embulk-input-sql (0.1.0)
embulk-input-sqlserver (0.6.0)
embulk-output-bigquery (0.1.7)
embulk-output-command (0.1.2)
embulk-output-elasticsearch (0.1.7)
embulk-output-gcs (0.1.1)
embulk-output-hdfs (0.1.1)
embulk-output-jdbc (0.4.1)
embulk-output-mysql (0.4.1)
embulk-output-oracle (0.4.1)
embulk-output-parquet (0.1.0)
embulk-output-postgres-json (0.2.0)
embulk-output-postgres-udf (0.1.2)
embulk-output-postgresql (0.4.1)
embulk-output-redis (0.1.6)
embulk-output-redshift (0.4.1)
embulk-output-s3 (0.1.0)
embulk-output-salesforce (0.1.2)
embulk-output-sqlite3 (0.0.1)
embulk-output-td (0.1.2)
embulk-output-vertica (0.2.2)
embulk-parser-apache-log (0.1.0)
embulk-parser-fluent-s3-log (0.0.1)
embulk-parser-jdbc-schema-csv (0.0.1)
embulk-parser-json (0.0.1)
embulk-parser-jsonl (0.0.1)
embulk-parser-pcapng (0.1.0)
embulk-parser-query_string (0.1.3)
embulk-parser-roo-excel (0.0.1)
embulk-parser-xml (0.0.3)
embulk-plugin-filter-convert (0.0.1)
embulk-plugin-filter-grep (0.0.2)
embulk-plugin-input-hbase (0.0.2 java, 0.0.1)
embulk-plugin-input-jstat (0.0.2)
embulk-plugin-input-pcapng-files (0.0.2)
embulk-plugin-input-random (0.0.2)
embulk-plugin-input-roo-excel (0.1.1)
embulk-plugin-input-sfdc-event-log-files (0.0.7)
embulk-plugin-mysql (0.0.3)
embulk-plugin-redis (0.1.5)
embulk-plugin-twitterstream (0.0.2)
embulk-plugin-vertica (0.0.2)
embulk-plugin-vim (0.0.2)
$

MySQL Inputプラグイン

$ embulk gem install embulk-input-mysql

TreasureData Outputプラグイン

$ embulk gem install embulk-output-td

環境設定

MySQL

以下のテーブル定義のデータ。

mysql> DESC datas;
+-------------+--------------+------+-----+---------+----------------+
| Field       | Type         | Null | Key | Default | Extra          |
+-------------+--------------+------+-----+---------+----------------+
| id          | mediumint(9) | NO   | PRI | NULL    | auto_increment |
| num         | bigint(20)   | YES  |     | NULL    |                |
| amount      | bigint(20)   | YES  |     | NULL    |                |
| date        | datetime     | YES  |     | NULL    |                |
+-------------+--------------+------+-----+---------+----------------+
5 rows in set (0.00 sec)

mysql> SELECT * FROM datas_dump LIMIT 1;
+--------+------------+-------------+---------------------+
| id     | num        | amount      | date                |
+--------+------------+-------------+---------------------+
| 122823 | 5529888889 | 83610957107 | 2015-07-24 20:00:32 |
+--------+------------+-------------+---------------------+
1 row in set (0.00 sec)

mysql> 

YBI

API-Keyの取得方法はTreasureData + JavaScript SDKを参照。

embulk.embulkというテーブルを作成。

$ td db:create embulk
Database 'embulk' is created.
Use 'td table:create embulk <table_name>' to create a table.
$ td table:create embulk embulk
Table 'embulk.embulk' is created.
$ 

config.ymlの作成

以下のconfigファイルを作成

in:
  type: mysql
  user: ${user}
  password: ${pass}
  database: ybi
  table: datas
  host: hostname
  select: "*"
  
out:
  type: td
  apikey: 
  endpoint: ybi.jp-east.idcfcloud.com
  use_ssl: true
  database: embulk
  table: embulk
  time_column: date

Import

Preview

$ embulk preview config.yml | head -10
2015-07-29 13:13:47.209 +0900: Embulk v0.6.19
2015-07-29 13:13:50.228 +0900 [INFO] (preview): Loaded plugin embulk-input-mysql (0.6.0)
2015-07-29 13:13:50.606 +0900 [INFO] (preview): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:13:51.629 +0900 [INFO] (preview): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:13:51.689 +0900 [INFO] (preview): SQL: SELECT id,import_num as num,volume as amount,dump_date as date FROM `datas_dump`
2015-07-29 13:13:51.694 +0900 [INFO] (preview): > 0.00 seconds
+---------+----------------+-----------------+-------------------------+
| id:long |       num:long |     amount:long |          date:timestamp |
+---------+----------------+-----------------+-------------------------+
| 122,823 |  5,529,888,889 |  83,610,957,107 | 2015-07-24 11:00:32 UTC |
$

良さげですね。

Run

問題なければRun。

$ embulk run config.yml
2015-07-29 13:15:03.761 +0900: Embulk v0.6.19
2015-07-29 13:15:16.770 +0900 [INFO] (transaction): Loaded plugin embulk-input-mysql (0.6.0)
2015-07-29 13:15:17.095 +0900 [INFO] (transaction): Loaded plugin embulk-output-td (0.1.2)
2015-07-29 13:15:17.297 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:15:19.625 +0900 [INFO] (transaction): Logging initialized @25186ms
2015-07-29 13:15:22.233 +0900 [INFO] (transaction): Duplicating date:timestamp column to 'time' column as seconds for the data partitioning
2015-07-29 13:15:22.234 +0900 [INFO] (transaction): Create bulk_import session embulk_20150729_041509_962000000
2015-07-29 13:15:22.490 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-07-29 13:15:22.505 +0900 [INFO] (task-0000): Duplicating date:timestamp column to 'time' column as seconds for the data partitioning
2015-07-29 13:15:23.861 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-07-29 13:15:24.070 +0900 [INFO] (task-0000): SQL: SELECT * FROM `datas`
2015-07-29 13:15:24.076 +0900 [INFO] (task-0000): > 0.00 seconds
2015-07-29 13:15:24.823 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-07-29 13:15:25.039 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-07-29 13:15:25.583 +0900 [INFO] (task-0000): {uploading: {rows: 1944, size: 19,988 bytes (compressed)}}
2015-07-29 13:15:26.439 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-07-29 13:15:27.017 +0900 [INFO] (transaction): Performing bulk import session 'embulk_20150729_041509_962000000'
2015-07-29 13:16:04.166 +0900 [INFO] (transaction):     job id: 1194645
2015-07-29 13:16:04.167 +0900 [INFO] (transaction): Committing bulk import session 'embulk_20150729_041509_962000000'
2015-07-29 13:16:04.167 +0900 [INFO] (transaction):     valid records: 1944
2015-07-29 13:16:04.167 +0900 [INFO] (transaction):     error records: 0
2015-07-29 13:16:04.167 +0900 [INFO] (transaction):     valid parts: 1
2015-07-29 13:16:04.167 +0900 [INFO] (transaction):     error parts: 0
2015-07-29 13:16:10.735 +0900 [INFO] (transaction): Deleting bulk import session 'embulk_20150729_041509_962000000'
2015-07-29 13:16:11.078 +0900 [INFO] (main): Committed.
2015-07-29 13:16:11.078 +0900 [INFO] (main): Next config diff: {"in":{},"out":{"last_session":"embulk_20150729_041509_962000000"}}
$

裏側ではTDのコマンドラインツールのバルクアップローダーが動いてそうですね。

確認

$ td query -w -t presto -d embulk "SELECT * FROM embulk LIMIT 1"
+--------+------+--------+-------------------------+------------+
| id     | num  | amount | date                    | time       |
+--------+------+--------+-------------------------+------------+
| 123576 | 3108 | 0      | 2015-07-27 11:00:35.000 | 1437994835 |
+--------+------+--------+-------------------------+------------+
$

ちゃんと入っていますね。

DataConnector + YBI

TDコマンドラインツールのインストール

DataConnectorは今のところTDコマンドラインツール経由でしか利用できないので以下のコマンドでインストールする。

$ gem install td

DataConnectorの詳細については以下を参照。
http://ybi-docs.idcfcloud.com/categories/data-connector

config_dataconn.ymlの作成

Embulkと同様に以下のようなconfigファイルを作成する。

in:
  type: mysql
  user: ${user}
  password: ${pass}
  database: ybi
  table: datas
  host: hostname
  select: "*"
  
out:
  mode: append

modeはappend/replaceを指定可能。

guess

以下なぜload.ymlにいったん吐き出すかわからないが、config_dataconn.ymlでフォーマットがまずいところをいい感じにフォーマッティングしてくれるのかな。

$ td connector:guess config_dataconn.yml -o load.yml
$ td connector:preview load.yml
+---------+-------------+--------------+---------------------------+
| id:long | num:long    | amount:long  | date:timestamp            |
+---------+-------------+--------------+---------------------------+
| 124870  | 5529986558  | 83611069544  | "2015-07-25 20:00:23 UTC" |

Database/Table作成

$ td db:create data_connector
$ td table:create data_connector mysql

job登録

YBIへのインポートJob登録は以下のコマンドで実行する。

$ td connector:issue load.yml --database data_connector --table mysql
Job 1202041 is queued.
Use 'td job:show 1202041' to show the status.
$

job登録後は今のところはクライアントツールでJobの状態を確認する必要がある。

$ td job:show 1202041
JobID       : 1202041
Status      : success
Type        : bulkload
Database    : data_connector
Use '-v' option to show detailed messages.                    
$

上の状態になったら完了。

確認

以下のクエリを実行してデータが入っているのかを確認。

$ td query -w -t presto -d data_connector "SELECT * FROM mysql LIMIT 1"
+--------+------------+-------------+-------------------------+------------+                                                                                
| id     | num        | amount      | date                    | time       |
+--------+------------+-------------+-------------------------+------------+
| 124870 | 5529986558 | 83611069544 | 2015-07-25 20:00:23.000 | 1437854423 |
+--------+------------+-------------+-------------------------+------------+
$

入ってますね^^

おわりに

Embulkのおかげで今後様々な今あるデータを、様々な場所に簡単に送れそうですね。
TDのoutputプラグインにはtimeformatを記載する場所がないけど、それはoutputプラグインの方で良い感じでやってくれるのかな?、今度時間があるときにでも調べてみます。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?