1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksAdvent Calendar 2024

Day 12

Confluent の Python 公式チュートリアルを Databricks で実行してみた

Last updated at Posted at 2024-12-12

概要

Confluent のサイトにて紹介されているGetting Started with Apache Kafka and Pythonをノートブック型 python 環境である Databricks 上で実行する方法を紹介します。

image.png

引用元:Apache Kafka and Python - Getting Started Tutorial

実施手順では左側の目次に合わせて実施内容を説明します。細かい手順は紹介しないため、必要に応じて下記ページを参照してください。

  1. Introduction
  2. Prerequisites
  3. Create Project
  4. Kafka Setup
  5. Create Topic
  6. Build Producer
  7. Build Consumer
  8. Produce Events
  9. Consume Events
  10. Where next?

image.png

引用元:Introduction

実施手順

Introduction

Confluent の無償トライアルに申し込みます。Google アカウントでもうしこんだところ、すぐにアカウントが払い出されました。

image.png

引用元:無料版の Confluent をお試しください:あらゆるクラウドでお使いいただけるマネージド型 Kafka です | JP

Prerequisites

Databricks 上でシングルノードのクラスターを作成します。

image.png

Create Project

Databricks 上でconfluent-kafkaをインストールします。

%pip install confluent-kafka -q
dbutils.library.restartPython()

image.png

Kafka Setup

Confuent にて kafka 環境の作成と認証情報を取得します。

kafka 環境の作成については、ドキュメント通りの手順を実施しました。クレジットカードの入力が求められた場合には、CONFLUENTDEV1というプロモーションコードを入力することでスキップできます。

image.png

引用元:Apache Kafka and Python - Getting Started Tutorial

認証情報についてはBASICの手順を実施しました。APIのキーとシークレットが表示されますが、Download and continueを選択してダウンロードしたテキストファイルを参照することをおすすめします。接続先に関する情報を確認できるためです。

image.png

引用元:Apache Kafka and Python - Getting Started Tutorial

image.png

image.png

Databricks のノートブック上で、接続情報と認証情報を変数にセットします。シークレットなどの機微なデータは、通常であれば Databricks シークレットに格納しますが、動作確認であるため平文でセットしています。

bootstrap_servers = ""
sasl_username = ""
sasl_password = ""

image.png

Create Topic

Confluet 上でpurchasesという topic を作成します。

image.png

image.png

Build Producer

Databricks ノートブックに、ノートブック上で動作することを考慮した修正済みコードを記述します。

#!/usr/bin/env python
from random import choice
from confluent_kafka import Producer


config = {
    # User-specific properties that you must set
    'bootstrap.servers': bootstrap_servers,
    'sasl.username':     sasl_username,
    'sasl.password':     sasl_password,

    # Fixed properties
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms':   'PLAIN',
    'acks':              'all'
}

# Create Producer instance
producer = Producer(config)

# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
    if err:
        print('ERROR: Message failed delivery: {}'.format(err))
    else:
        print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
            topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

# Produce data by selecting random values from these lists.
topic = "purchases"
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

count = 0
for _ in range(10):
    user_id = choice(user_ids)
    product = choice(products)
    producer.produce(topic, product, user_id, callback=delivery_callback)
    count += 1

# Block until the messages are sent.
producer.poll(10000)
producer.flush()

image.png

記事を書く前の検証時にはコードを修正したのですが、修正しなくても動作はするようです。

if __name__ == '__main__':
    print(__name__)

image.png

Build Consumer

Databricks ノートブックに、ノートブック上で動作することを考慮した修正済みコードを記述します。

#!/usr/bin/env python
from confluent_kafka import Consumer


config = {
    # User-specific properties that you must set
    'bootstrap.servers': bootstrap_servers,
    'sasl.username':     sasl_username,
    'sasl.password':     sasl_password,

    # Fixed properties
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms':   'PLAIN',
    'group.id':          'kafka-python-getting-started',
    'auto.offset.reset': 'earliest'
}

# Create Consumer instance
consumer = Consumer(config)

# Subscribe to topic
topic = "purchases"
consumer.subscribe([topic])

# Poll for new messages from Kafka and print them.
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            # Initial message consumption may take up to
            # `session.timeout.ms` for the consumer group to
            # rebalance and start consuming
            print("Waiting...")
        elif msg.error():
            print("ERROR: %s".format(msg.error()))
        else:
            # Extract the (optional) key and value, and print.
            print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
    pass
finally:
    # Leave group and commit final offsets
    consumer.close()

image.png

Produce Events

Build Producerの手順で記述したコードを実行します。Confluent 上においてもデータが書きこまれたことを確認できます。

Produced event to topic purchases: key = jbernard     value = batteries   
Produced event to topic purchases: key = jbernard     value = book        
Produced event to topic purchases: key = awalther     value = batteries   
Produced event to topic purchases: key = eabara       value = batteries   
Produced event to topic purchases: key = jbernard     value = alarm clock 
Produced event to topic purchases: key = htanaka      value = gift card   
Produced event to topic purchases: key = htanaka      value = book        
Produced event to topic purchases: key = eabara       value = book        
Produced event to topic purchases: key = jsmith       value = book        
Produced event to topic purchases: key = awalther     value = batteries   
0

image.png

image.png

Consume Events

Build Consumerの手順で記述したコードを実行します。処理が継続実行されるため、セルの実行をとめることを忘れないようでしてください。

Consumed event from topic purchases: key = jbernard     value = batteries   
Consumed event from topic purchases: key = jbernard     value = book        
Consumed event from topic purchases: key = awalther     value = batteries   
Consumed event from topic purchases: key = eabara       value = batteries   
Consumed event from topic purchases: key = jbernard     value = alarm clock 
Consumed event from topic purchases: key = htanaka      value = gift card   
Consumed event from topic purchases: key = htanaka      value = book        
Consumed event from topic purchases: key = eabara       value = book        
Consumed event from topic purchases: key = jsmith       value = book        
Consumed event from topic purchases: key = awalther     value = batteries   
Waiting...

image.png

セルを再実行した際には、想定通りに過去に取得したデータを取得しませんでした。

image.png

Where next?

Confluent のサイトでは無償のトレーニングが動画が公開されているためおすすめです。

image.png

引用元:Learn Apache Kafka® & Flink®

Python 開発者向けのコンテンツもあります。

image.png

引用元:Introduction to Python for Kafka

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?