ES-Hadoopを使ってSparkと連携できるが、Databricksでも同様にできるという事で、Azure Databricksを使ってやってみた。
ちなみにAzure DatabricksはAzure上で動くSparkのPaaS環境で、いろいろな処置を施しているので普通のSparkよりも高速だったり、付属のNotebookが使いやすかったりする、便利なサービスである。
https://azure.microsoft.com/ja-jp/services/databricks/
やることとしてはこちらに書いてあることそのままだが、一応メモとして記録しておく。
https://docs.azuredatabricks.net/spark/latest/data-sources/elasticsearch.html
Azure Databricksの環境準備に関しては、こちらの クイックスタート を見ながら実施。非常に簡単なのですぐに完成する。
ES-Hadoopのセットアップ
ES-Hadoopのパッケージは ここ から入手できる。ここではZIPを解凍してでてきたelasticsearch-spark-xx_x.xx-x.x.x.jarのファイルを使う。
Azure DatabricksのWorkspaceから [User] > [Create] > [Library]と選ぶ。
New Libraryという画面が開くのでここにさっきダウンロードしたES-HadoopのJARファイルをアップロードしてあげれば準備OK。
これで準備ができたので、ES-Hadoopを使う処理を書いていく。
データの下準備と通信確認
Scala版のDatabricks Notebookを起動してコードを記述。
最初にサンプルデータをロードするためのストレージをマウントする、ストレージはBlobストレージのアカウントとコンテナを事前に用意しておく必要がある。
dbutils.fs.mount(
source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
,mount_point = "/mnt/hol"
,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})
念のためElasticsearchのインスタンスとの疎通確認をとる。何も設定していないとPublic IPでの通信が必要になるが、仮想ネットワークの設定を施しておくと、Private IPでの通信も可能だ。ここで%shと書くとシェルを実行することができるところが結構便利。
%sh
ping -c 2 <Elasticsearch IP>
ちなみに以下のようにcurlコマンドでHTTPの通信も可能。
%sh
curl -XGET http://<Elasticsearch IP>:9200
続いて、ストレージにサンプルのデータセットをダウンロードする。これは公開されているサンプルデータのようで、フリーでダウンロードすることができる。
%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno
DatabcirksではDatabricks Filesystemというファイルシステムを使う。ファイルシステム系のコマンドは%fsマジックコマンドを使って呼び出すことができる。
%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/
Elasticsearchとの連携
ここからElasticsearchとの連携を進めていく。
マウントしたファイルシステムからダウンロードしたCSVファイルを読んでElasticsearchにロードする。このように、DataFrameを作って、writeで書き込むので比較的シンプルな感じ。
val esURL = "<Elasticsearch IP>"
val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
.mode("Overwrite")
.save("index/dogs")
データがロードされたことを確認するために、下記のコマンドでElasticsearchにクエリを投げると良い。
%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie
Sparkを使ってElasticsearchからデータを読み込む場合はreadを使う。
val reader = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)
SQLを使ってアクセスする場合は下記のようにテーブルを作成して、クエリを実行する。SQLを実行するときは%sqlマジックコマンドを使う。こういう切り替えができるのは非常に便利だ。
%sql
drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs',
'nodes'= '<Elasticsearch IP>',
'es.nodes.wan.only'='true',
'es.port'='9200',
'es.net.ssl'='false');
select weight_range as size, count(*) as number
from (
select case
when weight_low_lbs between 0 and 10 then 'toy'
when weight_low_lbs between 11 and 20 then 'small'
when weight_low_lbs between 21 and 40 then 'medium'
when weight_low_lbs between 41 and 80 then 'large'
else 'xlarge' end as weight_range
from dogs) d
group by weight_range