LoginSignup
13
7

More than 3 years have passed since last update.

[golang] オーケストレーションベースのサーガパターンに優しく入門する

Last updated at Posted at 2020-12-17

この記事は Voicy Advent Calendar 2020 の 18 日目の記事です。
先日は, @tamo_hory さんの AndroidアプリにVisual Regression Test導入を目指す 第一回 Instrumented testで自動でスクショを撮る でした。明日は, @miyukiaizawa さんの ~ です。

はじめに

マイクロサービスパターンの db は, サービス毎のモジュール性を担保するため, database per service というパターンを当てはめ 「1サービスあたり1db」 となっています。
そのため従来の単一 db に対する 2PC (begin...commit) だけだとデータの整合性の面で問題が残ります。
その時, 結果整合性を保つのに有効となるパターンが Saga と呼ばれるパターンです。
今回はそんな Saga パターンに優しく入門してみました。

参考資料

前提

マイクロサービスパターン本ではモバイルオーダーのオーダーの作成をサンプルとしています。
今回はそちらを golang によって実装します。

できたもの

97939cdbd56c3fe519a76b6a75975d5e.gif

サーガとは

サーガとは, サービスにまたがるトランザクションを実装するメカニズムのです。
サーガにはおよそ 2 パターンのソリューションがあります。

  1. Choreography-based saga
  2. Orchestration-based saga

※ 正確なサーガはメッセージブローカーを通したイベントソーシングにより実現されます。今回は優しく概念に入門というていでそこは省いています。

Choreography-based saga

コレオグラフィ(振り付け)。個々のサービスが個別で踊るイメージ。
サーガに参加しているサービスが個々に次のステップを判断する方法。

Orchestration-based saga

オーケストレーション(指揮)。個々のサービスの指揮者が指揮するイメージ。
サーガオーケストレータが各種サービスのトランザクションをオーケストレーションする方法。


マイクロサービスパターン本ではコレオグラフィのデメリットとして, 「サービス間の循環依存の可能性が高井。」「サービス間の依存関係が密になる。」などが上がっていて, 基本的にはオーケストレーションサーガによる実装を推奨しています。
オーケストレーションサーガのデメリットとして, 「オーケストレーション記述が太る」といった点が上がっていますが, オーケストレーションをサービスディスパッチに集中させるのはアーキテクチャの問題ではなくアプリケーションの問題となってくるとのことです。詳しく知りたい方は本をどうぞ!


以下実装に移りますが我流を多分に含みます。

実装するサンプルのシーケンス

細かいところはともかく本で紹介されているシーケンスは以下のような感じです。

lLNDRjf04BxxALOvbnT0LHMfnnvQgUq3k6014XXacmCtxpeb9XGfLGHLAUaF8GaXWHP5hKXAseSPRCCfhz3PNInCBJXLbHmhcldcc-_xnb5sJKirh78XhwWfgsYGbwPcA2KivLAvagfRP0VO09mfE3P_i-_YzoDWDpj4DSbJ8nt3Imox4e3dQaPTxo--d7kZRepfSTcqYeKusH0S1nmApdLiv4akbTNqzO7Pj3tlkp7e2qDJ.png

オーダー発生時, 実際に他に連携するサービスへは非同期にサーガが行うようパブリッシュされます。
ユーザーは注文発生結果をすぐに受け取ることができて, その裏で認証やら実際の注文, 検証やらが行われる仕組みです。

データ整合性を保つため, サーガの方でロールバックのオーケストレーションも受け持ちます。

オーケストレーションベースのサーガ実装

オーケストレーションは, 宣言的なステップのスタックです。
オーケストレーターは, オーケストレーション定義を順に処理します。

ステップは以下のように定義しました。

type Step struct {
    // トランザクションの実行
    invokeParticipant func() (interface{}, error)

    // サービスリプライに対する処理
    onReply           func(reply interface{})

    // トランザクション失敗時のロールバック処理
    withCompensation  func() error
}

宣言的なステップのスタックなのでサーガは以下のように表現します。

