0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kafka・Outbox Pattern・CloudEventsで設計するイベント駆動アーキテクチャとEvent Sourcing実践ガイド

0
Last updated at Posted at 2026-05-11

Kafka・Outbox Pattern・CloudEventsで設計するイベント駆動アーキテクチャとEvent Sourcing実践ガイド

マイクロサービスの普及に伴い、サービス間の疎結合なデータ連携を実現するイベント駆動アーキテクチャ(EDA)は、2026年現在の分散システム設計において標準的なアプローチとなっています。さらに、状態変更の履歴をイベントとして永続化するEvent Sourcingを組み合わせることで、監査ログ・時間旅行クエリ・障害からの正確な復元が可能になります。

しかし、EDAとEvent Sourcingを「なんとなく」導入すると、Dual Write問題・スキーマ進化の破綻・プロジェクション再構築の負荷増大といった運用課題に直面します。本記事では、これらの設計パターンの全体像を整理し、Outbox Pattern・CloudEvents・スナップショット戦略など、本番運用で必要な設計判断をPythonのコード例とともに解説します。

関連記事: Event Sourcingの本番運用設計については「EventStoreDB・PostgreSQLで実装するEvent Sourcing本番運用設計ガイド」、CQRSのRust実装については「RustとAxumで実装するCQRSパターン」もあわせてご参照ください。

この記事でわかること

  • イベント駆動アーキテクチャの4つのパターン(Event Notification / Event-Carried State Transfer / Event Sourcing / CQRS)の使い分け基準
  • Dual Write問題を解決するOutbox Patternの設計と、Polling方式・CDC方式の選定判断
  • CloudEvents v1.0を使ったイベントフォーマットの標準化とサービス間の相互運用性確保
  • Event Sourcingのスナップショット戦略・スキーマ進化(Upcasting)の実装パターン
  • Pythonによるイベントストア・Outboxテーブル・Kafkaプロデューサー/コンシューマーの実装例

対象読者

  • 想定読者: マイクロサービスアーキテクチャに取り組む中級者のバックエンドエンジニア
  • 必要な前提知識:
    • Python 3.12+の基本文法とasyncioの基礎
    • SQLの基本操作(トランザクション、JSONBカラム)
    • Apache Kafkaの基本概念(トピック、パーティション、コンシューマーグループ)
    • REST APIやマイクロサービスの基本的な設計経験

結論・成果

