Python モジュールの confluent-kafka-python
で Azure Event Hubs に Pub/Sub する方法を解説します。
Azure Event Hubs に Kafka プロトコルで Python から Pub/Sub するには azure-eventhub
モジュールを使われることが多いですが、 Azure Event Hubs 以外の Kafka ブローカーにも Pub/Sub できる汎用的なコードが必要になり、調べて作ってみました。
モジュールのインストール
事前準備としてモジュールのインストールを行います。
pip install confluent-kafka
pip install certifi
Apache Kafka への Pub/Sub
まずは、 Apache Kafka へ Pub/Sub する方法です。
Apache Kafka への Publish
下記の定数は、各自の環境に置き換えてください。
- HOST
- TOPIC
from confluent_kafka import Producer
# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'
# Apache Kafka の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-producer'
}
# Apache Kafka へ Publish
kafka = Producer(conf)
message = 'hello'
try:
kafka.produce(topic = TOPIC, value = message)
finally:
kafka.flush()
Apache Kafka からの Subscribe
下記の定数は、各自の環境に置き換えてください。
- HOST
- TOPIC
from confluent_kafka import Consumer
# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'
GROUP = 'foo'
# Apache Kafka の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-consumer'
, 'group.id': GROUP
, 'auto.offset.reset': 'smallest'
}
# Apache Kafka から Subscribe
kafka = Consumer(conf)
try:
kafka.subscribe([TOPIC])
msg_count = 0
while True:
msg = kafka.poll(timeout = 1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
kafka.close()
Azure Event Hubs への Pub/Sub
次は、 Azure Event Hubs へ Pub/Sub する方法です。
Azure Event Hubs への Publish
下記の定数は、各自の環境に置き換えてください。
- HOST
- TOPIC
- CONNECTION_STRING
from confluent_kafka import Producer
import certifi
# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'
# Azure Event Hubs の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-producer'
, 'security.protocol': 'SASL_SSL'
, 'sasl.mechanism': 'PLAIN'
, 'sasl.username': '$ConnectionString'
, 'sasl.password': CONNECTION_STRING
, 'ssl.ca.location': certifi.where()
}
# Azure Event Hubs へ Publish
kafka = Producer(conf)
message = 'hello'
try:
kafka.produce(topic = TOPIC, value = message)
finally:
kafka.flush()
Azure Event Hubs からの Subscribe
下記の定数は、各自の環境に置き換えてください。
- HOST
- TOPIC
- CONNECTION_STRING
- GROUP
from confluent_kafka import Consumer
import certifi
# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'
GROUP = 'foo'
# Azure Event Hubs の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-consumer'
, 'security.protocol': 'SASL_SSL'
, 'sasl.mechanism': 'PLAIN'
, 'sasl.username': '$ConnectionString'
, 'sasl.password': CONNECTION_STRING
, 'ssl.ca.location': certifi.where()
, 'group.id': GROUP
, 'auto.offset.reset': 'smallest'
}
# Azure Event Hubs から Subscribe
kafka = Consumer(conf)
try:
kafka.subscribe([TOPIC])
msg_count = 0
while True:
msg = kafka.poll(timeout = 1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
kafka.close()
Azure Event Hubs の接続文字列の確認方法
Azure Event Hubs の 接続文字列
は、 Azure Portal から確認できます。
-
共有アクセスポリシー
をクリックします。 -
[共有アクセスポリシー名]
をクリックします。(※デフォルトはRootManageSharedAccessKey
です) -
接続文字列 – 主キー
が Azure Event Hubs に接続する際に必要な接続文字列
になります。
掲載されているスクリーンショットの各キーなどは既に利用できないものになりますが、分かりやすさのために隠さずに掲載しています。
参考文献
下記のサイトを参考にさせて頂きました。