Confluet の公式トレーニングコースであるApache Kafka® for Python Developers
におけるハンズオンを Databricks で実行する手順を紹介します。
引用元:Introduction to Python for Kafka
- Hands On: Setup the Exercise Environment for Confluent Cloud and Python
- Hands On: Use the Python Producer Class
- Hands on: Use the Python Consumer Class
- Hands On: Use the Python Producer Class with Schemas
- Hands On: Use the Python Consumer Class with Schemas
- Hands On: Use the Python AdminClient Class
Hands On: Setup the Exercise Environment for Confluent Cloud and Python
Sign up for Confluent Cloud
Confluent の無償トライアルに申し込みます。Google アカウントでもうしこんだところ、すぐにアカウントが払い出されました。
引用元:無料版の Confluent をお試しください:あらゆるクラウドでお使いいただけるマネージド型 Kafka です | JP
Login to the Confluent Cloud Console
Confluent 上で learn-kafka-python
という environment を作成します。
Enable Streams Governance
environment 作成時に、Stream Governance package
で作成してしまった場合には、 environment のページにて右側にあるStream Governance package
のUpgrade now
にてadvanced governance controls
Create a Cluster in the learn-kafka-python
という名称の cluster を作成します。
Create and Download Python Client Configuration Properties
Confluent のサイトでは client から API を取得する手順となっておりますが、 API Keys を取得するための手順であるためAPI Keys
からキーの生成をしました。Download and continue
を選択するとBoostrap server
Create Python Dictionary
Databricks 上にノートブックと同じディレクトリ上にconfig.py
config = {
"bootstrap.servers": "<bootstrap-server-endpoint>",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": "<CLUSTER_API_KEY>",
"sasl.password": "<CLUSTER_API_SECRET>",
Databricks のノートブック上で下記のコードを実行して想定通りの値が表示されることを確認します。
from config import config
Install Python 3.X and Related Prerequisites on Local Machine
Databricks 上でシングルノードのクラスターを作成します。
Create a New Virtual Environment with virtualenv
Install Confluent Kafka
Databricks ノートブック上で下記のコードを実行します。
%pip install confluent-kafka -q
Hands On: Use the Python Producer Class
Create Topics
Project Setup
Databricks 利用時には不要な手順であるため省略。
Add Required Imports
Databricks のノートブック上で下記コードを実行します。
from confluent_kafka import Producer
from config import config
Create callback function
Databricks のノートブック上で下記コードを実行します。
def callback(err, event):
if err:
print(f'Produce to topic {event.topic()} failed for event: {event.key()}')
val = event.value().decode('utf8')
print(f'{val} sent to partition {event.partition()}.')
Create function to produce to hello_topic
Databricks のノートブック上で下記コードを実行します。
def say_hello(producer, key):
value = f'Hello {key}!'
producer.produce('hello_topic', value, key, on_delivery=callback)
Add Main Block
Databricks のノートブック上で下記コードを記述します。
if __name__ == '__main__':
producer = Producer(config)
keys = ['Amy', 'Brenda', 'Cindy', 'Derrick', 'Elaine', 'Fred']
[say_hello(producer, key) for key in keys]
Run the Program
Databricks 上で前の手順を実行します。実行後に Confluent 上でデータが書きこまれたことを確認します。
Hands on: Use the Python Consumer Class
Use Consumer to Read Events from Kafka
Project Setup
Databricks 利用時には不要な手順であるため省略。
Add Required Imports
from confluent_kafka import Consumer, KafkaException
from config import config
Create function to update configuration
def set_consumer_configs():
config['group.id'] = 'hello_group'
config['auto.offset.reset'] = 'earliest'
config['enable.auto.commit'] = False
Create callback function for partition assignment
def assignment_callback(consumer, partitions):
for p in partitions:
print(f'Assigned to {p.topic}, partition {p.partition}')
Add Main Block
if __name__ == '__main__':
consumer = Consumer(config)
consumer.subscribe(['hello_topic'], on_assign=assignment_callback)
while True:
event = consumer.poll(1.0)
if event is None:
if event.error():
raise KafkaException(event.error())
val = event.value().decode('utf8')
partition = event.partition()
print(f'Received: {val} from partition {partition} ')
# consumer.commit(event)
except KeyboardInterrupt:
print('Canceled by user.')
Run the Consumer
Produce New Events
省略します。本手順を実施したい場合には、Hands On: Use the Python Producer Class
とHands on: Use the Python Consumer Class
%pip install confluent-kafka -q
Observe Consumer Group Rebalance Behavior
Hands On: Use the Python Producer Class with Schemas
Confluent 上でtemp_readings
という topic を作成します。
Confluent にて environment の画面に右下にあるStream Governance API
のAdd key
を選択して、 Schema Registry API の認証情報を取得します。
Databricks のノートブック上で下記のコードを実行します。
%pip install jsonschema -q
%pip install requests -q
というファイルに、 Schema Registry API に対する認証情報を追記します。
sr_config = {
'url': '<schema.registry.url>',
Project Setup
Databricks 利用時には不要な手順であるため省略。
Add Required Imports
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from config import config, sr_config
import time
Define our class and schema
Databricks のノートブック上で下記のコードを実行します。
class Temperature(object):
def __init__(self, city, reading, unit, timestamp):
self.city = city
self.reading = reading
self.unit = unit
self.timestamp = timestamp
schema_str = """{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Temperature",
"description": "Temperature sensor reading",
"type": "object",
"properties": {
"city": {
"description": "City name",
"type": "string"
"reading": {
"description": "Current temperature reading",
"type": "number"
"unit": {
"description": "Temperature unit (C/F)",
"type": "string"
"timestamp": {
"description": "Time of reading in ms since epoch",
"type": "number"
def temp_to_dict(temp, ctx):
return {"city":temp.city,
Create some test data
Databricks のノートブック上で下記のコードを実行します。
data = [Temperature('London', 12, 'C', round(time.time()*1000)),
Temperature('Chicago', 63, 'F', round(time.time()*1000)),
Temperature('Berlin', 14, 'C', round(time.time()*1000)),
Temperature('Madrid', 18, 'C', round(time.time()*1000)),
Temperature('Phoenix', 78, 'F', round(time.time()*1000))]
Create a producer callback function
Databricks のノートブック上で下記のコードを実行します。
def delivery_report(err, event):
if err is not None:
print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
print(f'Temp reading for {event.key().decode("utf8")} produced to {event.topic()}')
Add Main Block
Databricks のノートブック上で下記のコードを記述します。
if __name__ == '__main__':
topic = 'temp_readings'
schema_registry_client = SchemaRegistryClient(sr_config)
json_serializer = JSONSerializer(schema_str,
producer = Producer(config)
for temp in data:
producer.produce(topic=topic, key=str(temp.city),
SerializationContext(topic, MessageField.VALUE)),
Execute the program
前の手順のコードを実行後、 Confluent 上でデータが書きこまれたことを確認します。
Hands On: Use the Python Consumer Class with Schemas
Project Setup
Databricks 利用時には不要な手順であるため省略。
Add Required Imports
Databricks のノートブック上で下記のコードを実行します。
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from config import config
Add consumer specific configuration properties
Databricks のノートブック上で下記のコードを実行します。
def set_consumer_configs():
config['group.id'] = 'temp_group'
config['auto.offset.reset'] = 'earliest'
Define our class and schema
Databricks のノートブック上で下記のコードを実行します。
class Temperature(object):
def __init__(self, city, reading, unit, timestamp):
self.city = city
self.reading = reading
self.unit = unit
self.timestamp = timestamp
schema_str = """{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Temperature",
"description": "Temperature sensor reading",
"type": "object",
"properties": {
"city": {
"description": "City name",
"type": "string"
"reading": {
"description": "Current temperature reading",
"type": "number"
"unit": {
"description": "Temperature unit (C/F)",
"type": "string"
"timestamp": {
"description": "Time of reading in ms since epoch",
"type": "number"
def dict_to_temp(dict, ctx):
return Temperature(dict['city'], dict['reading'], dict['unit'], dict['timestamp'])
Add Main Block
Databricks のノートブック上で下記のコードを記述します。
if __name__ == '__main__':
topic = 'temp_readings'
json_deserializer = JSONDeserializer(schema_str, from_dict=dict_to_temp)
consumer = Consumer(config)
while True:
event = consumer.poll(1.0)
if event is None:
temp = json_deserializer(event.value(),
SerializationContext(topic, MessageField.VALUE))
if temp is not None:
print(f'Latest temp in {temp.city} is {temp.reading} {temp.unit}.')
except KeyboardInterrupt:
Execute the program
Hands On: Use the Python AdminClient Class
Use AdminClient to Create a Topic and Alter its Configuration
Databricks のノートブック上で下記のコードを実行します。
from confluent_kafka.admin import (AdminClient, NewTopic,
from config import config
Project Setup
Databricks 利用時には不要な手順であるため省略。
Add Required Imports
Databricks のノートブック上で下記のコードを実行します。
# return True if topic exists and False if not
def topic_exists(admin, topic):
metadata = admin.list_topics()
for t in iter(metadata.topics.values()):
if t.topic == topic:
return True
return False
Check if Kafka Topic Exists
Databricks のノートブック上で下記のコードを実行します。
# return True if topic exists and False if not
def topic_exists(admin, topic):
metadata = admin.list_topics()
for t in iter(metadata.topics.values()):
if t.topic == topic:
return True
return False
Create a New Kafka Topic
Databricks のノートブック上で下記のコードを実行します。
# create new topic and return results dictionary
def create_topic(admin, topic):
new_topic = NewTopic(topic, num_partitions=6, replication_factor=3)
result_dict = admin.create_topics([new_topic])
for topic, future in result_dict.items():
future.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
Describe the New Kafka Topic
Databricks のノートブック上で下記のコードを実行します。
# get max.message.bytes property
def get_max_size(admin, topic):
resource = ConfigResource('topic', topic)
result_dict = admin.describe_configs([resource])
config_entries = result_dict[resource].result()
max_size = config_entries['max.message.bytes']
return max_size.value
Set a Kafka Topic Configuration Property Value
Databricks のノートブック上で下記のコードを実行します。
# set max.message.bytes for topic
def set_max_size(admin, topic, max_k):
config_dict = {'max.message.bytes': str(max_k*1024)}
resource = ConfigResource('topic', topic, config_dict)
result_dict = admin.alter_configs([resource])
Add Main Block
Databricks のノートブック上で下記のコードを記述します。
if __name__ == '__main__':
# Create Admin client
admin = AdminClient(config)
topic_name = 'my_topic'
max_msg_k = 50
# Create topic if it doesn't exist
if not topic_exists(admin, topic_name):
create_topic(admin, topic_name)
# Check max.message.bytes config and set if needed
current_max = get_max_size(admin, topic_name)
if current_max != str(max_msg_k * 1024):
print(f'Topic, {topic_name} max.message.bytes is {current_max}.')
set_max_size(admin, topic_name, max_msg_k)
# Verify config was set
new_max = get_max_size(admin, topic_name)
print(f'Now max.message.bytes for topic {topic_name} is {new_max}')
Test admin.py
KafkaException: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
topic 作成後に1秒待機することで正常終了しました。
if __name__ == '__main__':
# Create Admin client
admin = AdminClient(config)
topic_name = 'my_topic'
max_msg_k = 50
# Create topic if it doesn't exist
if not topic_exists(admin, topic_name):
create_topic(admin, topic_name)
# TOPIC が作成されるまで1秒待機
# Check max.message.bytes config and set if needed
current_max = get_max_size(admin, topic_name)
if current_max != str(max_msg_k * 1024):
print(f'Topic, {topic_name} max.message.bytes is {current_max}.')
set_max_size(admin, topic_name, max_msg_k)
# Verify config was set
new_max = get_max_size(admin, topic_name)
print(f'Now max.message.bytes for topic {topic_name} is {new_max}')
Exercise Environment Teardown
environment を削除します。