0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】- 16日目: ストリーム処理の基本:フィルタリングと集計を実装

Last updated at Posted at 2025-08-14

はじめに

「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」16日目です。昨日、EC2インスタンスからWebログをKafkaに送信するデータ収集の仕組みを構築しました。

今日は、そのKafkaに流れてくる膨大なログデータから、「本当に必要な情報だけを抽出し、意味のある形に変換する」 ストリーム処理の基本に挑戦します。具体的には、Pythonでコンシューマーを実装し、データのフィルタリングと集計を行います。


1. なぜストリーム処理が必要なのか?

Kafkaに流れてくる生データは、そのままでは分析に使いにくいことがほとんどです。Webログの例では、以下のような課題があります。

  • ノイズの多さ: サイトにアクセスするすべてのユーザーのログが含まれており、特定の行動(例: 商品購入)に絞りたい場合、不要なデータが多くなります。
  • データの非構造化: ログは1件ずつバラバラに流れてくるため、全体の傾向を把握するには集計が必要です。

ストリーム処理を行うことで、これらの課題を解決し、データを「生きた情報」に変えることができます。

  • フィルタリング: 特定の条件(例: event_typepurchase)を満たすデータだけを抽出します。
  • 集計: 決められた時間枠(例: 過去1分間)でデータを集計し、ビジネス指標(例: 1分間の購入回数)を計算します。

2. Pythonでストリーム処理を実装する

今回は、Webログから「商品購入」イベントだけをフィルタリングし、さらに過去1分間の購入数を集計するコンシューマーを実装します。

EC2インスタンスにSSHで接続し、stream_processor.pyというファイルを作成して、以下のコードを記述してください。

# stream_processor.py
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
import sys
from collections import deque
import time

# Kafkaコンシューマーの設定
conf = {
    'bootstrap.servers': '<your-msk-bootstrap-brokers>',
    'group.id': 'log-processor-group',
    'auto.offset.reset': 'earliest'
}

# Consumerインスタンスの作成
consumer = Consumer(conf)

# 購読するトピック
topic = "web-logs"

# 過去1分間のデータを保持するキュー
purchase_events = deque()
window_size_sec = 60

try:
    consumer.subscribe([topic])

    print("Starting stream processor...")

    while True:
        msg = consumer.poll(1.0)
        
        if msg is None:
            # 1秒間メッセージがなければ次のループへ
            continue
        if msg.error():
            if msg.error().code() == KafkaError.PARTITION_EOF:
                # 終端に達したが、これは正常な状態
                continue
            else:
                raise KafkaException(msg.error())
        else:
            try:
                # データのデコードとパース
                data = json.loads(msg.value().decode('utf-8'))
                event_type = data.get('event_type')

                # フィルタリング: 'purchase'イベントのみを抽出
                if event_type == 'purchase':
                    current_time = time.time()
                    purchase_events.append(current_time)
                    print(f"Filtered purchase event: {data}")

                # データの集計: 過去1分間の購入数を計算
                # 古いイベントをキューから削除
                while purchase_events and purchase_events[0] < current_time - window_size_sec:
                    purchase_events.popleft()
                
                # 現在の購入数をログに出力
                print(f"Current purchase count in the last {window_size_sec} seconds: {len(purchase_events)}")

            except Exception as e:
                print(f"Error processing message: {e}")

except KeyboardInterrupt:
    sys.stderr.write('%% Aborted by user\n')
finally:
    consumer.close()

3. コードの解説と実行

(1) コードの解説

  • group.id: 'log-processor-group': コンシューマーグループIDを新しく設定しました。これにより、ログを収集するコンシューマーと処理するコンシューマーを分離できます。
  • deque: collectionsモジュールのdeque(デック)は、高速な追加・削除が可能なキュー(両端キュー)です。今回は、過去1分間のタイムスタンプを効率的に管理するために使用しています。
  • フィルタリング: if event_type == 'purchase'というシンプルな条件で、必要なデータだけを抽出しています。
  • ウィンドウ集計:
    • purchase_events.append(current_time):購入イベントが発生した時刻をキューに追加します。
    • while purchase_events and purchase_events[0] < current_time - window_size_sec::キューの先頭にある最も古いイベントのタイムスタンプが、現在の時刻から60秒以上前であれば、そのイベントをキューから削除します。
    • len(purchase_events):キューに残っているイベントの数が、そのまま過去1分間の購入数になります。

(2) 実行方法

  1. プロデューサーの実行:
    まず、昨日作成したweb_log_producer.pyを別のSSHセッションで実行し、web-logsトピックにデータを送信し続けます。

    python3 web_log_producer.py
    
  2. ストリームプロセッサーの実行:
    新しいSSHセッションを開き、stream_processor.pyを実行します。

    python3 stream_processor.py
    

コンソールには、フィルタリングされた購入イベントと、過去1分間の購入数がリアルタイムで表示されるはずです。


まとめと次回予告

今日は、Pythonを使ってKafkaからのデータをリアルタイムにフィルタリングし、時間ベースで集計するストリーム処理の基本を学びました。これにより、膨大な生データからビジネスに役立つ指標を抽出する、本格的なデータ分析の第一歩を踏み出しました。

明日は、この処理したデータをAWSのストレージサービスに保存する方法を学びます。

17日目: 処理したデータをAWS S3に保存する

お楽しみに!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?