0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure EventHubsのKafka通信をGoogle ColaboratoryでKafkaConsumerを使って手軽に確認する方法

Posted at

本記事では、Google Colaboratoryを使って、KafkaConsumerでAzure EventHubsのKafka通信をすぐに確認する方法を紹介します。

1: Azure EventHubs Namespaceの作成

AzureポータルでEvent Hubs 名前空間を作成します。名前空間の作成時に、価格レベルを「標準(Standard)」以上にすることでKafkaエンドポイントを有効にすることが可能です。

2: Event Hubの作成

作成した名前空間内にEvent Hubを作成します。

3: 接続文字列の取得

名前空間の"Shared access policies"から、"RootManageSharedAccessKey"ポリシーの接続文字列を取得します。

4: Google Colaboratoryノートブックの作成

Google Colaboratoryで新しいノートブックを作成します。

5: 必要なライブラリのインストール

Google Colaboratoryノートブック上で以下のコマンドを実行して、必要なライブラリをインストールします。

!pip install kafka-python

6: Kafkaコンシューマーの実装

以下のコードをノートブックに追加し、接続文字列、Event Hubの名前などを適切に置き換えます。

# Event Hubs configuration
# Event Hubs 名前空間名
EH_NAMESPACE                    = "eventhubs-namespace"
# Event Hubs インスタンス名
EH_NAME                         = "eventhub-name"


EH_CONN_SHARED_ACCESS_KEY_NAME  = "RootManageSharedAccessKey"
# Event Hubs 名前空間の 設定 > 共有アクセスポリシーのプライマリーキー
EH_CONN_SHARED_ACCESS_KEY_VALUE = "AbCdEfGhiJkLmNoPqRsT/uVwXyZaBcDe+FgH1a2b3c="


EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"

KafkaConsumerを使用した10秒間のデータ取得を以下で実施できます。

from kafka import KafkaConsumer
import pandas as pd
import json


consumer = KafkaConsumer(
   EH_NAME,
   bootstrap_servers=[f"{EH_NAMESPACE}.servicebus.windows.net:9093"],
   security_protocol="SASL_SSL",
   sasl_mechanism="PLAIN",
   sasl_plain_username="$ConnectionString",
   sasl_plain_password=EH_CONN_STR
)


# データを格納するリスト
data = []


# 10秒間データを取得
import time
start_time = time.time()
while time.time() - start_time < 10:
   for message in consumer:
       data.append(message.value)
       print(message.value)
       if time.time() - start_time >= 10:
           break


# データをPandas DataFrameに変換
df = pd.DataFrame(data)

ノートブック実行の10秒間の間にEventhubsにデータを送信すると、以下のような表示となり、動作が確認できます。

WARNING:kafka.coordinator.consumer:group_id is None: disabling auto-commit.
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'
b'aaa bbb ccc'

まとめ

この記事では、Google ColaboratoryとKafkaConsumerを使ってAzure EventHubsのKafka通信を簡単に確認する方法を紹介しました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?