0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Colaboratory上のPySpark で Azure EventHubs から eventhubs フォーマットでデータを取得する方法

Posted at

pyparkのeventhgubsフォーマットを使ってAzure Eventhubsからデータを取得してみたいと思います。
Microsoftが提供しているOSSのsparkでのプラグインとして azure-eventhubs-spark
を使用します。

1.pysparkのインストール

!pip install pyspark

2.SparkSessionの作成

SparkSessionを明示的に作成します。その際に、azure-eventhubs-spark プラグイン(jar)をダウンロードして使用するように指定します。

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName("EventhubSpakSession") \
 .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22') \
 .getOrCreate()

以下のコマンドでjarがダウンロードされてsparkで使用できるよう配置されているか確認できます。

print(spark.sparkContext.getConf().get("spark.jars"))

# => file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.1.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-eventhubs-spark_2.12-2.3.22.jar,file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.1.jar,file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-3.4.1.jar,file:///root/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.0.jar,file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.11.1.jar,file:///root/.ivy2/jars/org.apache.hadoop_hadoop-client-runtime-3.3.4.jar,file:///root/.ivy2/jars/org.lz4_lz4-java-1.8.0.jar,file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.10.3.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-2.0.7.jar,file:///root/.ivy2/jars/org.apache.hadoop_hadoop-client-api-3.3.4.jar,file:///root/.ivy2/jars/commons-logging_commons-logging-1.1.3.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-eventhubs-3.3.0.jar,file:///root/.ivy2/jars/org.scala-lang.modules_scala-java8-compat_2.12-0.9.0.jar,file:///root/.ivy2/jars/org.apache.qpid_proton-j-0.33.8.jar,file:///root/.ivy2/jars/com.microsoft.azure_qpid-proton-j-extensions-1.2.4.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-client-authentication-1.7.3.jar,file:///root/.ivy2/jars/com.nimbusds_nimbus-jose-jwt-9.8.1.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-client-runtime-1.7.3.jar,file:///root/.ivy2/jars/commons-codec_commons-codec-1.11.jar,file:///root/.ivy2/jars/com.microsoft.azure_adal4j-1.6.4.jar,file:///root/.ivy2/jars/com.microsoft.azure_azure-annotations-1.10.0.jar,file:///root/.ivy2/jars/com.microsoft.rest_client-runtime-1.7.3.jar,file:///root/.ivy2/jars/com.google.guava_guava-24.1.1-jre.jar,file:///root/.ivy2/jars/com.squareup.retrofit2_retrofit-2.7.2.jar,file:///root/.ivy2/jars/com.squareup.okhttp3_okhttp-3.12.6.jar,file:///root/.ivy2/jars/com.squareup.okhttp3_logging-interceptor-3.12.2.jar,file:///root/.ivy2/jars/com.squareup.okhttp3_okhttp-urlconnection-3.12.2.jar,file:///root/.ivy2/jars/com.squareup.retrofit2_converter-jackson-2.7.2.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.datatype_jackson-datatype-joda-2.10.1.jar,file:///root/.ivy2/jars/org.apache.commons_commons-lang3-3.4.jar,file:///root/.ivy2/jars/io.reactivex_rxjava-1.3.8.jar,file:///root/.ivy2/jars/com.squareup.retrofit2_adapter-rxjava-2.7.2.jar,file:///root/.ivy2/jars/org.checkerframework_checker-compat-qual-2.0.0.jar,file:///root/.ivy2/jars/com.google.errorprone_error_prone_annotations-2.1.3.jar,file:///root/.ivy2/jars/com.google.j2objc_j2objc-annotations-1.1.jar,file:///root/.ivy2/jars/org.codehaus.mojo_animal-sniffer-annotations-1.14.jar,file:///root/.ivy2/jars/com.squareup.okio_okio-1.15.0.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-databind-2.10.1.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-annotations-2.10.1.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-core-2.10.1.jar,file:///root/.ivy2/jars/joda-time_joda-time-2.9.9.jar,file:///root/.ivy2/jars/com.nimbusds_oauth2-oidc-sdk-6.5.jar,file:///root/.ivy2/jars/com.google.code.gson_gson-2.8.0.jar,file:///root/.ivy2/jars/com.sun.mail_javax.mail-1.6.1.jar,file:///root/.ivy2/jars/com.github.stephenc.jcip_jcip-annotations-1.0-1.jar,file:///root/.ivy2/jars/net.minidev_json-smart-2.3.jar,file:///root/.ivy2/jars/com.nimbusds_lang-tag-1.7.jar,file:///root/.ivy2/jars/javax.activation_activation-1.1.jar,file:///root/.ivy2/jars/net.minidev_accessors-smart-1.2.jar,file:///root/.ivy2/jars/org.ow2.asm_asm-5.0.4.jar

4: Azure Eventhubsへの接続文字列の用意

以下のコードをノートブックに追加し、接続文字列、Event Hubの名前などを適切に置き換えます。EntityPathを接続文字列に明示的に指定するのがポイントです。

# Event Hubs configuration
# Event Hubs 名前空間名
EH_NAMESPACE                    = "eventhubs-namespace"
# Event Hubs インスタンス名
EH_NAME                         = "eventhub-name"


EH_CONN_SHARED_ACCESS_KEY_NAME  = "RootManageSharedAccessKey"
# Event Hubs 名前空間の 設定 > 共有アクセスポリシーのプライマリーキー
EH_CONN_SHARED_ACCESS_KEY_VALUE = "AbCdEfGhiJkLmNoPqRsT/uVwXyZaBcDe+FgH1a2b3c="


EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE};EntityPath={EH_NAME}"

5: eventhubsフォーマットを使用したpysparkでの接続

以下のコマンドで10秒間接続した結果をdfに格納できます。eventhubsStreamというテーブル名でメモリ上に保存しています。

from pyspark import SparkContext
sc = SparkContext.getOrCreate();


ehConf = {
 'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EH_CONN_STR)
}


df = spark \
 .readStream \
 .format("eventhubs") \
 .options(**ehConf) \
 .load()


import time
# データをメモリに保存する
query = df.writeStream \
   .outputMode("append") \
   .format("memory") \
   .queryName("eventhubsStream") \
   .start()
query.awaitTermination(10)


query.stop()

dfの中身は以下のように取得できます。

spark.sql("SELECT * FROM eventhubsStream").show()
+--------------------+---------+------+--------------+--------------------+---------+------------+----------+--------------------+
|                body|partition|offset|sequenceNumber|        enqueuedTime|publisher|partitionKey|properties|    systemProperties|
+--------------------+---------+------+--------------+--------------------+---------+------------+----------+--------------------+
|[61 61 61 20 62 6...|        0| 24592|           313|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 24672|           314|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 24752|           315|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 24832|           316|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 24912|           317|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 24992|           318|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 25072|           319|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 25152|           320|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
|[61 61 61 20 62 6...|        0| 25232|           321|2024-09-14 08:59:...|     NULL|        NULL|        {}|{x-opt-sequence-n...|
+--------------------+---------+------+--------------+--------------------+---------+------------+----------+--------------------+

bodyはバイナリデータとなっていますが、取得できているのがわかります。dfのparseを実施することで任意の入力形式に合わせて変換することができます。

まとめ

この記事では、PySpark を使用して Azure Event Hubs から eventhubs フォーマットでデータを取得する方法を解説しました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?