Posted at

Azure DatabricksとElasticsearchの連携

ES-Hadoopを使ってSparkと連携できるが、Databricksでも同様にできるという事で、Azure Databricksを使ってやってみた。

ちなみにAzure DatabricksはAzure上で動くSparkのPaaS環境で、いろいろな処置を施しているので普通のSparkよりも高速だったり、付属のNotebookが使いやすかったりする、便利なサービスである。

https://azure.microsoft.com/ja-jp/services/databricks/

やることとしてはこちらに書いてあることそのままだが、一応メモとして記録しておく。

https://docs.azuredatabricks.net/spark/latest/data-sources/elasticsearch.html

Azure Databricksの環境準備に関しては、こちらの クイックスタート を見ながら実施。非常に簡単なのですぐに完成する。


ES-Hadoopのセットアップ

ES-Hadoopのパッケージは ここ から入手できる。ここではZIPを解凍してでてきたelasticsearch-spark-xx_x.xx-x.x.x.jarのファイルを使う。

Azure DatabricksのWorkspaceから [User] > [Create] > [Library]と選ぶ。

library.PNG

New Libraryという画面が開くのでここにさっきダウンロードしたES-HadoopのJARファイルをアップロードしてあげれば準備OK。

CreateLibrary.PNG

これで準備ができたので、ES-Hadoopを使う処理を書いていく。


データの下準備と通信確認

Scala版のDatabricks Notebookを起動してコードを記述。

最初にサンプルデータをロードするためのストレージをマウントする、ストレージはBlobストレージのアカウントとコンテナを事前に用意しておく必要がある。

dbutils.fs.mount(

source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
,mount_point = "/mnt/hol"
,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})

念のためElasticsearchのインスタンスとの疎通確認をとる。何も設定していないとPublic IPでの通信が必要になるが、仮想ネットワークの設定を施しておくと、Private IPでの通信も可能だ。ここで%shと書くとシェルを実行することができるところが結構便利。

%sh 

ping -c 2 <Elasticsearch IP>

ちなみに以下のようにcurlコマンドでHTTPの通信も可能。

%sh 

curl -XGET http://<Elasticsearch IP>:9200

続いて、ストレージにサンプルのデータセットをダウンロードする。これは公開されているサンプルデータのようで、フリーでダウンロードすることができる。

%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno

DatabcirksではDatabricks Filesystemというファイルシステムを使う。ファイルシステム系のコマンドは%fsマジックコマンドを使って呼び出すことができる。

%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/


Elasticsearchとの連携

ここからElasticsearchとの連携を進めていく。

マウントしたファイルシステムからダウンロードしたCSVファイルを読んでElasticsearchにロードする。このように、DataFrameを作って、writeで書き込むので比較的シンプルな感じ。

val esURL = "<Elasticsearch IP>"

val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
.mode("Overwrite")
.save("index/dogs")

データがロードされたことを確認するために、下記のコマンドでElasticsearchにクエリを投げると良い。

%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie

Sparkを使ってElasticsearchからデータを読み込む場合はreadを使う。

val reader = spark.read

.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)

SQLを使ってアクセスする場合は下記のようにテーブルを作成して、クエリを実行する。SQLを実行するときは%sqlマジックコマンドを使う。こういう切り替えができるのは非常に便利だ。

%sql

drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs',
'nodes'= '<Elasticsearch IP>',
'es.nodes.wan.only'='true',
'es.port'='9200',
'es.net.ssl'='false');

select weight_range as size, count(*) as number
from (
select case
when weight_low_lbs between 0 and 10 then 'toy'
when weight_low_lbs between 11 and 20 then 'small'
when weight_low_lbs between 21 and 40 then 'medium'
when weight_low_lbs between 41 and 80 then 'large'
else 'xlarge' end as weight_range
from dogs) d
group by weight_range