今回はストリーム処理について学んでいきます。
一言サマリー
リアルタイム性を求めるならストリーム処理、
以下からAIが作成しています。
ストリーム処理とは何か
ストリーム処理(Stream Processing)は、データが生成されるたびに、リアルタイムまたはニアリアルタイムで処理する方式です。データを「流れ(ストリーム)」として扱い、次々と到着するデータを連続的に処理します。
例えば、株価の監視システムでは、価格が更新されるたびに即座に処理し、一定の条件を満たしたらアラートを発します。バッチ処理のように「まとめて後で」ではなく、「来たらすぐ」処理するのがストリーム処理の特徴です。
ストリーム処理の特徴
ストリーム処理には、バッチ処理とは異なる特徴があります。
低レイテンシ
データの発生から処理完了までの時間(レイテンシ)が数秒以内と非常に短いです。場合によってはミリ秒単位での処理も可能です。
連続処理
24時間365日、常に動作し続けます。バッチのように「開始と終了」がなく、継続的にデータを受け付けて処理します。
イベント駆動
データの到着がトリガーとなって処理が実行されます。スケジュールではなく、イベントに反応する形で動作します。
ストリーム処理のユースケース
実際のビジネスでよく使われるストリーム処理のパターンを見ていきましょう。
リアルタイム監視とアラート
異常を即座に検知して通知する用途に最適です。
from kafka import KafkaConsumer
import json
def monitor_transactions():
"""不正取引の監視"""
consumer = KafkaConsumer(
'transactions',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
transaction = message.value
# 不正パターンのチェック
if is_suspicious(transaction):
send_alert(transaction)
print(f"Alert: Suspicious transaction detected - {transaction['id']}")
def is_suspicious(transaction):
"""不正取引の判定"""
# 短時間に高額の複数取引
if transaction['amount'] > 100000:
return True
# 通常と異なる場所からのアクセス
if transaction['location'] != transaction['usual_location']:
return True
return False
リアルタイム集計
現在進行中のデータを継続的に集計します。
from collections import defaultdict
from datetime import datetime, timedelta
class RealTimeAggregator:
"""リアルタイム集計エンジン"""
def __init__(self, window_minutes=5):
self.window_minutes = window_minutes
self.metrics = defaultdict(lambda: {
'count': 0,
'total': 0,
'window_start': None
})
def process_event(self, event):
"""イベントを処理して集計"""
category = event['category']
amount = event['amount']
timestamp = datetime.fromisoformat(event['timestamp'])
metric = self.metrics[category]
# ウィンドウの開始時刻を設定
if metric['window_start'] is None:
metric['window_start'] = timestamp
# ウィンドウ期間を超えたらリセット
window_end = metric['window_start'] + timedelta(minutes=self.window_minutes)
if timestamp > window_end:
self.output_result(category, metric)
metric['count'] = 0
metric['total'] = 0
metric['window_start'] = timestamp
# 集計更新
metric['count'] += 1
metric['total'] += amount
# リアルタイムで現在の状態を返す
return {
'category': category,
'count': metric['count'],
'total': metric['total'],
'average': metric['total'] / metric['count'] if metric['count'] > 0 else 0
}
def output_result(self, category, metric):
"""集計結果の出力"""
print(f"Window closed for {category}:")
print(f" Count: {metric['count']}")
print(f" Total: {metric['total']}")
print(f" Average: {metric['total'] / metric['count'] if metric['count'] > 0 else 0}")
リアルタイムデータ変換
データを受け取り次第、変換して次の処理に渡します。
import json
from kafka import KafkaConsumer, KafkaProducer
def stream_transformer():
"""ストリームデータの変換"""
consumer = KafkaConsumer(
'raw_events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for message in consumer:
raw_event = message.value
# データ変換
transformed = transform_event(raw_event)
# エンリッチメント
enriched = enrich_with_metadata(transformed)
# 次のトピックに送信
producer.send('processed_events', enriched)
print(f"Processed event: {enriched['id']}")
def transform_event(event):
"""イベントの変換"""
return {
'id': event['event_id'],
'type': event['event_type'],
'timestamp': event['created_at'],
'user_id': event['user']['id'],
'amount': float(event['data']['amount'])
}
def enrich_with_metadata(event):
"""メタデータの付加"""
event['processed_at'] = datetime.now().isoformat()
event['processor_version'] = '1.0'
return event
ウィンドウ処理
ストリーム処理で重要な概念が「ウィンドウ」です。無限に続くストリームを、一定の区切りで処理する仕組みです。
タンブリングウィンドウ(固定時間)
重複しない固定サイズの時間窓で区切ります。
from datetime import datetime, timedelta
class TumblingWindow:
"""タンブリングウィンドウ処理"""
def __init__(self, window_size_seconds=60):
self.window_size = timedelta(seconds=window_size_seconds)
self.current_window = []
self.window_start = None
def add_event(self, event):
"""イベントを追加"""
event_time = datetime.fromisoformat(event['timestamp'])
if self.window_start is None:
self.window_start = event_time
window_end = self.window_start + self.window_size
if event_time >= window_end:
# ウィンドウを閉じて処理
result = self.process_window()
# 新しいウィンドウを開始
self.current_window = [event]
self.window_start = event_time
return result
else:
# 現在のウィンドウに追加
self.current_window.append(event)
return None
def process_window(self):
"""ウィンドウの処理"""
if not self.current_window:
return None
total = sum(e['amount'] for e in self.current_window)
count = len(self.current_window)
return {
'window_start': self.window_start.isoformat(),
'window_end': (self.window_start + self.window_size).isoformat(),
'event_count': count,
'total_amount': total,
'average_amount': total / count
}
スライディングウィンドウ(移動窓)
一定間隔で移動する時間窓で処理します。
from collections import deque
class SlidingWindow:
"""スライディングウィンドウ処理"""
def __init__(self, window_size_seconds=300):
self.window_size = timedelta(seconds=window_size_seconds)
self.events = deque()
def add_event(self, event):
"""イベントを追加して集計"""
event_time = datetime.fromisoformat(event['timestamp'])
# 新しいイベントを追加
self.events.append((event_time, event))
# ウィンドウ外の古いイベントを削除
cutoff_time = event_time - self.window_size
while self.events and self.events[0][0] < cutoff_time:
self.events.popleft()
# 現在のウィンドウ内のデータで集計
return self.calculate_metrics()
def calculate_metrics(self):
"""現在のウィンドウの集計"""
if not self.events:
return None
amounts = [e[1]['amount'] for e in self.events]
return {
'count': len(amounts),
'total': sum(amounts),
'average': sum(amounts) / len(amounts),
'max': max(amounts),
'min': min(amounts)
}
ストリーム処理のフレームワーク
実際のストリーム処理では、専用のフレームワークを使うことが一般的です。
Apache Kafka Streams
from kafka import KafkaConsumer, KafkaProducer
import json
class SimpleStreamProcessor:
"""シンプルなストリームプロセッサ"""
def __init__(self):
self.consumer = KafkaConsumer(
'input-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='stream-processor-group'
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def start(self):
"""処理開始"""
print("Stream processor started")
for message in self.consumer:
try:
# メッセージ処理
result = self.process_message(message.value)
# 結果を出力トピックに送信
self.producer.send('output-topic', result)
except Exception as e:
print(f"Error processing message: {e}")
def process_message(self, data):
"""メッセージの処理ロジック"""
# データ変換
processed = {
'id': data['id'],
'value': data['value'] * 2,
'processed_at': datetime.now().isoformat()
}
return processed
ストリーム処理の課題と対策
ストリーム処理特有の課題と、その解決方法を見ていきます。
順序の保証
ネットワーク遅延などで、イベントの到着順序が入れ替わることがあります。
from sortedcontainers import SortedList
class OrderedStreamProcessor:
"""順序を保証するストリームプロセッサ"""
def __init__(self, buffer_seconds=5):
self.buffer = SortedList(key=lambda x: x['timestamp'])
self.buffer_duration = timedelta(seconds=buffer_seconds)
self.last_processed_time = None
def add_event(self, event):
"""イベントを追加"""
self.buffer.add(event)
# バッファから処理可能なイベントを取り出す
return self.process_buffered_events()
def process_buffered_events(self):
"""バッファ内の古いイベントを順序通りに処理"""
results = []
if not self.buffer:
return results
# 最新イベントの時刻を基準に、バッファ期間を超えたイベントを処理
latest_time = datetime.fromisoformat(self.buffer[-1]['timestamp'])
cutoff_time = latest_time - self.buffer_duration
while self.buffer and datetime.fromisoformat(self.buffer[0]['timestamp']) < cutoff_time:
event = self.buffer.pop(0)
result = self.process_event(event)
results.append(result)
return results
def process_event(self, event):
"""個別イベントの処理"""
print(f"Processing event in order: {event['id']}")
return event
状態管理
ストリーム処理でも、過去の情報を保持する必要がある場合があります。
class StatefulStreamProcessor:
"""状態を持つストリームプロセッサ"""
def __init__(self):
self.user_states = {}
def process_event(self, event):
"""ユーザーの状態を更新しながら処理"""
user_id = event['user_id']
# 現在の状態を取得
state = self.user_states.get(user_id, {
'total_purchases': 0,
'total_amount': 0,
'last_purchase': None
})
# 状態を更新
state['total_purchases'] += 1
state['total_amount'] += event['amount']
state['last_purchase'] = event['timestamp']
# 状態を保存
self.user_states[user_id] = state
# 状態に基づいた判定
if state['total_amount'] > 100000:
return {
'user_id': user_id,
'alert': 'VIP customer',
'lifetime_value': state['total_amount']
}
return None
エラーハンドリングとリトライ
ストリーム処理では、エラーが発生してもシステムを止めずに動作し続ける必要があります。
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientStreamProcessor:
"""エラーに強いストリームプロセッサ"""
def __init__(self):
self.error_count = 0
self.processed_count = 0
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def process_with_retry(self, event):
"""リトライ付き処理"""
try:
result = self.process_event(event)
self.processed_count += 1
return result
except Exception as e:
self.error_count += 1
print(f"Error processing event {event['id']}: {e}")
raise
def process_event(self, event):
"""イベント処理(失敗する可能性がある)"""
# 外部APIを呼び出すなど、失敗する可能性のある処理
if event.get('corrupted'):
raise ValueError("Corrupted event")
return {'status': 'processed', 'id': event['id']}
まとめと次回予告
ストリーム処理は、データが到着次第リアルタイムで処理する方式で、低レイテンシ、連続処理、イベント駆動という特徴があります。ウィンドウ処理、順序保証、状態管理などの技術を活用することで、信頼性の高いストリームパイプラインを構築できます。
次回は、「バッチ vs ストリーム - 使い分けの判断基準」について学びます。2つの処理方式をどのような場合に選択すべきか、実践的な判断基準を解説します。
感想
ストリーム処理とバッチ処理の関係はNoSQLとRDBに近いなぁと感じました。また、記載があった課題に関連してデバッグが大変そうだなと思いました。バッチ処理に比べて要件も実装難度も厳しそうですね。