Help us understand the problem. What is going on with this article?

ProtocolBuffersでRDBのトランザクションを簡単にできるようにした(マイクロサービスではない)

プラハ大関です。
今回はよくマイクロサービスで使われるProtocolBuffersをマイクロサービスではなく単なるAPIとして使用して、
RDBのトランザクションをインターフェース時に意識せずに実装する方法を書きました。
ProtocolBuffersはフロントとサーバで同じenumを共有できたり、型ありのインターフェースを定義することである程度バリデーションの役割を果たしてくれたりと、マイクロサービスとしてだけでなくRESTの代わりとなるような使い方することもできるツールです。

記事の対象者

  • ProtocolBuffersやgRPCを多少なりとも理解していて、使ったことのある人
  • 何かしらのRDBを使用したことがある人
  • 今回の話す内容のコードはこちら

環境

使用技術 バージョン
libprotoc 3.11.4
Go 1.14
protobuf v1.23.0
macOS Catalina

ProtocolBuffersとは

データをバイト列にシリアライズするためのツールで、RPCなどに使われます。

他にもバイト列にシリアライズするツールはありますが、これのメリットとして
- 拡張がしやすい
- コンパクトなバイナリ形式を採用している(通信量がXMLと比べると約3~10倍程度小さいとか)
- プログラム言語に依存しない(マイクロサービス向き)
- データアクセスコードを自動生成してくれる

などがあります。

今回はここの話は他の記事にいっぱいあるので他の記事を参照。

RDBのトランザクションを仕込む

RDBのトランザクションをInterceptorに仕込むことで、トランザクションを意識せずにアプリケーションの実装に集中できます。

Create, Update, Deleteのプレフィックスインターフェース時はトランザクションを開始する

HTTPメソッドだとPOST, PATCH, PUT, DELETEでトランザクションを開始するようにすれば、トランザクション開始の処理を共通化できますが、
gRPCはすべてのメソッドがPOSTのため、それはできません。

そこでGoogleがProtocolBuffersを作成する際のコード規約を定義していて、それに従ってインターフェースは作成していきます。
例: CreateBook, DeleteBook, UpdateBook etc
(ただし我々のプロジェクトではGet系のインターフェースは表現力を落とさないようにするため、Get+Resource名以外にも使っています)

このようなコード規約に従うことでトランザクション開始処理を共通化できます。

var (
    transactionMethods = []string{
        "Create",
        "Update",
        "Delete",
    }
)

func isTransactionMethod(method string) bool {
    for _, tMethod := range transactionMethods {
        if strings.HasPrefix(method, tMethod) {
            return true
        }
    }
    return false
}

transactionMethodsでトランザクションを開始するメソッドのプレフィックスを定義します。
isTransactionMethodでトランザクションするかどうかを判定しています。

UnaryのInterceptor

func DBInterceptor(conn *sql.DB) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        log.Println("start interceptor")
        fullMethods := strings.Split(info.FullMethod, "/")
        method := fullMethods[len(fullMethods)-1]
        var tx *sql.Tx
        if isTransactionMethod(method) {
            log.Println("start transaction")
            transaction, err := conn.BeginTx(ctx, nil)
            if err != nil {
                return nil, xerrors.New("transaction error")
            }
            tx = transaction

            ctx = context.WithValue(ctx, config.DBKey, transaction)
        } else {
            ctx = context.WithValue(ctx, config.DBKey, conn)
        }
                // handlerの前に書かれているコードはServiceインターフェースが実行される前に処理され、handlerの後の処理はインターフェース後に実行される
        resp, err := handler(ctx, req)
        if err != nil {
            if tx != nil {
                tx.Rollback()
            }
            log.Println("rollback")
            return nil, xerrors.Errorf("fail to handle transaction: %w", err)
        }

        if tx != nil {
            if err := tx.Commit(); err != nil {
                return nil, xerrors.Errorf("fail to handle transaction commit: %w", err)
            }
            log.Println("commited")
        }

        return resp, nil
    }
}

Interceptorではhandler(ctx, req)の前に書かれているコードはServiceインターフェースが実行される前に処理され、handlerの後の処理はインターフェース後に実行される。
この処理ではまずisTransactionMethodでトランザクション処理をするかどうかをチェックして、する場合は、contextにトランザクションのポインタを詰めるようにして、handler後にエラーが返っている場合はrollbackそうでない場合は、commitするようになっています。

StreamのInterceptor

func DBInterceptorForStream(conn *sql.DB) grpc.StreamServerInterceptor {
    return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        fullMethods := strings.Split(info.FullMethod, "/")
        method := fullMethods[len(fullMethods)-1]
        var tx *sql.Tx
        ctx := ss.Context()
        wrappedStream := grpc_middleware.WrapServerStream(ss)
        if isTransactionMethod(method) {
            transaction, err := conn.BeginTx(ss.Context(), nil)
            if err != nil {
                return xerrors.Errorf("fail to begin transaction: %w", err)
            }
            tx = transaction

            ctx = context.WithValue(ctx, config.DBKey, transaction)
        } else {
            ctx = context.WithValue(ctx, config.DBKey, conn)
        }
        wrappedStream.WrappedContext = ctx
        err := handler(srv, wrappedStream)
        if err != nil {
            if tx != nil {
                tx.Rollback()
            }
            return xerrors.Errorf("fail to handle transaction: %w", err)
        }

        if tx != nil {
            if err := tx.Commit(); err != nil {
                return xerrors.Errorf("fail to handle transaction commit: %w", err)
            }
        }

        return nil
    }
}

Streamの方も基本的には同じですが、Unaryとの違いとしてはStreamだとcontextがポインタで取得できないため、WithValueをすることができないです。
なので
wrappedStream := grpc_middleware.WrapServerStream(ss)
でコンテキストをラップして

wrappedStream.WrappedContext = ctx
err := handler(srv, wrappedStream)

で生成したコンテキストを詰めるようにしている。

まとめ

  • Interceptorでトランザクションを仕込むことで、トランザクションを意識せずに実装できる
  • ProtocolBuffersだとインターフェース名のルールが必要になる

今回のサンプルコードはこちらにあります。

https://github.com/revenue-hack/protocolbuf-transaction-sample

参考文献

revenue-hack
サーバサイドメインで、インフラ、アプリ、フロントとなぜか手広くやってます
praha-inc
受託開発と自社開発を並走する「自給自足型スタートアップ」
https://www.praha-inc.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした