EmbulkDay 11

embulk-output-elasticsearchとelasticsearchの注意点

More than 1 year has passed since last update.

Fluentdのバッチ版Embulk(エンバルク)のまとめ 来週のMeetupまでに100になるか気にしている佐藤(@hiroysato)です

この記事はembulkアドベントカレンダー11日目の記事です。


2017/06/29 追記


2017/5/19 追記

現状


2017/3/29 追記


2017/1/30 追記

elasticsearch 5.0に接続するには、次のプラグインが使えるようです。


2016/11/17 追記

embulk-output-elasticsearch v0.2.Xは、elasticsearch-5.0.0には未対応です。

代わりにembulk-output-elasticsearch_rubyが使えるようです。


2016/1/27 追記

2016年1月26日にembulk-output-elasticsearch v0.2.0が出ました。

ご利用のelasticsearchのバージョンに合わせて適切なプラグインを使ってください。

Elasticsearch
embulkプラグイン

2.X
v0.2.0以降

1.X
v0.1.8

1.Xと2.xで互換性が無いのは、2.0になってJavaのAPIが大きく変わり、下位互換性が失われたためです。


2016/03/08 追加

embulk-output-elasticsearchはAWS ESには未対応です。

以下は古いまとめです。


まとめ


  • embulk-output-elasticsearch 0.1.8はElasticsearch 2.X系に接続できません。

  • elasticsearchは1.Xを使いましょう

  • もし諸般の事情でelasticsearch v2.Xを使いたい場合は下記の方法を試してください。

  • リリースされたらこのページの情報が変わります。ストックして変更を追跡してください。


はじめに

Embulkのチュートリアルをみて、embulkからelasticsearchへデータを投入してみたがうまくいかない。なぜだとお悩みのかた、

elasticsearchはv2.Xをご利用ではないでしょうか?、

2015年12月11日現在embulk-output-elasticsearch 0.1.8はelasticsearch v2.Xには未対応です。

embulk-output-elasticsearchを利用する場合、Elasticsearch 1.7.3を使ってください。

embulk使ってみたけど動かないじゃんという方が少なくなるといいなと思い


elasticsearch v2.Xに接続した時に出るエラー

elasticsearchのサーバがv2.Xの場合に、embulk runを実行すると次のようにエラーがでます。

これは、リリース済みのembulk-output-elasticsearchがまだv2.Xに未対応のためです。

org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream

Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:176) ~[na:na]
at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:128) ~[na:na]
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[na:na]
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[na:na]
at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[na:na]
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_31]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: java.io.StreamCorruptedException: Unsupported version: 1
at org.elasticsearch.common.io.ThrowableObjectInputStream.readStreamHeader(ThrowableObjectInputStream.java:46) ~[na:na]
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) ~[na:1.8.0_31]
at org.elasticsearch.common.io.ThrowableObjectInputStream.<init>(ThrowableObjectInputStream.java:38) ~[na:na]
at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:173) ~[na:na]
... 23 common frames omitted


embulk-output-elasticsearchを独自ビルド

どうしてもelasticsearch v2.0を試したいという方は、

githubに置いてある、embulk-output-elasticsearchを試してみてください。

githubのソースは、v2.0に対応しているようです。

(ただ不具合があるかもしれませんので、不具合があった場合はIsshueで報告をしてください)

こちらをビルドして試すことができます。(※JDKが必要です。)

次のコマンドで、elasticsearch v2.X対応のembulk-output-elasticsearchを作ることができます。

git clone https://github.com/muga/embulk-output-elasticsearch.git

cd embulk-output-elasticsearch
./gradlew gem


検証


elasticsearch導入

wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.1.0/elasticsearch-2.1.0.zip

unzip elasticsearch-2.1.0.zip
cd elasticsearch-2.1.0
./bin/elasticsearch


サンプルデータの作成

embulk example sample

embulk guess sample/example.yml -o config.yml


設定ファイルの修正

outの部分を変更します。

in:

type: file
path_prefix: /private/tmp/sample/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
#out: {type: stdout}
out:
type: elasticsearch
index: embulk
index_type: embulk
nodes:
- host: localhost


プレビュー

embulk preview config.yml

+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+


データ投入

独自ビルドしたembulk-output-elasticsearchのパスを指定してembulk runを動かします。

embulk run -I /path/to/embulk-output-elasticsearch/lib config.yml


ビルドしたパッケージをインストールする場合

次の方法でインストールできます。(バージョンが0.1.8になるので注意)

./gradlew gem

embulk gem install -l pkg/embulk-output-elasticsearch-0.1.8.gem