Day 12 で議論した stateful operations(状態を持つ操作)の概念を覚えていますか?また、以前に実装した Database Lookup Join についてはどうでしょうか?
今日はさらに一歩進んで、ストリーミング処理の真髄である Stream-to-Stream Join(ストリーム间结合)について深く掘り下げます。外部データベースのルックアップに頼るのではなく、2つのストリーム間で直接イベントをマッチングさせます。これこそが、ストリーミングシステムの真の力が発揮される場面です。
JOIN を再考する
リレーショナルデータベース(RDB)において、JOIN は特定の条件に基づいて2つのテーブルを組み合わせる操作です。
しかし、ストリーミング処理では「テーブル」は 制限のないイベントのストリーム(unbounded stream of events) になります。これにより JOIN の性質は根本的に変わり、継続的で、状態を持ち、時間を意識したもの(time-aware)になります。
従来の RDB Join:Build–Probe モデル
Step 1: Build Phase
Table A
(小さい方) ────────────────────> Hash Table
│ (key→value index)
│ │
└─ すべてのデータをメモリにロード │
│
Step 2: Probe Phase │
Table B │
(大きい方) │
│ │
├─ row1 ──────────┐ │
├─ row2 ──────────┼─ Lookup ───────────►│
├─ row3 ──────────┼─ Hash Table │
├─ ... ──────────┘ │
│ │
│ Match Found ◄───────┘
│ │
▼ │
Join Result ◄────────────┘
特徴:
- 片方のサイドがインデックスを構築(Build)し、もう片方が一致するものを探索(Probe)します。
- 小さい方のテーブルは、ハッシュテーブルを構築するためにメモリに完全にロードされます。
- 大きい方のテーブルは行ごとにスキャンされ、ハッシュテーブルに対して照合されます。
- これは、明確な開始と終了がある 有限の一回限りの操作 です。
Streaming Join:双方向状態管理(Bidirectional State Management)
Stream A Processing Stream B Processing
┌─────────────────┐ ┌─────────────────┐
│ │ │ │
│ Event A │ │ Event B │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Store in │ │ Store in │
│ State A │ │ State B │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Query State B ──┼────────────────┼── Query State A │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Emit Result │ │ Emit Result │
│ │ │ │
└─────────────────┘ └─────────────────┘
│ │
│ State Management │
│ ┌─────────────────────────┐ │
└───►│ State Store A │◄────┘
│ Key → [timestamp,value] │
│ │
│ State Store B │
│ Key → [timestamp,value] │
└─────────────────────────┘
主な違い:
- 双方向の Build + Probe: 両方のストリームが状態(state)を保持し、相手側を探索(probe)する必要があります。
- 継続的な操作: 明確な終了はなく、データは無期限に流れ続けます。
- 時間セマンティクス: イベントのタイムスタンプと到着順序が重要になります。
- 状態管理(State management): 過去のデータは保持され、インテリジェントにクリーンアップされる必要があります。
Simple Streaming における Streaming Join の実装
重要な注意点
この記事に含まれるすべてのコードは、教育目的の疑似コード です。ストリーミング処理の背後にあるアーキテクチャのアイデアとコア設計コンセプトを説明するためのものであり、本番環境用ではありません。
分かりやすくするために、以前の実装の詳細を簡略化し、今日のコアメカニズムに焦点を当てています。
ステップ 1:State Store の設計
class SimpleDataFrame:
def __init__(self, name: str = "dataframe", sink_callback=None):
self.name = name
# Streaming Join の核心:双方向の状態ストレージ
self._join_state = defaultdict(list) # key -> [events...]
self._join_partner = None # ペアとなる DataFrame
self._join_key = None # join フィールド名
_join_state の内部構造
{
"O001": [event1, event2, ...], # 1つのキーが複数のイベントに対応する場合がある
"O002": [event3, event4, ...],
}
設計上の考慮事項
-
defaultdict(list)は 1対多の関係をサポートします。 - 単一の
order_idに複数の関連する詳細イベントが紐付く可能性があります。
ステップ 2:join() における双方向の関連付けの確立
def join(self, other: 'SimpleDataFrame', on: str) -> 'SimpleDataFrame':
# 1. join 結果を収集するための新しい DataFrame を作成
joined_df = SimpleDataFrame(f"{self.name}_join_{other.name}")
# 2. 双方向の関連付けを確立
self._join_partner = other
other._join_partner = self
self._join_key = on
other._join_key = on
# 3. 元の処理ロジックをインターセプトして join 機能を注入
self_original_process = self.process_message
def enhanced_process(message):
# 最初に元のロジック(フィルタ、変換など)を実行
result = self_original_process(message)
# 次に join ロジックを適用
join_results = self._process_join_event(message)
for join_result in join_results:
joined_df.process_message(join_result)
return result
self.process_message = enhanced_process
return joined_df
コア設計原則
- 双方向参照(Bidirectional references): 各 DataFrame は自身の join パートナーを知っています。
- イベントインターセプト(Event interception): 既存の動作を壊すことなく join ロジックが注入されます。
- 後方互換性(Backward compatibility): 既存の DataFrame 機能はそのまま維持されます。
設計上の利点
- 後方互換性:フィルタやシンク(sink)は引き続き動作します。
- 関心の分離(Separation of concerns):join 結果は独立して処理されます。
- 連結可能な操作(Chainable operations):join された結果をさらに処理できます。
ステップ 3:コア Join アルゴリズム
def _process_join_event(self, event) -> List[Dict]:
if not self._join_partner:
return []
join_value = str(event.get(self._join_key))
results = []
# 1. Store: 現在のイベントを状態(state)に保存
self._join_state[join_value].append(event)
# 2. Query: パートナーの状態から一致するイベントを検索
partner_events = self._join_partner._join_state.get(join_value, [])
# 3. Merge: 一致した各イベントと結合
for partner_event in partner_events:
merged = {**event, **partner_event}
results.append(merged)
return results
アルゴリズムのステップ
- Store: 現在のイベントを join key に基づいてローカル状態に永続化します。
- Query: パートナーの state store で一致するイベントを検索します。
- Merge: 一致したイベントを組み合わせて結果を出力します。
ステップ 4:エンドツーエンドの実行フロー
完全な例を見てみましょう:
# join 関係を定義
orders_df = app.dataframe(source=orders_source)
details_df = app.dataframe(source=details_source)
joined_df = orders_df.join(details_df, on="order_id")
# イベントの到着順序
1. detail = {"order_id": "O001", "product": "咖啡", "qty": 2}
2. order = {"order_id": "O001", "user_id": "U123", "total": 500}
イベント 1:Detail が先に到着した場合
# details_df が detail イベントを受信:
join_value = "O001"
# 1. ローカル状態に保存
details_df._join_state["O001"] = [{"order_id": "O001", "product": "咖啡", "qty": 2}]
# 2. パートナー (orders_df) を照会
orders_df._join_state.get("O001", []) # [] を返す。order はまだ存在しない
# 3. 一致なし、出力なし
results = []
イベント 2:Order が後から到着した場合
# orders_df が order イベントを受信:
join_value = "O001"
# 1. ローカル状態に保存
orders_df._join_state["O001"] = [{"order_id": "O001", "user_id": "U123", "total": 500}]
# 2. パートナー (details_df) を照会
details_df._join_state.get("O001", [])
# 見つかった! [{"order_id": "O001", "product": "咖啡", "qty": 2}]
# 3. イベントをマージ
merged = {
**{"order_id": "O001", "user_id": "U123", "total": 500}, # 現在のイベント
**{"order_id": "O001", "product": "咖啡", "qty": 2} # 一致したイベント
}
# 出力:
# {"order_id": "O001", "user_id": "U123", "total": 500, "product": "咖啡", "qty": 2}
results = [merged]
到着順序に関係なく、双方が状態を保持しているため、最終的に join は成功します。
設計上の利点のまとめ
-
クリーンな API:
df1.join(df2, on="key")は直感的で表現力に富んでいます。 - 自動的な状態処理: 状態管理とルーティングが透過的に行われます。
- 非侵入的な設計: 既存の DataFrame 操作に影響を与えません。
- 構成可能(Composable): join 結果をさらに別の変換処理と繋げることができます。
- 順序不同(Out-of-order)への耐性: 到着順序に関係なくマッチングが機能します。
この実装は、複雑な Streaming Join のロジックをシンプルな API の背後にカプセル化(encapsulated)し、開発者が低レベルの状態メカニズムではなくビジネスロジックに集中できるようにする方法を示しています。
アーキテクチャのまとめ
Day 13: SimpleStreamingEngine with Streaming JOIN
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ KafkaSource │───►│DataFrame │───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Filter │ │ • Batch insert │
│ • Consumer pause│ │ • Lookup Join │ │ • Timer trigger │
│ • Offset resume │ │ • Streaming Join │ │ • Overload detect│
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ • Backpressure │
│ • Checkpoint │
│ • State Storage │
└─────────────────────────────────────────────────────────────┘
新たに導入された機能
Day 10
- Lookup Join: データベースを照会してデータを補完する DataFrame 機能。
Day 13
- Streaming Join: 2つのストリーム間の直接結合。
- 状態ストレージ(State storage): イベントマッチングと順序不同なデータ処理のための双方向状態管理。
まとめ
Streaming Join は、単なるイベントの転送から、インテリジェントなストリーム相関へと大きく前進したことを意味します。
双方向の状態管理を導入することで、2つの独立したストリームが互いを認識し、一致する相手を待ち、最終的にストリーミングレイヤー自体の中で出会うことが可能になります。これは到着順序の課題を解決するだけでなく、さらに重要なことに、外部データベースへの依存を排除します。データ処理は、完全にストリーミングドメイン内で完結できるようになりました。
次回予告:Day 14 – Streaming GroupBy
JOIN が「ストリーム間の出会い」であるなら、GROUP BY は「ストリーム自身の整理」です。
Day 14 では、Streaming GroupBy の実装方法を探ります。お楽しみに。