0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kafka Consumer をゼロから書いて学ぶ Lambda アーキテクチャの Speed Layer

Posted at

前回の記事では、Lambda アーキテクチャの設計思想と実装について紹介しました。その中で、Lambda Architecture の 3 層について触れました:

  • Batch Layer:遅いが正確。全履歴データを処理し、最終的な一貫性を保証する
  • Speed Layer:増分データを高速に処理する
  • Serving Layer:前の 2 層のデータを統合し、外部に統一クエリインターフェースを提供する

今日からは Speed Layer の実装 に深く入り込みます。手書きの Kafka Consumer から始めて、ストリーム処理の核心メカニズムを段階的に探っていきましょう。

本日は、Kafka Consumer という Speed Layer の重要コンポーネントを書きます。これは即時に注文データを受け取り、Serving DB に送信する役割を担います(Batch Layer は毎日の全量履歴データの処理に専念します)。

なぜ「知其然よりも、知其所以然」なのか?

普段は強力なフレームワークを使って問題を解決していますが、次のように考えたことはありませんか?

  • 「なぜこのフレームワークはこう設計されているのか?」
  • 「もし自分が設計するならどうするだろう?」
  • 「奇妙なバグに遭遇したとき、実際には何が起きているのか?」

多くの場合、私たちは 知其然(=使い方を知っている)に留まっており、知其所以然(=なぜそうなのか)を理解していません。

最も基本的なコードを自分で書くことで、ツールの背後にあるロジックを本当に理解できます。原理を理解すれば、どんなツールでもより自在に扱え、デバッグの際にも方向性が明確になります。

これこそが本シリーズの狙いです:最もシンプルなコードから始めて、複雑なシステムの設計思想を一歩ずつ理解すること

重要な注意

本記事のすべてのコードは 教育・概念解説用の疑似コード(Pseudo Code) です。実際のシナリオ理解や問題解決に役立ちますが、ここに掲載しているコードをそのまま実運用に使うことはできません。目的はストリーム処理の設計思想と核心概念を解説することにあります。読む際は全体アーキテクチャと設計ロジックの理解に重点を置き、コードの細部は軽く流す程度で構いません。

Step 1:Speed Layer Kafka Consumer → DB 書き込み

まずは最もシンプルなバージョンを見てみましょう:

from kafka import KafkaConsumer
import json

# orders トピックを購読
consumer = KafkaConsumer('orders')

print("[Speed Layer] Waiting for fresh orders...")

for message in consumer:
    order = json.loads(message.value.decode('utf-8'))

    insert_db(order)
    conn.commit()
    print(f"[Speed Layer] Inserted order {order['id']}")

このロジックはとても直感的です:

  • Kafka がデータソースとして即時注文ストリームを提供
  • Consumer がデータを消費し処理する
  • Serving DB が処理結果を保存
  • 核心タスク:データを素早くストレージ層に書き込む

Step 2:Serving DB 設計

ここでは 2 つの重要なテーブルがあります:

  • orders_batch_summary - Batch Layer が毎日計算した履歴統計
  • orders_realtime - Speed Layer が即時に送信する注文明細

ダッシュボードでのクエリ時には、両方のテーブルを統合して計算し、status = 'removed' の無効注文をフィルタリングします:

SELECT status, SUM(count) AS total
FROM (
    SELECT status, count(*)
    FROM orders_batch_summary
    WHERE status != 'removed'

    UNION ALL

    SELECT status, COUNT(*) AS count
    FROM orders_realtime
    WHERE status != 'removed'
    GROUP BY status
) t
GROUP BY status;

しかし、会社の成長に伴い、Consumer の数は増え続け、各自が独自にロジックを書いた結果、コードは錯綜し、保守が困難になってしまいました。

ここで必要になるのがリファクタリングです――統一されたストリーム処理アーキテクチャを構築し、バラバラな実装をやめることです。

Source 抽象層設計

チーム開発においては、各自が独自の Kafka Consumer を実装するため、コードスタイルが統一されず、統合が難しくなります。

