0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Saga Patternに関してGolangでまとめてみた!

Last updated at Posted at 2025-09-16

こんにちは!株式会社GROWTH VERSEに所属しております川田剣士です。最近分散システムについて学んでいくうちにSagaPatternについて興味を持ちまして、SagaPatternに関してしっかり理解したいなと思い、個人的に学習し技術記事で体系的にまとめようと思いました。分散システムの基礎に関しては分散システムの基本:CAP定理・シャーディング・レプリケーション・マスタースレーブを体系的に解説を執筆したため、興味ある方は是非ご覧ください。

近年、多くのシステムがマイクロサービスアーキテクチャを採用しています。機能ごとにサービスを分割することで、開発チームの独立性やスケーラビリティを高められる一方で、新たな課題が生じます。その代表例が分散トランザクションです。

たとえば EC サイトの注文処理を考えてみます。
ユーザーが商品を購入するとき、注文の登録、決済の確定、在庫の更新といった処理が複数のサービスにまたがって行われます。もし途中で決済処理が失敗した場合、すでに登録された注文や引き当てられた在庫を元に戻さなければなりません。しかし、マイクロサービスの世界では、従来のデータベースが提供するACIDトランザクションをそのまま適用することはできません。

このように「すべての処理を成功させるか、または失敗させるか」という一貫性の確保が難しい状況で登場するのが Saga Patternです。Saga Pattern は、一連の処理を小さなローカルトランザクションに分割し、必要に応じて補償処理を実行することで、分散システムにおける整合性を実現します。

本記事では、このSaga Patternについて、基本概念から実装方法、実際のユースケースまでを体系的に解説します。記事に対する質問・コメント大歓迎です!

目次

分散トランザクションの課題

モノリシックなシステムでは、1つのデータベースに対する更新をACIDトランザクションでまとめることにより、一貫性を容易に保証できます。たとえば「注文を登録し、決済を確定し、在庫を減算する」という一連の処理も、1つのRDB内であればBEGINとCOMMITの間で完結します。

さらに、モノリシックな構成で同じ種類のデータベースをシャーディングやレプリケーションで複数サーバーに分散している場合でも、各ノードが2PC(Two-Phase Commit)をサポートしていれば、トランザクションマネージャーを介して一貫性を担保することは可能です。PostgreSQL はPREPARE TRANSACTION/COMMIT PREPAREDによる2PC を備えているため、このような「単一システム内の分散DB」では2PCが現実的な選択肢となり得ます。

しかし、マイクロサービス環境では事情が異なります。サービスごとに独立したデータベースや外部システムを利用することが多く、次のようなフローが典型的です。

  1. 注文サービス が注文を PostgreSQL に保存する
  2. 決済サービス が外部決済 API にリクエストを送る
  3. 在庫サービス が DynamoDB で在庫数を更新する

この場合、すべてのリソースを 2PC に参加させることはできません。理由は以下の通りです。

  • PostgreSQL は 2PC をサポートしているが、DynamoDB は非対応
  • 外部決済 API は「prepare → commit/rollback」の操作モデルを提供しておらず、2PCに対応できない
  • 例えばMySQLとPostgreSQLはそれぞれ2PCに対応しているが、MySQLとPostgreSQLなど異なるDBをまたいで 2PCを実現すること自体、可用性、性能劣化の観点で厳しい。理論上は可能である

つまり、2PCが現実的に使えるのは「モノリシック構成で、同種のDBが2PCに対応している場合」に限られ、マイクロサービスの世界では2PCを適用することはほぼ不可能です。

そのため分散システムでは、2PCに頼らずに一貫性を確保する仕組みとして Saga Patternが注目されているのです。

メッセージキューの基本整理

