さてではアプリからKafkaを使っていきたいと思います。
具体的に何をするかというと、POSTメソッドでTODOを保存する処理を非同期化したいと思います。
今日はそのイベント送信部分をみていきます。
uw-labs/substrate
メッセージの送信・受信を抽象化するGoライブラリも弊社から公開されています。
Kafka以外のストリームプロセスを使う際でも同じ使い勝手で実装することができます。
今回はこのライブラリの使い方も一緒にみていきましょう。
substrate.NewSynchronousMessageSink()
このsubstrateライブラリではメッセージ処理完了のシグナルをデフォルトで非同期化してパフォーマンスを上げているのですが、今回は同期バージョンであるsubstrate.NewSynchronousMessageSink()
のAPIを利用したいと思います。
main.go
内でメッセージシンクを設定するinitialiseKafkaSink
メソッドを以下のように定義します。
func initialiseKafkaSink(version, brokers, topic *string, keyFunc func(substrate.Message) []byte) (substrate.SynchronousMessageSink, error) {
sink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
Brokers: strings.Split(*brokers, ","),
Topic: *topic,
KeyFunc: keyFunc,
Version: *version,
})
if err != nil {
return nil, err
}
return substrate.NewSynchronousMessageSink(sink), nil
}
呼び出し側は以下のような感じです。
unc main() {
...
app.Action = func() {
...
gSrv := initialiseGRPCServer(newServer(store))
actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
if err != nil {
log.Fatalln("init action event kafka sink:", err)
}
defer actionSink.Close()
errCh := make(chan error, 2)
...
}
...
}
Kafkaのバージョンなど、必要な環境変数は以下のように定義しています。
sinkKafkaVersion := app.String(cli.StringOpt{
Name: "sink-kafka-version",
Desc: "sink kafka version",
EnvVar: "SINK_KAFKA_VERSION",
})
sinkBrokers := app.String(cli.StringOpt{
Name: "sink-brokers",
Desc: "kafka sink brokers",
EnvVar: "SINK_BROKERS",
Value: "localhost:9092",
})
actionTopic := app.String(cli.StringOpt{
Name: "action-topic",
Desc: "action topic",
EnvVar: "ACTION_TOPIC",
Value: "qiita.action",
})
残るはinitialiseKafkaSinkに渡している、actionKeyFunc
という関数ですが、後ほど触れたいと思います。
イベントの定義
では実際に送信するイベント、CreateTodoActionEvent
を定義したいと思います。
syntax = "proto3";
package event;
message CreateTodoActionEvent {
string id = 1;
string title = 2;
string description = 3;
}
proto/service.proto
内でTodo
メッセージを定義しているのでそれを使い回してもよかったのですが、コンパイルが少し厄介なのでリファクタリング要素として残しておきます。
make protos
タスクは以下のようになりました。
.PHONY: protos
protos:
mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_EVENT_DIR) $(GENERATED_ENVELOPE_DIR)
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
service.proto
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=paths=source_relative:$(GENERATED_EVENT_DIR) \
event.proto
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
envelope.proto
keyFuncの実装
先ほど後回しにした、actionKeyFunc
という関数です。
これはイベントのキーを定義する関数です。
Kafkaでは同じキーを共有するメッセージは同じパーティションに入るという大事な性質があるため何をキーとして使うかという問題が無視できません。
先ほどイベント定義の際にid
フィールドを足したので、今回はこれを使いたいと思います。
func actionKeyFunc(msg substrate.Message) []byte {
var env envelope.Event
if err := proto.Unmarshal(msg.Data(), &env); err != nil {
panic(err)
}
if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
var ev event.CreateTodoActionEvent
if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
panic(err)
}
return []byte(ev.Id)
}
panic("unknown event")
}
ここでは
の2つのパッケージを使って、envelopeからイベントを取り出している様子も確認できます。
substrate.Messageの実装
メッセージ送信部分の実装に入りたいのですが、その前に1つ必要なことがあります。
メッセージ送信のAPIPublishMessage()
は引数としてsubstrate.Message
インターフェイスを受け取ります。
ということで、このインターフェイスを実装する型を用意する必要があります。
必要なメソッドは1つだけ。
type Message interface {
Data() []byte
}
以下のように型を定義します。
type message []byte
func (m message) Data() []byte {
return m
}
メッセージの送信
よし、じゃあメッセージの送信処理です。
server.CreateTodo()
メソッドは以下のようになります。
func (s *server) CreateTodo(ctx context.Context, req *service.CreateTodoRequest) (*service.CreateTodoResponse, error) {
todoID := uuid.New().String()
ev := &event.CreateTodoActionEvent{
Id: todoID,
Title: req.Todo.Title,
Description: req.Todo.Description,
}
any, err := types.MarshalAny(ev)
if err != nil {
return nil, err
}
env := envelope.Event{
Id: uuid.New().String(),
Timestamp: types.TimestampNow(),
Payload: any,
}
b, err := proto.Marshal(&env)
if err != nil {
return nil, err
}
if err := s.sink.PublishMessage(ctx, message(b)); err != nil {
return nil, err
}
return &service.CreateTodoResponse{
Success: true,
Id: todoID,
}, nil
}
イベントをペイロードとしてenvelopeに包み込む部分、それから全体を[]byte
にマーシャルする処理に注目してください。
サーバー構造体はsubstrate.SynchrounousMessageSink
への依存を持ちます。
type (
...
server struct {
sink substrate.SynchronousMessageSink
}
)
メッセージを送信するコードのテストも書くべきですが、今回はおサボり。後日触れられたらと思います。
依存関係の注入
main.go
内で以下のように依存関係を注入してあげます。
diff --git a/main.go b/main.go
index be109e8..907cd41 100644
--- a/main.go
+++ b/main.go
@@ -104,8 +104,6 @@ func main() {
}
defer lis.Close()
- gSrv := initialiseGRPCServer(newServer(nil))
-
actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
if err != nil {
log.Fatalln("init payment account kafka sink:", err)
@@ -122,6 +120,9 @@ func main() {
}()
var wg sync.WaitGroup
+
+ gSrv := initialiseGRPCServer(newServer(actionSink))
+
wg.Add(1)
go func() {
defer wg.Done()
実際にはメッセージを送信する前にバリデーションなどを通して本当に保存していいのかをチェックする必要があるかなと思いますが、とりあえずこれでメッセージの送信部分は完了です。
思ったよりやることが多くて飛ばし飛ばしになってしまいました。すいません。
メッセージの送信処理が完了したので、次はこれを受信し、DBに保存する処理をみていきたいと思います。