はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」22日目です。前回、Glue Data Catalogを使ってS3上のデータをテーブルとして登録する準備が整いました。
今日は、その準備を活かし、Amazon Athenaを使ってS3に蓄積されたストリーミングデータをSQLで分析してみましょう。これにより、リアルタイムに収集されたデータから、ビジネスに役立つインサイト(洞察)を得る方法を学びます。
1. Amazon Athenaとは?
Amazon Athenaは、サーバーレスのインタラクティブなクエリサービスです。S3上のデータを、標準的なSQLを使って直接分析できます。
- サーバーレス: クラスターのプロビジョニングや管理が不要で、すぐにクエリを開始できます。
- 従量課金: スキャンしたデータ量に基づいて料金が発生するため、コスト効率に優れています。
- オープンなフォーマット: CSV、JSON、Parquetなど、様々なファイル形式をサポートしています。
Glue Data Catalogに登録されたメタデータを参照することで、S3上のデータがまるでデータベースのテーブルであるかのように扱えるため、データ分析者はインフラの複雑さを気にすることなく、SQLクエリの記述に集中できます。
2. S3上のWebログデータをSQLで分析する
それでは、Athenaを使って、S3に蓄積されたWebログデータから「過去24時間のページビュー数」や「人気のある商品ページ」といった情報を抽出してみましょう。
ステップ1:Athenaコンソールを開く
AWSマネジメントコンソールで「Athena」と検索し、サービスを開きます。
ステップ2:クエリを実行する
「クエリエディタ」に移動し、前回Glue Data Catalogで作成したデータベースとテーブルが選択されていることを確認します。
① 全体のレコード数を確認する
まずは、テーブルにどれくらいのデータが保存されているかを確認してみましょう。
SELECT count(*)
FROM "kafka_analysis"."web_logs_web_logs";
このクエリを実行すると、Kafka ConnectがS3に書き込んだJSONレコードの総数が返されます。
② ページビュー数を集計する
次に、過去24時間(または任意の日付)のページビュー数を日別に集計してみましょう。timestampフィールドを使って、日付ごとにグループ化します。
SELECT
CAST(substr("timestamp", 1, 10) AS DATE) AS event_date,
count(*) AS page_views
FROM "kafka_analysis"."web_logs_web_logs"
GROUP BY 1
ORDER BY event_date DESC;
-
substr("timestamp", 1, 10): ISO 8601形式(YYYY-MM-DDTHH:MM:SS)のタイムスタンプ文字列から、日付部分(YYYY-MM-DD)を抽出しています。 -
CAST(... AS DATE): 抽出した文字列を日付型にキャストしています。
③ 最も人気の高いページを特定する
Webサイトで最もアクセスが多いページはどれか、page_idごとに集計してみましょう。
SELECT
page_id,
count(*) AS access_count
FROM "kafka_analysis"."web_logs_web_logs"
WHERE event_type = 'page_view'
GROUP BY page_id
ORDER BY access_count DESC
LIMIT 10;
このクエリは、event_typeがpage_viewのレコードのみを抽出し、page_idごとにアクセス数を集計して、上位10件を表示します。
④ 特定ユーザーの行動履歴を追跡する
最後に、特定のuser_idを持つユーザーがどのような行動を取ったかを追跡してみましょう。
SELECT
timestamp,
event_type,
page_id
FROM "kafka_analysis"."web_logs_web_logs"
WHERE user_id = 'your-specific-user-id' -- 任意のuser_idに置き換える
ORDER BY timestamp ASC;
これにより、そのユーザーがどのページをいつ閲覧したか、といった一連の行動履歴を時系列で把握できます。
3. Athenaのコスト管理
Athenaはスキャンしたデータ量に基づいて課金されます。今回のデータ量が少ないうちは問題ありませんが、本番環境でデータ量が増えるとコストが膨大になる可能性があります。
コストを最適化するためには、以下の対策が有効です。
-
パーティションの活用: S3のフォルダ構造を日付(例:
year=2024/month=08/day=15/)で分割し、クエリ時にWHERE句でパーティションを指定することで、スキャン対象のデータを減らせます。 - データ形式の変更: JSONよりも、ParquetやORCといったカラムナー形式のファイルに変換することで、スキャン効率が向上し、データ量を大幅に削減できます。
まとめと次回予告
今日は、Amazon Athenaを使ってS3上のストリーミングデータをSQLで分析する方法を学びました。これにより、リアルタイムに収集されたデータから、ビジネスの意思決定に役立つ情報を抽出できる基盤が完成しました。
明日からは、このデータパイプラインをさらに発展させ、より高度な分析を可能にするサービスに触れていきます。
23日目: AWS Lambdaを使ってKafkaのイベントをトリガーにする
お楽しみに!