本記事で解説するパターンを適用することで、以下の効果が期待できます。

  • データ整合性: Outbox Patternにより、ビジネスデータとイベント発行の原子性を保証し、Dual Write問題によるデータ不整合を排除
  • 読み取り性能: CQRS + プロジェクションにより、読み取り専用モデルを最適化し、従来のCRUDと比較して読み取り操作で最大30%の高速化が報告されている(Event Sourcing and CQRS with Databases
  • 相互運用性: CloudEvents v1.0準拠により、異なる言語・フレームワーク間でのイベントフォーマット統一を実現
  • 運用復元力: Event Sourcingにより、任意の時点へのシステム状態復元と、プロジェクションの再構築が可能

イベント駆動アーキテクチャの4パターンを理解する

イベント駆動アーキテクチャと一口に言っても、実際には目的の異なる4つのパターンがあります。Martin Fowlerの分類に基づき、それぞれの特徴と適用場面を整理しましょう。

Event Notificationパターン

サービスが「何かが起きた」という事実だけを通知し、受信側が独自に判断して処理するパターンです。イベントには最小限の情報(識別子と種別)のみを含めます。

# event_notification.py
from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import uuid4

@dataclass(frozen=True)
class OrderPlacedNotification:
    event_id: str
    order_id: str
    occurred_at: str

    @classmethod
    def create(cls, order_id: str) -> "OrderPlacedNotification":
        return cls(
            event_id=str(uuid4()),
            order_id=order_id,
            occurred_at=datetime.now(timezone.utc).isoformat(),
        )

受信サービスはorder_idを使って注文サービスにAPIコールし、必要な詳細情報を取得します。このパターンはサービス間の結合度が低い反面、受信側から送信側へのコールバックが必要になるため、送信サービスの負荷が増大する点がトレードオフです。

Event-Carried State Transferパターン

イベント自体に十分なデータを含め、受信側が送信サービスへの問い合わせなしに処理できるパターンです。

# event_carried_state_transfer.py
@dataclass(frozen=True)
class OrderPlacedWithDetails:
    event_id: str
    order_id: str
    customer_id: str
    items: list[dict]
    total_amount: float
    shipping_address: dict
    occurred_at: str

受信サービスはイベントだけで処理を完結できるため、サービス間の同期呼び出しが不要になります。ただし、イベントサイズが大きくなり、送信側の内部データ構造が受信側に漏洩するリスクがあります。個人情報を含むデータをイベントに載せる場合は、GDPR等のコンプライアンス要件にも注意が必要です。

Event SourcingとCQRSの関係

Event Sourcingは「現在の状態」ではなく「状態に至るまでのイベント履歴」を永続化するパターンです。CQRSは書き込みモデル(コマンド側)と読み取りモデル(クエリ側)を分離するパターンです。

この2つは独立したパターンですが、実践上はほぼ常にセットで使われます。Event Sourcingでイベントを永続化し、CQRSの読み取り側でプロジェクション(イベントから構築された読み取り専用ビュー)を生成する構成が標準的です。

パターン データの持ち方 適用場面 複雑度
Event Notification 最小限の識別子 疎結合な通知(メール送信トリガー等)
Event-Carried State Transfer 完全なデータコピー コールバック不要な非同期処理
Event Sourcing イベント履歴が正(Source of Truth) 監査ログ・時間旅行・金融取引
CQRS 読み書き分離 読み取り負荷が書き込みの10倍以上

よくある間違い: 「すべてのサービスをEvent Sourcingにする」というアプローチは過剰設計です。Event Sourcingは監査要件のある領域(決済・在庫・医療記録)に限定し、それ以外は従来のCRUDで十分です。Azure Architecture Centerのドキュメントでも「Event Sourcingは全か無かの決定ではなく、最も恩恵を受ける部分に選択的に適用すべき」と記載されています(Event Sourcing pattern - Azure Architecture Center)。

Outbox PatternでDual Write問題を解決する

Event Sourcingやイベント駆動アーキテクチャで最も頻繁に遭遇する問題がDual Write問題です。これは「データベースへの書き込み」と「メッセージブローカーへのイベント発行」を両方確実に成功させることが、分散システムでは本質的に困難であるという問題です。

Dual Write問題の具体例

データベースへの書き込みは成功したが、Kafkaへのイベント発行前にプロセスがクラッシュすると、注文は存在するのに下流サービス(在庫引当・メール送信等)が動かないという不整合が発生します。

Outbox Patternの設計

Outbox Patternは、イベントをKafkaに直接発行するのではなく、ビジネスデータと同じデータベースのOutboxテーブルに同一トランザクションで書き込むことで、この問題を解決します。

# outbox_pattern.py
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from uuid import uuid4

import asyncpg


@dataclass(frozen=True)
class OutboxEntry:
    id: str
    aggregate_type: str
    aggregate_id: str
    event_type: str
    payload: dict
    created_at: str
    published_at: str | None = None


async def place_order_with_outbox(
    conn: asyncpg.Connection,
    order_id: str,
    customer_id: str,
    items: list[dict],
    total_amount: float,
) -> None:
    """注文作成とOutboxイベント書き込みを同一トランザクションで実行"""
    async with conn.transaction():
        # 1. ビジネスデータの書き込み
        await conn.execute(
            """
            INSERT INTO orders (id, customer_id, items, total_amount, status)
            VALUES ($1, $2, $3, $4, 'placed')
            """,
            order_id,
            customer_id,
            json.dumps(items),
            total_amount,
        )

        # 2. 同一トランザクションでOutboxテーブルに書き込み
        event_payload = {
            "order_id": order_id,
            "customer_id": customer_id,
            "items": items,
            "total_amount": total_amount,
        }
        await conn.execute(
            """
            INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload, created_at)
            VALUES ($1, $2, $3, $4, $5, $6)
            """,
            str(uuid4()),
            "Order",
            order_id,
            "OrderPlaced",
            json.dumps(event_payload),
            datetime.now(timezone.utc).isoformat(),
        )

Outboxテーブルのスキーマは以下の通りです。

-- outbox_schema.sql
CREATE TABLE outbox (
    id UUID PRIMARY KEY,
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    published_at TIMESTAMPTZ NULL
);

CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL;

Polling方式 vs CDC方式の選定

Outboxテーブルに書き込まれたイベントをKafkaに転送する方式は、Polling方式CDC(Change Data Capture)方式の2つがあります。

Polling方式の実装例:

# outbox_poller.py
import asyncio
import json

import asyncpg
from aiokafka import AIOKafkaProducer


async def poll_and_publish(pool: asyncpg.Pool, producer: AIOKafkaProducer) -> int:
    """未発行のOutboxイベントを取得してKafkaに発行する"""
    async with pool.acquire() as conn:
        # FOR UPDATE SKIP LOCKEDで複数Pollerの競合を防止
        rows = await conn.fetch(
            """
            SELECT id, aggregate_type, aggregate_id, event_type, payload
            FROM outbox
            WHERE published_at IS NULL
            ORDER BY created_at
            LIMIT 100
            FOR UPDATE SKIP LOCKED
            """,
        )

        for row in rows:
            topic = f"events.{row['aggregate_type'].lower()}"
            await producer.send_and_wait(
                topic,
                key=str(row["aggregate_id"]).encode(),
                value=json.dumps(row["payload"]).encode(),
            )
            await conn.execute(
                "UPDATE outbox SET published_at = now() WHERE id = $1",
                row["id"],
            )

        return len(rows)


async def run_poller(dsn: str, kafka_bootstrap: str) -> None:
    """Pollerのメインループ"""
    pool = await asyncpg.create_pool(dsn)
    producer = AIOKafkaProducer(bootstrap_servers=kafka_bootstrap)
    await producer.start()

    try:
        while True:
            published = await poll_and_publish(pool, producer)
            # イベントがなければ待機間隔を長くする
            wait_seconds = 1.0 if published > 0 else 5.0
            await asyncio.sleep(wait_seconds)
    finally:
        await producer.stop()
        await pool.close()
観点 Polling方式 CDC方式(Debezium)
レイテンシ 1〜5秒(ポーリング間隔依存) 数ミリ秒(WALからリアルタイム)
インフラ複雑度 低(アプリ内で完結) 高(Debezium + Kafka Connect必要)
DB負荷 クエリ負荷あり WAL読み取りのみ
順序保証 ORDER BY created_at で保証 WALの物理順序で保証
運用コスト 中〜高(Connectクラスタの運用)
推奨場面 イベント量が少ない / 小規模チーム 高スループット / レイテンシ要件が厳しい

ハマりポイント: Polling方式でFOR UPDATE SKIP LOCKEDを使わないと、複数のPollerインスタンスが同じイベントを重複発行します。また、Pollerがクラッシュした場合に備え、コンシューマー側のべき等性は必須です。イベントのidフィールドを使った重複排除をコンシューマー側で実装してください。

CloudEvents v1.0でイベントフォーマットを標準化する

マイクロサービスが増えると、チームごとに異なるイベントフォーマットが乱立する問題が発生します。CloudEventsは、CNCF(Cloud Native Computing Foundation)が策定し2024年1月にgraduatedとなったイベントデータの標準仕様です(CloudEvents.io)。

CloudEventsの必須属性

CloudEvents v1.0は以下の必須属性を定義しています。

# cloudevents_envelope.py
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from uuid import uuid4
import json


@dataclass
class CloudEvent:
    """CloudEvents v1.0準拠のイベントエンベロープ"""

    # 必須属性
    specversion: str = "1.0"
    id: str = field(default_factory=lambda: str(uuid4()))
    source: str = ""          # イベント発生元(例: "/orders/service")
    type: str = ""            # イベント種別(例: "com.example.order.placed")

    # オプション属性
    time: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat(),
    )
    datacontenttype: str = "application/json"
    subject: str | None = None  # イベント対象の識別子

    # データ本体
    data: dict = field(default_factory=dict)

    def to_json(self) -> str:
        envelope = {k: v for k, v in asdict(self).items() if v is not None}
        return json.dumps(envelope, ensure_ascii=False)

    @classmethod
    def from_json(cls, raw: str) -> "CloudEvent":
        d = json.loads(raw)
        return cls(**d)


