プラハの大関です。
今回はよくマイクロサービスで使われるProtocolBuffersをマイクロサービスではなく単なるAPIとして使用して、
RDBのトランザクションをインターフェース時に意識せずに実装する方法を書きました。
ProtocolBuffersはフロントとサーバで同じenumを共有できたり、型ありのインターフェースを定義することである程度バリデーションの役割を果たしてくれたりと、マイクロサービスとしてだけでなくRESTの代わりとなるような使い方することもできるツールです。
記事の対象者
- ProtocolBuffersやgRPCを多少なりとも理解していて、使ったことのある人
- 何かしらのRDBを使用したことがある人
- 今回の話す内容のコードはこちら
環境
使用技術 | バージョン |
---|---|
libprotoc | 3.11.4 |
Go | 1.14 |
protobuf | v1.23.0 |
macOS | Catalina |
ProtocolBuffersとは
データをバイト列にシリアライズするためのツールで、RPCなどに使われます。
他にもバイト列にシリアライズするツールはありますが、これのメリットとして
- 拡張がしやすい
- コンパクトなバイナリ形式を採用している(通信量がXMLと比べると約3~10倍程度小さいとか)
- プログラム言語に依存しない(マイクロサービス向き)
- データアクセスコードを自動生成してくれる
などがあります。
今回はここの話は他の記事にいっぱいあるので他の記事を参照。
RDBのトランザクションを仕込む
RDBのトランザクションをInterceptorに仕込むことで、トランザクションを意識せずにアプリケーションの実装に集中できます。
Create, Update, Deleteのプレフィックスインターフェース時はトランザクションを開始する
HTTPメソッドだとPOST, PATCH, PUT, DELETE
でトランザクションを開始するようにすれば、トランザクション開始の処理を共通化できますが、
gRPCはすべてのメソッドがPOST
のため、それはできません。
そこでGoogleがProtocolBuffersを作成する際のコード規約を定義していて、それに従ってインターフェースは作成していきます。
例: CreateBook, DeleteBook, UpdateBook etc
(ただし我々のプロジェクトではGet系のインターフェースは表現力を落とさないようにするため、Get+Resource名
以外にも使っています)
このようなコード規約に従うことでトランザクション開始処理を共通化できます。
var (
transactionMethods = []string{
"Create",
"Update",
"Delete",
}
)
func isTransactionMethod(method string) bool {
for _, tMethod := range transactionMethods {
if strings.HasPrefix(method, tMethod) {
return true
}
}
return false
}
transactionMethods
でトランザクションを開始するメソッドのプレフィックスを定義します。
isTransactionMethod
でトランザクションするかどうかを判定しています。
UnaryのInterceptor
func DBInterceptor(conn *sql.DB) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Println("start interceptor")
fullMethods := strings.Split(info.FullMethod, "/")
method := fullMethods[len(fullMethods)-1]
var tx *sql.Tx
if isTransactionMethod(method) {
log.Println("start transaction")
transaction, err := conn.BeginTx(ctx, nil)
if err != nil {
return nil, xerrors.New("transaction error")
}
tx = transaction
ctx = context.WithValue(ctx, config.DBKey, transaction)
} else {
ctx = context.WithValue(ctx, config.DBKey, conn)
}
// handlerの前に書かれているコードはServiceインターフェースが実行される前に処理され、handlerの後の処理はインターフェース後に実行される
resp, err := handler(ctx, req)
if err != nil {
if tx != nil {
tx.Rollback()
}
log.Println("rollback")
return nil, xerrors.Errorf("fail to handle transaction: %w", err)
}
if tx != nil {
if err := tx.Commit(); err != nil {
return nil, xerrors.Errorf("fail to handle transaction commit: %w", err)
}
log.Println("commited")
}
return resp, nil
}
}
Interceptorではhandler(ctx, req)
の前に書かれているコードはServiceインターフェースが実行される前に処理され、handlerの後の処理はインターフェース後に実行される。
この処理ではまずisTransactionMethod
でトランザクション処理をするかどうかをチェックして、する場合は、contextにトランザクションのポインタを詰めるようにして、handler後にエラーが返っている場合はrollbackそうでない場合は、commitするようになっています。
StreamのInterceptor
func DBInterceptorForStream(conn *sql.DB) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fullMethods := strings.Split(info.FullMethod, "/")
method := fullMethods[len(fullMethods)-1]
var tx *sql.Tx
ctx := ss.Context()
wrappedStream := grpc_middleware.WrapServerStream(ss)
if isTransactionMethod(method) {
transaction, err := conn.BeginTx(ss.Context(), nil)
if err != nil {
return xerrors.Errorf("fail to begin transaction: %w", err)
}
tx = transaction
ctx = context.WithValue(ctx, config.DBKey, transaction)
} else {
ctx = context.WithValue(ctx, config.DBKey, conn)
}
wrappedStream.WrappedContext = ctx
err := handler(srv, wrappedStream)
if err != nil {
if tx != nil {
tx.Rollback()
}
return xerrors.Errorf("fail to handle transaction: %w", err)
}
if tx != nil {
if err := tx.Commit(); err != nil {
return xerrors.Errorf("fail to handle transaction commit: %w", err)
}
}
return nil
}
}
Streamの方も基本的には同じですが、Unaryとの違いとしてはStreamだとcontextがポインタで取得できないため、WithValue
をすることができないです。
なので
wrappedStream := grpc_middleware.WrapServerStream(ss)
でコンテキストをラップして
wrappedStream.WrappedContext = ctx
err := handler(srv, wrappedStream)
で生成したコンテキストを詰めるようにしている。
まとめ
- Interceptorでトランザクションを仕込むことで、トランザクションを意識せずに実装できる
- ProtocolBuffersだとインターフェース名のルールが必要になる
今回のサンプルコードはこちらにあります。