3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ProtocolBuffersでRDBのトランザクションを簡単にできるようにした(マイクロサービスではない)

Posted at

プラハ大関です。
今回はよくマイクロサービスで使われるProtocolBuffersをマイクロサービスではなく単なるAPIとして使用して、
RDBのトランザクションをインターフェース時に意識せずに実装する方法を書きました。
ProtocolBuffersはフロントとサーバで同じenumを共有できたり、型ありのインターフェースを定義することである程度バリデーションの役割を果たしてくれたりと、マイクロサービスとしてだけでなくRESTの代わりとなるような使い方することもできるツールです。

記事の対象者

  • ProtocolBuffersやgRPCを多少なりとも理解していて、使ったことのある人
  • 何かしらのRDBを使用したことがある人
  • 今回の話す内容のコードはこちら

環境

使用技術 バージョン
libprotoc 3.11.4
Go 1.14
protobuf v1.23.0
macOS Catalina

ProtocolBuffersとは

データをバイト列にシリアライズするためのツールで、RPCなどに使われます。

他にもバイト列にシリアライズするツールはありますが、これのメリットとして

  • 拡張がしやすい
  • コンパクトなバイナリ形式を採用している(通信量がXMLと比べると約3~10倍程度小さいとか)
  • プログラム言語に依存しない(マイクロサービス向き)
  • データアクセスコードを自動生成してくれる

などがあります。

今回はここの話は他の記事にいっぱいあるので他の記事を参照。

RDBのトランザクションを仕込む

RDBのトランザクションをInterceptorに仕込むことで、トランザクションを意識せずにアプリケーションの実装に集中できます。

Create, Update, Deleteのプレフィックスインターフェース時はトランザクションを開始する

HTTPメソッドだとPOST, PATCH, PUT, DELETEでトランザクションを開始するようにすれば、トランザクション開始の処理を共通化できますが、
gRPCはすべてのメソッドがPOSTのため、それはできません。

そこでGoogleがProtocolBuffersを作成する際のコード規約を定義していて、それに従ってインターフェースは作成していきます。
例: CreateBook, DeleteBook, UpdateBook etc
(ただし我々のプロジェクトではGet系のインターフェースは表現力を落とさないようにするため、Get+Resource名以外にも使っています)

このようなコード規約に従うことでトランザクション開始処理を共通化できます。

var (
	transactionMethods = []string{
		"Create",
		"Update",
		"Delete",
	}
)

func isTransactionMethod(method string) bool {
	for _, tMethod := range transactionMethods {
		if strings.HasPrefix(method, tMethod) {
			return true
		}
	}
	return false
}

transactionMethodsでトランザクションを開始するメソッドのプレフィックスを定義します。
isTransactionMethodでトランザクションするかどうかを判定しています。

UnaryのInterceptor

func DBInterceptor(conn *sql.DB) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		log.Println("start interceptor")
		fullMethods := strings.Split(info.FullMethod, "/")
		method := fullMethods[len(fullMethods)-1]
		var tx *sql.Tx
		if isTransactionMethod(method) {
			log.Println("start transaction")
			transaction, err := conn.BeginTx(ctx, nil)
			if err != nil {
				return nil, xerrors.New("transaction error")
			}
			tx = transaction

			ctx = context.WithValue(ctx, config.DBKey, transaction)
		} else {
			ctx = context.WithValue(ctx, config.DBKey, conn)
		}
                // handlerの前に書かれているコードはServiceインターフェースが実行される前に処理され、handlerの後の処理はインターフェース後に実行される
		resp, err := handler(ctx, req)
		if err != nil {
			if tx != nil {
				tx.Rollback()
			}
			log.Println("rollback")
			return nil, xerrors.Errorf("fail to handle transaction: %w", err)
		}

		if tx != nil {
			if err := tx.Commit(); err != nil {
				return nil, xerrors.Errorf("fail to handle transaction commit: %w", err)
			}
			log.Println("commited")
		}

		return resp, nil
	}
}

Interceptorではhandler(ctx, req)の前に書かれているコードはServiceインターフェースが実行される前に処理され、handlerの後の処理はインターフェース後に実行される。
この処理ではまずisTransactionMethodでトランザクション処理をするかどうかをチェックして、する場合は、contextにトランザクションのポインタを詰めるようにして、handler後にエラーが返っている場合はrollbackそうでない場合は、commitするようになっています。

StreamのInterceptor

func DBInterceptorForStream(conn *sql.DB) grpc.StreamServerInterceptor {
	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
		fullMethods := strings.Split(info.FullMethod, "/")
		method := fullMethods[len(fullMethods)-1]
		var tx *sql.Tx
		ctx := ss.Context()
		wrappedStream := grpc_middleware.WrapServerStream(ss)
		if isTransactionMethod(method) {
			transaction, err := conn.BeginTx(ss.Context(), nil)
			if err != nil {
				return xerrors.Errorf("fail to begin transaction: %w", err)
			}
			tx = transaction

			ctx = context.WithValue(ctx, config.DBKey, transaction)
		} else {
			ctx = context.WithValue(ctx, config.DBKey, conn)
		}
		wrappedStream.WrappedContext = ctx
		err := handler(srv, wrappedStream)
		if err != nil {
			if tx != nil {
				tx.Rollback()
			}
			return xerrors.Errorf("fail to handle transaction: %w", err)
		}

		if tx != nil {
			if err := tx.Commit(); err != nil {
				return xerrors.Errorf("fail to handle transaction commit: %w", err)
			}
		}

		return nil
	}
}

Streamの方も基本的には同じですが、Unaryとの違いとしてはStreamだとcontextがポインタで取得できないため、WithValueをすることができないです。
なので
wrappedStream := grpc_middleware.WrapServerStream(ss)
でコンテキストをラップして

wrappedStream.WrappedContext = ctx
err := handler(srv, wrappedStream)

で生成したコンテキストを詰めるようにしている。

まとめ

  • Interceptorでトランザクションを仕込むことで、トランザクションを意識せずに実装できる
  • ProtocolBuffersだとインターフェース名のルールが必要になる

今回のサンプルコードはこちらにあります。

参考文献

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?