バッチ処理とストリーム処理について学びました。今回はこの2つの使い分けについて学びます。
一言サマリ
判断基準は、レイテンシ、データ性質、処理の複雑さ、コストの4つ
.
以下からAIによる記事です。
バッチとストリームの比較
前回までに、バッチ処理とストリーム処理の基本を学びました。今回は、どちらを選択すべきか、実践的な判断基準を解説します。
まず、両者の特徴を比較表で整理しましょう。
| 特徴 | バッチ処理 | ストリーム処理 |
|---|---|---|
| 処理タイミング | 定期的(時間・日単位) | リアルタイム(秒・ミリ秒単位) |
| データ量 | 大量データを一括 | 小さなデータを連続 |
| レイテンシ | 高い(分〜時間) | 低い(秒以下) |
| 複雑さ | シンプル | やや複雑 |
| リソース使用 | ピーク時に集中 | 常時一定 |
| コスト | 比較的低い | 比較的高い |
判断基準1: レイテンシ要件
どれくらい早く結果が必要かが最も重要な判断基準です。
バッチを選ぶケース
- 数時間〜1日の遅延が許容される
- 夜間に処理して、翌朝見られれば十分
- 過去データの分析や集計
例: 日次売上レポート
# バッチ処理で十分なケース
def generate_daily_sales_report():
"""
前日の売上を集計
翌朝9時に見られれば良いので、深夜2時にバッチ実行
"""
yesterday = datetime.now() - timedelta(days=1)
query = f"""
SELECT
DATE(order_date) as date,
category,
SUM(amount) as total_sales
FROM orders
WHERE DATE(order_date) = '{yesterday.date()}'
GROUP BY DATE(order_date), category
"""
df = pd.read_sql(query, con=engine)
df.to_csv(f'reports/sales_{yesterday.date()}.csv')
ストリームを選ぶケース
- 秒単位、ミリ秒単位の応答が必要
- 異常を即座に検知して対応したい
- リアルタイムダッシュボードの更新
例: 不正取引の検知
# ストリーム処理が必要なケース
def detect_fraud_realtime(transaction):
"""
取引発生から数秒以内に不正判定し、
疑わしい場合は即座にブロック
"""
if transaction['amount'] > 100000:
# 高額取引は即座にチェック
if is_suspicious_pattern(transaction):
block_transaction(transaction['id'])
send_alert(transaction)
return 'BLOCKED'
return 'APPROVED'
判断基準2: データの性質
データがどのように発生するかも重要な要素です。
バッチを選ぶケース
- データが定期的にまとまって生成される
- 外部システムから定期的にファイルが届く
- 完全なデータセットが必要
例: 月次の財務レポート
def monthly_financial_report(year, month):
"""
月末締めの財務データを集計
全てのデータが揃ってから処理する必要がある
"""
# 月の最終日を取得
last_day = calendar.monthrange(year, month)[1]
start_date = f'{year}-{month:02d}-01'
end_date = f'{year}-{month:02d}-{last_day}'
query = f"""
SELECT
account_type,
SUM(debit) as total_debit,
SUM(credit) as total_credit
FROM transactions
WHERE transaction_date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY account_type
"""
return pd.read_sql(query, con=engine)
ストリームを選ぶケース
- データが継続的に生成される
- イベントが散発的に発生する
- 各イベントを個別に処理したい
例: IoTセンサーデータの監視
class SensorMonitor:
"""
センサーからデータが来るたびに処理
異常値を即座に検知
"""
def __init__(self):
self.thresholds = {
'temperature': (0, 50),
'humidity': (20, 80),
'pressure': (900, 1100)
}
def process_sensor_data(self, data):
"""センサーデータを受信次第処理"""
sensor_type = data['type']
value = data['value']
min_val, max_val = self.thresholds[sensor_type]
if value < min_val or value > max_val:
# 異常値を即座に通知
alert = {
'sensor_id': data['sensor_id'],
'type': sensor_type,
'value': value,
'threshold': f'{min_val}-{max_val}',
'timestamp': data['timestamp']
}
send_immediate_alert(alert)
判断基準3: 処理の複雑さ
処理がどれくらい複雑かによっても選択が変わります。
バッチを選ぶケース
- 複雑な集計や結合が必要
- 全データを見渡す必要がある
- 複数ステップの変換が必要
例: 顧客セグメント分析
def customer_segmentation_analysis():
"""
全顧客の購買履歴を分析してセグメント分け
複雑な計算が必要なのでバッチ処理
"""
# ステップ1: 顧客ごとの購買サマリー
customer_summary = """
SELECT
customer_id,
COUNT(*) as purchase_count,
SUM(amount) as total_amount,
MAX(order_date) as last_purchase,
MIN(order_date) as first_purchase
FROM orders
GROUP BY customer_id
"""
df = pd.read_sql(customer_summary, con=engine)
# ステップ2: RFM分析
df['recency'] = (datetime.now() - pd.to_datetime(df['last_purchase'])).dt.days
df['frequency'] = df['purchase_count']
df['monetary'] = df['total_amount']
# ステップ3: セグメント判定(複雑なロジック)
df['segment'] = df.apply(lambda row: determine_segment(
row['recency'],
row['frequency'],
row['monetary']
), axis=1)
return df
def determine_segment(recency, frequency, monetary):
"""複雑なセグメント判定ロジック"""
if recency < 30 and frequency > 10 and monetary > 100000:
return 'VIP'
elif recency < 90 and frequency > 5:
return 'Regular'
elif recency > 180:
return 'Dormant'
else:
return 'Occasional'
ストリームを選ぶケース
- シンプルな変換やフィルタリング
- イベント単位で完結する処理
- 過去データへの参照が少ない
例: ログのフィルタリングと転送
def stream_log_filter(log_event):
"""
ログをフィルタして重要なものだけ保存
シンプルな処理なのでストリーム向き
"""
# エラーレベル以上のログのみ処理
if log_event['level'] in ['ERROR', 'CRITICAL']:
# 簡単な変換
formatted = {
'timestamp': log_event['timestamp'],
'level': log_event['level'],
'message': log_event['message'],
'service': log_event['service_name']
}
# 外部システムに送信
send_to_monitoring_system(formatted)
return formatted
return None
判断基準4: コストとリソース
運用コストとリソースも重要な判断要素です。
バッチを選ぶケース
- 予算が限られている
- 既存のインフラを活用したい
- リソースを時間帯で調整できる
# バッチ処理のリソース使用パターン
"""
00:00 - 02:00: ピーク(バッチ処理実行)
02:00 - 09:00: アイドル
09:00 - 18:00: 低負荷(通常業務)
"""
def optimize_batch_execution():
"""
夜間の安い時間帯にリソースを集中使用
日中はリソースを開放してコスト削減
"""
# 夜間に大量のCPU/メモリを使用
if datetime.now().hour in range(0, 3):
process_large_dataset_batch()
ストリームを選ぶケース
- リアルタイム性に価値がある
- 運用の複雑さを許容できる
- 24時間安定稼働が必要
# ストリーム処理のリソース使用パターン
"""
常時一定のリソースを使用
高可用性が必要(99.9%以上)
"""
class HighAvailabilityStreamProcessor:
"""
24時間365日稼働するストリームプロセッサ
コストは高いが、リアルタイム性を確保
"""
def __init__(self):
self.health_check_interval = 60 # 秒
self.last_health_check = time.time()
def process(self):
"""継続的な処理"""
while True:
try:
# イベント処理
event = self.get_next_event()
self.process_event(event)
# 定期的なヘルスチェック
if time.time() - self.last_health_check > self.health_check_interval:
self.perform_health_check()
self.last_health_check = time.time()
except Exception as e:
self.handle_error(e)
# エラーが発生しても処理を継続
ハイブリッドアプローチ: Lambda Architecture
実際には、バッチとストリームを組み合わせることもあります。
class HybridDataPipeline:
"""
バッチとストリームを組み合わせたパイプライン
"""
def __init__(self):
self.batch_layer = BatchLayer()
self.stream_layer = StreamLayer()
def query(self, query_params):
"""
バッチ層: 過去の全データ(正確だが遅い)
ストリーム層: 最近のデータ(速いが暫定的)
両方の結果をマージして返す
"""
# バッチ層から過去データを取得(昨日まで)
batch_result = self.batch_layer.query(query_params, until='yesterday')
# ストリーム層から最新データを取得(今日分)
stream_result = self.stream_layer.query(query_params, from_='today')
# 結果をマージ
return self.merge_results(batch_result, stream_result)
def merge_results(self, batch_result, stream_result):
"""バッチとストリームの結果を統合"""
combined = batch_result.copy()
for key, value in stream_result.items():
if key in combined:
combined[key] += value
else:
combined[key] = value
return combined
実践的な意思決定フロー
以下のフローチャートで判断できます。
def choose_processing_mode(requirements):
"""処理方式の選択フロー"""
# 1. レイテンシ要件をチェック
if requirements['latency_seconds'] <= 10:
return 'STREAM'
# 2. データ発生パターンをチェック
if requirements['data_pattern'] == 'continuous':
if requirements['budget'] == 'high':
return 'STREAM'
else:
return 'MICRO_BATCH' # 数分間隔の小さなバッチ
# 3. 処理の複雑さをチェック
if requirements['complexity'] == 'high':
if requirements['latency_hours'] <= 1:
return 'MICRO_BATCH'
else:
return 'BATCH'
# 4. デフォルトはバッチ
return 'BATCH'
# 使用例
requirements = {
'latency_seconds': 60,
'latency_hours': 24,
'data_pattern': 'periodic',
'complexity': 'medium',
'budget': 'medium'
}
mode = choose_processing_mode(requirements)
print(f"Recommended mode: {mode}")
ケーススタディ
実際のビジネスシナリオで判断してみましょう。
ケース1: ECサイトのレコメンデーション
# 判断: ハイブリッド
# バッチ層: 全ユーザーの購買履歴を分析
def batch_recommendation_model():
"""
夜間バッチで協調フィルタリングモデルを更新
計算量が多いのでバッチ処理
"""
# 全ユーザー x 全商品のマトリックスを計算
pass
# ストリーム層: リアルタイムの閲覧履歴を反映
def stream_recommendation_adjustment(user_id, viewed_items):
"""
ユーザーの今の閲覧行動を即座に反映
レスポンスが重要なのでストリーム処理
"""
base_recommendations = get_batch_recommendations(user_id)
adjusted = adjust_for_recent_behavior(base_recommendations, viewed_items)
return adjusted
ケース2: 在庫管理システム
# 判断: ストリーム
def inventory_management():
"""
在庫の増減をリアルタイムで追跡
在庫切れを即座に検知する必要があるのでストリーム
"""
def process_inventory_event(event):
current_stock = get_current_stock(event['product_id'])
if event['type'] == 'sale':
new_stock = current_stock - event['quantity']
else: # 'restock'
new_stock = current_stock + event['quantity']
update_stock(event['product_id'], new_stock)
# 在庫切れアラート
if new_stock <= 0:
send_out_of_stock_alert(event['product_id'])
ケース3: 月次財務レポート
# 判断: バッチ
def monthly_financial_close():
"""
月次決算処理
全データが確定してから処理するのでバッチ
複雑な会計処理が必要
"""
# 月末締め後に実行
# 複数のテーブルを結合
# 複雑な計算とチェック
# 最終レポート生成
pass
まとめと次回予告
バッチとストリームの選択は、レイテンシ要件、データの性質、処理の複雑さ、コストという4つの観点で判断します。絶対的な正解はなく、要件に応じて最適な選択をすることが重要です。場合によっては、両方を組み合わせたハイブリッドアプローチも有効です。
次回は、「データの不変性」について学びます。データパイプラインの信頼性を高めるための重要な概念である、データの不変性について詳しく解説します。
感想
使い分けの基準はレイテンシ要件の占める割合が多いように感じました。使いどころがはっきりしているのでどちらにしようか迷うことは少なそうと思いながらも選択肢がない分、実装難度が上がることもありそうだと思いました。