embulk-output-elasticsearchとelasticsearchの注意点

  • 13
    いいね
  • 0
    コメント

Fluentdのバッチ版Embulk(エンバルク)のまとめ 来週のMeetupまでに100になるか気にしている佐藤(@hiroysato)です

この記事はembulkアドベントカレンダー11日目の記事です。

2017/06/29 追記

2017/5/19 追記

現状

2017/3/29 追記

2017/1/30 追記

elasticsearch 5.0に接続するには、次のプラグインが使えるようです。

2016/11/17 追記

embulk-output-elasticsearch v0.2.Xは、elasticsearch-5.0.0には未対応です。
代わりにembulk-output-elasticsearch_rubyが使えるようです。

2016/1/27 追記

2016年1月26日にembulk-output-elasticsearch v0.2.0が出ました。
ご利用のelasticsearchのバージョンに合わせて適切なプラグインを使ってください。

Elasticsearch embulkプラグイン
2.X v0.2.0以降
1.X v0.1.8

1.Xと2.xで互換性が無いのは、2.0になってJavaのAPIが大きく変わり、下位互換性が失われたためです。

2016/03/08 追加

embulk-output-elasticsearchはAWS ESには未対応です。

以下は古いまとめです。

まとめ

  • embulk-output-elasticsearch 0.1.8はElasticsearch 2.X系に接続できません。
  • elasticsearchは1.Xを使いましょう
  • もし諸般の事情でelasticsearch v2.Xを使いたい場合は下記の方法を試してください。
  • リリースされたらこのページの情報が変わります。ストックして変更を追跡してください。

はじめに

Embulkのチュートリアルをみて、embulkからelasticsearchへデータを投入してみたがうまくいかない。なぜだとお悩みのかた、
elasticsearchはv2.Xをご利用ではないでしょうか?、

2015年12月11日現在embulk-output-elasticsearch 0.1.8はelasticsearch v2.Xには未対応です。

embulk-output-elasticsearchを利用する場合、Elasticsearch 1.7.3を使ってください。

embulk使ってみたけど動かないじゃんという方が少なくなるといいなと思い

elasticsearch v2.Xに接続した時に出るエラー

elasticsearchのサーバがv2.Xの場合に、embulk runを実行すると次のようにエラーがでます。

これは、リリース済みのembulk-output-elasticsearchがまだv2.Xに未対応のためです。

org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
    at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:176) ~[na:na]
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:128) ~[na:na]
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[na:na]
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[na:na]
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[na:na]
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[na:na]
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[na:na]
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[na:na]
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[na:na]
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[na:na]
    at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[na:na]
    at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_31]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_31]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: java.io.StreamCorruptedException: Unsupported version: 1
    at org.elasticsearch.common.io.ThrowableObjectInputStream.readStreamHeader(ThrowableObjectInputStream.java:46) ~[na:na]
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) ~[na:1.8.0_31]
    at org.elasticsearch.common.io.ThrowableObjectInputStream.<init>(ThrowableObjectInputStream.java:38) ~[na:na]
    at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:173) ~[na:na]
    ... 23 common frames omitted

embulk-output-elasticsearchを独自ビルド

どうしてもelasticsearch v2.0を試したいという方は、
githubに置いてある、embulk-output-elasticsearchを試してみてください。
githubのソースは、v2.0に対応しているようです。
(ただ不具合があるかもしれませんので、不具合があった場合はIsshueで報告をしてください)

こちらをビルドして試すことができます。(※JDKが必要です。)

次のコマンドで、elasticsearch v2.X対応のembulk-output-elasticsearchを作ることができます。

git clone https://github.com/muga/embulk-output-elasticsearch.git
cd embulk-output-elasticsearch
./gradlew gem

検証

elasticsearch導入

wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.1.0/elasticsearch-2.1.0.zip
unzip elasticsearch-2.1.0.zip 
cd elasticsearch-2.1.0
./bin/elasticsearch

サンプルデータの作成

embulk example sample
embulk guess sample/example.yml -o config.yml

設定ファイルの修正

outの部分を変更します。

in:
  type: file
  path_prefix: /private/tmp/sample/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
#out: {type: stdout}
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
    - host: localhost

プレビュー

embulk preview config.yml
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                       NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+

データ投入

独自ビルドしたembulk-output-elasticsearchのパスを指定してembulk runを動かします。

embulk run -I /path/to/embulk-output-elasticsearch/lib config.yml

ビルドしたパッケージをインストールする場合

次の方法でインストールできます。(バージョンが0.1.8になるので注意)

./gradlew gem
embulk gem install -l pkg/embulk-output-elasticsearch-0.1.8.gem
この投稿は Embulk Advent Calendar 201511日目の記事です。