2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Azure DatabricksとElasticsearchの連携

Posted at

ES-Hadoopを使ってSparkと連携できるが、Databricksでも同様にできるという事で、Azure Databricksを使ってやってみた。
ちなみにAzure DatabricksはAzure上で動くSparkのPaaS環境で、いろいろな処置を施しているので普通のSparkよりも高速だったり、付属のNotebookが使いやすかったりする、便利なサービスである。
https://azure.microsoft.com/ja-jp/services/databricks/

やることとしてはこちらに書いてあることそのままだが、一応メモとして記録しておく。
https://docs.azuredatabricks.net/spark/latest/data-sources/elasticsearch.html
Azure Databricksの環境準備に関しては、こちらの クイックスタート を見ながら実施。非常に簡単なのですぐに完成する。

ES-Hadoopのセットアップ

ES-Hadoopのパッケージは ここ から入手できる。ここではZIPを解凍してでてきたelasticsearch-spark-xx_x.xx-x.x.x.jarのファイルを使う。

Azure DatabricksのWorkspaceから [User] > [Create] > [Library]と選ぶ。
library.PNG

New Libraryという画面が開くのでここにさっきダウンロードしたES-HadoopのJARファイルをアップロードしてあげれば準備OK。
CreateLibrary.PNG
これで準備ができたので、ES-Hadoopを使う処理を書いていく。

データの下準備と通信確認

Scala版のDatabricks Notebookを起動してコードを記述。

最初にサンプルデータをロードするためのストレージをマウントする、ストレージはBlobストレージのアカウントとコンテナを事前に用意しておく必要がある。

dbutils.fs.mount(
  source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
 ,mount_point = "/mnt/hol"
 ,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})

念のためElasticsearchのインスタンスとの疎通確認をとる。何も設定していないとPublic IPでの通信が必要になるが、仮想ネットワークの設定を施しておくと、Private IPでの通信も可能だ。ここで%shと書くとシェルを実行することができるところが結構便利。

%sh 
ping -c 2 <Elasticsearch IP>

ちなみに以下のようにcurlコマンドでHTTPの通信も可能。

%sh 
curl -XGET http://<Elasticsearch IP>:9200

続いて、ストレージにサンプルのデータセットをダウンロードする。これは公開されているサンプルデータのようで、フリーでダウンロードすることができる。

%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno

DatabcirksではDatabricks Filesystemというファイルシステムを使う。ファイルシステム系のコマンドは%fsマジックコマンドを使って呼び出すことができる。

%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/

Elasticsearchとの連携

ここからElasticsearchとの連携を進めていく。
マウントしたファイルシステムからダウンロードしたCSVファイルを読んでElasticsearchにロードする。このように、DataFrameを作って、writeで書き込むので比較的シンプルな感じ。

val esURL = "<Elasticsearch IP>"
val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
  .mode("Overwrite")
  .save("index/dogs")

データがロードされたことを確認するために、下記のコマンドでElasticsearchにクエリを投げると良い。

%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie

Sparkを使ってElasticsearchからデータを読み込む場合はreadを使う。

val reader = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)

SQLを使ってアクセスする場合は下記のようにテーブルを作成して、クエリを実行する。SQLを実行するときは%sqlマジックコマンドを使う。こういう切り替えができるのは非常に便利だ。

%sql
drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs', 
  'nodes'= '<Elasticsearch IP>',
  'es.nodes.wan.only'='true',
  'es.port'='9200',
  'es.net.ssl'='false');
  
select weight_range as size, count(*) as number 
from (
  select case 
    when weight_low_lbs between 0 and 10 then 'toy'
    when weight_low_lbs between 11 and 20 then 'small'
    when weight_low_lbs between 21 and 40 then 'medium'
    when weight_low_lbs between 41 and 80 then 'large'
    else 'xlarge' end as weight_range
  from dogs) d
group by weight_range
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?