0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】- 15日目: リアルタイムデータ収集:EC2上でWebログをKafkaに送信する

Posted at

はじめに

「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」15日目です。これまでの2週間で、私たちはローカルからAWSへと開発環境を移行し、PythonアプリケーションがMSKクラスターで動作することを確認しました。

今日からは、いよいよ 「リアルタイムデータ分析」 というテーマに本格的に取り組みます。今回は、Webサイトのアクセスログを模倣したデータをEC2インスタンス上で生成し、それをKafkaに送信するスクリプトを作成します。これが、ストリーミングデータ分析パイプラインの最初のステップ「データ収集」となります。


1. なぜログデータをKafkaに送信するのか?

多くの企業にとって、Webサイトのアクセスログは非常に貴重な情報源です。しかし、従来のバッチ処理では、以下の課題がありました。

  • 分析の遅延: ログファイルを夜間にまとめて処理するため、リアルタイムなユーザーの行動を把握できません。
  • システムの複雑さ: ログファイルは増え続けるため、ファイルシステムやデータベースの管理が複雑になります。

Kafkaを使うことで、これらの課題を解決できます。

  • リアルタイム処理: ログが生成された瞬間にKafkaに送信し、即座に分析できます。
  • スケーラブルなデータハブ: 複数のWebサーバーから生成される膨大なログを、Kafkaが効率的に集約・管理します。

2. EC2上でWebログプロデューサーを実装

今回は、EC2インスタンス上で動かすための、より実践的なプロデューサーのスクリプトを作成します。このスクリプトは、Webサイトのアクセスログを模倣したJSONデータを連続的に生成し、Kafkaに送信します。

EC2インスタンスにSSHで接続し、web_log_producer.pyというファイルを作成して、以下のコードを記述してください。

# web_log_producer.py
from confluent_kafka import Producer
import json
import time
import random
from datetime import datetime
import uuid

# Kafkaブローカーの設定
conf = {'bootstrap.servers': '<your-msk-bootstrap-brokers>'}

# Producerインスタンスの作成
producer = Producer(conf)

# 配信レポートコールバック
def delivery_report(err, msg):
    if err is not None:
        print(f"Failed to deliver message: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

# 送信先のトピック
topic = "web-logs"

# ログデータを生成して連続的に送信
def generate_log_data():
    while True:
        user_id = str(uuid.uuid4())
        page_id = random.choice(['home', 'product_A', 'product_B', 'about', 'contact'])
        ip_address = f"192.168.{random.randint(1, 255)}.{random.randint(1, 255)}"
        timestamp = datetime.now().isoformat()
        
        log_data = {
            'user_id': user_id,
            'timestamp': timestamp,
            'event_type': 'page_view',
            'page_id': page_id,
            'ip_address': ip_address
        }
        
        json_data = json.dumps(log_data)
        
        try:
            producer.produce(
                topic,
                key=user_id.encode('utf-8'),
                value=json_data.encode('utf-8'),
                callback=delivery_report
            )
            print(f"Sent log: {json_data}")
        except Exception as e:
            print(f"An error occurred: {e}")
        
        # 1秒間に約5件のログを生成
        time.sleep(random.uniform(0.1, 0.3))

# スクリプトの実行
try:
    generate_log_data()
except KeyboardInterrupt:
    print("\nShutting down producer...")
finally:
    producer.flush()
    print("All messages flushed. Producer is shut down.")

コードのポイント

  • uuid.uuid4(): ユニークなユーザーIDを生成し、実際の匿名ユーザーを模倣しています。
  • datetime.now().isoformat(): ISO 8601形式でタイムスタンプを生成します。
  • generate_log_data(): 無限ループでログデータを連続的に生成し続けます。これにより、ストリーミングデータの特徴である「継続的なデータの流れ」を再現しています。
  • time.sleep(): データの生成間隔をランダムに設定し、よりリアルなトラフィックを模倣しています。

3. アプリケーションの実行と確認

ステップ1:トピックの作成

このスクリプトを実行する前に、web-logsという新しいトピックをMSKクラスターに作成する必要があります。EC2インスタンス上で以下のコマンドを実行してください。

# MSKのブートストラップブローカーのアドレスを環境変数に設定していると仮定
# BOOTSTRAP_BROKERS=$(aws msk describe-cluster --cluster-arn <your-cluster-arn> --query 'ClusterInfo.BrokerNodeGroupInfo.ClientSubnetInfo.BrokerEndpoints' --output text)

/usr/bin/kafka-topics --create --topic web-logs --bootstrap-server $BOOTSTRAP_BROKERS

ステップ2:プロデューサーの実行

トピックが作成されたら、web_log_producer.pyを実行します。

python3 web_log_producer.py

実行すると、EC2インスタンスのコンソールに、ログデータがKafkaに送信されるたびに、その内容と配信レポートが表示されるはずです。

ステップ3:データの確認

別のSSHセッションでコンシューマーを起動し、データが正常に受信されているか確認しましょう。

# 別のターミナルでEC2に接続
ssh -i "kafka-challenge-key.pem" ec2-user@<your-ec2-public-ip>

# コンソールコンシューマーを起動
/usr/bin/kafka-console-consumer --topic web-logs --bootstrap-server $BOOTSTRAP_BROKERS

コンソールに、プロデューサーが送信したログデータがリアルタイムで表示されれば成功です。


まとめと次回予告

今日は、EC2インスタンス上でWebログデータを生成し、それをMSKクラスターに送信するリアルタイムデータ収集の仕組みを構築しました。これにより、ストリーミングデータ分析パイプラインの最も重要な入り口が完成しました。

明日からは、このKafkaに集められたデータをどう活用していくかを学んでいきます。

16日目: ストリーム処理の基本:フィルタリングと集計を実装

お楽しみに!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?