# 使用例
def create_order_placed_event(order_id: str, customer_id: str, total: float) -> CloudEvent:
    return CloudEvent(
        source="/orders/service",
        type="com.example.order.placed.v1",
        subject=order_id,
        data={
            "order_id": order_id,
            "customer_id": customer_id,
            "total_amount": total,
        },
    )

CloudEvents導入の判断基準

CloudEventsは万能ではありません。以下の基準で導入を判断します。

条件 CloudEvents推奨 独自フォーマットで十分
サービス数 5以上 3以下
開発チーム 複数チーム / 多言語 単一チーム / 単一言語
外部連携 あり(AWS EventBridge等) なし
イベントスキーマ管理 Schema Registry使用 アプリ内で完結

制約条件: CloudEventsはエンベロープ(メタデータ)の標準化であり、dataフィールドの中身(ペイロード)のスキーマは規定していません。ペイロードのスキーマ管理には、別途Confluent Schema RegistryやAsyncAPI 3.0の導入が必要です。

Event Sourcingの実装パターンを設計する

ここでは、Event Sourcingの核となるイベントストア・アグリゲート・プロジェクションの実装パターンを解説します。

イベントストアの設計

イベントストアはappend-onlyなストレージで、アグリゲート(ビジネスエンティティ)ごとにイベントストリームを管理します。

