0
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Kafkaメッセージの受信

今日は昨日送信したメッセージを受信する処理をみていきます。

【バグ修正】

その前に1つバグを修正します。
substrate.Messageを以下の型で実装していました。

main.go
type message []byte

func (m message) Data() []byte {
    return m
}

しかし、substrateライブラリ内で以下の記述があり、sliceは==/!=の演算子で比較できない仕様からパニックを発生してしまうことになります。

sync_adapter_sink.go#L98
if msg.Message != req.m {
    panic(fmt.Sprintf("wrong message expected: %s got: %s", req.m, msg.Message))
}

というわけでmessage型を以下のように修正します。

main.go
type message struct{ data []byte }

func (m *message) Data() []byte {
    return m.data
}

呼び出し側もちょっとした変更が必要なのでレポジトリを確認してください。
修正終わり。

substrate.SynchronousMessageSourceの作成

メッセージを受信するSourceオブジェクトのインターフェイスもsubstrateライブラリから提供されています。
initialiseKafkaSource()メソッドを以下のように定義し、Sourceオブジェクトを作成します。

main.go
func initialiseKafkaSource(version, brokers, topic, consumer *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
    var kafkaOffset int64
    if *offsetOldest {
        kafkaOffset = kafka.OffsetOldest
    } else {
        kafkaOffset = kafka.OffsetNewest
    }

    source, err := kafka.NewAsyncMessageSource(kafka.AsyncMessageSourceConfig{
        ConsumerGroup: *consumer,
        Topic:         *topic,
        Brokers:       strings.Split(*brokers, ","),
        Offset:        kafkaOffset,
        Version:       *version,
    })
    if err != nil {
        return nil, err
    }

    return substrate.NewSynchronousMessageSource(source), nil
}

呼び出し側はこんな感じです。

main.go
sourceKafkaVersion := app.String(cli.StringOpt{
    Name:   "source-kafka-version",
    Desc:   "source kafka version",
    EnvVar: "SOURCE_KAFKA_VERSION",
})
sourceBrokers := app.String(cli.StringOpt{
    Name:   "source-brokers",
    Desc:   "kafka source brokers",
    EnvVar: "SOURCE_BROKERS",
    Value:  "localhost:9092",
})
consumerID := app.String(cli.StringOpt{
    Name:   "consumer-id",
    Desc:   "consumer id to connect to source",
    EnvVar: "CONSUMER_ID",
    Value:  appName,
})
kafkaOffsetOldest := app.Bool(cli.BoolOpt{
    Name:   "kafka-offset-oldest",
    Desc:   "If set to true, will start consuming from the oldest available messages",
    EnvVar: "KAFKA_OFFSET_OLDEST",
    Value:  true,
})

...

actionSource, err := initialiseKafkaSource(sourceKafkaVersion, sourceBrokers, actionTopic, consumerID, kafkaOffsetOldest)
if err != nil {
    log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()

メッセージハンドラの作成

substrate.SynchronousMessageSourceインターフェイスには以下のメソッドが定義されています。

substrate.go
type SynchronousMessageSource interface {
    ...
    // ConsumeMessages calls the `handler` function for each messages
    // available to consume.  If the handler returns no error, an
    // acknowledgement will be sent to the broker.  If an error is returned
    // by the handler, it will be propogated and returned from this
    // function.  This function will block until `ctx` is done or until an
    // error occurs.
    ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error
    ...
}

この引数となっているsubstrate.ConsumerMessageHandlerは以下のように定義されており、メッセージを処理するハンドラとしてこれを実装します。

substrate.go
// ConsumerMessageHandler is the callback function type that synchronous
// message consumers must implement.
type ConsumerMessageHandler func(context.Context, Message) error

というわけでハンドラとなるactionEventHandlerは以下の通り。

action_event_handler.go
type actionEventHandler struct {
    todoMgr todoManager
}

func newActionEventHandler(todoMgr todoManager) actionEventHandler {
    return actionEventHandler{todoMgr: todoMgr}
}

func (h actionEventHandler) handle(ctx context.Context, msg substrate.Message) error {
    var env envelope.Event
    if err := proto.Unmarshal(msg.Data(), &env); err != nil {
        return errors.Wrap(err, "failed to unmarshal message")
    }

    if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
        var ev event.CreateTodoActionEvent
        if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
            return errors.Wrap(err, "failed to unmarshal payload")
        }

        if err := h.todoMgr.projectTodo(todo{
            id:          ev.Id,
            title:       ev.Title,
            description: ev.Description,
        }); err != nil {
            return errors.Wrap(err, "failed to project a todo")
        }
    }

    return nil
}

msg.Data()からイベントまでアンマーシャルする処理は先日keyFuncの項目でみたものとよく似ていますね。
とりだしたイベントを保存する処理は同期処理時にサーバー構造体で実装したものと全く一緒です。
テストはごめんなさい割愛です。

なお、todoのidの取り扱いに関して加えた変更に従って、データ型やtodoManagerのインターフェイスも一部変更しているので合わせてご確認ください。

メッセージ受信プロセスの開始

main.go内で新たにメッセージを受信するgoroutineを開始します。

main.go
wg.Add(1)
go func() {
    defer wg.Done()

    h := newActionEventHandler(store)
    if err := actionSource.ConsumeMessages(context.Background(), h.handle); err != nil {
        errCh <- errors.Wrap(err, "failed to consume action event")
    }
}()

これでもいいのですが、少し改良を加えて平和に終了できるようにしましょう。

main.go
ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
    defer wg.Done()

    h := newActionEventHandler(store)
    if err := actionSource.ConsumeMessages(ctx, h.handle); err != nil {
        errCh <- errors.Wrap(err, "failed to consume action event")
    }
}()

...

gSrv.GracefulStop()
cancel()
wg.Wait()

これでgRPCサーバと同時にKafkaメッセージの受信プロセスが開始されます。


よしじゃあKubernetesマニフェストファイルを更新してデバッグしていこう、というところですが、サボります😇
気になる方はGitHubレポジトリを確認しておいてください。

これでTodoを保存する処理の非同期化ができました。
明日はKafkaにgRPCインターフェイスを付けてみるなんてちょっと面白いことをしたいなーと思います。では。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
0
Help us understand the problem. What are the problem?