0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RustとAxumで実装するCQRSパターン:マイクロサービスのデータ整合性を最適化する実践ガイド

0
Last updated at Posted at 2026-03-28

RustとAxumで実装するCQRSパターン:マイクロサービスのデータ整合性を最適化する実践ガイド

この記事でわかること

  • CQRS(Command Query Responsibility Segregation)パターンの基本概念と、マイクロサービスにおけるデータ整合性の課題
  • Rust の型システムを活用して、無効な状態遷移をコンパイル時に検出する CQRS 実装の設計手法
  • Axum 0.8 + cqrs-es クレートを用いた、コマンド/クエリ分離型 API の具体的な実装方法
  • PostgreSQL をイベントストアとして利用する永続化戦略と、読み取りモデルの最適化手法
  • 結果整合性(Eventual Consistency)のトレードオフを理解し、Saga パターンで分散トランザクションを管理する方法

対象読者

  • 想定読者: Rust で Web API を構築した経験があり、マイクロサービスアーキテクチャに関心のある中級者
  • 必要な前提知識:
    • Rust の基礎文法(traitenumasync/await
    • Web API の基本概念(REST、HTTP メソッド)
    • データベースの基本操作(SQL、テーブル設計)
    • AI/ML エンジニアの方へ: CQRS は ML パイプラインにおける学習(Write)と推論(Read)の分離と類似した概念です

結論・成果

CQRS パターンを Rust と Axum で実装することで、以下の効果が報告されています。

  • 読み取りスループット: 非正規化ビューの導入により、複雑な JOIN クエリと比較して読み取りレスポンスタイムが 5〜10 倍改善される事例が Microsoft の CQRS パターンガイド で紹介されています
  • 書き込みのデータ整合性: コマンドモデルでのトランザクション管理により、同時書き込み時の競合を排除
  • 独立スケーリング: 読み取り専用インスタンスの水平スケーリングにより、読み取り負荷が 10 倍増加しても書き込み性能への影響なし
  • 型安全性: Rust のコンパイル時チェックにより、不正なコマンド/イベント遷移がデプロイ前に検出可能

ただし、CQRS は CRUD 主体の単純なアプリケーションには過剰な設計です。読み書きの負荷比率が大きく異なるシステムや、複数のマイクロサービスをまたぐ複雑なクエリが必要な場合に適用を検討してください。

CQRSパターンの基礎を理解する

CQRS は「Command Query Responsibility Segregation」の略で、日本語では「コマンドクエリ責務分離」と訳されます。アプリケーションの書き込み操作(Command)と読み取り操作(Query)を別々のモデルに分離するアーキテクチャパターンです。

ML エンジニアの方には、学習パイプライン(大量のデータを書き込み、モデルを更新する処理)と推論パイプライン(学習済みモデルから高速に結果を返す処理)を分離するアーキテクチャに似ている、と考えるとイメージしやすいでしょう。

なぜマイクロサービスでCQRSが必要になるのか

マイクロサービスアーキテクチャでは、Database per Service パターンにより各サービスが独自のデータベースを持ちます。この設計では、複数サービスにまたがるデータを取得する際に従来の SQL JOIN が使えません。

microservices.io の CQRS パターン解説によると、CQRS はこの課題を解決するために、読み取り専用の非正規化ビューを構築します。書き込み側が発行するドメインイベントを購読し、クエリに最適化されたスキーマでデータを保持します。

CQRSのメリットとデメリット

実際に CQRS の採用を検討する際には、メリットとデメリットを正確に把握する必要があります。

観点 メリット デメリット
スケーラビリティ 読み書きを独立スケーリング可能 インフラ構成が複雑化
パフォーマンス 読み取りモデルを非正規化で最適化 書き込み→読み取りの同期ラグ
データモデル 用途別に最適なスキーマを選択 モデルの二重管理コスト
整合性 書き込み側でトランザクション保証 結果整合性の許容が必要
保守性 関心の分離によるコード可読性向上 コードベース全体の規模増大

注意: Microsoft の CQRS パターンガイドでは、「CQRSはシステムの特定の限定的な部分(Bounded Context)にのみ適用すべきであり、システム全体に適用すべきではない」と述べられています。単純な CRUD で十分な領域にまで CQRS を導入すると、不必要な複雑性を生みます。

Rustの型システムを活かしたCQRS設計を実装する

Rust の強力な型システムは、CQRS パターンの実装に大きな利点をもたらします。enum による網羅的パターンマッチ、trait による振る舞いの抽象化、そしてコンパイル時の型チェックにより、不正な状態遷移をデプロイ前に検出できます。

ドメインモデルの定義

まず、注文管理ドメインを例に、コマンド・イベント・エラーを型として定義してみましょう。cqrs-es クレート(公式ドキュメント)のパターンに沿って実装します。

// domain/commands.rs
// コマンド: 「何をしたいか」を表現する型
// Pythonの dataclass に相当する構造体定義
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize)]
pub enum OrderCommand {
    CreateOrder {
        customer_id: String,
        items: Vec<OrderItem>,
    },
    AddItem {
        item: OrderItem,
    },
    ConfirmOrder,
    CancelOrder {
        reason: String,
    },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderItem {
    pub product_id: String,
    pub name: String,
    pub quantity: u32,
    pub unit_price: u64, // 金額は整数で管理(浮動小数点の丸め誤差回避)
}
// domain/events.rs
// イベント: 「何が起きたか」を表現する型
// コマンドが成功した結果として発行される
use serde::{Deserialize, Serialize};
use cqrs_es::DomainEvent;
use crate::domain::commands::OrderItem;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum OrderEvent {
    OrderCreated {
        customer_id: String,
        items: Vec<OrderItem>,
    },
    ItemAdded {
        item: OrderItem,
    },
    OrderConfirmed {
        confirmed_at: String,
    },
    OrderCancelled {
        reason: String,
        cancelled_at: String,
    },
}

impl DomainEvent for OrderEvent {
    fn event_type(&self) -> String {
        match self {
            OrderEvent::OrderCreated { .. } => "OrderCreated".to_string(),
            OrderEvent::ItemAdded { .. } => "ItemAdded".to_string(),
            OrderEvent::OrderConfirmed { .. } => "OrderConfirmed".to_string(),
            OrderEvent::OrderCancelled { .. } => "OrderCancelled".to_string(),
        }
    }

