0
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Kafkaメッセージの送信

さてではアプリからKafkaを使っていきたいと思います。
具体的に何をするかというと、POSTメソッドでTODOを保存する処理を非同期化したいと思います。

今日はそのイベント送信部分をみていきます。

uw-labs/substrate

メッセージの送信・受信を抽象化するGoライブラリも弊社から公開されています。
Kafka以外のストリームプロセスを使う際でも同じ使い勝手で実装することができます。
今回はこのライブラリの使い方も一緒にみていきましょう。

substrate.NewSynchronousMessageSink()

このsubstrateライブラリではメッセージ処理完了のシグナルをデフォルトで非同期化してパフォーマンスを上げているのですが、今回は同期バージョンであるsubstrate.NewSynchronousMessageSink()のAPIを利用したいと思います。
main.go内でメッセージシンクを設定するinitialiseKafkaSinkメソッドを以下のように定義します。

main.go
func initialiseKafkaSink(version, brokers, topic *string, keyFunc func(substrate.Message) []byte) (substrate.SynchronousMessageSink, error) {
    sink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
        Brokers: strings.Split(*brokers, ","),
        Topic:   *topic,
        KeyFunc: keyFunc,
        Version: *version,
    })
    if err != nil {
        return nil, err
    }

    return substrate.NewSynchronousMessageSink(sink), nil
}

呼び出し側は以下のような感じです。

main.go
unc main() {
    ...

    app.Action = func() {
        ...

        gSrv := initialiseGRPCServer(newServer(store))

        actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
        if err != nil {
            log.Fatalln("init action event kafka sink:", err)
        }
        defer actionSink.Close()

        errCh := make(chan error, 2)

        ...
    }

    ...
}

Kafkaのバージョンなど、必要な環境変数は以下のように定義しています。

main.go
sinkKafkaVersion := app.String(cli.StringOpt{
    Name:   "sink-kafka-version",
    Desc:   "sink kafka version",
    EnvVar: "SINK_KAFKA_VERSION",
})
sinkBrokers := app.String(cli.StringOpt{
    Name:   "sink-brokers",
    Desc:   "kafka sink brokers",
    EnvVar: "SINK_BROKERS",
    Value:  "localhost:9092",
})

actionTopic := app.String(cli.StringOpt{
    Name:   "action-topic",
    Desc:   "action topic",
    EnvVar: "ACTION_TOPIC",
    Value:  "qiita.action",
})

残るはinitialiseKafkaSinkに渡している、actionKeyFuncという関数ですが、後ほど触れたいと思います。

イベントの定義

では実際に送信するイベント、CreateTodoActionEventを定義したいと思います。

proto/event.proto
syntax = "proto3";

package event;

message CreateTodoActionEvent {
    string id = 1;
    string title = 2;
    string description = 3;
}

proto/service.proto内でTodoメッセージを定義しているのでそれを使い回してもよかったのですが、コンパイルが少し厄介なのでリファクタリング要素として残しておきます。

make protosタスクは以下のようになりました。

.PHONY: protos
protos:
    mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_EVENT_DIR) $(GENERATED_ENVELOPE_DIR)
    protoc \
        -I $(PROTO_DIR) \
        -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
        --gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
        service.proto
    protoc \
        -I $(PROTO_DIR) \
        -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
        --gogoslick_out=paths=source_relative:$(GENERATED_EVENT_DIR) \
        event.proto
    protoc \
        -I $(PROTO_DIR) \
        -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
        --gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
        envelope.proto

keyFuncの実装

先ほど後回しにした、actionKeyFuncという関数です。
これはイベントのキーを定義する関数です。
Kafkaでは同じキーを共有するメッセージは同じパーティションに入るという大事な性質があるため何をキーとして使うかという問題が無視できません。
先ほどイベント定義の際にidフィールドを足したので、今回はこれを使いたいと思います。

main.go
func actionKeyFunc(msg substrate.Message) []byte {
    var env envelope.Event
    if err := proto.Unmarshal(msg.Data(), &env); err != nil {
        panic(err)
    }

    if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
        var ev event.CreateTodoActionEvent
        if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
            panic(err)
        }

        return []byte(ev.Id)
    }

    panic("unknown event")
}

ここでは
- github.com/gogo/protobuf/proto
- github.com/gogo/protobuf/types

の2つのパッケージを使って、envelopeからイベントを取り出している様子も確認できます。

substrate.Messageの実装

メッセージ送信部分の実装に入りたいのですが、その前に1つ必要なことがあります。
メッセージ送信のAPIPublishMessage()は引数としてsubstrate.Messageインターフェイスを受け取ります。
ということで、このインターフェイスを実装する型を用意する必要があります。

必要なメソッドは1つだけ。

type Message interface {
    Data() []byte
}

以下のように型を定義します。

main.go
type message []byte

func (m message) Data() []byte {
    return m
}

メッセージの送信

よし、じゃあメッセージの送信処理です。
server.CreateTodo()メソッドは以下のようになります。

server.go
func (s *server) CreateTodo(ctx context.Context, req *service.CreateTodoRequest) (*service.CreateTodoResponse, error) {
    todoID := uuid.New().String()
    ev := &event.CreateTodoActionEvent{
        Id:          todoID,
        Title:       req.Todo.Title,
        Description: req.Todo.Description,
    }

    any, err := types.MarshalAny(ev)
    if err != nil {
        return nil, err
    }

    env := envelope.Event{
        Id:        uuid.New().String(),
        Timestamp: types.TimestampNow(),
        Payload:   any,
    }

    b, err := proto.Marshal(&env)
    if err != nil {
        return nil, err
    }

    if err := s.sink.PublishMessage(ctx, message(b)); err != nil {
        return nil, err
    }

    return &service.CreateTodoResponse{
        Success: true,
        Id:      todoID,
    }, nil
}

イベントをペイロードとしてenvelopeに包み込む部分、それから全体を[]byteにマーシャルする処理に注目してください。

サーバー構造体はsubstrate.SynchrounousMessageSinkへの依存を持ちます。

server.go
type (
    ...

    server struct {
        sink substrate.SynchronousMessageSink
    }
)

メッセージを送信するコードのテストも書くべきですが、今回はおサボり。後日触れられたらと思います。

依存関係の注入

main.go内で以下のように依存関係を注入してあげます。

diff --git a/main.go b/main.go
index be109e8..907cd41 100644
--- a/main.go
+++ b/main.go
@@ -104,8 +104,6 @@ func main() {
        }
        defer lis.Close()

-       gSrv := initialiseGRPCServer(newServer(nil))
-
        actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
        if err != nil {
            log.Fatalln("init payment account kafka sink:", err)
@@ -122,6 +120,9 @@ func main() {
        }()

        var wg sync.WaitGroup
+
+       gSrv := initialiseGRPCServer(newServer(actionSink))
+
        wg.Add(1)
        go func() {
            defer wg.Done()

実際にはメッセージを送信する前にバリデーションなどを通して本当に保存していいのかをチェックする必要があるかなと思いますが、とりあえずこれでメッセージの送信部分は完了です。


思ったよりやることが多くて飛ばし飛ばしになってしまいました。すいません。

メッセージの送信処理が完了したので、次はこれを受信し、DBに保存する処理をみていきたいと思います。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
0
Help us understand the problem. What are the problem?