はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」15日目です。これまでの2週間で、私たちはローカルからAWSへと開発環境を移行し、PythonアプリケーションがMSKクラスターで動作することを確認しました。
今日からは、いよいよ 「リアルタイムデータ分析」 というテーマに本格的に取り組みます。今回は、Webサイトのアクセスログを模倣したデータをEC2インスタンス上で生成し、それをKafkaに送信するスクリプトを作成します。これが、ストリーミングデータ分析パイプラインの最初のステップ「データ収集」となります。
1. なぜログデータをKafkaに送信するのか?
多くの企業にとって、Webサイトのアクセスログは非常に貴重な情報源です。しかし、従来のバッチ処理では、以下の課題がありました。
- 分析の遅延: ログファイルを夜間にまとめて処理するため、リアルタイムなユーザーの行動を把握できません。
- システムの複雑さ: ログファイルは増え続けるため、ファイルシステムやデータベースの管理が複雑になります。
Kafkaを使うことで、これらの課題を解決できます。
- リアルタイム処理: ログが生成された瞬間にKafkaに送信し、即座に分析できます。
- スケーラブルなデータハブ: 複数のWebサーバーから生成される膨大なログを、Kafkaが効率的に集約・管理します。
2. EC2上でWebログプロデューサーを実装
今回は、EC2インスタンス上で動かすための、より実践的なプロデューサーのスクリプトを作成します。このスクリプトは、Webサイトのアクセスログを模倣したJSONデータを連続的に生成し、Kafkaに送信します。
EC2インスタンスにSSHで接続し、web_log_producer.py
というファイルを作成して、以下のコードを記述してください。
# web_log_producer.py
from confluent_kafka import Producer
import json
import time
import random
from datetime import datetime
import uuid
# Kafkaブローカーの設定
conf = {'bootstrap.servers': '<your-msk-bootstrap-brokers>'}
# Producerインスタンスの作成
producer = Producer(conf)
# 配信レポートコールバック
def delivery_report(err, msg):
if err is not None:
print(f"Failed to deliver message: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
# 送信先のトピック
topic = "web-logs"
# ログデータを生成して連続的に送信
def generate_log_data():
while True:
user_id = str(uuid.uuid4())
page_id = random.choice(['home', 'product_A', 'product_B', 'about', 'contact'])
ip_address = f"192.168.{random.randint(1, 255)}.{random.randint(1, 255)}"
timestamp = datetime.now().isoformat()
log_data = {
'user_id': user_id,
'timestamp': timestamp,
'event_type': 'page_view',
'page_id': page_id,
'ip_address': ip_address
}
json_data = json.dumps(log_data)
try:
producer.produce(
topic,
key=user_id.encode('utf-8'),
value=json_data.encode('utf-8'),
callback=delivery_report
)
print(f"Sent log: {json_data}")
except Exception as e:
print(f"An error occurred: {e}")
# 1秒間に約5件のログを生成
time.sleep(random.uniform(0.1, 0.3))
# スクリプトの実行
try:
generate_log_data()
except KeyboardInterrupt:
print("\nShutting down producer...")
finally:
producer.flush()
print("All messages flushed. Producer is shut down.")
コードのポイント
-
uuid.uuid4()
: ユニークなユーザーIDを生成し、実際の匿名ユーザーを模倣しています。 -
datetime.now().isoformat()
: ISO 8601形式でタイムスタンプを生成します。 -
generate_log_data()
: 無限ループでログデータを連続的に生成し続けます。これにより、ストリーミングデータの特徴である「継続的なデータの流れ」を再現しています。 -
time.sleep()
: データの生成間隔をランダムに設定し、よりリアルなトラフィックを模倣しています。
3. アプリケーションの実行と確認
ステップ1:トピックの作成
このスクリプトを実行する前に、web-logs
という新しいトピックをMSKクラスターに作成する必要があります。EC2インスタンス上で以下のコマンドを実行してください。
# MSKのブートストラップブローカーのアドレスを環境変数に設定していると仮定
# BOOTSTRAP_BROKERS=$(aws msk describe-cluster --cluster-arn <your-cluster-arn> --query 'ClusterInfo.BrokerNodeGroupInfo.ClientSubnetInfo.BrokerEndpoints' --output text)
/usr/bin/kafka-topics --create --topic web-logs --bootstrap-server $BOOTSTRAP_BROKERS
ステップ2:プロデューサーの実行
トピックが作成されたら、web_log_producer.py
を実行します。
python3 web_log_producer.py
実行すると、EC2インスタンスのコンソールに、ログデータがKafkaに送信されるたびに、その内容と配信レポートが表示されるはずです。
ステップ3:データの確認
別のSSHセッションでコンシューマーを起動し、データが正常に受信されているか確認しましょう。
# 別のターミナルでEC2に接続
ssh -i "kafka-challenge-key.pem" ec2-user@<your-ec2-public-ip>
# コンソールコンシューマーを起動
/usr/bin/kafka-console-consumer --topic web-logs --bootstrap-server $BOOTSTRAP_BROKERS
コンソールに、プロデューサーが送信したログデータがリアルタイムで表示されれば成功です。
まとめと次回予告
今日は、EC2インスタンス上でWebログデータを生成し、それをMSKクラスターに送信するリアルタイムデータ収集の仕組みを構築しました。これにより、ストリーミングデータ分析パイプラインの最も重要な入り口が完成しました。
明日からは、このKafkaに集められたデータをどう活用していくかを学んでいきます。
16日目: ストリーム処理の基本:フィルタリングと集計を実装
お楽しみに!