LoginSignup
0
0

More than 3 years have passed since last update.

今日は昨日送信したメッセージを受信する処理をみていきます。

【バグ修正】

その前に1つバグを修正します。
substrate.Messageを以下の型で実装していました。

main.go
type message []byte

func (m message) Data() []byte {
    return m
}

しかし、substrateライブラリ内で以下の記述があり、sliceは==/!=の演算子で比較できない仕様からパニックを発生してしまうことになります。

sync_adapter_sink.go#L98
if msg.Message != req.m {
    panic(fmt.Sprintf("wrong message expected: %s got: %s", req.m, msg.Message))
}

というわけでmessage型を以下のように修正します。

main.go
type message struct{ data []byte }

func (m *message) Data() []byte {
    return m.data
}

呼び出し側もちょっとした変更が必要なのでレポジトリを確認してください。
修正終わり。

substrate.SynchronousMessageSourceの作成

メッセージを受信するSourceオブジェクトのインターフェイスもsubstrateライブラリから提供されています。
initialiseKafkaSource()メソッドを以下のように定義し、Sourceオブジェクトを作成します。

main.go
func initialiseKafkaSource(version, brokers, topic, consumer *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
    var kafkaOffset int64
    if *offsetOldest {
        kafkaOffset = kafka.OffsetOldest
    } else {
        kafkaOffset = kafka.OffsetNewest
    }

    source, err := kafka.NewAsyncMessageSource(kafka.AsyncMessageSourceConfig{
        ConsumerGroup: *consumer,
        Topic:         *topic,
        Brokers:       strings.Split(*brokers, ","),
        Offset:        kafkaOffset,
        Version:       *version,
    })
    if err != nil {
        return nil, err
    }

    return substrate.NewSynchronousMessageSource(source), nil
}

呼び出し側はこんな感じです。

main.go
sourceKafkaVersion := app.String(cli.StringOpt{
    Name:   "source-kafka-version",
    Desc:   "source kafka version",
    EnvVar: "SOURCE_KAFKA_VERSION",
})
sourceBrokers := app.String(cli.StringOpt{
    Name:   "source-brokers",
    Desc:   "kafka source brokers",
    EnvVar: "SOURCE_BROKERS",
    Value:  "localhost:9092",
})
consumerID := app.String(cli.StringOpt{
    Name:   "consumer-id",
    Desc:   "consumer id to connect to source",
    EnvVar: "CONSUMER_ID",
    Value:  appName,
})
kafkaOffsetOldest := app.Bool(cli.BoolOpt{
    Name:   "kafka-offset-oldest",
    Desc:   "If set to true, will start consuming from the oldest available messages",
    EnvVar: "KAFKA_OFFSET_OLDEST",
    Value:  true,
})

...

actionSource, err := initialiseKafkaSource(sourceKafkaVersion, sourceBrokers, actionTopic, consumerID, kafkaOffsetOldest)
if err != nil {
    log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()

メッセージハンドラの作成

substrate.SynchronousMessageSourceインターフェイスには以下のメソッドが定義されています。

substrate.go
type SynchronousMessageSource interface {
    ...
    // ConsumeMessages calls the `handler` function for each messages
    // available to consume.  If the handler returns no error, an
    // acknowledgement will be sent to the broker.  If an error is returned
    // by the handler, it will be propogated and returned from this
    // function.  This function will block until `ctx` is done or until an
    // error occurs.
    ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error
    ...
}

この引数となっているsubstrate.ConsumerMessageHandlerは以下のように定義されており、メッセージを処理するハンドラとしてこれを実装します。

substrate.go
// ConsumerMessageHandler is the callback function type that synchronous
// message consumers must implement.
type ConsumerMessageHandler func(context.Context, Message) error

というわけでハンドラとなるactionEventHandlerは以下の通り。

action_event_handler.go
type actionEventHandler struct {
    todoMgr todoManager
}

func newActionEventHandler(todoMgr todoManager) actionEventHandler {
    return actionEventHandler{todoMgr: todoMgr}
}

func (h actionEventHandler) handle(ctx context.Context, msg substrate.Message) error {
    var env envelope.Event
    if err := proto.Unmarshal(msg.Data(), &env); err != nil {
        return errors.Wrap(err, "failed to unmarshal message")
    }

    if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
        var ev event.CreateTodoActionEvent
        if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
            return errors.Wrap(err, "failed to unmarshal payload")
        }

        if err := h.todoMgr.projectTodo(todo{
            id:          ev.Id,
            title:       ev.Title,
            description: ev.Description,
        }); err != nil {
            return errors.Wrap(err, "failed to project a todo")
        }
    }

    return nil
}

msg.Data()からイベントまでアンマーシャルする処理は先日keyFuncの項目でみたものとよく似ていますね。
とりだしたイベントを保存する処理は同期処理時にサーバー構造体で実装したものと全く一緒です。
テストはごめんなさい割愛です。

なお、todoのidの取り扱いに関して加えた変更に従って、データ型やtodoManagerのインターフェイスも一部変更しているので合わせてご確認ください。

メッセージ受信プロセスの開始

main.go内で新たにメッセージを受信するgoroutineを開始します。

main.go
wg.Add(1)
go func() {
    defer wg.Done()

    h := newActionEventHandler(store)
    if err := actionSource.ConsumeMessages(context.Background(), h.handle); err != nil {
        errCh <- errors.Wrap(err, "failed to consume action event")
    }
}()

これでもいいのですが、少し改良を加えて平和に終了できるようにしましょう。

main.go
ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
    defer wg.Done()

    h := newActionEventHandler(store)
    if err := actionSource.ConsumeMessages(ctx, h.handle); err != nil {
        errCh <- errors.Wrap(err, "failed to consume action event")
    }
}()

...

gSrv.GracefulStop()
cancel()
wg.Wait()

これでgRPCサーバと同時にKafkaメッセージの受信プロセスが開始されます。


よしじゃあKubernetesマニフェストファイルを更新してデバッグしていこう、というところですが、サボります😇
気になる方はGitHubレポジトリを確認しておいてください。

これでTodoを保存する処理の非同期化ができました。
明日はKafkaにgRPCインターフェイスを付けてみるなんてちょっと面白いことをしたいなーと思います。では。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0