    fn event_version(&self) -> String {
        "1.0".to_string()
    }
}

なぜこの設計を選んだか:

  • enum でコマンドとイベントを定義することで、match 式での網羅性チェックが有効になります。新しいコマンドやイベントを追加した際、対応するハンドラの実装漏れがコンパイルエラーとして検出されます
  • 金額を u64(整数)で管理している理由は、f64 の浮動小数点演算では 0.1 + 0.2 ≠ 0.3 のような丸め誤差が発生するためです

Aggregate(集約)の実装

Aggregate はドメインロジックの中核です。コマンドを受け取り、ビジネスルールを検証し、イベントを発行します。

// domain/aggregate.rs
use async_trait::async_trait;
use cqrs_es::Aggregate;
use crate::domain::commands::{OrderCommand, OrderItem};
use crate::domain::events::OrderEvent;

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Order {
    customer_id: String,
    items: Vec<OrderItem>,
    status: OrderStatus,
    total_amount: u64,
}

// 状態遷移を型で表現
// Pythonの Enum に相当
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
pub enum OrderStatus {
    #[default]
    Draft,
    Confirmed,
    Cancelled,
}

#[derive(Debug, thiserror::Error)]
pub enum OrderError {
    #[error("注文が空です。1つ以上の商品を追加してください")]
    EmptyOrder,
    #[error("確定済みの注文は変更できません")]
    AlreadyConfirmed,
    #[error("キャンセル済みの注文は操作できません")]
    AlreadyCancelled,
    #[error("合計金額が上限(100万円)を超えています: {0}円")]
    AmountExceeded(u64),
}

#[async_trait]
impl Aggregate for Order {
    type Command = OrderCommand;
    type Event = OrderEvent;
    type Error = OrderError;
    type Services = (); // 外部サービス依存なし

    fn aggregate_type() -> String {
        "Order".to_string()
    }

