概要
Confluent のサイトにて紹介されているGetting Started with Apache Kafka and Python
をノートブック型 python 環境である Databricks 上で実行する方法を紹介します。
引用元:Apache Kafka and Python - Getting Started Tutorial
実施手順では左側の目次に合わせて実施内容を説明します。細かい手順は紹介しないため、必要に応じて下記ページを参照してください。
- Introduction
- Prerequisites
- Create Project
- Kafka Setup
- Create Topic
- Build Producer
- Build Consumer
- Produce Events
- Consume Events
- Where next?
引用元:Introduction
実施手順
Introduction
Confluent の無償トライアルに申し込みます。Google アカウントでもうしこんだところ、すぐにアカウントが払い出されました。
引用元:無料版の Confluent をお試しください:あらゆるクラウドでお使いいただけるマネージド型 Kafka です | JP
Prerequisites
Databricks 上でシングルノードのクラスターを作成します。
Create Project
Databricks 上でconfluent-kafka
をインストールします。
%pip install confluent-kafka -q
dbutils.library.restartPython()
Kafka Setup
Confuent にて kafka 環境の作成と認証情報を取得します。
kafka 環境の作成については、ドキュメント通りの手順を実施しました。クレジットカードの入力が求められた場合には、CONFLUENTDEV1
というプロモーションコードを入力することでスキップできます。
引用元:Apache Kafka and Python - Getting Started Tutorial
認証情報についてはBASIC
の手順を実施しました。APIのキーとシークレットが表示されますが、Download and continue
を選択してダウンロードしたテキストファイルを参照することをおすすめします。接続先に関する情報を確認できるためです。
引用元:Apache Kafka and Python - Getting Started Tutorial
Databricks のノートブック上で、接続情報と認証情報を変数にセットします。シークレットなどの機微なデータは、通常であれば Databricks シークレットに格納しますが、動作確認であるため平文でセットしています。
bootstrap_servers = ""
sasl_username = ""
sasl_password = ""
Create Topic
Confluet 上でpurchases
という topic を作成します。
Build Producer
Databricks ノートブックに、ノートブック上で動作することを考慮した修正済みコードを記述します。
#!/usr/bin/env python
from random import choice
from confluent_kafka import Producer
config = {
# User-specific properties that you must set
'bootstrap.servers': bootstrap_servers,
'sasl.username': sasl_username,
'sasl.password': sasl_password,
# Fixed properties
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'acks': 'all'
}
# Create Producer instance
producer = Producer(config)
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
print('ERROR: Message failed delivery: {}'.format(err))
else:
print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
# Produce data by selecting random values from these lists.
topic = "purchases"
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']
count = 0
for _ in range(10):
user_id = choice(user_ids)
product = choice(products)
producer.produce(topic, product, user_id, callback=delivery_callback)
count += 1
# Block until the messages are sent.
producer.poll(10000)
producer.flush()
記事を書く前の検証時にはコードを修正したのですが、修正しなくても動作はするようです。
if __name__ == '__main__':
print(__name__)
Build Consumer
Databricks ノートブックに、ノートブック上で動作することを考慮した修正済みコードを記述します。
#!/usr/bin/env python
from confluent_kafka import Consumer
config = {
# User-specific properties that you must set
'bootstrap.servers': bootstrap_servers,
'sasl.username': sasl_username,
'sasl.password': sasl_password,
# Fixed properties
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'group.id': 'kafka-python-getting-started',
'auto.offset.reset': 'earliest'
}
# Create Consumer instance
consumer = Consumer(config)
# Subscribe to topic
topic = "purchases"
consumer.subscribe([topic])
# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print("ERROR: %s".format(msg.error()))
else:
# Extract the (optional) key and value, and print.
print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()
Produce Events
Build Producer
の手順で記述したコードを実行します。Confluent 上においてもデータが書きこまれたことを確認できます。
Produced event to topic purchases: key = jbernard value = batteries
Produced event to topic purchases: key = jbernard value = book
Produced event to topic purchases: key = awalther value = batteries
Produced event to topic purchases: key = eabara value = batteries
Produced event to topic purchases: key = jbernard value = alarm clock
Produced event to topic purchases: key = htanaka value = gift card
Produced event to topic purchases: key = htanaka value = book
Produced event to topic purchases: key = eabara value = book
Produced event to topic purchases: key = jsmith value = book
Produced event to topic purchases: key = awalther value = batteries
0
Consume Events
Build Consumer
の手順で記述したコードを実行します。処理が継続実行されるため、セルの実行をとめることを忘れないようでしてください。
Consumed event from topic purchases: key = jbernard value = batteries
Consumed event from topic purchases: key = jbernard value = book
Consumed event from topic purchases: key = awalther value = batteries
Consumed event from topic purchases: key = eabara value = batteries
Consumed event from topic purchases: key = jbernard value = alarm clock
Consumed event from topic purchases: key = htanaka value = gift card
Consumed event from topic purchases: key = htanaka value = book
Consumed event from topic purchases: key = eabara value = book
Consumed event from topic purchases: key = jsmith value = book
Consumed event from topic purchases: key = awalther value = batteries
Waiting...
セルを再実行した際には、想定通りに過去に取得したデータを取得しませんでした。
Where next?
Confluent のサイトでは無償のトレーニングが動画が公開されているためおすすめです。
引用元:Learn Apache Kafka® & Flink®
Python 開発者向けのコンテンツもあります。