steps := []*Step{}
steps = append(steps,
    &Step{
        ...
        ...

実際のステップの処理は順に行われます。また, 失敗時の処理は基本的に失敗時点以前のステップのものを実行します。

failedIndex := -1
for i, step := range steps {
    if step.invokeParticipant == nil {
        continue
    }
    reply, err := step.invokeParticipant()
    if err != nil {
        failedIndex = i
        break
    }
    if step.onReply == nil {
        continue
    }
    step.onReply(reply)
}

for i := failedIndex; i >= 0; i-- {
    if steps[i].withCompensation == nil {
        continue
    }
    if err := steps[i].withCompensation(); err != nil {
        return nil, err
    }
}

具体的な CreateOrderSaga は以下のようになります。

        steps = append(steps,
        // step1.
        //  - ロールバック: 注文作成の中断
        &Step{
            withCompensation: func() error {
                _, err = orderClient.RejectOrder(ctx, &proto.RejectOrderRequest{OrderId: state.OrderID})
                return err
            },
        },
        // step2.
        //   - トランザクション: 顧客の検証
        &Step{
            invokeParticipant: func() (interface{}, error) {
                return consumerClient.VerifyConsumer(ctx, &proto.VerifyConsumerRequest{ConsumerId: consumerID})
            },
            onReply: func(reply interface{}) {
                state.PaymentServiceID = reply.(*proto.VerifyConsumerReply).Consumer.Card.PaymentServiceId
            },
        },
        // step3.
        //   - トランザクション: チケット作成
        //   - ロールバック: チケット作成の中断
        &Step{
            invokeParticipant: func() (interface{}, error) {
                return kitchenClient.CreateTicket(ctx, &proto.CreateTicketRequest{OrderId: state.OrderID})
            },
            onReply: func(reply interface{}) {
                state.TicketID = reply.(*proto.CreateTicketReply).Ticket.Id
            },
            withCompensation: func() error {
                _, err = kitchenClient.RejectTicket(ctx, &proto.RejectTicketRequest{TicketId: state.TicketID})
                return err
            },
        },
        // step4.
        //   - トランザクション: カード認証
        &Step{
            invokeParticipant: func() (interface{}, error) {
                return treasureClient.AuthorizeCard(ctx, &proto.AuthorizeCardRequest{PaymentServiceId: state.PaymentServiceID})
            },
        },
        // step5.
        //   - トランザクション: 注文承認
        &Step{
            invokeParticipant: func() (interface{}, error) {
                return orderClient.ApproveOrder(ctx, &proto.ApproveOrderRequest{OrderId: state.OrderID})
            },
        },
    )

これでサーガができました!

サーガを動かすためのサーバー側工夫例

単純に上記をまとめたメソッドを CreateOrder 中にゴルーチンを飛ばすなどをしても context.Canceled が発生し上手くいきません。
CreateOrder はレスポンスを返した時点でコンテキストが完了するためです。
これを単に回避するためには, ゴルーチンではなく同期的に実行すれば問題ないですが, これだとユーザーはクライアント上で認証完了まで結果を待ち続けることになってしまいます。

サーガの実行を非同期に行うためには, k8s のキューイングが便利です。
本家 k8s のコントローラー実装にバリバリ使われているキューで信頼性バッチリです。

import (
    ...
    ...
    "k8s.io/client-go/util/workqueue"
)

type Worker struct {
    queue *workqueue.Type

    sagaConn *grpc.ClientConn
}

使い方はとても簡単です。キューのアイテムとしては interface{} を受け取るようになっているため, とりあえずキューイングしたいアイテムを Add すればいいです。

(q *Type) Add(item interface{})

キューの待機は, Get() から行えます。

    log.Print("queue wait...")

    q, quit := w.queue.Get()
    if quit {
        return false
    }
    log.Print("queue get")

あとは実際のキューイングしたアイテムを処理するだけです。
今回は, Request をアイテムとしてキューイングし, Request 型によって特定のサーガ処理実行という風にしてみました。

func PublishCreateOrderSaga(req *proto.CreateOrderSagaRequest) {
    defaultWorker.queue.Add(req)
    log.Print("PublishCreateOrderSaga")
}
...
...
    switch q.(type) {
    case *proto.CreateOrderSagaRequest:
        req := q.(*proto.CreateOrderSagaRequest)
        defer w.queue.Done(req)

        client := proto.NewSagaClient(w.sagaConn)
        client.CreateOrderSaga(context.TODO(), req)

        log.Print("CreateOrderSagaRequest")
    default:
        panic(ErrNotImplemented)
    }

動作を見る

まずは正常系です。
初回 client からの create order により, 速 ORDER_CREATED のステータスでレスポンスが帰ります。
その後, saga により各種サービスが叩かれ, 一定時間後 ORDER_APPROVED のステータスに変わります。

97939cdbd56c3fe519a76b6a75975d5e.gif

次に異常系を見てみるため, あえてカード認証で例外を発生させます。

 func AuthorizeCard(ctx context.Context, in *proto.AuthorizeCardRequest) (*proto.AuthorizeCardReply, error) {
+       return nil, errors.New("認証できなかったよ")
+

いい感じにサーガがエラーを検知してロールバック、最終オーダーステータスが ORDER_CREATE_REJECTED へ変わりました。

7d80c9e864f5ff57c1092d0b61229bc9.gif

終わりに

というわけでサーガパターンに優しく入門する。でした!
実際やってみる前は、サーガパターン...??何をどうするの...??:thinking:
という状態でしたが, 実際試すことで理解できたように思います。

にしてもマイクロサービス周辺のパターン覚えること、やることが多くて大変ですね。。。

マイクロサービスアーキテクチャという流行りから「とりあえずサービス分ければいいでしょ。」みたいに安易にやってしまいがちですが, こういった周辺にあるパターンを正しく扱ってよりよいアーキテクチャをシステムに当てはめれるようになりたいですね!

13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7