0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】- 18日目: 処理したデータをAmazon DynamoDBに書き込む

Last updated at Posted at 2025-08-14

はじめに

「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」18日目です。前回は、処理したデータを長期保存に適したAWS S3に保存する方法を学びました。

今日は、そのデータをさらに活用し、Amazon DynamoDBに書き込む方法を学びます。DynamoDBは、高速な読み書きが求められるリアルタイムアプリケーションに最適なデータベースです。


1. なぜデータをDynamoDBに書き込むのか?

S3は長期保存やバッチ分析には優れていますが、個別のデータに対する高速な読み書きには向きません。一方で、DynamoDBは以下のような特徴を持っています。

  • 高速なパフォーマンス: ミリ秒単位のレイテンシでデータを読み書きできるため、リアルタイムなユーザーの問い合わせやアプリケーションのデータストアとして最適です。
  • フルマネージド: サーバーの運用やパッチ適用はAWSがすべて行います。
  • スケーラビリティ: データ量やリクエスト数に応じて、自動的かつシームレスにスケールします。

例えば、Webサイトのリアルタイムなユーザー行動を追跡し、パーソナライズされたコンテンツを提供する場合、S3にデータを保存しつつ、最新のユーザー情報をDynamoDBに書き込むというハイブリッドな構成が一般的です。


2. DynamoDBへのデータ書き込みを実装する

今回は、Webログからフィルタリングした購入イベントを、DynamoDBテーブルに書き込むスクリプトを作成します。

EC2インスタンスにSSHで接続し、dynamodb_processor.pyというファイルを作成して、以下のコードを記述してください。

注意: このスクリプトを実行する前に、DynamoDBテーブルを作成しておく必要があります。AWSコンソールから、purchase_eventsという名前でテーブルを作成しておきましょう。この際、プライマリキーとしてuser_idを設定してください。また、EC2インスタンスにアタッチしたIAMロールにDynamoDBへの書き込み権限(dynamodb:PutItem)が付与されていることを確認してください。

# dynamodb_processor.py
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
import sys
import boto3

# AWS DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('purchase_events') # 作成したDynamoDBテーブル名に置き換える

# Kafkaコンシューマーの設定
conf = {
    'bootstrap.servers': '<your-msk-bootstrap-brokers>',
    'group.id': 'dynamodb-processor-group',
    'auto.offset.reset': 'earliest'
}

# Consumerインスタンスの作成
consumer = Consumer(conf)

# 購読するトピック
topic = "web-logs"

def put_item_to_dynamodb(item):
    """
    DynamoDBにデータを書き込む関数
    """
    try:
        # DynamoDBにデータを挿入
        table.put_item(Item=item)
        print(f"Successfully wrote item to DynamoDB: {item}")
    except Exception as e:
        print(f"Failed to write to DynamoDB: {e}")

try:
    consumer.subscribe([topic])

    print("Starting DynamoDB processor...")
    
    while True:
        msg = consumer.poll(1.0)
        
        if msg is None:
            continue

        if msg.error():
            if msg.error().code() != KafkaError.PARTITION_EOF:
                raise KafkaException(msg.error())
        else:
            try:
                data = json.loads(msg.value().decode('utf-8'))
                
                # フィルタリング: 'purchase'イベントのみを抽出
                if data.get('event_type') == 'purchase':
                    # DynamoDBに書き込むデータ形式に変換
                    item = {
                        'user_id': data['user_id'],
                        'timestamp': data['timestamp'],
                        'page_id': data['page_id'],
                        'ip_address': data['ip_address']
                    }
                    put_item_to_dynamodb(item)
                    
            except Exception as e:
                print(f"Error processing message: {e}")
                
except KeyboardInterrupt:
    print("\nShutting down processor...")
finally:
    consumer.close()

3. コードの解説と実行

(1) コードの解説

  • boto3.resource('dynamodb'): DynamoDBに接続するためのリソースオブジェクトを作成します。
  • dynamodb.Table('purchase_events'): 作成したDynamoDBテーブル名を取得します。
  • put_item_to_dynamodb()関数:
    • table.put_item(Item=item)で、DynamoDBテーブルにデータを書き込みます。Itemのキーと値は、テーブルのスキーマと一致させる必要があります。
    • S3へのアップロードと同様に、EC2にアタッチしたIAMロールが自動的に認証情報を引き継ぎます。
  • フィルタリングとデータ変換: event_typepurchaseのログを抽出し、DynamoDBに書き込むために必要なフィールドだけをitem辞書に格納しています。

(2) 実行方法

  1. DynamoDBテーブルを作成: AWSコンソールで、プライマリキーにuser_idを持つpurchase_eventsというテーブルを作成します。
  2. 必要なライブラリをインストール: EC2インスタンス上でboto3がインストールされていることを確認します(17日目でインストール済み)。
  3. プロデューサーの実行: 別のSSHセッションでweb_log_producer.pyを起動し、Kafkaにログデータを流し続けます。
    python3 web_log_producer.py
    
  4. DynamoDBプロセッサーの実行: dynamodb_processor.pyを実行します。
    python3 dynamodb_processor.py
    
    実行すると、purchaseイベントが検出されるたびにログが出力され、そのデータがDynamoDBテーブルに書き込まれるのが確認できます。AWSコンソールからテーブルのアイテムを確認してみましょう。

まとめと次回予告

今日は、Pythonで処理したストリーミングデータを、高速な読み書きが可能なAmazon DynamoDBに書き込む方法を学びました。これで、リアルタイム分析の基盤がさらに強化され、ユーザーの行動に即座に反応するアプリケーションを構築する準備が整いました。

明日からは、このデータ分析パイプラインをより堅牢で、見やすいものにするためのステップに進みます。

19日目: AWS CloudWatchでKafkaとアプリケーションをモニタリングする

お楽しみに!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?