はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」16日目です。昨日、EC2インスタンスからWebログをKafkaに送信するデータ収集の仕組みを構築しました。
今日は、そのKafkaに流れてくる膨大なログデータから、「本当に必要な情報だけを抽出し、意味のある形に変換する」 ストリーム処理の基本に挑戦します。具体的には、Pythonでコンシューマーを実装し、データのフィルタリングと集計を行います。
1. なぜストリーム処理が必要なのか?
Kafkaに流れてくる生データは、そのままでは分析に使いにくいことがほとんどです。Webログの例では、以下のような課題があります。
- ノイズの多さ: サイトにアクセスするすべてのユーザーのログが含まれており、特定の行動(例: 商品購入)に絞りたい場合、不要なデータが多くなります。
- データの非構造化: ログは1件ずつバラバラに流れてくるため、全体の傾向を把握するには集計が必要です。
ストリーム処理を行うことで、これらの課題を解決し、データを「生きた情報」に変えることができます。
-
フィルタリング: 特定の条件(例:
event_typeがpurchase)を満たすデータだけを抽出します。 - 集計: 決められた時間枠(例: 過去1分間)でデータを集計し、ビジネス指標(例: 1分間の購入回数)を計算します。
2. Pythonでストリーム処理を実装する
今回は、Webログから「商品購入」イベントだけをフィルタリングし、さらに過去1分間の購入数を集計するコンシューマーを実装します。
EC2インスタンスにSSHで接続し、stream_processor.pyというファイルを作成して、以下のコードを記述してください。
# stream_processor.py
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
import sys
from collections import deque
import time
# Kafkaコンシューマーの設定
conf = {
'bootstrap.servers': '<your-msk-bootstrap-brokers>',
'group.id': 'log-processor-group',
'auto.offset.reset': 'earliest'
}
# Consumerインスタンスの作成
consumer = Consumer(conf)
# 購読するトピック
topic = "web-logs"
# 過去1分間のデータを保持するキュー
purchase_events = deque()
window_size_sec = 60
try:
consumer.subscribe([topic])
print("Starting stream processor...")
while True:
msg = consumer.poll(1.0)
if msg is None:
# 1秒間メッセージがなければ次のループへ
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
# 終端に達したが、これは正常な状態
continue
else:
raise KafkaException(msg.error())
else:
try:
# データのデコードとパース
data = json.loads(msg.value().decode('utf-8'))
event_type = data.get('event_type')
# フィルタリング: 'purchase'イベントのみを抽出
if event_type == 'purchase':
current_time = time.time()
purchase_events.append(current_time)
print(f"Filtered purchase event: {data}")
# データの集計: 過去1分間の購入数を計算
# 古いイベントをキューから削除
while purchase_events and purchase_events[0] < current_time - window_size_sec:
purchase_events.popleft()
# 現在の購入数をログに出力
print(f"Current purchase count in the last {window_size_sec} seconds: {len(purchase_events)}")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
consumer.close()
3. コードの解説と実行
(1) コードの解説
-
group.id: 'log-processor-group': コンシューマーグループIDを新しく設定しました。これにより、ログを収集するコンシューマーと処理するコンシューマーを分離できます。 -
deque:collectionsモジュールのdeque(デック)は、高速な追加・削除が可能なキュー(両端キュー)です。今回は、過去1分間のタイムスタンプを効率的に管理するために使用しています。 -
フィルタリング:
if event_type == 'purchase'というシンプルな条件で、必要なデータだけを抽出しています。 -
ウィンドウ集計:
-
purchase_events.append(current_time):購入イベントが発生した時刻をキューに追加します。 -
while purchase_events and purchase_events[0] < current_time - window_size_sec::キューの先頭にある最も古いイベントのタイムスタンプが、現在の時刻から60秒以上前であれば、そのイベントをキューから削除します。 -
len(purchase_events):キューに残っているイベントの数が、そのまま過去1分間の購入数になります。
-
(2) 実行方法
-
プロデューサーの実行:
まず、昨日作成したweb_log_producer.pyを別のSSHセッションで実行し、web-logsトピックにデータを送信し続けます。python3 web_log_producer.py -
ストリームプロセッサーの実行:
新しいSSHセッションを開き、stream_processor.pyを実行します。python3 stream_processor.py
コンソールには、フィルタリングされた購入イベントと、過去1分間の購入数がリアルタイムで表示されるはずです。
まとめと次回予告
今日は、Pythonを使ってKafkaからのデータをリアルタイムにフィルタリングし、時間ベースで集計するストリーム処理の基本を学びました。これにより、膨大な生データからビジネスに役立つ指標を抽出する、本格的なデータ分析の第一歩を踏み出しました。
明日は、この処理したデータをAWSのストレージサービスに保存する方法を学びます。
17日目: 処理したデータをAWS S3に保存する
お楽しみに!