はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」21日目です。昨日、Kafka Connectを使ってKafkaからS3へのデータ連携を自動化しました。これにより、リアルタイムに集まったログデータが、S3にファイルとして蓄積されるようになりました。
今日は、そのS3に保存されたデータを、いよいよ分析に使えるようにするための準備を行います。AWSのデータカタログサービスGlue Data Catalogにデータを登録し、SQLで分析できるようにする手順を学びましょう。
1. なぜGlue Data Catalogが必要なのか?
S3に保存されたファイルは、そのままでは単なる「データ」の集合体に過ぎません。データ分析を行うためには、そのデータのスキーマ(データの構造、列名、データ型など)を定義し、データベースのテーブルのように扱えるようにする必要があります。
AWS Glue Data Catalogは、その役割を担うフルマネージドサービスです。
- 中央集中管理: 複数のAWSサービス(S3、RDSなど)にまたがるデータのメタデータ(スキーマ情報)を一元管理します。
- 分析ツールの統合: Athena、Redshift Spectrum、SageMakerなど、様々なAWSの分析・機械学習ツールが、Glue Data Catalogに登録されたメタデータを参照してデータを分析します。
- クローラー機能: S3に保存されたファイルを自動でスキャンし、スキーマを推論してくれます。これにより、手動でスキーマを定義する手間が省けます。
Glue Data Catalogを使うことで、S3上のデータが「データレイク」として機能し、SQLを使って手軽に分析できる環境が整います。
2. Glue Data CatalogにS3のデータを登録する
S3に蓄積されたWebログデータをGlue Data Catalogに登録し、Athenaでクエリを実行できるようにする手順を解説します。
ステップ1:S3バケットへのデータ蓄積
まずは、Kafka Connectを使って、web-logs
トピックのデータがS3バケットに蓄積されていることを確認します。
ステップ2:Glueクローラーの作成
AWSマネジメントコンソールで「Glue」と検索し、サービスを開きます。
- 「クローラー」 を選択し、「クローラーを作成」をクリックします。
-
クローラー名:
web-logs-crawler
など、わかりやすい名前を付けます。 -
データソースの指定:
- データソースの追加をクリックします。
- データソース: 「S3」を選択します。
-
S3パス: Kafka Connectがデータを保存しているS3バケット内のパス(例:
s3://your-s3-bucket-name/web-logs/
)を指定します。 - IAMロール: GlueクローラーがS3にアクセスするためのIAMロールを作成します。新しいロール名を入力して、作成することも可能です。このロールには、S3の読み取り権限とGlue Data Catalogへの書き込み権限が必要です。
- スケジュールの設定: 「オンデマンドで実行」を選択します。これにより、手動でクローラーを実行できます。本番環境では、定期的に実行するスケジュールを設定することが一般的です。
-
データベースとテーブルの設定:
-
データベース: 「新しいデータベースを追加」を選択し、
kafka_analysis
などのデータベース名を入力します。 -
プレフィックス: テーブル名のプレフィックスを付けることで、複数のテーブルを整理しやすくなります。今回は
web_logs_
とします。
-
データベース: 「新しいデータベースを追加」を選択し、
すべての設定を確認し、「クローラーを作成」をクリックします。
ステップ3:クローラーの実行
作成したクローラーを選択し、「クローラーを実行」をクリックします。
クローラーがS3のパスをスキャンし、データのスキーマを推論します。数分後、ステータスが「完了」に変わります。
ステップ4:Glueデータベースの確認
ナビゲーションメニューから「データベース」→「テーブル」を選択します。
kafka_analysis
データベースの中に、web_logs_
プレフィックスが付いたテーブルが作成されていることを確認しましょう。このテーブルのスキーマ(列名とデータ型)が、S3のJSONデータから正しく推論されているはずです。
3. AthenaでのSQL分析準備
Glue Data Catalogにデータが登録されると、Amazon Athenaを使って、S3上のデータをSQLで直接クエリできるようになります。
-
AWSマネジメントコンソールで「Athena」サービスを開きます。
-
データソース: 「
awsdatacatalog
」を選択します。 -
データベース: 「
kafka_analysis
」を選択します。 -
左側に作成されたテーブルが表示されているはずです。
-
クエリエディタで以下のSQLクエリを実行してみましょう。
SELECT * FROM "kafka_analysis"."web_logs_web_logs" LIMIT 10;
これで、S3に保存されたデータが、まるでリレーショナルデータベースのテーブルのように扱えることが確認できます。
まとめと次回予告
今日は、S3に蓄積されたストリーミングデータを、Glue Data Catalogに登録し、Amazon AthenaでSQL分析する準備を整えました。これにより、リアルタイム処理のデータとバッチ分析のデータをシームレスに統合できる基盤が完成しました。
明日は、このAthenaを使って、S3上のデータを実際に分析してみましょう。
22日目: Athenaを使ってS3上のストリーミングデータをSQLで分析する
お楽しみに!