1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Python モジュールの confluent-kafka-python で Azure Event Hubs に Pub/Sub する方法

Last updated at Posted at 2023-09-19

Python モジュールの confluent-kafka-python で Azure Event Hubs に Pub/Sub する方法を解説します。

Azure Event Hubs に Kafka プロトコルで Python から Pub/Sub するには azure-eventhub モジュールを使われることが多いですが、 Azure Event Hubs 以外の Kafka ブローカーにも Pub/Sub できる汎用的なコードが必要になり、調べて作ってみました。

モジュールのインストール

事前準備としてモジュールのインストールを行います。

pip install confluent-kafka
pip install certifi

Apache Kafka への Pub/Sub

まずは、 Apache Kafka へ Pub/Sub する方法です。

Apache Kafka への Publish

下記の定数は、各自の環境に置き換えてください。

  • HOST
  • TOPIC
from confluent_kafka import Producer

# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'

# Apache Kafka の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-producer'
}

# Apache Kafka へ Publish
kafka = Producer(conf)
message = 'hello'
try:
    kafka.produce(topic = TOPIC, value = message)
finally:
    kafka.flush()

Apache Kafka からの Subscribe

下記の定数は、各自の環境に置き換えてください。

  • HOST
  • TOPIC
from confluent_kafka import Consumer

# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'
GROUP = 'foo'

# Apache Kafka の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-consumer'
    , 'group.id': GROUP
    , 'auto.offset.reset': 'smallest'
}

# Apache Kafka から Subscribe
kafka = Consumer(conf)
try:
    kafka.subscribe([TOPIC])
    msg_count = 0
    while True:
        msg = kafka.poll(timeout = 1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
    kafka.close()

Azure Event Hubs への Pub/Sub

次は、 Azure Event Hubs へ Pub/Sub する方法です。

Azure Event Hubs への Publish

下記の定数は、各自の環境に置き換えてください。

  • HOST
  • TOPIC
  • CONNECTION_STRING
from confluent_kafka import Producer
import certifi

# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'


# Azure Event Hubs の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-producer'
    , 'security.protocol': 'SASL_SSL'
    , 'sasl.mechanism': 'PLAIN'
    , 'sasl.username': '$ConnectionString'
    , 'sasl.password': CONNECTION_STRING
    , 'ssl.ca.location': certifi.where()
}

# Azure Event Hubs へ Publish
kafka = Producer(conf)
message = 'hello'
try:
    kafka.produce(topic = TOPIC, value = message)
finally:
    kafka.flush()

Azure Event Hubs からの Subscribe

下記の定数は、各自の環境に置き換えてください。

  • HOST
  • TOPIC
  • CONNECTION_STRING
  • GROUP
from confluent_kafka import Consumer
import certifi

# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'
GROUP = 'foo'

# Azure Event Hubs の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-consumer'
    , 'security.protocol': 'SASL_SSL'
    , 'sasl.mechanism': 'PLAIN'
    , 'sasl.username': '$ConnectionString'
    , 'sasl.password': CONNECTION_STRING
    , 'ssl.ca.location': certifi.where()
    , 'group.id': GROUP
    , 'auto.offset.reset': 'smallest'
}

# Azure Event Hubs から Subscribe
kafka = Consumer(conf)
try:
    kafka.subscribe([TOPIC])
    msg_count = 0
    while True:
        msg = kafka.poll(timeout = 1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
    kafka.close()

Azure Event Hubs の接続文字列の確認方法

Azure Event Hubs の 接続文字列 は、 Azure Portal から確認できます。

  1.  共有アクセスポリシー をクリックします。
  2.  [共有アクセスポリシー名] をクリックします。(※デフォルトは RootManageSharedAccessKey です)
  3.  接続文字列 – 主キー が Azure Event Hubs に接続する際に必要な 接続文字列 になります。

Qiita_azure_shared-acces-key.png

掲載されているスクリーンショットの各キーなどは既に利用できないものになりますが、分かりやすさのために隠さずに掲載しています。

参考文献

下記のサイトを参考にさせて頂きました。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?