今日は昨日送信したメッセージを受信する処理をみていきます。
【バグ修正】
その前に1つバグを修正します。
substrate.Message
を以下の型で実装していました。
type message []byte
func (m message) Data() []byte {
return m
}
しかし、substrateライブラリ内で以下の記述があり、sliceは==
/!=
の演算子で比較できない仕様からパニックを発生してしまうことになります。
if msg.Message != req.m {
panic(fmt.Sprintf("wrong message expected: %s got: %s", req.m, msg.Message))
}
というわけでmessage
型を以下のように修正します。
type message struct{ data []byte }
func (m *message) Data() []byte {
return m.data
}
呼び出し側もちょっとした変更が必要なのでレポジトリを確認してください。
修正終わり。
substrate.SynchronousMessageSourceの作成
メッセージを受信するSourceオブジェクトのインターフェイスもsubstrateライブラリから提供されています。
initialiseKafkaSource()
メソッドを以下のように定義し、Sourceオブジェクトを作成します。
func initialiseKafkaSource(version, brokers, topic, consumer *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
var kafkaOffset int64
if *offsetOldest {
kafkaOffset = kafka.OffsetOldest
} else {
kafkaOffset = kafka.OffsetNewest
}
source, err := kafka.NewAsyncMessageSource(kafka.AsyncMessageSourceConfig{
ConsumerGroup: *consumer,
Topic: *topic,
Brokers: strings.Split(*brokers, ","),
Offset: kafkaOffset,
Version: *version,
})
if err != nil {
return nil, err
}
return substrate.NewSynchronousMessageSource(source), nil
}
呼び出し側はこんな感じです。
sourceKafkaVersion := app.String(cli.StringOpt{
Name: "source-kafka-version",
Desc: "source kafka version",
EnvVar: "SOURCE_KAFKA_VERSION",
})
sourceBrokers := app.String(cli.StringOpt{
Name: "source-brokers",
Desc: "kafka source brokers",
EnvVar: "SOURCE_BROKERS",
Value: "localhost:9092",
})
consumerID := app.String(cli.StringOpt{
Name: "consumer-id",
Desc: "consumer id to connect to source",
EnvVar: "CONSUMER_ID",
Value: appName,
})
kafkaOffsetOldest := app.Bool(cli.BoolOpt{
Name: "kafka-offset-oldest",
Desc: "If set to true, will start consuming from the oldest available messages",
EnvVar: "KAFKA_OFFSET_OLDEST",
Value: true,
})
...
actionSource, err := initialiseKafkaSource(sourceKafkaVersion, sourceBrokers, actionTopic, consumerID, kafkaOffsetOldest)
if err != nil {
log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()
メッセージハンドラの作成
substrate.SynchronousMessageSource
インターフェイスには以下のメソッドが定義されています。
type SynchronousMessageSource interface {
...
// ConsumeMessages calls the `handler` function for each messages
// available to consume. If the handler returns no error, an
// acknowledgement will be sent to the broker. If an error is returned
// by the handler, it will be propogated and returned from this
// function. This function will block until `ctx` is done or until an
// error occurs.
ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error
...
}
この引数となっているsubstrate.ConsumerMessageHandler
は以下のように定義されており、メッセージを処理するハンドラとしてこれを実装します。
// ConsumerMessageHandler is the callback function type that synchronous
// message consumers must implement.
type ConsumerMessageHandler func(context.Context, Message) error
というわけでハンドラとなるactionEventHandler
は以下の通り。
type actionEventHandler struct {
todoMgr todoManager
}
func newActionEventHandler(todoMgr todoManager) actionEventHandler {
return actionEventHandler{todoMgr: todoMgr}
}
func (h actionEventHandler) handle(ctx context.Context, msg substrate.Message) error {
var env envelope.Event
if err := proto.Unmarshal(msg.Data(), &env); err != nil {
return errors.Wrap(err, "failed to unmarshal message")
}
if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
var ev event.CreateTodoActionEvent
if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
return errors.Wrap(err, "failed to unmarshal payload")
}
if err := h.todoMgr.projectTodo(todo{
id: ev.Id,
title: ev.Title,
description: ev.Description,
}); err != nil {
return errors.Wrap(err, "failed to project a todo")
}
}
return nil
}
msg.Data()
からイベントまでアンマーシャルする処理は先日keyFuncの項目でみたものとよく似ていますね。
とりだしたイベントを保存する処理は同期処理時にサーバー構造体で実装したものと全く一緒です。
テストはごめんなさい割愛です。
なお、todoのidの取り扱いに関して加えた変更に従って、データ型やtodoManager
のインターフェイスも一部変更しているので合わせてご確認ください。
メッセージ受信プロセスの開始
main.go内で新たにメッセージを受信するgoroutineを開始します。
wg.Add(1)
go func() {
defer wg.Done()
h := newActionEventHandler(store)
if err := actionSource.ConsumeMessages(context.Background(), h.handle); err != nil {
errCh <- errors.Wrap(err, "failed to consume action event")
}
}()
これでもいいのですが、少し改良を加えて平和に終了できるようにしましょう。
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
h := newActionEventHandler(store)
if err := actionSource.ConsumeMessages(ctx, h.handle); err != nil {
errCh <- errors.Wrap(err, "failed to consume action event")
}
}()
...
gSrv.GracefulStop()
cancel()
wg.Wait()
これでgRPCサーバと同時にKafkaメッセージの受信プロセスが開始されます。
よしじゃあKubernetesマニフェストファイルを更新してデバッグしていこう、というところですが、サボります😇
気になる方はGitHubレポジトリを確認しておいてください。
これでTodoを保存する処理の非同期化ができました。
明日はKafkaにgRPCインターフェイスを付けてみるなんてちょっと面白いことをしたいなーと思います。では。