本記事では、Google Colaboratoryを使って、KafkaConsumerでAzure EventHubsのKafka通信をすぐに確認する方法を紹介します。
1: Azure EventHubs Namespaceの作成
AzureポータルでEvent Hubs 名前空間を作成します。名前空間の作成時に、価格レベルを「標準(Standard)」以上にすることでKafkaエンドポイントを有効にすることが可能です。
2: Event Hubの作成
作成した名前空間内にEvent Hubを作成します。
3: 接続文字列の取得
名前空間の"Shared access policies"から、"RootManageSharedAccessKey"ポリシーの接続文字列を取得します。
4: Google Colaboratoryノートブックの作成
Google Colaboratoryで新しいノートブックを作成します。
5: 必要なライブラリのインストール
Google Colaboratoryノートブック上で以下のコマンドを実行して、必要なライブラリをインストールします。
!pip install kafka-python
6: Kafkaコンシューマーの実装
以下のコードをノートブックに追加し、接続文字列、Event Hubの名前などを適切に置き換えます。
# Event Hubs configuration
# Event Hubs 名前空間名
EH_NAMESPACE = "eventhubs-namespace"
# Event Hubs インスタンス名
EH_NAME = "eventhub-name"
EH_CONN_SHARED_ACCESS_KEY_NAME = "RootManageSharedAccessKey"
# Event Hubs 名前空間の 設定 > 共有アクセスポリシーのプライマリーキー
EH_CONN_SHARED_ACCESS_KEY_VALUE = "AbCdEfGhiJkLmNoPqRsT/uVwXyZaBcDe+FgH1a2b3c="
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
KafkaConsumerを使用した10秒間のデータ取得を以下で実施できます。
from kafka import KafkaConsumer
import pandas as pd
import json
consumer = KafkaConsumer(
EH_NAME,
bootstrap_servers=[f"{EH_NAMESPACE}.servicebus.windows.net:9093"],
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username="$ConnectionString",
sasl_plain_password=EH_CONN_STR
)
# データを格納するリスト
data = []
# 10秒間データを取得
import time
start_time = time.time()
while time.time() - start_time < 10:
for message in consumer:
data.append(message.value)
print(message.value)
if time.time() - start_time >= 10:
break
# データをPandas DataFrameに変換
df = pd.DataFrame(data)
ノートブック実行の10秒間の間にEventhubsにデータを送信すると、以下のような表示となり、動作が確認できます。
WARNING:kafka.coordinator.consumer:group_id is None: disabling auto-commit.
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
まとめ
この記事では、Google ColaboratoryとKafkaConsumerを使ってAzure EventHubsのKafka通信を簡単に確認する方法を紹介しました。