はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」17日目です。昨日、Pythonを使ってKafkaからデータをフィルタリング・集計するストリーム処理の基本を学びました。
今日は、その処理したデータを永続的なストレージに保存するステップに進みます。ストリーミングデータ分析パイプラインの最終目的地の一つであるAWS S3に、処理済みのデータを保存する方法を学びましょう。
1. なぜデータをS3に保存するのか?
Kafkaはリアルタイムのデータストリームを扱うのに優れていますが、データの長期保存には向きません。また、バッチ分析や機械学習モデルの学習データとして利用する場合、データをまとめてアクセスできる場所に保存する必要があります。
AWS S3(Amazon Simple Storage Service)は、その要件を満たす最適なサービスです。
- 耐久性: 99.999999999%という非常に高い耐久性でデータを安全に保存します。
- スケーラビリティ: 無制限のデータを保存でき、データ量が増えてもパフォーマンスが低下しません。
- コスト効率: データを安価に保存できるため、長期的なデータアーカイブに適しています。
- 他のサービスとの連携: AWS Glue, Amazon Athena, SageMakerなど、多くのAWSサービスがS3をデータソースとして利用します。
2. AWS S3へのデータ保存を実装する
今回は、昨日作成したstream_processor.pyをベースに、購入イベントをフィルタリングした後、そのデータを一定時間ごとにS3にアップロードするスクリプトを作成します。
EC2インスタンスにSSHで接続し、s3_processor.pyというファイルを作成して、以下のコードを記述してください。
注意: このスクリプトを実行する前に、S3バケットを作成しておく必要があります。AWSコンソールから任意のリージョンでバケットを作成しておきましょう。また、EC2インスタンスにアタッチしたIAMロールにS3への書き込み権限(s3:PutObject)が付与されていることを確認してください。
# s3_processor.py
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
import sys
import boto3
import time
from datetime import datetime
# AWS S3クライアントの初期化
s3_client = boto3.client('s3')
s3_bucket_name = 'your-unique-s3-bucket-name' # 作成したS3バケット名に置き換える
# Kafkaコンシューマーの設定
conf = {
'bootstrap.servers': '<your-msk-bootstrap-brokers>',
'group.id': 's3-processor-group',
'auto.offset.reset': 'earliest'
}
# Consumerインスタンスの作成
consumer = Consumer(conf)
# 購読するトピック
topic = "web-logs"
# S3にアップロードするデータを一時的に保持するリスト
processed_data = []
last_upload_time = time.time()
upload_interval_sec = 60 # 60秒ごとにS3にアップロード
def upload_to_s3(data):
"""
処理したデータをS3にアップロードする関数。
"""
if not data:
return
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
s3_key = f"web-logs/purchase_events_{timestamp}.json"
# データをJSONL(JSON Lines)形式に変換
json_lines = "\n".join([json.dumps(d) for d in data])
try:
s3_client.put_object(
Bucket=s3_bucket_name,
Key=s3_key,
Body=json_lines.encode('utf-8')
)
print(f"Successfully uploaded {len(data)} records to s3://{s3_bucket_name}/{s3_key}")
except Exception as e:
print(f"Failed to upload to S3: {e}")
try:
consumer.subscribe([topic])
print("Starting S3 processor...")
while True:
msg = consumer.poll(1.0)
if msg is None:
# 受信データがない場合、アップロードのタイミングをチェック
if time.time() - last_upload_time > upload_interval_sec:
upload_to_s3(processed_data)
processed_data = [] # アップロード後、リストをクリア
last_upload_time = time.time()
continue
if msg.error():
if msg.error().code() != KafkaError.PARTITION_EOF:
raise KafkaException(msg.error())
else:
try:
data = json.loads(msg.value().decode('utf-8'))
# フィルタリング: 'purchase'イベントのみを抽出
if data.get('event_type') == 'purchase':
processed_data.append(data)
print(f"Filtered purchase event: {data}")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
print("\nShutting down processor...")
finally:
# 終了前に残っているデータをアップロード
upload_to_s3(processed_data)
consumer.close()
3. コードの解説と実行
(1) コードの解説
-
import boto3: AWS SDK for Pythonであるboto3をインポートします。これにより、PythonからAWSサービスを操作できます。pip3 install boto3でインストールが必要です。 -
s3_client = boto3.client('s3'): S3に接続するためのクライアントを作成します。EC2にアタッチしたIAMロールが自動的に認証情報を引き継ぐため、認証情報をコードに書く必要はありません。 -
processed_data: フィルタリングしたデータを一時的に保持するリストです。 -
upload_interval_sec: S3へのアップロード間隔を秒単位で設定します。今回は60秒に設定しました。 -
upload_to_s3()関数:- データをJSON Lines(JSONL)形式に変換しています。これは、1行に1つのJSONオブジェクトが記述された形式で、データ分析でよく使われます。
-
s3_client.put_object()でS3バケットにファイルをアップロードします。
-
while Trueループ内:-
consumer.poll()がNoneを返した場合、データがない状態です。このタイミングで、upload_interval_secが経過していればupload_to_s3関数を呼び出します。
-
(2) 実行方法
-
必要なライブラリをインストール:
EC2インスタンス上でboto3をインストールします。pip3 install boto3 -
S3バケットを作成:
AWSコンソールで一意の名前のS3バケットを作成します。 -
プロデューサーの実行:
別のSSHセッションで、web_log_producer.pyを起動し、Kafkaにログデータを流し続けます。python3 web_log_producer.py -
S3プロセッサーの実行:
s3_processor.pyを実行します。<your-unique-s3-bucket-name>をあなたが作成したバケット名に置き換えるのを忘れないでください。実行すると、python3 s3_processor.pypurchaseイベントが検出されるたびにログが出力され、60秒ごとにS3にファイルがアップロードされるのが確認できます。
まとめと次回予告
今日は、Pythonで処理したストリーミングデータを、長期保存に適したAWS S3に保存する方法を学びました。これにより、リアルタイム処理のデータを、後からバッチ分析や機械学習に活用できる基盤が完成しました。
明日からは、このS3に保存したデータをどうやって分析するかを学んでいきます。
18日目: 処理したデータをAmazon DynamoDBに書き込む
お楽しみに!