-- event_store_schema.sql
CREATE TABLE event_store (
    global_position BIGSERIAL PRIMARY KEY,
    stream_id UUID NOT NULL,
    stream_position INTEGER NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE (stream_id, stream_position)
);

CREATE INDEX idx_event_store_stream ON event_store (stream_id, stream_position);

stream_idはアグリゲートのID、stream_positionはストリーム内の連番です。楽観的排他制御にはUNIQUE (stream_id, stream_position)制約を利用します。

アグリゲートとイベントの実装

# aggregate.py
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from uuid import uuid4
import json

import asyncpg


# --- イベント定義 ---
@dataclass(frozen=True)
class OrderCreated:
    order_id: str
    customer_id: str
    items: list[dict]
    total_amount: float
    event_type: str = "OrderCreated"


@dataclass(frozen=True)
class OrderConfirmed:
    order_id: str
    confirmed_at: str
    event_type: str = "OrderConfirmed"


@dataclass(frozen=True)
class OrderCancelled:
    order_id: str
    reason: str
    cancelled_at: str
    event_type: str = "OrderCancelled"


# --- アグリゲート ---
@dataclass
class OrderAggregate:
    id: str = ""
    customer_id: str = ""
    items: list[dict] = field(default_factory=list)
    total_amount: float = 0.0
    status: str = "draft"
    version: int = 0
    _pending_events: list = field(default_factory=list, repr=False)

    def create(self, customer_id: str, items: list[dict], total_amount: float) -> None:
        if self.status != "draft":
            raise ValueError(f"Cannot create order in status: {self.status}")
        event = OrderCreated(
            order_id=str(uuid4()),
            customer_id=customer_id,
            items=items,
            total_amount=total_amount,
        )
        self._apply(event)
        self._pending_events.append(event)

    def confirm(self) -> None:
        if self.status != "placed":
            raise ValueError(f"Cannot confirm order in status: {self.status}")
        event = OrderConfirmed(
            order_id=self.id,
            confirmed_at=datetime.now(timezone.utc).isoformat(),
        )
        self._apply(event)
        self._pending_events.append(event)

    def cancel(self, reason: str) -> None:
        if self.status in ("cancelled", "draft"):
            raise ValueError(f"Cannot cancel order in status: {self.status}")
        event = OrderCancelled(
            order_id=self.id,
            reason=reason,
            cancelled_at=datetime.now(timezone.utc).isoformat(),
        )
        self._apply(event)
        self._pending_events.append(event)

    def _apply(self, event: Any) -> None:
        """イベントをアグリゲートの状態に反映する"""
        match event:
            case OrderCreated():
                self.id = event.order_id
                self.customer_id = event.customer_id
                self.items = event.items
                self.total_amount = event.total_amount
                self.status = "placed"
            case OrderConfirmed():
                self.status = "confirmed"
            case OrderCancelled():
                self.status = "cancelled"
        self.version += 1

    def collect_pending_events(self) -> list:
        events = self._pending_events.copy()
        self._pending_events.clear()
        return events

