0
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

gRPCでKafkaにアクセス

ある日こう思ったとします。
「あ、Kafka飽きた。NATS使お。」

ところがもしKafkaに他のチームからのアクセスがあった場合、彼らに変更をお願いする必要があります。
彼らは大体忙しいでしょう。そう簡単にNATSに変更することはできなくなります。

そんなとき、Kafkaに直接アクセスする代わりに、メッセージシステムを抽象化したインターフェイスを通してアクセスしてもらうことで内部の実装を隠蔽してしまうと便利です。

今回紹介するのはまたもや弊社謹製の uw-labs/proximo というライブラリ(とGoのサーバー実装)です。
このライブラリでは以下のように、メッセージングシステムのインターフェイスをprotocol bufferの形で定義しています。

proximo.proto
syntax = "proto3";

package proximo;

message Message {
  bytes data = 1;
  string id = 2;
}

// Consumer types
service MessageSource {
  rpc Consume(stream ConsumerRequest) returns (stream Message) {
  }
}

message ConsumerRequest {
  // expected if this is a start request
  StartConsumeRequest startRequest = 2;
  // expected if this is a confirmation
  Confirmation confirmation = 3;
}

message StartConsumeRequest {
  string topic = 1;
  string consumer = 2;
  Offset initial_offset = 3;
}

enum Offset {
  OFFSET_DEFAULT = 0;
  OFFSET_NEWEST = 1;
  OFFSET_OLDEST = 2;
}

message Confirmation {
  string msgID = 1;
}

// Producer types
service MessageSink {
  rpc Publish(stream PublisherRequest) returns (stream Confirmation) {
  }
}

message PublisherRequest {
  // expected if this is a start request
  StartPublishRequest startRequest = 2;
  // expected if this is a message
  Message msg = 3;
}

message StartPublishRequest {
  string topic = 1;
}

送信・受信ともに双方向ストリームRPCを使用しているのにご注目ください。
このインターフェイスを実装するgRPCサーバーを用意することで、Kafkaを使っていることをユーザーから隠すことができます。
レポジトリにはGoによるサーバー実装およびパブリックなDockerイメージが用意されているため、これを使います。

Proximoのデプロイ

以下のマニフェストファイルでKafkaをバックエンドとして使用するproximoをデプロイします。

kubernetes/proximo.yaml
---
apiVersion: v1
kind: Service
metadata:
  annotations:
    prometheus.io/scrape: 'true'
    prometheus.io/path:   /__/metrics
    prometheus.io/port:   '8080'
  name: &app proximo
  namespace: &ns qiita
  labels:
    app: *app
spec:
  ports:
    - name: app
      port: 6868
      protocol: TCP
      targetPort: 6868
    - name: http
      port: 80
      protocol: TCP
      targetPort: 8080
  selector:
    app: *app
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: &app proximo
  name: *app
  namespace: &ns qiita
spec:
  replicas: 1
  selector:
    matchLabels:
      app: *app
  template:
    metadata:
      labels:
        app: *app
      namespace: *ns
    spec:
      containers:
        - name: *app
          image: quay.io/utilitywarehouse/proximo:latest
          args:
            - /proximo-server
            - kafka
          env:
            - name: PROXIMO_KAFKA_VERSION
              valueFrom:
                configMapKeyRef:
                  name: kafka-brokers
                  key: internal.kafka.broker.version
            - name: PROXIMO_KAFKA_BROKERS
              valueFrom:
                configMapKeyRef:
                  name: kafka-brokers
                  key: internal.kafka.brokers
            - name: PROXIMO_PROBE_PORT
              value: "8080"
            - name: PROXIMO_PORT
              value: "6868"
          imagePullPolicy: Always
          livenessProbe:
            failureThreshold: 3
            httpGet:
              path: /__/ready
              port: 8080
              scheme: HTTP
            initialDelaySeconds: 15
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          ports:
            - containerPort: 6868
              name: proximo
              protocol: TCP
            - containerPort: 8080
              name: proximo-probe
              protocol: TCP
          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /__/ready
              port: 8080
              scheme: HTTP
            initialDelaySeconds: 15
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          resources:
            limits:
              memory: 512Mi