Saga Patternを理解する上で欠かせないのがメッセージキューです。マイクロサービス間の非同期連携を実現する基盤として、多くの場合Kafka、RabbitMQ、Amazon SQSなどが利用されます。メッセージキューは、サービス間の直接的な呼び出しを避け、「送信者」と「受信者」を疎結合にする仕組みを提供します。送信者はイベントをキューに発行するだけでよく、受信者は必要なイベントを購読して処理します。この構造により、次のような利点があります。

  • 非同期処理: 送信者は受信者の処理完了を待たずにレスポンスできる
  • 耐障害性: 受信側でサービスが落ちていて処理できない場合も復旧後に処理可能
  • スケーラビリティ: 受信者サービスをスケールアウトして並列に処理できる
  • 疎結合: 送信者は「誰が購読するか」を知らなくてもよい

一方で、メッセージキューを使う際には以下のような注意点も存在します。

  • 冪等性の担保による実装の複雑さ:受信側で重複メッセージに備える必要がある。例えば「同じ決済完了イベントが2回届いても、注文状態を2重に更新しない」ように設計しなければなりません。
  • 順序を正確に保証していない:ネットワーク障害などによりメッセージの順序が乱れる場合がある。
  • 待機時間がボトルネックになる可能性あり:キューの状態によってはイベントが待機状態になり、それがパフォーマンス低下につながる。

Saga Patternとは

Saga Patternは、マイクロサービスにおける分散トランザクションを扱うためのデザインパターンです。その基本的な考え方は、一連の大きなトランザクションを複数の小さなローカルトランザクションに分割し、各サービスごとに処理を完結させることにあります。そして、途中で失敗が発生した場合には、成功済みの処理を「補償トランザクション」で取り消すことで、全体としての一貫性を保ちます。

処理の流れ

  1. あるサービスがローカルトランザクションを実行し、自身のデータベースに確定する
  2. 成功したら「次の処理を実行してよい」というイベントを発行する
  3. 別のサービスがそれを受け取って処理を実行し、また確定する
  4. どこかで失敗が起きた場合、すでに実行済みのサービスに対して補償トランザクションを呼び出し、状態を巻き戻す

このように Saga Pattern では、「全員が同時にコミット」するのではなく、小さな成功と補償の積み重ね によって「最終的な整合性」を実現します。イメージとして、先ほどのECサイトの注文処理を例にすると以下のようになります。

  1. 注文サービス が注文を登録(失敗したら補償不要)
  2. 決済サービス がクレジット決済を確定(失敗したら注文をキャンセルする補償トランザクションを実行)
  3. 在庫サービス が在庫を減算(失敗したら決済を払い戻し、注文をキャンセルする)

このように、Saga Pattern は「必ず全員成功させる」ことを保証するのではなく、「失敗したら元に戻す手段をあらかじめ用意しておく」ことでシステム全体の整合性を担保する点が特徴です。

Saga Patternにはコレオグラフィ(Choreography)とオーケストレーション(Orchestration)の2種類のタイプがあります。両者についてそれぞれ以下に説明します。

コレオグラフィ(Choreography)

コレオグラフィ型の Saga は、オーケストレーターを置かず、各サービスがイベントを介して自律的に連携する方式 です。全体の流れを「指揮者(オーケストレーター)」が制御するのではなく、各サービスが「自分の番が来たら踊る(処理する)」イメージです。処理の流れは次のようになります。

  1. 注文サービス が注文を登録し、メッセージキュー(例: SQS, Kafka, RabbitMQ)に「注文作成イベント」を発行する。
  2. 決済サービス がそのイベントを購読し、決済処理を実行。成功すれば「決済完了イベント」を発行し、失敗すれば「決済失敗イベント」を発行する。
  3. 在庫サービスが「決済完了イベント」を購読し、在庫を減算する。もし在庫更新に失敗した場合は「在庫更新失敗イベント」を発行し、他サービスが補償処理を行う。

つまり、各サービスは「イベントを受け取ったら処理する」「結果を次のイベントとして発行する」という責務を持ち、これをリレー形式で繋げていくことで全体のフローが進みます。