ポイントは、アグリゲートの状態変更が常に「イベントの生成→_applyによる状態反映」という2段階で行われることです。_applyメソッドはイベントの再生(リプレイ)時にも使われるため、副作用を持たない純粋な状態遷移でなければなりません。

イベントストアへの永続化

# event_store.py
import json
from dataclasses import asdict
from typing import Any

import asyncpg


EVENT_TYPE_MAP: dict[str, type] = {
    "OrderCreated": OrderCreated,
    "OrderConfirmed": OrderConfirmed,
    "OrderCancelled": OrderCancelled,
}


async def save_events(
    conn: asyncpg.Connection,
    stream_id: str,
    events: list[Any],
    expected_version: int,
) -> None:
    """楽観的排他制御付きでイベントを保存する"""
    async with conn.transaction():
        for i, event in enumerate(events):
            position = expected_version + i + 1
            try:
                await conn.execute(
                    """
                    INSERT INTO event_store (stream_id, stream_position, event_type, data, metadata)
                    VALUES ($1, $2, $3, $4, $5)
                    """,
                    stream_id,
                    position,
                    event.event_type,
                    json.dumps(asdict(event), ensure_ascii=False),
                    json.dumps({"timestamp": event.__dict__.get("confirmed_at", "")}),
                )
            except asyncpg.UniqueViolationError:
                raise ConcurrencyError(
                    f"Stream {stream_id}: expected version {expected_version}, "
                    f"but position {position} already exists"
                )


async def load_aggregate(conn: asyncpg.Connection, stream_id: str) -> OrderAggregate:
    """イベントをリプレイしてアグリゲートを復元する"""
    rows = await conn.fetch(
        """
        SELECT event_type, data FROM event_store
        WHERE stream_id = $1
        ORDER BY stream_position
        """,
        stream_id,
    )

    aggregate = OrderAggregate()
    for row in rows:
        event_cls = EVENT_TYPE_MAP[row["event_type"]]
        data = json.loads(row["data"])
        data.pop("event_type", None)
        event = event_cls(**data)
        aggregate._apply(event)
    return aggregate


class ConcurrencyError(Exception):
    pass

注意点: UniqueViolationErrorによる楽観的排他制御は、同じアグリゲートに対する同時書き込みを検出する仕組みです。2つのプロセスが同じstream_positionに書き込もうとすると、後から書き込んだ方がConcurrencyErrorを受け取ります。この場合、アグリゲートを再読み込みしてリトライする必要があります。

スナップショット戦略とスキーマ進化を設計する

Event Sourcingの運用で最も課題となるのが、イベント数の増加によるリプレイ時間の増大と、ビジネス要件変更に伴うイベントスキーマの進化です。

