pyparkのeventhgubsフォーマットを使ってAzure Eventhubsからデータを取得してみたいと思います。
Microsoftが提供しているOSSのsparkでのプラグインとして azure-eventhubs-spark
を使用します。
1.pysparkのインストール
!pip install pyspark
2.SparkSessionの作成
SparkSessionを明示的に作成します。その際に、azure-eventhubs-spark
プラグイン(jar)をダウンロードして使用するように指定します。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("EventhubSpakSession") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22') \
.getOrCreate()
以下のコマンドでjarがダウンロードされてsparkで使用できるよう配置されているか確認できます。
print(spark.sparkContext.getConf().get("spark.jars"))
# => file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.1.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-eventhubs-spark_2.12-2.3.22.jar,file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.1.jar,file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-3.4.1.jar,file:///root/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.0.jar,file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.11.1.jar,file:///root/.ivy2/jars/org.apache.hadoop_hadoop-client-runtime-3.3.4.jar,file:///root/.ivy2/jars/org.lz4_lz4-java-1.8.0.jar,file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.10.3.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-2.0.7.jar,file:///root/.ivy2/jars/org.apache.hadoop_hadoop-client-api-3.3.4.jar,file:///root/.ivy2/jars/commons-logging_commons-logging-1.1.3.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-eventhubs-3.3.0.jar,file:///root/.ivy2/jars/org.scala-lang.modules_scala-java8-compat_2.12-0.9.0.jar,file:///root/.ivy2/jars/org.apache.qpid_proton-j-0.33.8.jar,file:///root/.ivy2/jars/com.microsoft.azure_qpid-proton-j-extensions-1.2.4.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-client-authentication-1.7.3.jar,file:///root/.ivy2/jars/com.nimbusds_nimbus-jose-jwt-9.8.1.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-client-runtime-1.7.3.jar,file:///root/.ivy2/jars/commons-codec_commons-codec-1.11.jar,file:///root/.ivy2/jars/com.microsoft.azure_adal4j-1.6.4.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-annotations-1.10.0.jar,file:///root/.ivy2/jars/com.microsoft.rest_client-runtime-1.7.3.jar,file:///root/.ivy2/jars/com.google.guava_guava-24.1.1-jre.jar,file:///root/.ivy2/jars/com.squareup.retrofit2_retrofit-2.7.2.jar,file:///root/.ivy2/jars/com.squareup.okhttp3_okhttp-3.12.6.jar,file:///root/.ivy2/jars/com.squareup.okhttp3_logging-interceptor-3.12.2.jar,file:///root/.ivy2/jars/com.squareup.okhttp3_okhttp-urlconnection-3.12.2.jar,file:///root/.ivy2/jars/com.squareup.retrofit2_converter-jackson-2.7.2.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.datatype_jackson-datatype-joda-2.10.1.jar,file:///root/.ivy2/jars/org.apache.commons_commons-lang3-3.4.jar,file:///root/.ivy2/jars/io.reactivex_rxjava-1.3.8.jar,file:///root/.ivy2/jars/com.squareup.retrofit2_adapter-rxjava-2.7.2.jar,file:///root/.ivy2/jars/org.checkerframework_checker-compat-qual-2.0.0.jar,file:///root/.ivy2/jars/com.google.errorprone_error_prone_annotations-2.1.3.jar,file:///root/.ivy2/jars/com.google.j2objc_j2objc-annotations-1.1.jar,file:///root/.ivy2/jars/org.codehaus.mojo_animal-sniffer-annotations-1.14.jar,file:///root/.ivy2/jars/com.squareup.okio_okio-1.15.0.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-databind-2.10.1.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-annotations-2.10.1.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-core-2.10.1.jar,file:///root/.ivy2/jars/joda-time_joda-time-2.9.9.jar,file:///root/.ivy2/jars/com.nimbusds_oauth2-oidc-sdk-6.5.jar,file:///root/.ivy2/jars/com.google.code.gson_gson-2.8.0.jar,file:///root/.ivy2/jars/com.sun.mail_javax.mail-1.6.1.jar,file:///root/.ivy2/jars/com.github.stephenc.jcip_jcip-annotations-1.0-1.jar,file:///root/.ivy2/jars/net.minidev_json-smart-2.3.jar,file:///root/.ivy2/jars/com.nimbusds_lang-tag-1.7.jar,file:///root/.ivy2/jars/javax.activation_activation-1.1.jar,file:///root/.ivy2/jars/net.minidev_accessors-smart-1.2.jar,file:///root/.ivy2/jars/org.ow2.asm_asm-5.0.4.jar
4: Azure Eventhubsへの接続文字列の用意
以下のコードをノートブックに追加し、接続文字列、Event Hubの名前などを適切に置き換えます。EntityPathを接続文字列に明示的に指定するのがポイントです。
# Event Hubs configuration
# Event Hubs 名前空間名
EH_NAMESPACE = "eventhubs-namespace"
# Event Hubs インスタンス名
EH_NAME = "eventhub-name"
EH_CONN_SHARED_ACCESS_KEY_NAME = "RootManageSharedAccessKey"
# Event Hubs 名前空間の 設定 > 共有アクセスポリシーのプライマリーキー
EH_CONN_SHARED_ACCESS_KEY_VALUE = "AbCdEfGhiJkLmNoPqRsT/uVwXyZaBcDe+FgH1a2b3c="
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE};EntityPath={EH_NAME}"
5: eventhubsフォーマットを使用したpysparkでの接続
以下のコマンドで10秒間接続した結果をdfに格納できます。eventhubsStreamというテーブル名でメモリ上に保存しています。
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
ehConf = {
'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EH_CONN_STR)
}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
import time
# データをメモリに保存する
query = df.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("eventhubsStream") \
.start()
query.awaitTermination(10)
query.stop()
dfの中身は以下のように取得できます。
spark.sql("SELECT * FROM eventhubsStream").show()
+--------------------+---------+------+--------------+--------------------+---------+------------+----------+--------------------+
| body|partition|offset|sequenceNumber| enqueuedTime|publisher|partitionKey|properties| systemProperties|
+--------------------+---------+------+--------------+--------------------+---------+------------+----------+--------------------+
|[61 61 61 20 62 6...| 0| 24592| 313|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 24672| 314|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 24752| 315|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 24832| 316|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 24912| 317|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 24992| 318|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 25072| 319|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 25152| 320|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...| 0| 25232| 321|2024-09-14 08:59:...| NULL| NULL| {}|{x-opt-sequence-n...|
+--------------------+---------+------+--------------+--------------------+---------+------------+----------+--------------------+
bodyはバイナリデータとなっていますが、取得できているのがわかります。dfのparseを実施することで任意の入力形式に合わせて変換することができます。
まとめ
この記事では、PySpark を使用して Azure Event Hubs から eventhubs フォーマットでデータを取得する方法を解説しました。