みなさん、こんにちは!
Apache Kafka を利用可能なマネージド型のストリーミングサービスである Azure Event Hubs のセットアップ方法とストリーミングの基本的な操作についてご紹介します。
セットアップ手順
1. Event Hubs 名前空間の作成
基本タブで以下の項目を設定します。
- サブスクリプション:対象のサブスクリプション
- リソースグループ:対象のリソースグループ
- 名前空間の名前:任意の名前
- 場所:指定のリージョン
- 価格レベル:標準(Basic では Kafka がサポートされないため)
- スループットユニット:1
- 自動インフレを有効にする:なし
※スループットユニットはデータ処理能力を表す指標です。今回は検証目的のため、課金を抑えた設定にしています。
その他はデフォルトのままとします。
以上で Event Hubs の作成は完了です。
後ほど利用するため、Event Hubs 名前空間の「設定」→「共有アクセスポリシー」→「RootManageSharedAccessKey」から Primary connection string を控えておきます。
2. JRE インストール
Java 製の Kafka を実行できるよう、JRE をインストールしておきます。
$ sudo apt update
$ sudo apt install default-jre
3. Kafka インストール
Kafka のインストールを行います。
※以降の処理はローカル環境のWSL(Ubuntu 22.04.3 LTS)上で実施していますが、VM を作成して実施しても問題ありません。
はじめに Kafka をインストールします。以下のコマンドを実行します。
$ curl -OL https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
$ tar -xzf kafka_2.13-3.9.0.tgz
これで Kafka のインストールは完了です。
4. Kafka CLI 導入
Event Hubs に対してメッセージを送受信できるようにするため、以下のコマンドを実行し Kafka CLI を導入します。
$ git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
下記コマンドで kafka-cli ディレクトリに移動します。
$ cd azure-event-hubs-for-kafka/quickstart/kafka-cli
jaas.conf に先ほど取得した「Primary connection string」を書き込んで上書き保存します。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="";
};
5. 動作確認
まず、以下のように環境変数を設定しておきます。
$ export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
$ export KAFKA_INSTALL_HOME=[Kafka をインストールしたディレクトリ(例:/home/tani/kafka_2.13-3.9.0)]
Producer 接続
Producer を起動し、メッセージを送信してみます。
$ $KAFKA_INSTALL_HOME/bin/kafka-console-producer.sh --topic test-topic --broker-list <名前空間名>.servicebus.windows.net:9093 --producer.config client_common.properties
> aaa
> bbb
> ccc
トピックは事前に作成していなくても上記コマンドを実行した時点で自動的に作成されます。
Consumer 起動
別ターミナルを開き、環境変数を設定したのち Consumer を起動します。
$ $KAFKA_INSTALL_HOME/bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server <名前空間名>.servicebus.windows.net:9093 --consumer.config client_common.properties
aaa
bbb
ccc
上記により、メッセージを送受信できていることが確認できます。
Event Hubs 名前空間の概要でも送受信の記録を確認することが可能です。
追記
上記ではコマンドによるメッセージ送受信の例をご紹介しましたが、confluent-kafka-python等のライブラリを利用してプログラムからメッセージ送受信を行うことも可能です。
以下は「Primary connection string」を使って認証を行い、AdminClient
を利用してトピックの一覧を表示するPythonプログラムの例です。
from confluent_kafka.admin import AdminClient
def main():
conf = {
'bootstrap.servers': '<名前空間名>:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<Primary connection string>'
}
ac = AdminClient(conf)
cluster_metadata = ac.list_topics()
print(f"topics: {cluster_metadata.topics}")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"Error: {e}")
# 実行結果
# topics: {'test-topic': TopicMetadata(test-topic, 1 partitions)}