スナップショット戦略

スナップショットは、特定時点でのアグリゲートの状態を保存し、その時点以降のイベントだけをリプレイすることで復元時間を短縮する最適化手法です。

# snapshot.py
import json
from dataclasses import asdict

import asyncpg


async def save_snapshot(
    conn: asyncpg.Connection,
    stream_id: str,
    aggregate: OrderAggregate,
) -> None:
    """アグリゲートのスナップショットを保存する"""
    state = {
        "id": aggregate.id,
        "customer_id": aggregate.customer_id,
        "items": aggregate.items,
        "total_amount": aggregate.total_amount,
        "status": aggregate.status,
    }
    await conn.execute(
        """
        INSERT INTO snapshots (stream_id, version, state)
        VALUES ($1, $2, $3)
        ON CONFLICT (stream_id) DO UPDATE
        SET version = $2, state = $3, updated_at = now()
        """,
        stream_id,
        aggregate.version,
        json.dumps(state, ensure_ascii=False),
    )


async def load_aggregate_with_snapshot(
    conn: asyncpg.Connection,
    stream_id: str,
) -> OrderAggregate:
    """スナップショットがあればそこから、なければ全イベントからリプレイ"""
    snapshot = await conn.fetchrow(
        "SELECT version, state FROM snapshots WHERE stream_id = $1",
        stream_id,
    )

    aggregate = OrderAggregate()
    start_position = 0

    if snapshot:
        state = json.loads(snapshot["state"])
        aggregate.id = state["id"]
        aggregate.customer_id = state["customer_id"]
        aggregate.items = state["items"]
        aggregate.total_amount = state["total_amount"]
        aggregate.status = state["status"]
        aggregate.version = snapshot["version"]
        start_position = snapshot["version"]

    # スナップショット以降のイベントをリプレイ
    rows = await conn.fetch(
        """
        SELECT event_type, data FROM event_store
        WHERE stream_id = $1 AND stream_position > $2
        ORDER BY stream_position
        """,
        stream_id,
        start_position,
    )
    for row in rows:
        event_cls = EVENT_TYPE_MAP[row["event_type"]]
        data = json.loads(row["data"])
        data.pop("event_type", None)
        aggregate._apply(event_cls(**data))

    return aggregate

スナップショットの作成タイミングは以下の基準で判断します。

戦略 実装 適用場面
N件ごと if aggregate.version % 100 == 0 イベントが均等に発生するアグリゲート
時間ベース 最終スナップショットから1時間以上経過 低頻度だがリプレイが重いアグリゲート
閾値ベース リプレイ時間が100msを超えたら作成 性能SLAが厳密な場面

トレードオフ: スナップショットは書き込み時のオーバーヘッド(スナップショット保存処理)と引き換えに、読み取り時のリプレイ時間を短縮します。イベント数が1,000件未満のアグリゲートではスナップショットは不要です。不要な最適化はコードの複雑性を増すだけです。

スキーマ進化(Upcasting)の実装

ビジネス要件の変化でイベントのスキーマを変更する必要が生じた場合、イベントストアのデータは不変(immutable)であるため、保存済みイベントを直接書き換えてはいけません。代わりに、デシリアライズ時に古いスキーマを新しいスキーマに変換するUpcastingパターンを使います。

# upcasting.py
from typing import Callable

# Upcaster: 古いイベントデータ(dict) → 新しいイベントデータ(dict)
Upcaster = Callable[[dict], dict]

# Upcasterレジストリ
_upcasters: dict[tuple[str, int], Upcaster] = {}


def register_upcaster(event_type: str, from_version: int, upcaster: Upcaster) -> None:
    _upcasters[(event_type, from_version)] = upcaster


def upcast(event_type: str, data: dict) -> dict:
    """イベントデータを最新バージョンにアップキャストする"""
    version = data.get("schema_version", 1)
    while (event_type, version) in _upcasters:
        data = _upcasters[(event_type, version)](data)
        version += 1
        data["schema_version"] = version
    return data