    // コマンドを検証し、イベントを生成する
    // ここがビジネスルールの実装箇所
    async fn handle(
        &self,
        command: Self::Command,
        _services: &Self::Services,
    ) -> Result<Vec<Self::Event>, Self::Error> {
        match command {
            OrderCommand::CreateOrder { customer_id, items } => {
                if items.is_empty() {
                    return Err(OrderError::EmptyOrder);
                }
                Ok(vec![OrderEvent::OrderCreated { customer_id, items }])
            }
            OrderCommand::AddItem { item } => {
                // 状態チェック: 確定済み・キャンセル済みなら拒否
                match self.status {
                    OrderStatus::Confirmed => return Err(OrderError::AlreadyConfirmed),
                    OrderStatus::Cancelled => return Err(OrderError::AlreadyCancelled),
                    OrderStatus::Draft => {}
                }
                let new_total = self.total_amount
                    + (item.unit_price * item.quantity as u64);
                if new_total > 1_000_000 {
                    return Err(OrderError::AmountExceeded(new_total));
                }
                Ok(vec![OrderEvent::ItemAdded { item }])
            }
            OrderCommand::ConfirmOrder => {
                match self.status {
                    OrderStatus::Confirmed => return Err(OrderError::AlreadyConfirmed),
                    OrderStatus::Cancelled => return Err(OrderError::AlreadyCancelled),
                    OrderStatus::Draft => {}
                }
                if self.items.is_empty() {
                    return Err(OrderError::EmptyOrder);
                }
                Ok(vec![OrderEvent::OrderConfirmed {
                    confirmed_at: chrono::Utc::now().to_rfc3339(),
                }])
            }
            OrderCommand::CancelOrder { reason } => {
                if self.status == OrderStatus::Cancelled {
                    return Err(OrderError::AlreadyCancelled);
                }
                Ok(vec![OrderEvent::OrderCancelled {
                    reason,
                    cancelled_at: chrono::Utc::now().to_rfc3339(),
                }])
            }
        }
    }

    // イベントを適用して状態を更新する
    // イベントソーシングの核心: 過去のイベントを再生して現在の状態を復元
    fn apply(&mut self, event: Self::Event) {
        match event {
            OrderEvent::OrderCreated { customer_id, items } => {
                self.customer_id = customer_id;
                self.total_amount = items
                    .iter()
                    .map(|i| i.unit_price * i.quantity as u64)
                    .sum();
                self.items = items;
                self.status = OrderStatus::Draft;
            }
            OrderEvent::ItemAdded { item } => {
                self.total_amount += item.unit_price * item.quantity as u64;
                self.items.push(item);
            }
            OrderEvent::OrderConfirmed { .. } => {
                self.status = OrderStatus::Confirmed;
            }
            OrderEvent::OrderCancelled { .. } => {
                self.status = OrderStatus::Cancelled;
            }
        }
    }
}

ポイント: handle メソッドはコマンドの検証とイベント生成のみを行い、状態の変更は apply メソッドで行います。この分離により、テスト時にはイベントの発行結果だけを検証すれば十分です。

ドメインロジックのテスト

cqrs-es フレームワークの大きな利点は、データベースなしでドメインロジックをテストできることです。公式ドキュメントのテストガイドに沿って、Given-When-Then パターンでテストを記述します。

// domain/tests.rs
#[cfg(test)]
mod tests {
    use cqrs_es::test::TestFramework;
    use crate::domain::aggregate::Order;
    use crate::domain::commands::*;
    use crate::domain::events::*;

    type OrderTestFramework = TestFramework<Order>;

    #[test]
    fn test_create_order_success() {
        let items = vec![OrderItem {
            product_id: "PROD-001".to_string(),
            name: "Rustプログラミング入門".to_string(),
            quantity: 1,
            unit_price: 3000,
        }];

        OrderTestFramework::with(())
            // Given: 初期状態(イベントなし)
            .given_no_previous_events()
            // When: 注文作成コマンド
            .when(OrderCommand::CreateOrder {
                customer_id: "CUST-001".to_string(),
                items: items.clone(),
            })
            // Then: OrderCreated イベントが発行される
            .then_expect_events(vec![OrderEvent::OrderCreated {
                customer_id: "CUST-001".to_string(),
                items,
            }]);
    }

