1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】- 25日目: CloudWatchを使ってMSKとEC2のメトリクスを監視する

Last updated at Posted at 2025-08-14

はじめに

「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」25日目です。前回、Amazon MSKとKinesis Data Analyticsの連携設定が完了しました。

今日は、その連携を活かし、Amazon Kinesis Data AnalyticsのSQLエディタを使って、MSKに流れてくるデータをリアルタイムに分析してみましょう。これにより、ストリーム処理の力を実感し、ビジネスに即座に役立つインサイトを得る方法を学びます。


1. Kinesis Data AnalyticsのSQLエディタ

Kinesis Data AnalyticsのSQLエディタは、ストリーム処理に特化した標準SQLを使って、アプリケーションを簡単に構築できるインタラクティブな開発環境です。

  • 入力ストリームの参照: SELECT文のFROM句で、入力ストリーム(WEB_LOG_STREAMなど)を直接参照できます。
  • ストリーム処理関数: SUM(), COUNT(), AVG()といった集計関数に加え、ウィンドウ関数TUMBLING WINDOWSLIDING WINDOW)を使って、一定時間内のデータを効率的に集計できます。
  • 出力ストリームへの書き込み: INSERT INTO文を使って、処理結果を別の出力ストリームに書き込めます。

このエディタを使うことで、Pythonで書いた複雑なコードと同じような処理を、よりシンプルかつスケーラブルに実現できます。


2. SQLエディタでリアルタイム分析を実行する

それでは、Kinesis Data AnalyticsのSQLエディタを開き、MSKから受信したWebログデータを分析してみましょう。

ステップ1:アプリケーションの起動

AWSコンソールでKinesis Data Analyticsのアプリケーション(WebLogAnalyticsなど)を開きます。
「ランタイム」 タブに移動し、「アプリケーションの開始」 をクリックします。アプリケーションが起動するまで数分かかります。

ステップ2:SQLエディタを開く

アプリケーションのステータスが「実行中」になったら、**「Apache Flink Studio」タブに移動し、「Apache Flink Studio の開始」**をクリックします。

ステップ3:入力ストリームの確認

SQLエディタが開くと、左側に「WEB_LOG_STREAM」という名前の入力ストリームが表示されているはずです。このストリームのプレビュー(「データの検出」)をクリックすると、MSKからリアルタイムに流れてくる生のJSONデータを確認できます。

ステップ4:SQLクエリを実行する

それでは、以下のクエリをエディタに入力し、実行してみましょう。


① ページビュー数のリアルタイム集計

このクエリは、過去1分間(タンブリングウィンドウ)のページビュー数を集計します。

CREATE OR REPLACE TABLE PageViewCount (
    window_end TIMESTAMP,
    page_count BIGINT
);

CREATE OR REPLACE VIEW TempPageView AS
SELECT STREAM *
FROM "WEB_LOG_STREAM";

INSERT INTO PageViewCount
SELECT
  TUMBLE_END(rowtime, INTERVAL '1' MINUTE),
  COUNT(*)
FROM TempPageView
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE);
  • CREATE TABLE: 処理結果を保存するための一時的なテーブルを作成します。
  • TUMBLE_END(): タンブリングウィンドウ(重複のない時間枠)の終了時刻を返します。
  • GROUP BY TUMBLE(): 1分ごとの時間枠でデータをグループ化し、集計を行います。

このクエリを実行すると、結果タブに1分ごとに更新されるページビュー数が表示されます。


② 最も人気のあるページのリアルタイム特定

このクエリは、過去5分間で最もアクセス数が多いページをリアルタイムで特定します。

CREATE OR REPLACE TABLE TopPages (
    page_id VARCHAR(255),
    access_count BIGINT
);

CREATE OR REPLACE VIEW TopPagesView AS
SELECT
  page_id,
  COUNT(*) AS access_count
FROM "WEB_LOG_STREAM"
GROUP BY page_id
ORDER BY access_count DESC
LIMIT 10;

INSERT INTO TopPages SELECT * FROM TopPagesView;
  • GROUP BY: page_idごとにアクセス数を集計します。
  • ORDER BY ... DESC LIMIT 10: アクセス数が多い順に並べ、上位10件を抽出します。

3. Kinesis Data Analyticsの出力先への連携

SQLエディタで書いたクエリは、INSERT INTO文を使って、処理結果を別の出力先(別のKinesisストリーム、S3、Lambdaなど)に送信できます。これにより、リアルタイム分析の結果を、さらに別のアプリケーションで活用したり、長期保存したりすることが可能になります。


まとめと次回予告

今日は、Amazon Kinesis Data AnalyticsのSQLエディタを使い、MSKから受信したデータをリアルタイムにフィルタリング・集計する方法を学びました。Pythonコードを書くことなく、強力なストリーム処理アプリケーションを構築できることを実感できたと思います。

これで、データ収集からリアルタイム分析までの一連の流れが完成しました。明日からは、データ分析結果をより見やすくするための可視化に挑戦します。

26日目: ロギングとアラート設定で障害に備える

お楽しみに!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?