---

このマニフェストをもとにデプロイします。

$ kubectl apply -f kubernetes/proximo.yaml
service/proximo created
deployment.apps/proximo created

$ kubectl -n qiita get pods proximo-684b45f898-9vvvr
NAME                       READY   STATUS    RESTARTS   AGE
proximo-684b45f898-9vvvr   1/1     Running   0          36s

Proximo経由のメッセージ受信

ユースケースとしては別ネームスペースからの使用を想定していますが、今回は前回実装したアプリ内での受信をProximoに変更したいと思います。

どのようにProximoにアクセスするかですが、ここでもsubstrateライブラリが役に立ちます。
substrate/proximoパッケージを使い、initialiseProximoSource()関数を以下のように定義します。

main.go
func initialiseProximoSource(addr, consumerID, topic *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
    var proximoOffset proximo.Offset
    if *offsetOldest {
        proximoOffset = proximo.OffsetOldest
    } else {
        proximoOffset = proximo.OffsetNewest
    }

    source, err := proximo.NewAsyncMessageSource(proximo.AsyncMessageSourceConfig{
        ConsumerGroup: *consumerID,
        Topic:         *topic,
        Broker:        *addr,
        Offset:        proximoOffset,
        Insecure:      true,
    })
    if err != nil {
        return nil, err
    }
    return substrate.NewSynchronousMessageSource(source), nil
}

呼び出し側は以下のように変更します。

main.go
proximoAddr := app.String(cli.StringOpt{
    Desc:   "proximo endpoint",
    Name:   "proximo-addr",
    EnvVar: "PROXIMO_ADDR",
    Value:  "proximo:6868",
})
proximoOffsetOldest := app.Bool(cli.BoolOpt{
    Name:   "proximo-offset-oldest",
    Desc:   "If set to true, will start consuming from the oldest available messages",
    EnvVar: "PROXIMO_OFFSET_OLDEST",
    Value:  true,
})

...

actionSource, err := initialiseProximoSource(proximoAddr, consumerID, actionTopic, proximoOffsetOldest)
if err != nil {
    log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()

以上の部分にはKafkaへの依存は含まれておらず、バックエンドシステムがなんであるか気をせず実装できています。
デプロイしたらデバッグしてみましょう。

Screenshot 2019-12-17 at 14.54.05.png

gRPCは無事成功、新たなTodoのIDを返しました。
ではDBを覗いてみます。

$ kubectl -n qiita exec -ti cockroachdb-0 -- /cockroach/cockroach sql --url postgresql://root@localhost:26257/qiita_advent_calendar_2019_db?sslmode=disable
...
root@localhost:26257/qiita_advent_calendar_2019_db> select * from todo where id = '71face5d-404e-4cf3-b831-dd33da5147a2';
                   id                  |   title   |                  description                   
+--------------------------------------+-----------+-----------------------------------------------+
  71face5d-404e-4cf3-b831-dd33da5147a2 | Holidays! | Pack your items and prepare for the holidays!  
(1 row)

Time: 1.328365ms

DBへの保存も無事できています:)


というわけで、ここまでマイクロサービスの立ち上げからgRPC、DBと辿りKafkaを使った非同期コミュニーケーションとみてきました。
アドベントカレンダーはまだ残り日数ありますが、一番紹介したかったことはここまでなのでひとまず本編これにて終了としたいと思います。

他にもいくつか触れたいトピックはあるのであとはマイペースに更新していきたいと思います。
ここまで読んでくださりありがとうございました:)

それではみなさまメリークリスマス&よいお年を!

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
0
Help us understand the problem. What are the problem?