その解決策は、統一的な「Source インターフェース」を定義することです。

Source アーキテクチャ設計

    ┌─────────────┐
     BaseSource    ◄── 抽象インターフェース
                 
     + run()     
    └─────────────┘
           
            implements
    ┌─────────────┐
    KafkaSource    ◄── 具体的な実装
                 
     + run()     
    └─────────────┘

Source コアコード逐次解説

Step 1:BaseSource 抽象インターフェースの定義

from abc import ABC, abstractmethod

class BaseSource(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def run(self):
        pass

核心概念

  • 各 Source は識別のため name を持つ
  • run() は抽象メソッドで、サブクラスに実装を強制する

Step 2:SimpleKafkaSource 初期化

class SimpleKafkaSource(BaseSource):
    def __init__(self, name: str, topic: str, broker_address: str = "localhost:9092"):
        super().__init__(name)
        self.topic = topic
        self.broker_address = broker_address
        self.consumer = None
        self.message_handler = self._default_handler

設計ポイント

  • BaseSource を継承し、統一インターフェースに準拠
  • message_handler は差し替え可能で、処理ロジックに柔軟性を持たせる

Step 3:Kafka Consumer の設定

def _setup_consumer(self):
    self.consumer = KafkaConsumer(
        self.topic,
        bootstrap_servers=self.broker_address,
        group_id=f"simple-source-{self.name}",
        auto_offset_reset='latest',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
    )

技術的要点

  • group_id は自動生成し、衝突を回避
  • auto_offset_reset='latest' により最新メッセージから消費を開始
  • 自動 JSON デシリアライズ

Step 4:コア実行ロジック

def run(self):
    self._setup_consumer() # まず Consumer をセットアップ

    for message in self.consumer: # メッセージを継続的に監視
        self.message_handler({
            'key': message.key,
            'value': message.value,
            'topic': message.topic,
            'offset': message.offset
        })

実行フロー

  1. Kafka Consumer を初期化
  2. topic からメッセージを継続的に読み取る
  3. メッセージを標準フォーマットにラップ
  4. message_handler を呼び出して処理

核心設計:Source はあくまでデータを受信する責務だけを担い、実際のメッセージ処理は外部から注入された message_handler に任せる。この設計により処理ロジックの柔軟性が確保される。

import logging
import json
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from kafka import KafkaConsumer

logger = logging.getLogger(__name__)

class BaseSource(ABC):
    """
    基本的な Source 抽象クラス
    """

    def __init__(self, name: str):
        self.name = name  # Source の一意な名称
        self._running = False  # 実行状態フラグ

    @abstractmethod
    def run(self):
        """
        メイン実行メソッド。サブクラスで実装が必要
        """
        pass

    def stop(self):
        """
        Source を停止する
        """
        self._running = False
        logger.info(f"Source {self.name} stopped")

class SimpleKafkaSource(BaseSource):
    """
    シンプルな Kafka Source 実装
    """

    def __init__(
        self,
        name: str,
        topic: str,
        broker_address: str = "localhost:9092",
        consumer_group: Optional[str] = None,
        message_handler: Optional[Callable[[Any], None]] = None
    ):
        super().__init__(name)
        self.topic = topic  # 消費対象の topic
        self.broker_address = broker_address  # Kafka broker アドレス
        self.consumer_group = consumer_group or f"simple-source-{name}"  # コンシューマグループ
        self.message_handler = message_handler or self._default_handler  # メッセージ処理関数
        self.consumer: Optional[KafkaConsumer] = None  # Kafka コンシューマ

    def _default_handler(self, message):
        """
        デフォルトのメッセージ処理関数
        """
        print(f"[{self.name}] Received message: {message}")

    def _setup_consumer(self):
        """
        Kafka コンシューマのセットアップ
        """
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=self.broker_address,
                group_id=self.consumer_group,
                auto_offset_reset='latest',  # 最新メッセージから消費開始
                value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
                key_deserializer=lambda m: m.decode('utf-8') if m else None
            )
            logger.info(f"Consumer setup for topic: {self.topic}, group: {self.consumer_group}")
        except Exception as e:
            logger.error(f"Failed to setup consumer: {e}")
            raise

    def run(self):
        """
        Kafka topic からデータを読み込む
        """
        logger.info(f"Starting Source {self.name} for topic {self.topic}")

        # コンシューマをセットアップ
        self._setup_consumer()

        # 実行状態を設定
        self._running = True

        try:
            # メインの消費ループ
            while self._running:
                # 新しいメッセージをポーリング(1秒タイムアウト)
                message_batch = self.consumer.poll(timeout_ms=1000)

                # 各パーティションのメッセージを処理
                for topic_partition, messages in message_batch.items():
                    for message in messages:
                        if not self._running:
                            break

                        try:
                            # メッセージを処理
                            self.message_handler({
                                'key': message.key,
                                'value': message.value,
                                'topic': message.topic,
                                'partition': message.partition,
                                'offset': message.offset,
                                'timestamp': message.timestamp
                            })
                        except Exception as e:
                            logger.error(f"Error processing message: {e}")

        except KeyboardInterrupt:
            logger.info("Received interrupt signal")
        except Exception as e:
            logger.error(f"Error in run loop: {e}")
        finally:
            # リソースを解放
            if self.consumer:
                self.consumer.close()
            logger.info(f"Source {self.name} finished")

    def stop(self):
        """
        Source を停止
        """
        super().stop()
        if self.consumer:
            self.consumer.close()

