今日からは話題がガラリと変わりまして、Kafkaを使った非同期処理についてみていきます。
これまでgRPCを使って同期的にコミュニケーションし、データベースにデータを保存するという言ってしまえばよくあるパターンをみてきました。
一方で弊社ではKafkaに代表されるようなストリーム処理基盤を使ったイベントドリブンなアーキテクチャを採用しています。
ログとしてキューにプッシュしたデータをSource of Truthとし、たとえDBが飛んでもログを初めから読み直せば元に戻せるような設計を心がけています。
またマイクロサービス間のコミュニケーションとしてもイベントキューを介した非同期な方法を非常に多く用います。
基盤とするソフトウェアはApache Kafkaを使うチームが多いですがこれも特に決まりはなく、Benthosやその他のツールを使うチームもあります。
ログやそれを応用したイベントドリブンアーキテクチャについてはKafkaの開発元でもあるLinkedInのこちらの記事がとても参考になるので、少し長いですが興味のある方は読んでみてください。ログの考え方が変わるとてもいい記事です。
イベントフォーマット
さてではイベントキューにどういったフォーマットでデータを残すかですがここでもprotocol bufferを使いたいと思います。
protocol bufferを使うと言っても、フォーマットがチームごとにバラバラだと、"イベントが送信された時間"などドメインに関わらず共通で残したいデータがあるときなどあまり利便性がよくありません。
そんなとき各チームが自由に定義する実際のイベントを包みこみ、メタデータなどを与える共通のフォーマットがあると便利です。
弊社で使っているフォーマットはオープンソース化されていないのでそのまま紹介することができませんが、以下のような雰囲気です。
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
message Event {
string id = 1;
google.protobuf.Timestamp timestamp = 2;
google.protobuf.Any payload = 3;
}
実際のイベントはpayloadとして渡します。
今回はこちらを使っていきたいと思います。
Envelopeのコンパイル
以下の記述をMakefileのprotosタスクに追加してコンパイルします。
diff --git a/Makefile b/Makefile
index 0510df4..f5b531d 100644
--- a/Makefile
+++ b/Makefile
@@ -6,15 +6,22 @@ LINKFLAGS := -X main.gitHash=$(GIT_HASH)
PROTO_DIR := ./proto
GENERATED_DIR := ./internal/pb
GENERATED_SERVICE_DIR := $(GENERATED_DIR)/service
+GENERATED_ENVELOPE_DIR := $(GENERATED_DIR)/envelope
+ENVELOPE_PROTO_MAPPINGS := Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types
.PHONY: protos
protos:
- mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR)
+ mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_ENVELOPE_DIR)
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
service.proto
+ protoc \
+ -I $(PROTO_DIR) \
+ -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
+ --gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
+ envelope.proto
mockgen-install:
無事envelope.pb.go
ファイルがinternal/pb/envelope
以下に作成されていればOKです。
今日は導入ということでここまで。
明日はKafkaをデプロイしたいと思います。