    #[test]
    fn test_empty_order_rejected() {
        OrderTestFramework::with(())
            .given_no_previous_events()
            .when(OrderCommand::CreateOrder {
                customer_id: "CUST-001".to_string(),
                items: vec![], // 空の注文
            })
            // Then: エラーが返る
            .then_expect_error_message(
                "注文が空です。1つ以上の商品を追加してください"
            );
    }

    #[test]
    fn test_cannot_add_item_after_confirmation() {
        let items = vec![OrderItem {
            product_id: "PROD-001".to_string(),
            name: "Rustプログラミング入門".to_string(),
            quantity: 1,
            unit_price: 3000,
        }];

        OrderTestFramework::with(())
            // Given: 注文が作成済み&確定済み
            .given(vec![
                OrderEvent::OrderCreated {
                    customer_id: "CUST-001".to_string(),
                    items,
                },
                OrderEvent::OrderConfirmed {
                    confirmed_at: "2026-03-20T00:00:00Z".to_string(),
                },
            ])
            // When: 商品追加を試みる
            .when(OrderCommand::AddItem {
                item: OrderItem {
                    product_id: "PROD-002".to_string(),
                    name: "追加商品".to_string(),
                    quantity: 1,
                    unit_price: 2000,
                },
            })
            // Then: 確定済みエラー
            .then_expect_error_message(
                "確定済みの注文は変更できません"
            );
    }
}

なぜ Given-When-Then パターンを選んだか:

  • Aggregate のテストはデータベース不要で実行可能です。ML エンジニアの方には、ユニットテストでモデルの入出力を検証するのと同じ感覚と捉えてください
  • then_expect_events発行されるイベントの内容を検証するため、内部実装の変更に対して脆くないテストが書けます

Axum APIサーバーでCQRSを統合する

ドメインロジックが完成したら、Axum の HTTP ハンドラと統合します。Axum 0.8 の Extractor パターンを活用して、コマンド API とクエリ API を明確に分離しましょう。

プロジェクト構成

order-service/
├── Cargo.toml
├── src/
│   ├── main.rs              # エントリポイント
│   ├── domain/
│   │   ├── mod.rs
│   │   ├── aggregate.rs     # Order Aggregate
│   │   ├── commands.rs       # コマンド定義
│   │   ├── events.rs         # イベント定義
│   │   └── tests.rs          # ドメインテスト
│   ├── api/
│   │   ├── mod.rs
│   │   ├── command_handlers.rs  # 書き込みAPI
│   │   └── query_handlers.rs    # 読み取りAPI
│   ├── infrastructure/
│   │   ├── mod.rs
│   │   ├── event_store.rs    # PostgreSQL イベントストア
│   │   └── read_model.rs     # 読み取りモデル
│   └── config.rs             # 設定管理

コマンドAPI(書き込み側)の実装

// api/command_handlers.rs
use axum::{
    extract::{Path, State},
    http::StatusCode,
    Json,
    response::IntoResponse,
};
use cqrs_es::persist::PersistedEventStore;
use cqrs_es::CqrsFramework;
use crate::domain::aggregate::Order;
use crate::domain::commands::{OrderCommand, OrderItem};

// アプリケーション状態の共有
// Python の FastAPI における Depends() に相当
pub type OrderCqrs = CqrsFramework<Order, PersistedEventStore<Order>>;

#[derive(Clone)]
pub struct AppState {
    pub order_cqrs: std::sync::Arc<OrderCqrs>,
}

// 注文作成ハンドラ
// POST /orders
pub async fn create_order(
    State(state): State<AppState>,
    Json(payload): Json<CreateOrderRequest>,
) -> impl IntoResponse {
    let aggregate_id = uuid::Uuid::new_v4().to_string();
    let command = OrderCommand::CreateOrder {
        customer_id: payload.customer_id,
        items: payload.items,
    };

    match state
        .order_cqrs
        .execute(&aggregate_id, command)
        .await
    {
        Ok(_) => (
            StatusCode::CREATED,
            Json(serde_json::json!({
                "order_id": aggregate_id,
                "status": "created"
            })),
        ).into_response(),
        Err(e) => (
            StatusCode::BAD_REQUEST,
            Json(serde_json::json!({
                "error": e.to_string()
            })),
        ).into_response(),
    }
}

// 注文確定ハンドラ
// PUT /orders/:id/confirm
pub async fn confirm_order(
    State(state): State<AppState>,
    Path(order_id): Path<String>,
) -> impl IntoResponse {
    match state
        .order_cqrs
        .execute(&order_id, OrderCommand::ConfirmOrder)
        .await
    {
        Ok(_) => (StatusCode::OK, Json(serde_json::json!({
            "order_id": order_id,
            "status": "confirmed"
        }))).into_response(),
        Err(e) => (StatusCode::BAD_REQUEST, Json(serde_json::json!({
            "error": e.to_string()
        }))).into_response(),
    }
}

#[derive(Debug, serde::Deserialize)]
pub struct CreateOrderRequest {
    pub customer_id: String,
    pub items: Vec<OrderItem>,
}

クエリAPI(読み取り側)の実装

クエリ側は、イベントストアではなく非正規化された読み取りモデルにアクセスします。これにより、複雑な集約処理なしに高速なレスポンスを実現できます。

// api/query_handlers.rs
use axum::{
    extract::{Path, Query, State},
    http::StatusCode,
    Json,
    response::IntoResponse,
};
use sqlx::PgPool;

#[derive(Clone)]
pub struct QueryState {
    pub read_db: PgPool,
}

// 注文詳細取得
// GET /orders/:id
pub async fn get_order(
    State(state): State<QueryState>,
    Path(order_id): Path<String>,
) -> impl IntoResponse {
    // 読み取りモデル(非正規化ビュー)から直接取得
    // JOIN 不要で高速
    let result = sqlx::query_as!(
        OrderView,
        r#"
        SELECT
            order_id,
            customer_id,
            status,
            total_amount,
            item_count,
            created_at,
            updated_at
        FROM order_summary_view
        WHERE order_id = $1
        "#,
        order_id
    )
    .fetch_optional(&state.read_db)
    .await;

    match result {
        Ok(Some(order)) => (StatusCode::OK, Json(order)).into_response(),
        Ok(None) => StatusCode::NOT_FOUND.into_response(),
        Err(e) => (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(serde_json::json!({"error": e.to_string()})),
        ).into_response(),
    }
}

// 注文一覧取得(ページネーション付き)
// GET /orders?customer_id=xxx&page=1&per_page=20
pub async fn list_orders(
    State(state): State<QueryState>,
    Query(params): Query<ListOrdersParams>,
) -> impl IntoResponse {
    let per_page = params.per_page.unwrap_or(20).min(100);
    let offset = (params.page.unwrap_or(1) - 1) * per_page;

    let orders = sqlx::query_as!(
        OrderView,
        r#"
        SELECT
            order_id,
            customer_id,
            status,
            total_amount,
            item_count,
            created_at,
            updated_at
        FROM order_summary_view
        WHERE ($1::text IS NULL OR customer_id = $1)
        ORDER BY created_at DESC
        LIMIT $2 OFFSET $3
        "#,
        params.customer_id,
        per_page as i64,
        offset as i64
    )
    .fetch_all(&state.read_db)
    .await;

    match orders {
        Ok(list) => (StatusCode::OK, Json(list)).into_response(),
        Err(e) => (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(serde_json::json!({"error": e.to_string()})),
        ).into_response(),
    }
}

#[derive(Debug, serde::Serialize, sqlx::FromRow)]
pub struct OrderView {
    pub order_id: String,
    pub customer_id: String,
    pub status: String,
    pub total_amount: i64,
    pub item_count: i32,
    pub created_at: chrono::DateTime<chrono::Utc>,
    pub updated_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, serde::Deserialize)]
pub struct ListOrdersParams {
    pub customer_id: Option<String>,
    pub page: Option<i64>,
    pub per_page: Option<i64>,
}

ルーティング設定

// main.rs
use axum::{routing::{get, post, put}, Router};