# --- 使用例 ---
# v1 → v2: shipping_address フィールドを追加(デフォルト値で補完)
def upcast_order_created_v1_to_v2(data: dict) -> dict:
    data["shipping_address"] = data.get("shipping_address", {"country": "JP", "detail": ""})
    return data


# v2 → v3: items配列の各要素にtax_rateを追加
def upcast_order_created_v2_to_v3(data: dict) -> dict:
    for item in data.get("items", []):
        if "tax_rate" not in item:
            item["tax_rate"] = 0.10  # デフォルト10%
    return data


register_upcaster("OrderCreated", 1, upcast_order_created_v1_to_v2)
register_upcaster("OrderCreated", 2, upcast_order_created_v2_to_v3)

Upcasterはチェーン可能で、v1→v2→v3と順次変換されます。アプリケーションコードは常に最新バージョンのスキーマだけを扱えばよく、古いバージョンの知識はUpcasterに閉じ込められます。

制約条件: Upcastingはデシリアライズ時に毎回変換が走るため、イベント数が多いストリームではパフォーマンスに影響します。変換コストが無視できない場合は、スナップショットと組み合わせて変換回数を削減してください。また、Upcasterには破壊的変更(フィールドの削除やセマンティクスの変更)を含めるべきではありません。そのような変更が必要な場合は、新しいイベント型(例: OrderCreatedV2)を定義し、補償イベントで対応します。

よくある問題と解決方法

問題 原因 解決方法
イベントが重複発行される Pollerのクラッシュ後にリトライで同じイベントを再送 コンシューマー側でイベントIDによるべき等性チェックを実装
プロジェクション再構築が遅い 全イベントの再リプレイに時間がかかる パーティション単位の並列再構築 + チェックポイント保存
アグリゲートの読み込みが遅い イベント数が多すぎてリプレイに時間がかかる スナップショット戦略の導入(N件ごと or 閾値ベース)
スキーマ変更でデシリアライズ失敗 イベントストアに古いスキーマのイベントが残っている Upcastingパターンの導入。古いスキーマ→新しいスキーマの変換関数を登録
Kafkaパーティション間の順序不整合 異なるアグリゲートのイベントが異なるパーティションに分散 アグリゲートIDをKafkaのメッセージキーに使い、同一アグリゲートのイベントを同一パーティションに集約
Outboxテーブルの肥大化 発行済みイベントが削除されない 発行済みイベントの定期アーカイブ/削除バッチを実装(7日以上前の発行済みレコードを削除)

まとめと次のステップ

まとめ:

  • イベント駆動アーキテクチャには4つのパターンがあり、要件に応じた使い分けが重要。Event Sourcingは監査・追跡性が求められる領域に限定して適用する
  • Dual Write問題はOutbox Patternで解決する。小規模ならPolling方式、高スループット要件ならDebeziumによるCDC方式を選択する
  • CloudEvents v1.0はイベントフォーマットの標準仕様として定着しており、マルチチーム・マルチ言語環境では導入を検討すべき
  • スナップショットはイベント数1,000件以上のホットアグリゲートにのみ適用する。不要な最適化は避ける
  • スキーマ進化はUpcastingパターンで対応し、保存済みイベントを直接書き換えない

次にやるべきこと:

  • 自プロジェクトのドメインで、Event Sourcingが必要な領域(監査・時間旅行・正確な復元が必要な箇所)とCRUDで十分な領域を洗い出す
  • Outbox Patternのプロトタイプを構築し、Polling方式で動作検証した後、スループット要件に応じてCDC方式への移行を検討する
  • 既存のイベント駆動システムがある場合は、CloudEvents v1.0準拠のエンベロープへの段階的な移行計画を策定する

参考


注意: この記事はAI(Claude Code)により自動生成されました。内容の正確性については複数の情報源で検証していますが、実際の利用時は公式ドキュメントもご確認ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?