Sink 抽象層設計

Speed Layer アーキテクチャにおいて、Source がデータ入力を担当するのに対し、Sink はデータ出力を担当します。
様々なバラバラな出力実装を避けるため、統一された Sink の仕様を定義します。

Sink アーキテクチャ設計

    ┌─────────────┐
      BaseSink     ◄── 抽象インターフェース
                 
     + write()   
    └─────────────┘
           
            implements
    ┌──────────────────┐
    SimplePostgreSQL    ◄── 具体的な実装
    Sink              
     + write()        
    └──────────────────┘

Sink コアコード逐次解説

Step 1:Base Sink 抽象インターフェースの定義

from abc import ABC, abstractmethod

class BaseSink(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def write(self, message):
        pass

    def setup(self):
        pass # デフォルトは空実装

核心概念

  • 各 Sink は識別のため name を持つ
  • write() がコアメソッドで、実際のデータ書き込みを処理する
  • setup() はデフォルト実装を提供し、必要に応じてサブクラスでオーバーライド可能

Step 2:Simple PostgreSQL Sink 初期化

class SimplePostgreSQLSink(BaseSink):
    def __init__(self, name: str, host: str, dbname: str, table_name: str):
        super().__init__(name)
        self.host = host
        self.dbname = dbname
        self.table_name = table_name
        self.connection = None

設計ポイント

  • BaseSink を継承し、統一インターフェースに準拠
  • データベース接続情報を保持
  • connection = None:必要なときに接続を確立する

Step 3:write メソッドのコアロジック

def write(self, message):
    # カラムを自動検出して DB に書き込む
    data = message.get('value', {})
    # ... 動的に SQL を生成して挿入を実行

核心特徴message['value'] のカラム構造を自動検出し、動的に INSERT SQL を生成して DB に書き込む。

これにより、異なるデータ構造にも自動で対応できる Sink が実現される。

Sink 完整コード

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict

try:
    import psycopg2
    from psycopg2.extras import Json
    from psycopg2 import sql
except ImportError:
    psycopg2 = None
    print("Warning: psycopg2 not installed. Run: pip install psycopg2-binary")

logger = logging.getLogger(__name__)

class BaseSink(ABC):
    """
    基本的な Sink 抽象クラス
    """

    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def write(self, message: Dict[str, Any]):
        """
        1 件のメッセージを書き込む
        """
        pass

    def setup(self):
        """
        接続を設定
        """
        pass

    def close(self):
        """
        接続を閉じる
        """
        pass

class SimplePostgreSQLSink(BaseSink):
    """
    カラム自動検出対応の PostgreSQL Sink
    """

    def __init__(
        self,
        name: str,
        host: str,
        port: int,
        dbname: str,
        user: str,
        password: str,
        table_name: str
    ):
        super().__init__(name)
        self.host = host
        self.port = port
        self.dbname = dbname
        self.user = user
        self.password = password
        self.table_name = table_name
        self.connection = None

    def setup(self):
        """
        データベース接続を確立
        """
        if psycopg2 is None:
            raise ImportError("psycopg2 is required")

        self.connection = psycopg2.connect(
            host=self.host,
            port=self.port,
            dbname=self.dbname,
            user=self.user,
            password=self.password
        )
        logger.info(f"Connected to PostgreSQL: {self.host}:{self.port}/{self.dbname}")

    def write(self, message: Dict[str, Any]):
        """
        message のカラムを自動検出して PostgreSQL に書き込む
        """
        # カラム構造を自動検出し、動的に INSERT SQL を生成して書き込み
        data = message.get('value', {})
        # ... 実際のカラム検出と SQL 実行ロジックをここに実装

    def close(self):
        """
        接続を閉じる
        """
        if self.connection:
            self.connection.close()
            logger.info("PostgreSQL connection closed")

Simple Streaming Engine 統一管理層

Source(データ入力)と Sink(データ出力)の間には、統一的な管理層が必要です。この層はスケジューリング、モニタリング、ライフサイクル管理を担当します。その役割を果たすのが SimpleStreamingEngine です。

Simple Streaming Engine アーキテクチャ設計

    ┌─────────────────────┐
    SimpleStreamingEngine  ◄── 中央管理者
                         
        +add_source()    
        +add_sink()      
        + run()          
    └─────────────────────┘
           
            manages
           
    ┌──────────────┐    ┌──────────────┐
        Source    │───▶│     Sink     
                                    
     KafkaSource      PostgreSQLSink
    └──────────────┘    └──────────────┘

Simple Streaming Engine コアコード逐次解説

Step 1:Simple Streaming Engine 初期化

class SimpleStreamingEngine:
    def __init__(self, name: str = "simple-streaming-app"):
        self.name = name
        self._sources = []  # Source リスト
        self._sinks = []    # Sink リスト

核心概念

  • SimpleStreamingEngine は Source と Sink の 2 つのリストを管理
  • 統一的な登録インターフェースを提供

Step 2:Source と Sink の登録

def add_source(self, source: BaseSource):
    self._sources.append(source)

def add_sink(self, sink: BaseSink):
    self._sinks.append(sink)

設計ポイント

  • シンプルなリスト管理で、複数の Source と Sink をサポート
  • BaseSource / BaseSink を実装していれば、どんなクラスでも登録可能

Step 3:コア実行ロジック

def run(self):
    # すべての Sink の接続をセットアップ
    for sink in self._sinks:
        sink.setup()

    # 各 Source にメッセージハンドラを設定し、データを Sink へ転送
    for source in self._sources:
        source.message_handler = self._create_message_handler()
        source.run()  # データ受信開始

コアフロー

  1. すべての Sink を初期化
  2. Source にメッセージハンドラを注入
  3. Source を起動してデータ受信を開始

Step 4:メッセージハンドラのコア設計

def _create_message_handler(self):
    def handler(message):
        # すべての Sink へメッセージを送信
        for sink in self._sinks:
            sink.write(message)
    return handler

データ伝達フロー詳細

  1. SimpleStreamingEngine 起動時
# SimpleStreamingEngine.run() 内
for source in self._sources:
    source.message_handler = self._create_message_handler()  # ハンドラを注入
    source.run()  # Source を起動
  1. Source がデータを受信
# SimpleKafkaSource.run() 内
for message in self.consumer:  # Kafka からメッセージを取得
    formatted_message = {
        'key': message.key,
        'value': message.value  # メッセージ内容
    }
    self.message_handler(formatted_message)  # Engine に注入されたハンドラを呼び出す
  1. ハンドラがデータを転送
# _create_message_handler() の戻り値
def handler(message):  # message は Source から渡されたフォーマット済みメッセージ
    for sink in self._sinks:  # 登録されている各 Sink へ送信
        sink.write(message)   # Sink が処理を実行

データの流れKafkaSource.run()message_handler()Sink.write()

設計の精髄:SimpleStreamingEngine は「関数注入」によって、Source が Sink の存在を知らなくてもよい構造を実現。完全な疎結合を達成。

なぜ Simple Streaming Engine が必要か?

  • 疎結合設計:Source と Sink を完全分離し、独立に置換可能
  • 拡張性:複数 Sink(例:PostgreSQL + Elasticsearch)の同時サポート
  • 統一管理:一貫した登録・実行インターフェースを提供

Simple Streaming Engine 完整コード

import logging
from typing import List
from .source import BaseSource
from .sink import BaseSink

logger = logging.getLogger(__name__)

class SimpleStreamingEngine:
    """
    シンプルなストリーミング処理エンジン
    """

    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources: List[BaseSource] = []  # Source リスト
        self._sinks: List[BaseSink] = []      # Sink リスト

    def add_source(self, source: BaseSource):
        """
        Source を追加
        """
        self._sources.append(source)

    def add_sink(self, sink: BaseSink):
        """
        Sink を追加
        """
        self._sinks.append(sink)

    def run(self):
        """
        エンジンを起動し、データ処理を開始
        """
        # Sink をセットアップ
        for sink in self._sinks:
            sink.setup()

        # 各 Source にメッセージハンドラを設定
        for source in self._sources:
            source.message_handler = self._create_message_handler()
            source.run()  # データ受信開始

    def _create_message_handler(self):
        """
        メッセージハンドラを生成し、すべての Sink へ転送
        """
        def handler(message):
            for sink in self._sinks:
                sink.write(message)
        return handler

実際の実行:部品を組み合わせればデータは自動で流れる

ここまでで、すでに以下が揃いました:

  • Source:Kafka からデータを取得
  • Sink:データを PostgreSQL に書き込む
  • SimpleStreamingEngine:Source と Sink を接続し、監視・管理を行う

あとはこの 3 つを組み合わせ、「起動」するだけでデータが自動的に流れ始めます。

# 1. SimpleStreamingEngine を作成
engine = SimpleStreamingEngine(...)

# 2. Kafka Source を作成
orders_source = SimpleKafkaSource(...)

# 3. PostgreSQL Sink を作成
pg_sink = SimplePostgreSQLSink(...)

# 4. 組み合わせて起動
engine.add_source(orders_source)
engine.add_sink(pg_sink)
engine.run()  # 実行開始:Kafka → PostgreSQL

まとめ

今日は Speed Layer のコア実装 について深掘りしました:

  • Batch Layer:堅牢な履歴データ処理の基盤を提供
  • Speed Layer:即時データストリームを処理し、迅速に応答
  • Serving Layer:両者を統合し、透過的なクエリ体験を提供

Lambda Architecture において Speed Layer がなければ、本当の意味でのリアルタイム性は実現できません。

今回紹介した Source–Sink–Simple Streaming Engine の設計によって、私たちは以下を構築しました:

  • 統一的なデータ処理インターフェース
  • 拡張可能なストリーム処理フレームワーク
  • 完全な Speed Layer の実装モデル

Day 5 予告:パフォーマンスボトルネックへの挑戦

最初のうちはシステムは順調に動作し、Consumer は注文データを問題なく処理できていました。
しかしトラフィックのピークに直面すると:

  • コンソールに遅延警告が表示され始める
  • Consumer の処理能力が限界に達する
  • 注文がキューに滞留し、処理待ちになる

次回は、こうした Speed Layer におけるパフォーマンス課題 を掘り下げ、高スループット環境で問題を解決するための最適化技術について解説します。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?