#[tokio::main]
async fn main() {
    // 環境変数から設定読み込み
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");

    // PostgreSQL 接続プール
    let pool = sqlx::PgPool::connect(&database_url)
        .await
        .expect("Failed to connect to database");

    // イベントストア設定(postgres-es クレート使用)
    let event_store = postgres_es::default_postgress_pool(&database_url)
        .await;

    // CQRS フレームワーク初期化
    let order_cqrs = cqrs_es::CqrsFramework::new(
        event_store,
        vec![], // クエリプロセッサ(後述)
        (),
    );

    let cmd_state = command_handlers::AppState {
        order_cqrs: std::sync::Arc::new(order_cqrs),
    };
    let query_state = query_handlers::QueryState {
        read_db: pool,
    };

    // コマンドAPIとクエリAPIを分離したルーティング
    let app = Router::new()
        // Command API(書き込み)
        .route("/orders", post(command_handlers::create_order))
        .route("/orders/{id}/confirm",
            put(command_handlers::confirm_order))
        .with_state(cmd_state)
        // Query API(読み取り)
        .route("/orders", get(query_handlers::list_orders))
        .route("/orders/{id}", get(query_handlers::get_order))
        .with_state(query_state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
        .await
        .unwrap();

    println!("Server running on http://0.0.0.0:3000");
    axum::serve(listener, app).await.unwrap();
}

ハマりポイント:

Axum 0.8 では with_state を呼ぶと Router の型が変わるため、異なる State 型を持つルートを1つの Router にマージする際は Router::mergeRouter::nest を活用してください。上記のコードは概念を示す疑似コードであり、実際の実装では State の共有方法を検討する必要があります。

PostgreSQLでイベントストアと読み取りモデルを構築する

永続化層は CQRS の実用性を左右する重要な要素です。postgres-es クレート(serverlesstechnology/cqrs に含まれるサブクレート)を使うと、PostgreSQL をイベントストアとして利用できます。

イベントストアのスキーマ

cqrs-es フレームワークが使用するイベントストアのテーブル構造は以下のとおりです。

-- イベントストアテーブル(cqrs-es が自動管理)
CREATE TABLE events (
    aggregate_type TEXT NOT NULL,
    aggregate_id TEXT NOT NULL,
    sequence BIGINT NOT NULL,
    event_type TEXT NOT NULL,
    event_version TEXT NOT NULL,
    payload JSONB NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    PRIMARY KEY (aggregate_type, aggregate_id, sequence)
);

-- スナップショットテーブル(大量イベントの再生を高速化)
CREATE TABLE snapshots (
    aggregate_type TEXT NOT NULL,
    aggregate_id TEXT NOT NULL,
    last_sequence BIGINT NOT NULL,
    current_snapshot JSONB NOT NULL,
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    PRIMARY KEY (aggregate_type, aggregate_id)
);

読み取りモデル(Projection)の実装

読み取りモデルは、イベントストアのイベントを購読し、クエリに最適化されたテーブルを更新します。

-- 読み取りモデル用テーブル
-- 非正規化されたビュー: JOIN 不要で高速
CREATE TABLE order_summary_view (
    order_id TEXT PRIMARY KEY,
    customer_id TEXT NOT NULL,
    status TEXT NOT NULL DEFAULT 'Draft',
    total_amount BIGINT NOT NULL DEFAULT 0,
    item_count INTEGER NOT NULL DEFAULT 0,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 顧客別の注文一覧を高速取得するためのインデックス
CREATE INDEX idx_order_summary_customer
    ON order_summary_view (customer_id, created_at DESC);

-- ステータス別フィルタリング用
CREATE INDEX idx_order_summary_status
    ON order_summary_view (status);
// infrastructure/read_model.rs
use cqrs_es::persist::GenericQuery;
use cqrs_es::{EventEnvelope, Query};
use async_trait::async_trait;
use sqlx::PgPool;
use crate::domain::aggregate::Order;
use crate::domain::events::OrderEvent;

pub struct OrderSummaryQuery {
    pool: PgPool,
}

impl OrderSummaryQuery {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
}

#[async_trait]
impl Query<Order> for OrderSummaryQuery {
    async fn dispatch(
        &self,
        aggregate_id: &str,
        events: &[EventEnvelope<Order>],
    ) {
        for event in events {
            match &event.payload {
                OrderEvent::OrderCreated { customer_id, items } => {
                    let total: i64 = items
                        .iter()
                        .map(|i| (i.unit_price * i.quantity as u64) as i64)
                        .sum();
                    let item_count = items.len() as i32;

                    sqlx::query!(
                        r#"
                        INSERT INTO order_summary_view
                            (order_id, customer_id, status,
                             total_amount, item_count)
                        VALUES ($1, $2, 'Draft', $3, $4)
                        ON CONFLICT (order_id) DO UPDATE SET
                            customer_id = $2,
                            total_amount = $3,
                            item_count = $4,
                            updated_at = NOW()
                        "#,
                        aggregate_id,
                        customer_id,
                        total,
                        item_count
                    )
                    .execute(&self.pool)
                    .await
                    .expect("Failed to upsert order summary");
                }
                OrderEvent::ItemAdded { item } => {
                    let amount = (item.unit_price * item.quantity as u64) as i64;
                    sqlx::query!(
                        r#"
                        UPDATE order_summary_view
                        SET total_amount = total_amount + $2,
                            item_count = item_count + 1,
                            updated_at = NOW()
                        WHERE order_id = $1
                        "#,
                        aggregate_id,
                        amount
                    )
                    .execute(&self.pool)
                    .await
                    .expect("Failed to update order summary");
                }
                OrderEvent::OrderConfirmed { .. } => {
                    sqlx::query!(
                        r#"
                        UPDATE order_summary_view
                        SET status = 'Confirmed', updated_at = NOW()
                        WHERE order_id = $1
                        "#,
                        aggregate_id
                    )
                    .execute(&self.pool)
                    .await
                    .expect("Failed to confirm order");
                }
                OrderEvent::OrderCancelled { .. } => {
                    sqlx::query!(
                        r#"
                        UPDATE order_summary_view
                        SET status = 'Cancelled', updated_at = NOW()
                        WHERE order_id = $1
                        "#,
                        aggregate_id
                    )
                    .execute(&self.pool)
                    .await
                    .expect("Failed to cancel order");
                }
            }
        }
    }
}

よくある間違い:

最初は「イベントストアから直接クエリすればよいのでは」と考えがちですが、イベントストアのデータはイベントの時系列であり、「現在の注文の合計金額」を取得するには全イベントを再生する必要があります。読み取りモデルを別途用意することで、1 回の SELECT で結果を取得できます。

スナップショット戦略

イベント数が増大すると、Aggregate の復元(イベント再生)に時間がかかります。スナップショットを活用して、N 件ごとに現在の状態を保存しましょう。

// スナップショット設定の概念
// 100イベントごとにスナップショットを作成
// これにより、Aggregate復元時に最大100件のイベント再生で済む
let snapshot_config = SnapshotConfig {
    snapshot_every: 100,
};

制約条件: スナップショットはイベントの再生を高速化しますが、スナップショット自体の保存にも I/O コストが発生します。1 Aggregate あたりのイベント数が 50 件未満の場合、スナップショットの導入は不要です。

結果整合性とSagaパターンでデータ整合性を管理する

CQRS の最大の課題は結果整合性(Eventual Consistency)です。コマンドモデルで変更が確定してから、読み取りモデルに反映されるまでにタイムラグが発生します。

結果整合性への対処パターン

パターン 説明 適用場面
Read-your-writes 書き込み直後のリクエストはコマンドモデルから読む ユーザーが自身の操作結果を即座に確認する場合
Polling クライアントが一定間隔で再取得 リアルタイム性が低い管理画面
WebSocket/SSE サーバーからプッシュ通知 リアルタイムダッシュボード
バージョンヘッダ レスポンスにイベントバージョンを含め、クライアント側で判定 API クライアント間の一貫性保証

Sagaパターンによる分散トランザクション

複数のマイクロサービスにまたがるトランザクション(例: 注文作成 → 在庫引当 → 決済処理)では、Saga パターンを組み合わせます。microservices.io の Saga パターンによると、オーケストレーション型とコレオグラフィ型の2つのアプローチがあります。

// Saga オーケストレーター(概念実装)
// 注文作成 → 在庫確認 → 決済 の3ステップ
pub enum OrderSagaStep {
    CreateOrder,
    ReserveInventory,
    ProcessPayment,
}

pub enum SagaAction {
    // 前進: 次のステップへ
    Proceed(OrderSagaStep),
    // 補償: 前のステップを取り消す
    Compensate(OrderSagaStep),
}

pub struct OrderSaga {
    order_id: String,
    current_step: OrderSagaStep,
}

impl OrderSaga {
    /// ステップ成功時: 次のステップへ進む
    pub fn on_success(&mut self) -> Option<SagaAction> {
        match self.current_step {
            OrderSagaStep::CreateOrder => {
                self.current_step = OrderSagaStep::ReserveInventory;
                Some(SagaAction::Proceed(OrderSagaStep::ReserveInventory))
            }
            OrderSagaStep::ReserveInventory => {
                self.current_step = OrderSagaStep::ProcessPayment;
                Some(SagaAction::Proceed(OrderSagaStep::ProcessPayment))
            }
            OrderSagaStep::ProcessPayment => {
                None // 全ステップ完了
            }
        }
    }

    /// ステップ失敗時: 補償トランザクションを実行
    pub fn on_failure(&self) -> Vec<SagaAction> {
        match self.current_step {
            OrderSagaStep::ProcessPayment => vec![
                SagaAction::Compensate(OrderSagaStep::ReserveInventory),
                SagaAction::Compensate(OrderSagaStep::CreateOrder),
            ],
            OrderSagaStep::ReserveInventory => vec![
                SagaAction::Compensate(OrderSagaStep::CreateOrder),
            ],
            OrderSagaStep::CreateOrder => vec![],
        }
    }
}

トレードオフ:

  • オーケストレーション型は中央制御で見通しがよいが、オーケストレーターが単一障害点になりうる
  • コレオグラフィ型はサービス間の疎結合を維持できるが、フロー全体の把握が困難になる
  • 実務ではオーケストレーション型から始め、サービス数が増えたら部分的にコレオグラフィ型に移行するアプローチが現実的です

よくある問題と解決方法

問題 原因 解決方法
読み取りモデルにデータが反映されない Projection のイベントハンドラでパニック発生 expect ではなく Result でエラーハンドリングし、リトライキューに入れる
Aggregate 復元が遅い イベント数が膨大(1万件以上) スナップショットを 100〜500 件ごとに作成
同一 Aggregate への同時書き込みでエラー 楽観的ロック(シーケンス番号衝突) クライアント側でリトライ。指数バックオフ + ジッタ推奨
イベントスキーマの変更が困難 既存イベントとの後方互換性 イベントアップキャスター(cqrs-es のイベントアップキャスト機能)を使用
テスト時にデータベース接続が必要 Projection のテストで実 DB が必要 ドメインテストは TestFramework で DB 不要。Projection テストのみ testcontainers を使用

まとめと次のステップ

まとめ:

  • CQRS パターンは、読み書きのモデルを分離することで、マイクロサービスにおけるデータ整合性とパフォーマンスを両立させるアーキテクチャパターンです
  • Rust の型システム(enum + match の網羅性チェック)は、コマンド・イベントの状態遷移を安全に管理するのに適しています
  • cqrs-es クレートと postgres-es を組み合わせることで、PostgreSQL ベースのイベントストアと読み取りモデルを構築できます
  • 結果整合性は避けられないトレードオフであり、Read-your-writes やバージョンヘッダなどのパターンで対処します
  • 複数サービスにまたがるトランザクションには、Saga パターン(オーケストレーション型推奨)を組み合わせます

次にやるべきこと:

  • cqrs-es 公式チュートリアルで銀行アプリのハンズオンを完了する
  • postgres-es クレートで実際に PostgreSQL イベントストアを構築してみる
  • 読み取りモデルに複数の Projection(注文一覧ビュー、顧客別集計ビュー、売上ダッシュボードビューなど)を追加し、用途別のクエリ最適化を実践する

参考


注意: この記事はAI(Claude Code)により自動生成されました。内容の正確性については複数の情報源で検証していますが、実際の利用時は公式ドキュメントもご確認ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?