Edited at

KafkaからのデータをStructured Streamingで処理してElasticsearchに流す

今度はApache Kafkaからのストリームデータを取得してSparkのStructured Streamingで処理してElasticsearchに流すところを試してみる。

ES-HadoopをStructured Streamingと一緒に使う方法はここの ドキュメント に書いてある。

Sparkを使おうと思った時に、自分で環境を用意するのが大変なのと、作った後に必要な機能を使うための設定をしたり管理したりするのがつらいので、個人的にはAzure Databricksを使うのが今のところ一番楽だと思う。

この図を見るといろいろできそうな感じがわかる。そのうえでやることといえばクラスタを立ち上げてNotebookを開いてコードやクエリを書くくらいだから非常に楽である。

azure-databricks-overview.png

(https://docs.microsoft.com/ja-jp/azure/azure-databricks/what-is-azure-databricks より)

前回の記事 の方法に従ってES-Hadoopのライブラリをロードしてあることと、KafkaやElasticsearch環境は作成済みであることを前提とする。


Kafkaへの接続をセットアップ

kafkaへの接続定義は以下のようにしておこなう。

val kafkaDF = (

spark
.readStream
.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "YOUR_TOPIC1,YOUR_TOPIC2")
.option("startingOffsets", "latest")
.format("kafka")
.load()
)

これで接続の準備完了。データは下記のようにして普通に確認できる。

display(kafkaDF)


スキーマの定義

明示的にスキーマを定義する。下記は手持ちのサンプルデータをもとに実行しているが、データに合わせて変更が必要な部分。

import org.apache.spark.sql.types._

val schema = new StructType()
.add("orderID", IntegerType)
.add("productID", IntegerType)
.add("orderTimestamp", TimestampType)
.add("orderQty", IntegerType)


KafkaデータをDataFrameに読み込む

さっき定義したスキーマを当てはめてDataFrameに読み込む。

import org.apache.spark.sql.functions._

val df = kafkaDF.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, java.sql.Timestamp)]
.select(from_json($"value", schema).as("order"), $"timestamp")
.select("order.*", "timestamp")


パーティションの設定

shuffle後のパーティションを設定するには以下のように実行する。以下は8に設定している例。

spark.conf.set("spark.sql.shuffle.partitions", 8)


データを集計する

集計データのストリームを書き出す際には、time windowの定義が必要になる。ここで設定しないと後ほどwriteStreamができない。

import org.apache.spark.sql.functions._

val aggregatedDF = df.withWatermark("timestamp", "1 minutes").groupBy($"productID", window($"timestamp", "1 minutes")).count()


チェックポイントディレクトリの作成

オフセットやコミットログを書き込むためのディレクトリを作成する。

%fs mkdirs /tmp/es


Elasticsearchにデータを書き込む

writeStreamはここで流しっぱなしにして、次のコードを実行していく。

df.writeStream

.option("es.nodes.wan.only","true")
.option("es.net.ssl","false")
.option("es.nodes", "<Your Elasticsearch>")
.option("checkpointLocation", "/tmp/es")
.option("es.port", "<Port>")
.format("es")
.start("orders/log")


書き込まれたインデックスの確認

Elasticsearchに書き込まれたデータをcurlコマンドで確認する。ここではElasticsearchのsearchのAPIを使用している。

%sh curl http://<Your Elasticsearch>:<Port>/orders/log/_search?q=productID:869


ElasticsearchにSparkから接続してデータを取得

ストリームデータでも特に変わらず扱うことができる。

val reader = spark.read

.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port", "<Port>")
.option("es.net.ssl","false")
.option("es.nodes", "<Your Elasticsearch>")

val SQLdf = reader.load("orders/log")
display(SQLdf)


SQLでアクセスする

Elasticsearchのドキュメントに対してテーブルを作成し、クエリを実行する。

%sql

DROP TABLE IF EXISTS dcmotor;

CREATE TEMPORARY TABLE orders
USING org.elasticsearch.spark.sql
OPTIONS('resource'='orders/log',
'nodes'= '<Your Elasticsearch>',
'es.nodes.wan.only'='true',
'es.port'='<Port>',
'es.net.ssl'='false');

クエリを実行

%sql SELECT ProductID, SUM(orderQty) AS sum FROM orders GROUP BY ProductID ORDER BY sum DESC LIMIT 10;

これでElasticsearchにはストリーム処理されたデータが続々と投入されていく。Timestampデータを使ってKibanaで時系列データの可視化/モニタリングが可能になる。