前回の記事では、Lambda アーキテクチャの設計思想と実装について紹介しました。その中で、Lambda Architecture の 3 層について触れました:
- Batch Layer:遅いが正確。全履歴データを処理し、最終的な一貫性を保証する
- Speed Layer:増分データを高速に処理する
- Serving Layer:前の 2 層のデータを統合し、外部に統一クエリインターフェースを提供する
今日からは Speed Layer の実装 に深く入り込みます。手書きの Kafka Consumer から始めて、ストリーム処理の核心メカニズムを段階的に探っていきましょう。
本日は、Kafka Consumer という Speed Layer の重要コンポーネントを書きます。これは即時に注文データを受け取り、Serving DB に送信する役割を担います(Batch Layer は毎日の全量履歴データの処理に専念します)。
なぜ「知其然よりも、知其所以然」なのか?
普段は強力なフレームワークを使って問題を解決していますが、次のように考えたことはありませんか?
- 「なぜこのフレームワークはこう設計されているのか?」
- 「もし自分が設計するならどうするだろう?」
- 「奇妙なバグに遭遇したとき、実際には何が起きているのか?」
多くの場合、私たちは 知其然(=使い方を知っている)に留まっており、知其所以然(=なぜそうなのか)を理解していません。
最も基本的なコードを自分で書くことで、ツールの背後にあるロジックを本当に理解できます。原理を理解すれば、どんなツールでもより自在に扱え、デバッグの際にも方向性が明確になります。
これこそが本シリーズの狙いです:最もシンプルなコードから始めて、複雑なシステムの設計思想を一歩ずつ理解すること。
重要な注意
本記事のすべてのコードは 教育・概念解説用の疑似コード(Pseudo Code) です。実際のシナリオ理解や問題解決に役立ちますが、ここに掲載しているコードをそのまま実運用に使うことはできません。目的はストリーム処理の設計思想と核心概念を解説することにあります。読む際は全体アーキテクチャと設計ロジックの理解に重点を置き、コードの細部は軽く流す程度で構いません。
Step 1:Speed Layer Kafka Consumer → DB 書き込み
まずは最もシンプルなバージョンを見てみましょう:
from kafka import KafkaConsumer
import json
# orders トピックを購読
consumer = KafkaConsumer('orders')
print("[Speed Layer] Waiting for fresh orders...")
for message in consumer:
order = json.loads(message.value.decode('utf-8'))
insert_db(order)
conn.commit()
print(f"[Speed Layer] Inserted order {order['id']}")
このロジックはとても直感的です:
- Kafka がデータソースとして即時注文ストリームを提供
- Consumer がデータを消費し処理する
- Serving DB が処理結果を保存
- 核心タスク:データを素早くストレージ層に書き込む
Step 2:Serving DB 設計
ここでは 2 つの重要なテーブルがあります:
-
orders_batch_summary
- Batch Layer が毎日計算した履歴統計 -
orders_realtime
- Speed Layer が即時に送信する注文明細
ダッシュボードでのクエリ時には、両方のテーブルを統合して計算し、status = 'removed' の無効注文をフィルタリングします:
SELECT status, SUM(count) AS total
FROM (
SELECT status, count(*)
FROM orders_batch_summary
WHERE status != 'removed'
UNION ALL
SELECT status, COUNT(*) AS count
FROM orders_realtime
WHERE status != 'removed'
GROUP BY status
) t
GROUP BY status;
しかし、会社の成長に伴い、Consumer の数は増え続け、各自が独自にロジックを書いた結果、コードは錯綜し、保守が困難になってしまいました。
ここで必要になるのがリファクタリングです――統一されたストリーム処理アーキテクチャを構築し、バラバラな実装をやめることです。
Source 抽象層設計
チーム開発においては、各自が独自の Kafka Consumer を実装するため、コードスタイルが統一されず、統合が難しくなります。
その解決策は、統一的な「Source インターフェース」を定義することです。
Source アーキテクチャ設計
┌─────────────┐
│ BaseSource │ ◄── 抽象インターフェース
│ │
│ + run() │
└─────────────┘
△
│ implements
┌─────────────┐
│KafkaSource │ ◄── 具体的な実装
│ │
│ + run() │
└─────────────┘
Source コアコード逐次解説
Step 1:BaseSource 抽象インターフェースの定義
from abc import ABC, abstractmethod
class BaseSource(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def run(self):
pass
核心概念:
- 各 Source は識別のため
name
を持つ -
run()
は抽象メソッドで、サブクラスに実装を強制する
Step 2:SimpleKafkaSource 初期化
class SimpleKafkaSource(BaseSource):
def __init__(self, name: str, topic: str, broker_address: str = "localhost:9092"):
super().__init__(name)
self.topic = topic
self.broker_address = broker_address
self.consumer = None
self.message_handler = self._default_handler
設計ポイント:
- BaseSource を継承し、統一インターフェースに準拠
-
message_handler
は差し替え可能で、処理ロジックに柔軟性を持たせる
Step 3:Kafka Consumer の設定
def _setup_consumer(self):
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_address,
group_id=f"simple-source-{self.name}",
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
)
技術的要点:
-
group_id
は自動生成し、衝突を回避 -
auto_offset_reset='latest'
により最新メッセージから消費を開始 - 自動 JSON デシリアライズ
Step 4:コア実行ロジック
def run(self):
self._setup_consumer() # まず Consumer をセットアップ
for message in self.consumer: # メッセージを継続的に監視
self.message_handler({
'key': message.key,
'value': message.value,
'topic': message.topic,
'offset': message.offset
})
実行フロー:
- Kafka Consumer を初期化
- topic からメッセージを継続的に読み取る
- メッセージを標準フォーマットにラップ
-
message_handler
を呼び出して処理
核心設計:Source はあくまでデータを受信する責務だけを担い、実際のメッセージ処理は外部から注入された message_handler
に任せる。この設計により処理ロジックの柔軟性が確保される。
import logging
import json
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from kafka import KafkaConsumer
logger = logging.getLogger(__name__)
class BaseSource(ABC):
"""
基本的な Source 抽象クラス
"""
def __init__(self, name: str):
self.name = name # Source の一意な名称
self._running = False # 実行状態フラグ
@abstractmethod
def run(self):
"""
メイン実行メソッド。サブクラスで実装が必要
"""
pass
def stop(self):
"""
Source を停止する
"""
self._running = False
logger.info(f"Source {self.name} stopped")
class SimpleKafkaSource(BaseSource):
"""
シンプルな Kafka Source 実装
"""
def __init__(
self,
name: str,
topic: str,
broker_address: str = "localhost:9092",
consumer_group: Optional[str] = None,
message_handler: Optional[Callable[[Any], None]] = None
):
super().__init__(name)
self.topic = topic # 消費対象の topic
self.broker_address = broker_address # Kafka broker アドレス
self.consumer_group = consumer_group or f"simple-source-{name}" # コンシューマグループ
self.message_handler = message_handler or self._default_handler # メッセージ処理関数
self.consumer: Optional[KafkaConsumer] = None # Kafka コンシューマ
def _default_handler(self, message):
"""
デフォルトのメッセージ処理関数
"""
print(f"[{self.name}] Received message: {message}")
def _setup_consumer(self):
"""
Kafka コンシューマのセットアップ
"""
try:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_address,
group_id=self.consumer_group,
auto_offset_reset='latest', # 最新メッセージから消費開始
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
key_deserializer=lambda m: m.decode('utf-8') if m else None
)
logger.info(f"Consumer setup for topic: {self.topic}, group: {self.consumer_group}")
except Exception as e:
logger.error(f"Failed to setup consumer: {e}")
raise
def run(self):
"""
Kafka topic からデータを読み込む
"""
logger.info(f"Starting Source {self.name} for topic {self.topic}")
# コンシューマをセットアップ
self._setup_consumer()
# 実行状態を設定
self._running = True
try:
# メインの消費ループ
while self._running:
# 新しいメッセージをポーリング(1秒タイムアウト)
message_batch = self.consumer.poll(timeout_ms=1000)
# 各パーティションのメッセージを処理
for topic_partition, messages in message_batch.items():
for message in messages:
if not self._running:
break
try:
# メッセージを処理
self.message_handler({
'key': message.key,
'value': message.value,
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'timestamp': message.timestamp
})
except Exception as e:
logger.error(f"Error processing message: {e}")
except KeyboardInterrupt:
logger.info("Received interrupt signal")
except Exception as e:
logger.error(f"Error in run loop: {e}")
finally:
# リソースを解放
if self.consumer:
self.consumer.close()
logger.info(f"Source {self.name} finished")
def stop(self):
"""
Source を停止
"""
super().stop()
if self.consumer:
self.consumer.close()
Sink 抽象層設計
Speed Layer アーキテクチャにおいて、Source がデータ入力を担当するのに対し、Sink はデータ出力を担当します。
様々なバラバラな出力実装を避けるため、統一された Sink の仕様を定義します。
Sink アーキテクチャ設計
┌─────────────┐
│ BaseSink │ ◄── 抽象インターフェース
│ │
│ + write() │
└─────────────┘
△
│ implements
┌──────────────────┐
│SimplePostgreSQL │ ◄── 具体的な実装
│Sink │
│ + write() │
└──────────────────┘
Sink コアコード逐次解説
Step 1:Base Sink 抽象インターフェースの定義
from abc import ABC, abstractmethod
class BaseSink(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def write(self, message):
pass
def setup(self):
pass # デフォルトは空実装
核心概念:
- 各 Sink は識別のため
name
を持つ -
write()
がコアメソッドで、実際のデータ書き込みを処理する -
setup()
はデフォルト実装を提供し、必要に応じてサブクラスでオーバーライド可能
Step 2:Simple PostgreSQL Sink 初期化
class SimplePostgreSQLSink(BaseSink):
def __init__(self, name: str, host: str, dbname: str, table_name: str):
super().__init__(name)
self.host = host
self.dbname = dbname
self.table_name = table_name
self.connection = None
設計ポイント:
- BaseSink を継承し、統一インターフェースに準拠
- データベース接続情報を保持
-
connection = None
:必要なときに接続を確立する
Step 3:write メソッドのコアロジック
def write(self, message):
# カラムを自動検出して DB に書き込む
data = message.get('value', {})
# ... 動的に SQL を生成して挿入を実行
核心特徴:message['value']
のカラム構造を自動検出し、動的に INSERT SQL を生成して DB に書き込む。
これにより、異なるデータ構造にも自動で対応できる Sink が実現される。
Sink 完整コード
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict
try:
import psycopg2
from psycopg2.extras import Json
from psycopg2 import sql
except ImportError:
psycopg2 = None
print("Warning: psycopg2 not installed. Run: pip install psycopg2-binary")
logger = logging.getLogger(__name__)
class BaseSink(ABC):
"""
基本的な Sink 抽象クラス
"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def write(self, message: Dict[str, Any]):
"""
1 件のメッセージを書き込む
"""
pass
def setup(self):
"""
接続を設定
"""
pass
def close(self):
"""
接続を閉じる
"""
pass
class SimplePostgreSQLSink(BaseSink):
"""
カラム自動検出対応の PostgreSQL Sink
"""
def __init__(
self,
name: str,
host: str,
port: int,
dbname: str,
user: str,
password: str,
table_name: str
):
super().__init__(name)
self.host = host
self.port = port
self.dbname = dbname
self.user = user
self.password = password
self.table_name = table_name
self.connection = None
def setup(self):
"""
データベース接続を確立
"""
if psycopg2 is None:
raise ImportError("psycopg2 is required")
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
dbname=self.dbname,
user=self.user,
password=self.password
)
logger.info(f"Connected to PostgreSQL: {self.host}:{self.port}/{self.dbname}")
def write(self, message: Dict[str, Any]):
"""
message のカラムを自動検出して PostgreSQL に書き込む
"""
# カラム構造を自動検出し、動的に INSERT SQL を生成して書き込み
data = message.get('value', {})
# ... 実際のカラム検出と SQL 実行ロジックをここに実装
def close(self):
"""
接続を閉じる
"""
if self.connection:
self.connection.close()
logger.info("PostgreSQL connection closed")
Simple Streaming Engine 統一管理層
Source(データ入力)と Sink(データ出力)の間には、統一的な管理層が必要です。この層はスケジューリング、モニタリング、ライフサイクル管理を担当します。その役割を果たすのが SimpleStreamingEngine です。
Simple Streaming Engine アーキテクチャ設計
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── 中央管理者
│ │
│ +add_source() │
│ +add_sink() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Sink │
│ │ │ │
│ KafkaSource │ │PostgreSQLSink│
└──────────────┘ └──────────────┘
Simple Streaming Engine コアコード逐次解説
Step 1:Simple Streaming Engine 初期化
class SimpleStreamingEngine:
def __init__(self, name: str = "simple-streaming-app"):
self.name = name
self._sources = [] # Source リスト
self._sinks = [] # Sink リスト
核心概念:
- SimpleStreamingEngine は Source と Sink の 2 つのリストを管理
- 統一的な登録インターフェースを提供
Step 2:Source と Sink の登録
def add_source(self, source: BaseSource):
self._sources.append(source)
def add_sink(self, sink: BaseSink):
self._sinks.append(sink)
設計ポイント:
- シンプルなリスト管理で、複数の Source と Sink をサポート
- BaseSource / BaseSink を実装していれば、どんなクラスでも登録可能
Step 3:コア実行ロジック
def run(self):
# すべての Sink の接続をセットアップ
for sink in self._sinks:
sink.setup()
# 各 Source にメッセージハンドラを設定し、データを Sink へ転送
for source in self._sources:
source.message_handler = self._create_message_handler()
source.run() # データ受信開始
コアフロー:
- すべての Sink を初期化
- Source にメッセージハンドラを注入
- Source を起動してデータ受信を開始
Step 4:メッセージハンドラのコア設計
def _create_message_handler(self):
def handler(message):
# すべての Sink へメッセージを送信
for sink in self._sinks:
sink.write(message)
return handler
データ伝達フロー詳細
- SimpleStreamingEngine 起動時:
# SimpleStreamingEngine.run() 内
for source in self._sources:
source.message_handler = self._create_message_handler() # ハンドラを注入
source.run() # Source を起動
- Source がデータを受信:
# SimpleKafkaSource.run() 内
for message in self.consumer: # Kafka からメッセージを取得
formatted_message = {
'key': message.key,
'value': message.value # メッセージ内容
}
self.message_handler(formatted_message) # Engine に注入されたハンドラを呼び出す
- ハンドラがデータを転送:
# _create_message_handler() の戻り値
def handler(message): # message は Source から渡されたフォーマット済みメッセージ
for sink in self._sinks: # 登録されている各 Sink へ送信
sink.write(message) # Sink が処理を実行
データの流れ:Kafka
→ Source.run()
→ message_handler()
→ Sink.write()
設計の精髄:SimpleStreamingEngine は「関数注入」によって、Source が Sink の存在を知らなくてもよい構造を実現。完全な疎結合を達成。
なぜ Simple Streaming Engine が必要か?
- 疎結合設計:Source と Sink を完全分離し、独立に置換可能
- 拡張性:複数 Sink(例:PostgreSQL + Elasticsearch)の同時サポート
- 統一管理:一貫した登録・実行インターフェースを提供
Simple Streaming Engine 完整コード
import logging
from typing import List
from .source import BaseSource
from .sink import BaseSink
logger = logging.getLogger(__name__)
class SimpleStreamingEngine:
"""
シンプルなストリーミング処理エンジン
"""
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources: List[BaseSource] = [] # Source リスト
self._sinks: List[BaseSink] = [] # Sink リスト
def add_source(self, source: BaseSource):
"""
Source を追加
"""
self._sources.append(source)
def add_sink(self, sink: BaseSink):
"""
Sink を追加
"""
self._sinks.append(sink)
def run(self):
"""
エンジンを起動し、データ処理を開始
"""
# Sink をセットアップ
for sink in self._sinks:
sink.setup()
# 各 Source にメッセージハンドラを設定
for source in self._sources:
source.message_handler = self._create_message_handler()
source.run() # データ受信開始
def _create_message_handler(self):
"""
メッセージハンドラを生成し、すべての Sink へ転送
"""
def handler(message):
for sink in self._sinks:
sink.write(message)
return handler
実際の実行:部品を組み合わせればデータは自動で流れる
ここまでで、すでに以下が揃いました:
- Source:Kafka からデータを取得
- Sink:データを PostgreSQL に書き込む
- SimpleStreamingEngine:Source と Sink を接続し、監視・管理を行う
あとはこの 3 つを組み合わせ、「起動」するだけでデータが自動的に流れ始めます。
# 1. SimpleStreamingEngine を作成
engine = SimpleStreamingEngine(...)
# 2. Kafka Source を作成
orders_source = SimpleKafkaSource(...)
# 3. PostgreSQL Sink を作成
pg_sink = SimplePostgreSQLSink(...)
# 4. 組み合わせて起動
engine.add_source(orders_source)
engine.add_sink(pg_sink)
engine.run() # 実行開始:Kafka → PostgreSQL
まとめ
今日は Speed Layer のコア実装 について深掘りしました:
- Batch Layer:堅牢な履歴データ処理の基盤を提供
- Speed Layer:即時データストリームを処理し、迅速に応答
- Serving Layer:両者を統合し、透過的なクエリ体験を提供
Lambda Architecture において Speed Layer がなければ、本当の意味でのリアルタイム性は実現できません。
今回紹介した Source–Sink–Simple Streaming Engine の設計によって、私たちは以下を構築しました:
- 統一的なデータ処理インターフェース
- 拡張可能なストリーム処理フレームワーク
- 完全な Speed Layer の実装モデル
Day 5 予告:パフォーマンスボトルネックへの挑戦
最初のうちはシステムは順調に動作し、Consumer は注文データを問題なく処理できていました。
しかしトラフィックのピークに直面すると:
- コンソールに遅延警告が表示され始める
- Consumer の処理能力が限界に達する
- 注文がキューに滞留し、処理待ちになる
次回は、こうした Speed Layer におけるパフォーマンス課題 を掘り下げ、高スループット環境で問題を解決するための最適化技術について解説します。