MySQLに入っている時系列データをelasticsearchに入れて、kibanaで可視化したときのメモ.
「elasticsearch MySQL」とかでググると、JDBC river pluginを使え的な記事が出てくるのだが、どうやら
このriver pluginはdeprecated になっているようだった。
そこでembulkを使ってみたのだが、思いのほかさくっとできたのでメモ。
ざっくり外観
こんな感じのことをやる.
elasticsearch + kibana のインストール
どちらも、ダウンロードして起動プログラム・スクリプトを実行するだけですぐに動作するので詳細は本家サイトを参照.
elasticsearchはJavaで動くのでJavaのインストールが必要. kibanaはnodeで動作するが、node自体が同梱されているので気にしなくても動作する.
ダウンロードサイト
Embulkのセットアップ
Embulkは異なる種類のデータストア間で、データをよしなにエクスポート・インポートしてくれる君みたいなもの。
Fluentdと同じく様々なデータストアに対応したPluginが公開されており、これらを利用することで、多様な組み合わせのデータストア間でデータを移動できる。
インストール・セットアップは以下を参考にした。とても簡単。
Scheduled bulk data loading to Elasticsearch + Kibana 4 from CSV files
# Embulkをグローバルにインストール
sudo wget http://dl.embulk.org/embulk-latest.jar -O /usr/local/bin/embulk
sudo chmod +x /usr/local/bin/embulk
EmbulkにPluginをインストール
上記の状態では、mysqlやelasticsearchを扱うためのPluginは入ってないので入れる. embulk自身にpluginを管理する
コマンドがあるのでこれを使う.
# embulk pluginの確認
# embulk-input-mysql embulk-output-elasticsearchは最初は入ってない.
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4
*** LOCAL GEMS ***
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)
# mysqlのinputプラグイン、elasticsearchのoutputプラグインをインストール.
$ embulk gem install embulk-input-mysql embulk-output-elasticsearch
...
# 確認
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4
*** LOCAL GEMS ***
embulk-input-mysql (0.6.0)
embulk-output-elasticsearch (0.1.8)
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)
Bulk Loadの実行
Embulkの実行には、Inputプラグイン、Outputプラグインの設定ファイルを記述する.
以下のようなconfigファイルを書く. 特にデータフォーマットの変換やカラムのリネームはせず、全てのデータを愚直にelasticsearchに入れる.
in:
type: mysql
host: $mysql_host
user: $user
password: $password
database: $database
query: |
SELECT * FROM $table
out:
type: elasticsearch
index: test
index_type: embulk
nodes:
- host: $elasticsearch_host
これをEmbulkに読み込ませて実行すればO.K.
# ロードされるデータの確認.
$ embulk preview config.yml
# load
$ embulk run config.yml
2015-09-07 00:07:17.785 +0900: Embulk v0.7.4
2015-09-07 00:07:21.932 +0900 [INFO] (transaction): Loaded plugin embulk-input-mysql (0.6.0)
2015-09-07 00:07:22.825 +0900 [INFO] (transaction): Loaded plugin embulk-output-elasticsearch (0.1.8)
2015-09-07 00:07:22.959 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:23.846 +0900 [INFO] (transaction): [Ms. MODOK] loaded [], sites []
2015-09-07 00:07:25.364 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-09-07 00:07:25.388 +0900 [INFO] (task-0000): [Arkady Rossovich] loaded [], sites []
2015-09-07 00:07:25.625 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:25.650 +0900 [INFO] (task-0000): SQL: SELECT * FROM test
2015-09-07 00:07:25.664 +0900 [INFO] (task-0000): > 0.01 seconds
2015-09-07 00:07:26.065 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-09-07 00:07:26.174 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-09-07 00:07:26.188 +0900 [INFO] (task-0000): Execute 1000 bulk actions
2015-09-07 00:07:26.241 +0900 [INFO] (task-0000): Execute 71 bulk actions
2015-09-07 00:07:26.379 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#6]{New I/O worker #15}): 71 bulk actions succeeded
2015-09-07 00:07:26.841 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#5]{New I/O worker #14}): 1000 bulk actions succeeded
2015-09-07 00:07:26.954 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-09-07 00:07:27.047 +0900 [INFO] (main): Committed.
2015-09-07 00:07:27.048 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
以下のコマンドで、Elasticsearch側にmappingが生成されていることを確認.
curl -XGET 'http://$elastisearch_host/test?pretty'
あとはKibanaにアクセスしてごにょごにょすれば、MySQL内の時系列データをきれいなグラフで見ることができた!
メモ:Elasticsearch Pluginの中で自動でElasticsearch内のobject id(_id)を設定してくれるらしく、上記のembulk runを二回実行すると二回とも成功してMySQL内の同一のデータが二重に取り込まれてしまうよう. バッチ等で実行するときには注意が必要そう