LoginSignup
13
13

More than 5 years have passed since last update.

PySpark 1.5.2 + Elasticsearch 2.1.0 導入手順と実行

Posted at

はじめに

  • pyspark から Elasticsearchを触りたい

環境

  • Elasticsearch 2.1.0
  • Spark 1.5.2

Spark インストール

省略します。本日Spark 1.6も出ましたが、1.5.2で。

Elasticsearch+hadoop ダウンロード

  • 2016/1/6時点では、Elasticsearch 2.1.0では、elasticsearch-hadoop-2.2.0-beta1が必要です。

公式ページからダウンロードして展開するだけ

$ wget http://download.elastic.co/hadoop/elasticsearch-hadoop-2.2.0-beta1.zip
$ unzip elasticsearch-hadoop-2.2.0-beta1.zip

pyspark + elasticsearch起動

/usr/local/share/spark/bin/pyspark --master local[4] --driver-class-path=elasticsearch-hadoop-2.2.0-beta1/dist/elasticsearch-spark_2.11-2.2.0-beta1.jar

RDD生成

>>> conf = {"es.nodes" : "XXX.XXX.XXX.XXX:[port]", "es.resource" : "[index name]/[type]"}
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat","org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)

基本操作

>>> rdd.first()
>>> rdd.count()
>>> rdd.filter(lambda s: 'aaa' in s).count()

Map / Reduce

# name別にいくつのレコードがあるかカウント
counts = rdd.map(lambda item: item[1]["name"])
counts = counts.map(lambda ip: (ip, 1))
counts = counts.reduceByKey(lambda a, b: a+b)

# 実行
>>> counts.collect()

ESへのsave

rdd.saveToEs('test/docs')

ハマったとこ

  • Elasticsearch側での、Network設定は注意。network.publish_hostが正しくないと、接続が拒否されました系のエラーが出てはまった。
<snip>
File "/usr/local/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [_nodes/http] failed; server[hostname/XXX.XXX.XXX.XXX:Ports] returned [400|Bad Request:]
<snip>

参考

Spark

Elasticsearch

13
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
13