Fluentdのバッチ版Embulk(エンバルク)のまとめ 来週のMeetupまでに100になるか気にしている佐藤(@hiroysato)です
この記事はembulkアドベントカレンダー11日目の記事です。
2017/06/29 追記
- embulk-output-elasticsearch 4ではメモリリークがあるらしく、大きなデータをロードするとOutOfMemoryになる模様(参考)
2017/5/19 追記
現状
- Amazon Elasticsearch Serviceを使う場合embulk-output-elasticsearch_using_url を利用 参考
- elasticsearch1~5を利用する場合: embulk-output-elasticsearch 4.0以降を利用
- AzureやGCPは情報をおまちしております。
2017/3/29 追記
- elasticsearch5に対応した、embulk-output-elasticsearch0.4.0がリリースされました。
2017/1/30 追記
elasticsearch 5.0に接続するには、次のプラグインが使えるようです。
2016/11/17 追記
embulk-output-elasticsearch v0.2.Xは、elasticsearch-5.0.0には未対応です。
代わりにembulk-output-elasticsearch_rubyが使えるようです。
2016/1/27 追記
2016年1月26日にembulk-output-elasticsearch v0.2.0が出ました。
ご利用のelasticsearchのバージョンに合わせて適切なプラグインを使ってください。
Elasticsearch | embulkプラグイン |
---|---|
2.X | v0.2.0以降 |
1.X | v0.1.8 |
1.Xと2.xで互換性が無いのは、2.0になってJavaのAPIが大きく変わり、下位互換性が失われたためです。
2016/03/08 追加
embulk-output-elasticsearchはAWS ESには未対応です。
@nora96o @matetsu さんの先のツイートで書かれてる通りで、embulk-output-elasticsearchはTransport Client使ってるんですが、AWS ESで使えないので繋がらないです。。 https://t.co/5jKHbNzCdj
— oreradio (@oreradio) 2016年3月8日
以下は古いまとめです。
まとめ
- embulk-output-elasticsearch 0.1.8はElasticsearch 2.X系に接続できません。
- elasticsearchは1.Xを使いましょう
- もし諸般の事情でelasticsearch v2.Xを使いたい場合は下記の方法を試してください。
- リリースされたらこのページの情報が変わります。ストックして変更を追跡してください。
はじめに
Embulkのチュートリアルをみて、embulkからelasticsearchへデータを投入してみたがうまくいかない。なぜだとお悩みのかた、
elasticsearchはv2.Xをご利用ではないでしょうか?、
2015年12月11日現在embulk-output-elasticsearch 0.1.8はelasticsearch v2.Xには未対応です。
embulk-output-elasticsearchを利用する場合、Elasticsearch 1.7.3を使ってください。
embulk使ってみたけど動かないじゃんという方が少なくなるといいなと思い
elasticsearch v2.Xに接続した時に出るエラー
elasticsearchのサーバがv2.Xの場合に、embulk run
を実行すると次のようにエラーがでます。
これは、リリース済みのembulk-output-elasticsearchがまだv2.Xに未対応のためです。
org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:176) ~[na:na]
at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:128) ~[na:na]
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[na:na]
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[na:na]
at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[na:na]
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_31]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: java.io.StreamCorruptedException: Unsupported version: 1
at org.elasticsearch.common.io.ThrowableObjectInputStream.readStreamHeader(ThrowableObjectInputStream.java:46) ~[na:na]
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) ~[na:1.8.0_31]
at org.elasticsearch.common.io.ThrowableObjectInputStream.<init>(ThrowableObjectInputStream.java:38) ~[na:na]
at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:173) ~[na:na]
... 23 common frames omitted
embulk-output-elasticsearchを独自ビルド
どうしてもelasticsearch v2.0を試したいという方は、
githubに置いてある、embulk-output-elasticsearchを試してみてください。
githubのソースは、v2.0に対応しているようです。
(ただ不具合があるかもしれませんので、不具合があった場合はIsshueで報告をしてください)
こちらをビルドして試すことができます。(※JDKが必要です。)
次のコマンドで、elasticsearch v2.X対応のembulk-output-elasticsearchを作ることができます。
git clone https://github.com/muga/embulk-output-elasticsearch.git
cd embulk-output-elasticsearch
./gradlew gem
検証
elasticsearch導入
wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.1.0/elasticsearch-2.1.0.zip
unzip elasticsearch-2.1.0.zip
cd elasticsearch-2.1.0
./bin/elasticsearch
サンプルデータの作成
embulk example sample
embulk guess sample/example.yml -o config.yml
設定ファイルの修正
outの部分を変更します。
in:
type: file
path_prefix: /private/tmp/sample/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
#out: {type: stdout}
out:
type: elasticsearch
index: embulk
index_type: embulk
nodes:
- host: localhost
プレビュー
embulk preview config.yml
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+
データ投入
独自ビルドしたembulk-output-elasticsearchのパスを指定してembulk run
を動かします。
embulk run -I /path/to/embulk-output-elasticsearch/lib config.yml
ビルドしたパッケージをインストールする場合
次の方法でインストールできます。(バージョンが0.1.8になるので注意)
./gradlew gem
embulk gem install -l pkg/embulk-output-elasticsearch-0.1.8.gem