メリット

  • 疎結合: 各サービスはオーケストレーターに依存せず、イベント駆動で動くため独立性が高い。
  • スケーラビリティ: サービスごとに水平スケーリング可能で、イベント基盤を利用するため高い並行性を実現できる。
  • 単一障害点がない: 中央の制御役が不要なので、オーケストレーターのダウンによるシステム全体停止を回避できる。

デメリット

  • フローが見えにくい: 各サービスが独自にイベントを発行するため、処理の全体像を把握しづらい。
  • 複雑性の増加: サービスが増えるほどイベントの種類や補償フローが複雑化し、追跡やデバッグが難しくなる。
  • 依存関係の隠れた結合: 「誰がこのイベントを購読しているか」を把握しづらく、サービス間の暗黙の結合が生じやすい。

フローは以下の通りです。

オーケストレーション(Orchestration)

オーケストレーション型の Saga では、処理の流れを オーケストレーター(調整役) が集中管理します。各サービスが勝手にイベントを発行してバトンを渡していくコレオグラフィ型とは異なり、オーケストレーターが「次はどのサービスが何を実行するか」を明示的に指示します。まさに「指揮者が楽団をまとめる」イメージです。

処理の流れ

  1. オーケストレーターが「注文作成」リクエストを受け取り、注文サービスに処理を依頼
  2. 注文サービスが成功すると、オーケストレーターに結果を返す
  3. オーケストレーターは続けて決済サービスに「決済処理」を依頼
  4. 成功した場合は在庫サービスに「在庫更新」を依頼し、Saga 全体を完了
  5. もし途中で失敗が起きれば、オーケストレーターが補償トランザクションを各サービスに依頼してロールバック

メリット

  • 全体の流れが見やすい: どのサービスがどの順番で処理されるかをオーケストレーターが一元的に管理するため、処理フローを追いやすい
  • 制御が明示的: 各サービスは「指示された処理」を実行するだけなので、実装がシンプル
  • エラーハンドリングが集約できる: 失敗時の補償処理をオーケストレーターがまとめて制御できる

デメリット

  • 中央集権的な設計: オーケストレーターが単一障害点やボトルネックになりやすい
  • 結合度の上昇: 各サービスはオーケストレーターに依存するため、サービスの独立性が弱まる

フローは以下の通りです。

コレオグラフィ(Choreography)実装

コレオグラフィ型の実装をGolangを使用して以下に提示します。

