LoginSignup
28
29

More than 5 years have passed since last update.

EmbulkでMySQLからElasticSearchにデータをロードしてみた

Last updated at Posted at 2015-09-06

MySQLに入っている時系列データをelasticsearchに入れて、kibanaで可視化したときのメモ.

「elasticsearch MySQL」とかでググると、JDBC river pluginを使え的な記事が出てくるのだが、どうやら
このriver pluginはdeprecated になっているようだった。

そこでembulkを使ってみたのだが、思いのほかさくっとできたのでメモ。

ざっくり外観

system.png

こんな感じのことをやる.

elasticsearch + kibana のインストール

どちらも、ダウンロードして起動プログラム・スクリプトを実行するだけですぐに動作するので詳細は本家サイトを参照.
elasticsearchはJavaで動くのでJavaのインストールが必要. kibanaはnodeで動作するが、node自体が同梱されているので気にしなくても動作する.

ダウンロードサイト

Elasticsearch
Kibana

Embulkのセットアップ

Embulkは異なる種類のデータストア間で、データをよしなにエクスポート・インポートしてくれる君みたいなもの。
Fluentdと同じく様々なデータストアに対応したPluginが公開されており、これらを利用することで、多様な組み合わせのデータストア間でデータを移動できる。

Embulk

インストール・セットアップは以下を参考にした。とても簡単。

Scheduled bulk data loading to Elasticsearch + Kibana 4 from CSV files

# Embulkをグローバルにインストール
sudo wget http://dl.embulk.org/embulk-latest.jar -O /usr/local/bin/embulk
sudo chmod +x /usr/local/bin/embulk

EmbulkにPluginをインストール

上記の状態では、mysqlやelasticsearchを扱うためのPluginは入ってないので入れる. embulk自身にpluginを管理する
コマンドがあるのでこれを使う.


# embulk pluginの確認
# embulk-input-mysql embulk-output-elasticsearchは最初は入ってない.
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4

*** LOCAL GEMS ***
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)

# mysqlのinputプラグイン、elasticsearchのoutputプラグインをインストール.
$ embulk gem install embulk-input-mysql embulk-output-elasticsearch  
...

# 確認
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4

*** LOCAL GEMS ***

embulk-input-mysql (0.6.0)
embulk-output-elasticsearch (0.1.8)
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)

Bulk Loadの実行

Embulkの実行には、Inputプラグイン、Outputプラグインの設定ファイルを記述する.
以下のようなconfigファイルを書く. 特にデータフォーマットの変換やカラムのリネームはせず、全てのデータを愚直にelasticsearchに入れる.

config.yml
in:
    type: mysql
    host: $mysql_host
    user: $user
    password: $password
    database: $database
    query: |
        SELECT * FROM $table

out:
    type: elasticsearch
    index: test
    index_type: embulk
    nodes:
    - host: $elasticsearch_host

これをEmbulkに読み込ませて実行すればO.K.


# ロードされるデータの確認.
$ embulk preview config.yml

# load 
$ embulk run config.yml 
2015-09-07 00:07:17.785 +0900: Embulk v0.7.4
2015-09-07 00:07:21.932 +0900 [INFO] (transaction): Loaded plugin embulk-input-mysql (0.6.0)
2015-09-07 00:07:22.825 +0900 [INFO] (transaction): Loaded plugin embulk-output-elasticsearch (0.1.8)
2015-09-07 00:07:22.959 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:23.846 +0900 [INFO] (transaction): [Ms. MODOK] loaded [], sites []
2015-09-07 00:07:25.364 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-09-07 00:07:25.388 +0900 [INFO] (task-0000): [Arkady Rossovich] loaded [], sites []
2015-09-07 00:07:25.625 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:25.650 +0900 [INFO] (task-0000): SQL: SELECT * FROM test

2015-09-07 00:07:25.664 +0900 [INFO] (task-0000): > 0.01 seconds
2015-09-07 00:07:26.065 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-09-07 00:07:26.174 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-09-07 00:07:26.188 +0900 [INFO] (task-0000): Execute 1000 bulk actions
2015-09-07 00:07:26.241 +0900 [INFO] (task-0000): Execute 71 bulk actions
2015-09-07 00:07:26.379 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#6]{New I/O worker #15}): 71 bulk actions succeeded
2015-09-07 00:07:26.841 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#5]{New I/O worker #14}): 1000 bulk actions succeeded
2015-09-07 00:07:26.954 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-09-07 00:07:27.047 +0900 [INFO] (main): Committed.
2015-09-07 00:07:27.048 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

以下のコマンドで、Elasticsearch側にmappingが生成されていることを確認.

curl -XGET 'http://$elastisearch_host/test?pretty'

あとはKibanaにアクセスしてごにょごにょすれば、MySQL内の時系列データをきれいなグラフで見ることができた!

Visualize_-_Kibana_4.png

メモ:Elasticsearch Pluginの中で自動でElasticsearch内のobject id(_id)を設定してくれるらしく、上記のembulk runを二回実行すると二回とも成功してMySQL内の同一のデータが二重に取り込まれてしまうよう. バッチ等で実行するときには注意が必要そう

28
29
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
28
29