はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」25日目です。前回、Amazon MSKとKinesis Data Analyticsの連携設定が完了しました。
今日は、その連携を活かし、Amazon Kinesis Data AnalyticsのSQLエディタを使って、MSKに流れてくるデータをリアルタイムに分析してみましょう。これにより、ストリーム処理の力を実感し、ビジネスに即座に役立つインサイトを得る方法を学びます。
1. Kinesis Data AnalyticsのSQLエディタ
Kinesis Data AnalyticsのSQLエディタは、ストリーム処理に特化した標準SQLを使って、アプリケーションを簡単に構築できるインタラクティブな開発環境です。
-
入力ストリームの参照:
SELECT文のFROM句で、入力ストリーム(WEB_LOG_STREAMなど)を直接参照できます。 -
ストリーム処理関数:
SUM(),COUNT(),AVG()といった集計関数に加え、ウィンドウ関数(TUMBLING WINDOWやSLIDING WINDOW)を使って、一定時間内のデータを効率的に集計できます。 -
出力ストリームへの書き込み:
INSERT INTO文を使って、処理結果を別の出力ストリームに書き込めます。
このエディタを使うことで、Pythonで書いた複雑なコードと同じような処理を、よりシンプルかつスケーラブルに実現できます。
2. SQLエディタでリアルタイム分析を実行する
それでは、Kinesis Data AnalyticsのSQLエディタを開き、MSKから受信したWebログデータを分析してみましょう。
ステップ1:アプリケーションの起動
AWSコンソールでKinesis Data Analyticsのアプリケーション(WebLogAnalyticsなど)を開きます。
「ランタイム」 タブに移動し、「アプリケーションの開始」 をクリックします。アプリケーションが起動するまで数分かかります。
ステップ2:SQLエディタを開く
アプリケーションのステータスが「実行中」になったら、**「Apache Flink Studio」タブに移動し、「Apache Flink Studio の開始」**をクリックします。
ステップ3:入力ストリームの確認
SQLエディタが開くと、左側に「WEB_LOG_STREAM」という名前の入力ストリームが表示されているはずです。このストリームのプレビュー(「データの検出」)をクリックすると、MSKからリアルタイムに流れてくる生のJSONデータを確認できます。
ステップ4:SQLクエリを実行する
それでは、以下のクエリをエディタに入力し、実行してみましょう。
① ページビュー数のリアルタイム集計
このクエリは、過去1分間(タンブリングウィンドウ)のページビュー数を集計します。
CREATE OR REPLACE TABLE PageViewCount (
window_end TIMESTAMP,
page_count BIGINT
);
CREATE OR REPLACE VIEW TempPageView AS
SELECT STREAM *
FROM "WEB_LOG_STREAM";
INSERT INTO PageViewCount
SELECT
TUMBLE_END(rowtime, INTERVAL '1' MINUTE),
COUNT(*)
FROM TempPageView
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE);
-
CREATE TABLE: 処理結果を保存するための一時的なテーブルを作成します。 -
TUMBLE_END(): タンブリングウィンドウ(重複のない時間枠)の終了時刻を返します。 -
GROUP BY TUMBLE(): 1分ごとの時間枠でデータをグループ化し、集計を行います。
このクエリを実行すると、結果タブに1分ごとに更新されるページビュー数が表示されます。
② 最も人気のあるページのリアルタイム特定
このクエリは、過去5分間で最もアクセス数が多いページをリアルタイムで特定します。
CREATE OR REPLACE TABLE TopPages (
page_id VARCHAR(255),
access_count BIGINT
);
CREATE OR REPLACE VIEW TopPagesView AS
SELECT
page_id,
COUNT(*) AS access_count
FROM "WEB_LOG_STREAM"
GROUP BY page_id
ORDER BY access_count DESC
LIMIT 10;
INSERT INTO TopPages SELECT * FROM TopPagesView;
-
GROUP BY:page_idごとにアクセス数を集計します。 -
ORDER BY ... DESC LIMIT 10: アクセス数が多い順に並べ、上位10件を抽出します。
3. Kinesis Data Analyticsの出力先への連携
SQLエディタで書いたクエリは、INSERT INTO文を使って、処理結果を別の出力先(別のKinesisストリーム、S3、Lambdaなど)に送信できます。これにより、リアルタイム分析の結果を、さらに別のアプリケーションで活用したり、長期保存したりすることが可能になります。
まとめと次回予告
今日は、Amazon Kinesis Data AnalyticsのSQLエディタを使い、MSKから受信したデータをリアルタイムにフィルタリング・集計する方法を学びました。Pythonコードを書くことなく、強力なストリーム処理アプリケーションを構築できることを実感できたと思います。
これで、データ収集からリアルタイム分析までの一連の流れが完成しました。明日からは、データ分析結果をより見やすくするための可視化に挑戦します。
26日目: ロギングとアラート設定で障害に備える
お楽しみに!