はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」5日目: PythonからKafkaに接続するライブラリconfluent-kafka-pythonをセットアップ!です。これまでの4日間で、Kafkaの基本概念を理解し、Dockerを使ってローカル環境で基本的な操作を試しました。
今日からはいよいよ、Pythonを使ってKafkaを操作していきます。PythonでKafkaに接続するためのライブラリ confluent-kafka-python をセットアップし、その使い方を学んでいきましょう。
1. なぜconfluent-kafka-pythonを使うのか?
PythonにはKafkaに接続するためのライブラリがいくつか存在しますが、その中でも confluent-kafka-python は最も広く使われており、信頼性が高いライブラリです。このライブラリは、Kafkaを開発したConfluent社が提供しており、以下の特徴があります。
-
高いパフォーマンス: C言語で書かれたKafkaの公式クライアントライブラリ
librdkafkaをベースにしており、高いパフォーマンスと安定性を誇ります。 - 豊富な機能: プロデューサーとコンシューマーの基本的な機能はもちろん、認証やセキュリティ設定、エラーハンドリングなど、本番環境で必要となる機能が充実しています。
2. 環境構築:仮想環境とライブラリのインストール
プロジェクトごとに環境を分離するため、venvを使ってPythonの仮想環境を構築します。これは、他のプロジェクトのライブラリとの競合を防ぐためのベストプラクティスです。
-
プロジェクトディレクトリを作成
まず、新しいプロジェクト用のディレクトリを作成し、移動します。mkdir kafka-python-challenge cd kafka-python-challenge -
仮想環境を作成・有効化
以下のコマンドで仮想環境を作成し、有効化します。python3 -m venv venv source venv/bin/activate(Windowsの場合は
venv\Scripts\activate)
ターミナルの行頭に(venv)と表示されていれば成功です。 -
confluent-kafka-pythonをインストール
仮想環境が有効な状態で、以下のコマンドを実行してライブラリをインストールします。pip install confluent-kafka
これで、PythonからKafkaに接続するための準備が整いました。
3. 簡単な接続テスト
インストールが成功したか確認するために、簡単なPythonスクリプトを書いてみましょう。
producer.pyというファイルを作成し、以下のコードを記述してください。
from confluent_kafka import Producer
import json
# Kafkaブローカーの設定
conf = {'bootstrap.servers': '127.0.0.1:9092'}
# Producerインスタンスの作成
producer = Producer(conf)
# 送信するデータ
data = {'message': 'Hello from Python!'}
json_data = json.dumps(data)
# 指定したトピックにデータを送信
topic = "test-topic"
producer.produce(topic, key="key1", value=json_data.encode('utf-8'))
# バッファ内のすべてのメッセージを送信
producer.flush()
print(f"Message sent to topic: {topic}")
コードの解説
-
from confluent_kafka import Producer:Producerクラスをインポートします。 -
conf = {'bootstrap.servers': '127.0.0.1:9092'}:接続先のKafkaブローカーのアドレスを設定します。docker-compose.ymlで設定したアドレスと同じです。 -
producer = Producer(conf):設定を使ってProducerのインスタンスを作成します。 -
producer.produce(...):produce()メソッドでデータを送信します。-
topic: 送信先のトピック名です。 -
key: データのキーを指定します。同じキーのデータは同じパーティションに書き込まれます。 -
value: 送信するデータです。Kafkaはバイトデータを扱うため、json.dumps()でJSON形式の文字列に変換し、さらに.encode('utf-8')でバイト列に変換しています。
-
-
producer.flush():バッファに溜まったデータをすべて送信します。これを実行しないと、データが送信されない場合があります。
実行方法
このスクリプトを実行する前に、DockerでKafkaコンテナが起動していることを確認してください。
docker-compose ps
コンテナが起動していることを確認したら、以下のコマンドでスクリプトを実行します。
python producer.py
Message sent to topic: test-topicと表示されれば成功です。
データの確認
4日目で使ったコンソールコンシューマーでデータが送信されているか確認してみましょう。別のターミナルでコンテナに接続し、以下のコマンドを実行してください。
docker exec -it kafka /bin/bash
kafka-console-consumer.sh --topic test-topic --bootstrap-server 127.0.0.1:9092 --from-beginning
{"message": "Hello from Python!"}というメッセージが表示されれば、PythonからKafkaへのデータ送信が成功しています。
まとめと次回予告
今日は、PythonでKafkaを操作するためのライブラリconfluent-kafka-pythonのインストールと、簡単なプロデューサーの作成方法を学びました。
これで、PythonからKafkaにデータを送信する準備が整いました。明日は、これとは逆の操作、つまりPythonでKafkaからデータを受信する方法を学んでいきます。
6日目: PythonでシンプルなProducerを実装してみよう
このままプロデューサーの実装をもう少し詳しく見ていくことにします。明日もお楽しみに!