-- outbox: 送信用イベントの冪等性担保のためのイベントステータス管理テーブル
CREATE TABLE outbox (
  id           BIGSERIAL PRIMARY KEY,
  message_id   TEXT NOT NULL UNIQUE,
  topic        TEXT NOT NULL,
  payload      JSONB NOT NULL, -- リカバリなど再実行時に必要なパラメータなどをjsonで格納
  status       TEXT NOT NULL,        -- pending / sending / sent / failed
  attempts     INT  NOT NULL DEFAULT 0,
  updated_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
  created_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ON outbox (status, next_at);

-- inbox_jobs: 受信イベントの冪等性担保のためのイベントステータス管理テーブル
CREATE TABLE inbox_jobs (
  id           BIGSERIAL PRIMARY KEY,
  message_id   TEXT NOT NULL UNIQUE,  -- 元メッセージIDで去重
  topic        TEXT NOT NULL,
  payload      JSONB NOT NULL, -- リカバリなど再実行時に必要なパラメータなどをjsonで格納
  status       TEXT NOT NULL,         -- pending / processing / done / failed
  attempts     INT  NOT NULL DEFAULT 0,
  updated_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
  created_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ON inbox_jobs (status, next_at);

システム障害などでstatusがprocessingの状態のままになった場合、一定時間を過ぎるとpendingに戻すような処理をすることで自動で処理を再開するという実装が一般的です。
実装を全て出すと膨大になるので、Inventoryに絞って実装を提示します。

// events.go
// Inventory serviceがpublish,subscribeするイベントを定義
package inv

type EventHeader struct {
    MessageID     string
    CorrelationID string // 直前に実行されていたイベントのmessage_idが入る。
}

type PaymentSucceeded struct {
    Header    EventHeader
    OrderID   string // 注文idと紐づけることでどの注文のイベントかわかるようにしている
    PaymentID string
    SkuID     string // 商品やサービスごとに割り当てられた一意の識別コード
    Qty       int64 // 注文個数
}

type InventoryReserved struct {
    Header  EventHeader
    OrderID string
    SkuID   string
    Qty     int64
}

type InventoryFailed struct {
    Header  EventHeader
    OrderID string
    SkuID   string
    Reason  string
}
// message_que.go
package inv

type Consumer interface {
    Consume(topic, group string, handler func(msg []byte, attrs map[string]string) error) error
}

type Publisher interface {
    Publish(topic, key string, payload []byte) error
}
// repo.go
package inv

import (
    "context"
    "database/sql"
    "time"
)

// outbox,inboxへの操作を定義
type Repo interface {
    WithTx(ctx context.Context, fn func(tx *sql.Tx) error) error

    // === INBOX JOBS ===
    UpsertInboxPending(ctx context.Context, msgID, topic string, payload []byte) error
    PickInboxPending(ctx context.Context, now time.Time, limit int, lease time.Duration, workerID string) ([]Job, error)
    CompleteInboxJob(ctx context.Context, id int64) error
    FailInboxJob(ctx context.Context, id int64, backoff time.Duration) error
    RequeueExpiredInbox(ctx context.Context, now time.Time) (int64, error) // processing 期限切れ→pending

    // === OUTBOX ===
    EnqueueOutboxPendingTx(tx *sql.Tx, msgID, topic string, payload []byte) error
    PickOutboxPending(ctx context.Context, now time.Time, limit int, lease time.Duration, workerID string) ([]Job, error)
    MarkOutboxSent(ctx context.Context, id int64) error
    RetryOutbox(ctx context.Context, id int64, backoff time.Duration) error
    RequeueExpiredOutbox(ctx context.Context, now time.Time) (int64, error) // sending 期限切れ→pending
}

type Job struct {
    ID         int64
    Topic      string
    Payload    []byte
    Attempts   int
}
// 処理が成功した時はinboxのeventのstatusをdoneにして、outboxに成功後のイベントを登録する。
// 処理に失敗した時はリトライ対象としたい場合はattemptsをインクリメント。
// リトライ最大許容回数までリトライできるようにする。
// リトライ対象としたくない場合はinboxのeventのstatusをfailedにして、outboxに失敗後のイベントを登録する。
func (s *InventoryService) handleJob(ctx context.Context, j inboxJob) error {
    // Topicの中身(イベント種別)から処理をコントロールしている。
	switch j.Topic {
	case "payment.succeeded":
		var evt PaymentSucceeded
		if err := json.Unmarshal(j.Payload, &evt); err != nil {
            // inboxのeventをfailにしてoutboxに失敗イベントとして登録
		}

		// === 在庫ビジネス処理(省略) ===
		// ここでは Qty が負なら「在庫不足」とみなす
		if evt.Qty < 0 {
			// 業務的失敗: InventoryFailed を Outbox に積む
			failEvt := InventoryFailed{
				Header: EventHeader{
					MessageID:     uuid.NewString(),
					CorrelationID: evt.Header.CorrelationID,
					OccurredAtMs:  time.Now().UnixMilli(),
					Type:          "inventory.failed",
				},
				OrderID: evt.OrderID,
				SkuID:   evt.SkuID,
				Reason:  "insufficient stock",
			}
			payload, _ := json.Marshal(failEvt)
            // inboxでstatusをfailedに更新し、outboxにInventoryFailedを追加
		}

		// 業務成功: InventoryReserved を Outbox に積む
		resEvt := InventoryReserved{
			Header: EventHeader{
				MessageID:     uuid.NewString(),
				CorrelationID: evt.Header.CorrelationID,
				OccurredAtMs:  time.Now().UnixMilli(),
				Type:          "inventory.reserved",
			},
			OrderID: evt.OrderID,
			SkuID:   evt.SkuID,
			Qty:     evt.Qty,
		}
		payload, _ := json.Marshal(resEvt)
        // inboxでstatusをdoneに更新し、outboxにInventoryReservedを追加

	default:
		// 未知トピック → 技術的失敗扱い
		return fmt.Errorf("unhandled topic: %s", j.Topic)
	}
}

オーケストレーション(Orchestration)実装

オーケストレーション型の実装をGolangを使用して以下に提示します。

-- 受信ジョブ(lease管理)
  CREATE TABLE inbox_jobs(
    id BIGSERIAL PRIMARY KEY,
    message_id TEXT UNIQUE,
    topic TEXT NOT NULL,
    payload BYTEA NOT NULL,
    status TEXT NOT NULL,               -- pending/processing/done/failed
    attempts INT NOT NULL DEFAULT 0,
    next_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    lease_until TIMESTAMPTZ,
    worker_id TEXT,
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
  );
  CREATE INDEX ON inbox_jobs(status, next_at);

  -- 送信(コマンド/通知)
  CREATE TABLE outbox(
    id BIGSERIAL PRIMARY KEY,
    message_id TEXT UNIQUE,
    topic TEXT NOT NULL,                -- e.g. "cmd.create.order"
    payload BYTEA NOT NULL,
    status TEXT NOT NULL,               -- pending/sending/sent/failed
    attempts INT NOT NULL DEFAULT 0,
    next_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    lease_until TIMESTAMPTZ,
    worker_id TEXT,
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
  );
  CREATE INDEX ON outbox(status, next_at);

  -- サガ状態(最小)
  -- オーケストレーション型ではeventをオーケストレーターがトレースするために必ず必要
  -- コレオグラフィ型でも監視のために用意しても問題ない。
  -- ただ、そのためにはマイクロサービスを跨いだ共通databaseが必要になる。
  CREATE TABLE saga_instances(
    saga_id TEXT PRIMARY KEY,
    state TEXT NOT NULL, -- PENDING/ORDERED/PAID/RESERVED/COMPLETED/COMPENSATED/ABORTED
    order_id TEXT,
    payment_id TEXT,
    correlation_id TEXT NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
  );

// orchestrator/orchestrator_worker.go
package orchestrator

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"time"

	"github.com/google/uuid"
)

// オーケストレーターは全てのイベントを保持する必要がある。
// これらのイベントは各サービスでも同様に定義されている。

/*
  想定スキーマ(抜粋)
*/

type EventHeader struct {
	MessageID     string `json:"message_id"`
	CorrelationID string `json:"correlation_id"`
	OccurredAtMs  int64  `json:"occurred_at_ms"`
	Type          string `json:"type"`
}

// --- 受信イベント(reply/event queue 側) ---

// サガ開始(UIやAPIが発行した開始イベントを想定)
type SagaStartOrder struct {
	Header  EventHeader `json:"header"`
	OrderID string      `json:"order_id"`
	Amount  int64       `json:"amount"`
	UserID  string      `json:"user_id"`
	SkuID   string      `json:"sku_id"`
	Qty     int64       `json:"qty"`
}

// CreateOrder の実行結果
type OrderCreated struct {
	Header  EventHeader `json:"header"`
	OrderID string      `json:"order_id"`
	OK      bool        `json:"ok"`
	Reason  string      `json:"reason,omitempty"`
}

// CapturePayment の結果
type PaymentCaptured struct {
	Header    EventHeader `json:"header"`
	OrderID   string      `json:"order_id"`
	PaymentID string      `json:"payment_id"`
	OK        bool        `json:"ok"`
	Reason    string      `json:"reason,omitempty"`
}

// ReserveInventory の結果
type InventoryReserved struct {
	Header  EventHeader `json:"header"`
	OrderID string      `json:"order_id"`
	OK      bool        `json:"ok"`
	Reason  string      `json:"reason,omitempty"`
}

// --- 送信コマンド(command queue 側に流す) ---

type CmdCreateOrder struct {
	Header  EventHeader `json:"header"`
	OrderID string      `json:"order_id"`
	UserID  string      `json:"user_id"`
	Amount  int64       `json:"amount"`
}

type CmdCapturePayment struct {
	Header   EventHeader `json:"header"`
	OrderID  string      `json:"order_id"`
	Amount   int64       `json:"amount"`
}

type CmdReserveInventory struct {
	Header  EventHeader `json:"header"`
	OrderID string      `json:"order_id"`
	SkuID   string      `json:"sku_id"`
	Qty     int64       `json:"qty"`
}

type CmdRefundPayment struct {
	Header    EventHeader `json:"header"`
	OrderID   string      `json:"order_id"`
	PaymentID string      `json:"payment_id"`
}

type CmdCancelOrder struct {
	Header  EventHeader `json:"header"`
	OrderID string      `json:"order_id"`
}

type OrchestratorService struct {
	DB *sql.DB
}

type inboxJob struct {
	ID       int64
	Topic    string
	Payload  []byte
	Attempts int
}

// 流れとしては、
// 1. inbox.Topic= "saga.start.order"を指定してこの関数をcallする。
// 2. StartOrderイベントをメッセージキュに登録される
// 3. StartOrderイベントがOrder Serviceで実施され、処理完了後OrderServiceがorder.createdをイベントとしてメッセージキューに登録する。
// 4. この関数がinbox.Topic= "order.created"で呼ばれる。
// 5. 上記の流れを繰り返す
// 上記の流れによってOrchestratorがSagaの処理をコントロールする。
func (s *OrchestratorService) handleInboxJob(ctx context.Context, j inboxJob) error {
	switch j.Topic {

	case "saga.start.order":
		var evt SagaStartOrder
		if err := json.Unmarshal(j.Payload, &evt); err != nil { return err }
		return s.withTx(ctx, func(tx *sql.Tx) error {
			// サガをPENDINGで作成
			if err := s.ensureSagaTx(tx, evt.Header.CorrelationID, "PENDING", evt.OrderID, ""); err != nil { return err }
			// 次のコマンド:CreateOrder
			cmd := CmdCreateOrder{
				Header: hdr(evt.Header.CorrelationID, "cmd.create.order"),
				OrderID: evt.OrderID, UserID: evt.UserID, Amount: evt.Amount,
			}
			return s.enqueueCmdTx(tx, "cmd.create.order", cmd)
		})

	case "order.created":
		var evt OrderCreated
		if err := json.Unmarshal(j.Payload, &evt); err != nil { return err }
		if evt.OK {
			return s.withTx(ctx, func(tx *sql.Tx) error {
				if err := s.updateSagaStateTx(tx, evt.Header.CorrelationID, "ORDERED"); err != nil { return err }
				// 次のコマンド:CapturePayment
				cmd := CmdCapturePayment{
					Header: hdr(evt.Header.CorrelationID, "cmd.capture.payment"),
					OrderID: evt.OrderID, Amount: 0, // 金額は本来サガに保持、ここでは省略
				}
				return s.enqueueCmdTx(tx, "cmd.capture.payment", cmd)
			})
		}
		// 注文作成NG → サガ中止
		return s.withTx(ctx, func(tx *sql.Tx) error {
			return s.updateSagaStateTx(tx, evt.Header.CorrelationID, "ABORTED")
		})

	case "payment.captured":
		var evt PaymentCaptured
		if err := json.Unmarshal(j.Payload, &evt); err != nil { return err }
		if evt.OK {
			return s.withTx(ctx, func(tx *sql.Tx) error {
				// 支払いIDを保持し、状態をPAIDへ
				if err := s.updateSagaPaidTx(tx, evt.Header.CorrelationID, evt.PaymentID); err != nil { return err }
				// 次のコマンド:ReserveInventory
				cmd := CmdReserveInventory{
					Header: hdr(evt.Header.CorrelationID, "cmd.reserve.inventory"),
					OrderID: evt.OrderID, SkuID: "", Qty: 0, // 本来はサガ情報から取得
				}
				return s.enqueueCmdTx(tx, "cmd.reserve.inventory", cmd)
			})
		}
		// 決済NG → 注文キャンセル(補償)してCOMPENSATED
		return s.withTx(ctx, func(tx *sql.Tx) error {
			if err := s.updateSagaStateTx(tx, evt.Header.CorrelationID, "COMPENSATING"); err != nil { return err }
			if err := s.enqueueCmdTx(tx, "cmd.cancel.order",
				CmdCancelOrder{Header: hdr(evt.Header.CorrelationID, "cmd.cancel.order"), OrderID: evt.OrderID}); err != nil { return err }
			return s.updateSagaStateTx(tx, evt.Header.CorrelationID, "COMPENSATED")
		})

	case "inventory.reserved":
		var evt InventoryReserved
		if err := json.Unmarshal(j.Payload, &evt); err != nil { return err }
		if evt.OK {
			// 全工程成功
			return s.withTx(ctx, func(tx *sql.Tx) error {
				return s.updateSagaStateTx(tx, evt.Header.CorrelationID, "COMPLETED")
			})
		}
		// 在庫NG → 返金 + 注文キャンセル(補償)→ COMPENSATED
		return s.withTx(ctx, func(tx *sql.Tx) error {
			// サガから payment_id を引き、そのIDで返金(コマンド送信)
			pid, err := s.getPaymentIDTx(tx, evt.Header.CorrelationID)
			if err != nil { return err }
			if err := s.updateSagaStateTx(tx, evt.Header.CorrelationID, "COMPENSATING"); err != nil { return err }
			if err := s.enqueueCmdTx(tx, "cmd.refund.payment",
				CmdRefundPayment{Header: hdr(evt.Header.CorrelationID, "cmd.refund.payment"), OrderID: evt.OrderID, PaymentID: pid}); err != nil { return err }
			if err := s.enqueueCmdTx(tx, "cmd.cancel.order",
				CmdCancelOrder{Header: hdr(evt.Header.CorrelationID, "cmd.cancel.order"), OrderID: evt.OrderID}); err != nil { return err }
			return s.updateSagaStateTx(tx, evt.Header.CorrelationID, "COMPENSATED")
		})
	}

	// 未対応トピック → リトライ対象(技術的失敗扱い)
	return fmt.Errorf("unhandled topic: %s", j.Topic)
}

cronプロセスとSagaPatternとの比較

SagaPatternに関して以下のような疑問を持った方もいると思います。(自分がそうでした。)

SagaPatternではなくcronプロセスを各処理ごとに用意しても実現できないだろうか。例えば注文、決済処理、請求書作成がマイクロサービスを跨いであった時に、それらは別々のcornプロセスで拾い、もし何らかの原因で処理が失敗した場合はstatusをfailedにしたり、決済までしていた場合は決済処理をキャンセルするような処理を実施する(SagaPatternのcompensating transactionとも捉えられる)ことによって整合性担保をしても問題ないと思う。

以下がChatGPTから返ってきたレスポンスです。

その感覚はかなり現実的です。多くの業務システムでは 「まずは単純な非同期+整合性監査(cron やバッチ)」 の方がコスト対効果が高く、Saga を最初から入れる必要はありません。
いつSagaを入れないか(=cron監査で十分)

  • 処理のスパンが長い/遅延が許容:数秒〜数分の遅れで困らない。
  • 依存サービスが少ない/疎結合:3〜4サービス程度で、強い同時性要件がない。
  • 高頻度の同時更新が少ない:同一リソースに対する競合更新が稀。

いつ Saga を検討すべきか

  • 強い体験整合性が必要:ユーザー操作直後に「全サービスが一貫した結果」を見せたい(例:決済+在庫+予約)。
  • 並行更新・分岐が多い:同一エンティティに対して高スループットで更新が走る。
  • 障害波及を最小化したい:各段での失敗を即時に止め、補償で巻き戻したい(会計・在庫等)。

つまり、強い同時性,強整合性要件がなく処理の遅延を許容する場合はcronによるシステム設計で問題ないです。例えば、反例として出すと、あるホテルの予約サイトがあるとします。この時に部屋が1つ空いている状態(その部屋をroomAと以下に記載)でuserAがroomAを予約したとします。この時にroomAは空いていないことになりますが、userAのクレジットカードに問題があり、決済処理に失敗したとします。その後userBがサイトに訪れて部屋が空いているか確認します。もしそれぞれのcron処理間隔を1分おきなどにした場合、注文失敗として部屋の空き状況を空室に戻すための時間が1分かかってしまうため、その間にuserBがroomAを予約をせずにサイトから離れてしまうと売り上げの低下につながります。ですが、SagaPatternの場合だと失敗したら即時でイベントをメッセージキューに送り、補償トランザクションが走るため、処理速度が速くすぐにroomAを空室状態に更新し、userBがroomAの予約を行うことに成功します。これらの事象は共通のエンティティの更新、参照を並列実行しているから起きている問題であり、例えば、Netflixのサービスなどはあるuserの行動が他userに影響を与えるような事象は仕様として発生しないので、こうした問題は考えなくても良いと思います。その場合は、同時実行性も要件として優先度が低くなるため、SagaPatternではなくcronによるシステム構築で良いかなと思います。

また、SagaPatternの場合だとモニタリングが強くなるというのもSRE視点だと大きなメリットです。

  • 明示的な状態機械:outbox,inbox,saga_instancesテーブルの導入により、pending → processing → completed / compensating → compensated / failed をDBで持つため、今どこで止まっているかがダッシュボード一発。
  • step別 SLI/SLO:例:S2(在庫確保)の失敗率/レイテンシ、補償率、ボトルネックが特定しやすい。
  • 分散トレーシングが可能:発火イベント→各サービス→補償イベントまで trace_id が貫通。1注文の旅路をログ/トレースで復元可。
  • バックプレッシャー指標が自然に生まれる:キュー長、待ち時間、ステップ滞留、再試行回数が即座に数値化。オートスケールと連動しやすい。
  • 補償も一次指標:補償は“例外”ではなく設計の一部。補償率・補償所要時間が健全性KPIになる。

サービスの特性や求められている要件、監視レベルを整理することでcronによるシステム構築で良いか、SagaPatternを採用するべきか考えると良いですね。

まとめ

本記事では、マイクロサービスにおける分散トランザクションの課題と、それを解決するアプローチとしての Saga Pattern について解説しました。特に、2PCが適用できるのは同一DB内のような限定的なケースであり、異種サービス(PostgreSQL、外部決済 API、DynamoDB など)が連携するマイクロサービス環境では現実的ではないことを確認しました。そのため、補償トランザクションを前提とした Saga Pattern が実用的な解決策になります。

Saga Patternには以下の2つの実装方式が存在します。

  • コレオグラフィ型:各サービスがイベントを購読し、自律的に処理と補償を行う方式。疎結合でシンプルですが、フローが分かりにくくなりやすい。
  • オーケストレーション型:専用のオーケストレーターが全体の流れを制御する方式。処理フローが明確で追いやすい一方、オーケストレーターに依存が集中する。

どちらの方式においても、Inbox/Outboxパターンによる冪等性担保と、「成功」「失敗」「リトライで成功可能性がある失敗」それぞれの場合でハンドリングすることが実装の安定性を支えるポイントになります。最終的に、どのアプローチを採用するかはシステムの規模や特性次第ですが、イベント駆動で疎結合を重視するなら「コレオグラフィ型」可観測性やフロー制御を重視するなら「オーケストレーション型」という棲み分けが有効です。
最後まで読んでいただきありがとうございました!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?