はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」18日目です。前回は、処理したデータを長期保存に適したAWS S3に保存する方法を学びました。
今日は、そのデータをさらに活用し、Amazon DynamoDBに書き込む方法を学びます。DynamoDBは、高速な読み書きが求められるリアルタイムアプリケーションに最適なデータベースです。
1. なぜデータをDynamoDBに書き込むのか?
S3は長期保存やバッチ分析には優れていますが、個別のデータに対する高速な読み書きには向きません。一方で、DynamoDBは以下のような特徴を持っています。
- 高速なパフォーマンス: ミリ秒単位のレイテンシでデータを読み書きできるため、リアルタイムなユーザーの問い合わせやアプリケーションのデータストアとして最適です。
- フルマネージド: サーバーの運用やパッチ適用はAWSがすべて行います。
- スケーラビリティ: データ量やリクエスト数に応じて、自動的かつシームレスにスケールします。
例えば、Webサイトのリアルタイムなユーザー行動を追跡し、パーソナライズされたコンテンツを提供する場合、S3にデータを保存しつつ、最新のユーザー情報をDynamoDBに書き込むというハイブリッドな構成が一般的です。
2. DynamoDBへのデータ書き込みを実装する
今回は、Webログからフィルタリングした購入イベントを、DynamoDBテーブルに書き込むスクリプトを作成します。
EC2インスタンスにSSHで接続し、dynamodb_processor.pyというファイルを作成して、以下のコードを記述してください。
注意: このスクリプトを実行する前に、DynamoDBテーブルを作成しておく必要があります。AWSコンソールから、purchase_eventsという名前でテーブルを作成しておきましょう。この際、プライマリキーとしてuser_idを設定してください。また、EC2インスタンスにアタッチしたIAMロールにDynamoDBへの書き込み権限(dynamodb:PutItem)が付与されていることを確認してください。
# dynamodb_processor.py
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
import sys
import boto3
# AWS DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('purchase_events') # 作成したDynamoDBテーブル名に置き換える
# Kafkaコンシューマーの設定
conf = {
'bootstrap.servers': '<your-msk-bootstrap-brokers>',
'group.id': 'dynamodb-processor-group',
'auto.offset.reset': 'earliest'
}
# Consumerインスタンスの作成
consumer = Consumer(conf)
# 購読するトピック
topic = "web-logs"
def put_item_to_dynamodb(item):
"""
DynamoDBにデータを書き込む関数
"""
try:
# DynamoDBにデータを挿入
table.put_item(Item=item)
print(f"Successfully wrote item to DynamoDB: {item}")
except Exception as e:
print(f"Failed to write to DynamoDB: {e}")
try:
consumer.subscribe([topic])
print("Starting DynamoDB processor...")
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError.PARTITION_EOF:
raise KafkaException(msg.error())
else:
try:
data = json.loads(msg.value().decode('utf-8'))
# フィルタリング: 'purchase'イベントのみを抽出
if data.get('event_type') == 'purchase':
# DynamoDBに書き込むデータ形式に変換
item = {
'user_id': data['user_id'],
'timestamp': data['timestamp'],
'page_id': data['page_id'],
'ip_address': data['ip_address']
}
put_item_to_dynamodb(item)
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
print("\nShutting down processor...")
finally:
consumer.close()
3. コードの解説と実行
(1) コードの解説
-
boto3.resource('dynamodb'): DynamoDBに接続するためのリソースオブジェクトを作成します。 -
dynamodb.Table('purchase_events'): 作成したDynamoDBテーブル名を取得します。 -
put_item_to_dynamodb()関数:-
table.put_item(Item=item)で、DynamoDBテーブルにデータを書き込みます。Itemのキーと値は、テーブルのスキーマと一致させる必要があります。 - S3へのアップロードと同様に、EC2にアタッチしたIAMロールが自動的に認証情報を引き継ぎます。
-
-
フィルタリングとデータ変換:
event_typeがpurchaseのログを抽出し、DynamoDBに書き込むために必要なフィールドだけをitem辞書に格納しています。
(2) 実行方法
-
DynamoDBテーブルを作成: AWSコンソールで、プライマリキーに
user_idを持つpurchase_eventsというテーブルを作成します。 -
必要なライブラリをインストール: EC2インスタンス上で
boto3がインストールされていることを確認します(17日目でインストール済み)。 -
プロデューサーの実行: 別のSSHセッションで
web_log_producer.pyを起動し、Kafkaにログデータを流し続けます。python3 web_log_producer.py -
DynamoDBプロセッサーの実行:
dynamodb_processor.pyを実行します。実行すると、python3 dynamodb_processor.pypurchaseイベントが検出されるたびにログが出力され、そのデータがDynamoDBテーブルに書き込まれるのが確認できます。AWSコンソールからテーブルのアイテムを確認してみましょう。
まとめと次回予告
今日は、Pythonで処理したストリーミングデータを、高速な読み書きが可能なAmazon DynamoDBに書き込む方法を学びました。これで、リアルタイム分析の基盤がさらに強化され、ユーザーの行動に即座に反応するアプリケーションを構築する準備が整いました。
明日からは、このデータ分析パイプラインをより堅牢で、見やすいものにするためのステップに進みます。
19日目: AWS CloudWatchでKafkaとアプリケーションをモニタリングする
お楽しみに!