ある日こう思ったとします。
「あ、Kafka飽きた。NATS使お。」
ところがもしKafkaに他のチームからのアクセスがあった場合、彼らに変更をお願いする必要があります。
彼らは大体忙しいでしょう。そう簡単にNATSに変更することはできなくなります。
そんなとき、Kafkaに直接アクセスする代わりに、メッセージシステムを抽象化したインターフェイスを通してアクセスしてもらうことで内部の実装を隠蔽してしまうと便利です。
今回紹介するのはまたもや弊社謹製の uw-labs/proximo というライブラリ(とGoのサーバー実装)です。
このライブラリでは以下のように、メッセージングシステムのインターフェイスをprotocol bufferの形で定義しています。
syntax = "proto3";
package proximo;
message Message {
bytes data = 1;
string id = 2;
}
// Consumer types
service MessageSource {
rpc Consume(stream ConsumerRequest) returns (stream Message) {
}
}
message ConsumerRequest {
// expected if this is a start request
StartConsumeRequest startRequest = 2;
// expected if this is a confirmation
Confirmation confirmation = 3;
}
message StartConsumeRequest {
string topic = 1;
string consumer = 2;
Offset initial_offset = 3;
}
enum Offset {
OFFSET_DEFAULT = 0;
OFFSET_NEWEST = 1;
OFFSET_OLDEST = 2;
}
message Confirmation {
string msgID = 1;
}
// Producer types
service MessageSink {
rpc Publish(stream PublisherRequest) returns (stream Confirmation) {
}
}
message PublisherRequest {
// expected if this is a start request
StartPublishRequest startRequest = 2;
// expected if this is a message
Message msg = 3;
}
message StartPublishRequest {
string topic = 1;
}
送信・受信ともに双方向ストリームRPCを使用しているのにご注目ください。
このインターフェイスを実装するgRPCサーバーを用意することで、Kafkaを使っていることをユーザーから隠すことができます。
レポジトリにはGoによるサーバー実装およびパブリックなDockerイメージが用意されているため、これを使います。
Proximoのデプロイ
以下のマニフェストファイルでKafkaをバックエンドとして使用するproximoをデプロイします。
---
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: /__/metrics
prometheus.io/port: '8080'
name: &app proximo
namespace: &ns qiita
labels:
app: *app
spec:
ports:
- name: app
port: 6868
protocol: TCP
targetPort: 6868
- name: http
port: 80
protocol: TCP
targetPort: 8080
selector:
app: *app
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: &app proximo
name: *app
namespace: &ns qiita
spec:
replicas: 1
selector:
matchLabels:
app: *app
template:
metadata:
labels:
app: *app
namespace: *ns
spec:
containers:
- name: *app
image: quay.io/utilitywarehouse/proximo:latest
args:
- /proximo-server
- kafka
env:
- name: PROXIMO_KAFKA_VERSION
valueFrom:
configMapKeyRef:
name: kafka-brokers
key: internal.kafka.broker.version
- name: PROXIMO_KAFKA_BROKERS
valueFrom:
configMapKeyRef:
name: kafka-brokers
key: internal.kafka.brokers
- name: PROXIMO_PROBE_PORT
value: "8080"
- name: PROXIMO_PORT
value: "6868"
imagePullPolicy: Always
livenessProbe:
failureThreshold: 3
httpGet:
path: /__/ready
port: 8080
scheme: HTTP
initialDelaySeconds: 15
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 10
ports:
- containerPort: 6868
name: proximo
protocol: TCP
- containerPort: 8080
name: proximo-probe
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /__/ready
port: 8080
scheme: HTTP
initialDelaySeconds: 15
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 10
resources:
limits:
memory: 512Mi
---
このマニフェストをもとにデプロイします。
$ kubectl apply -f kubernetes/proximo.yaml
service/proximo created
deployment.apps/proximo created
$ kubectl -n qiita get pods proximo-684b45f898-9vvvr
NAME READY STATUS RESTARTS AGE
proximo-684b45f898-9vvvr 1/1 Running 0 36s
Proximo経由のメッセージ受信
ユースケースとしては別ネームスペースからの使用を想定していますが、今回は前回実装したアプリ内での受信をProximoに変更したいと思います。
どのようにProximoにアクセスするかですが、ここでもsubstrateライブラリが役に立ちます。
substrate/proximoパッケージを使い、initialiseProximoSource()
関数を以下のように定義します。
func initialiseProximoSource(addr, consumerID, topic *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
var proximoOffset proximo.Offset
if *offsetOldest {
proximoOffset = proximo.OffsetOldest
} else {
proximoOffset = proximo.OffsetNewest
}
source, err := proximo.NewAsyncMessageSource(proximo.AsyncMessageSourceConfig{
ConsumerGroup: *consumerID,
Topic: *topic,
Broker: *addr,
Offset: proximoOffset,
Insecure: true,
})
if err != nil {
return nil, err
}
return substrate.NewSynchronousMessageSource(source), nil
}
呼び出し側は以下のように変更します。
proximoAddr := app.String(cli.StringOpt{
Desc: "proximo endpoint",
Name: "proximo-addr",
EnvVar: "PROXIMO_ADDR",
Value: "proximo:6868",
})
proximoOffsetOldest := app.Bool(cli.BoolOpt{
Name: "proximo-offset-oldest",
Desc: "If set to true, will start consuming from the oldest available messages",
EnvVar: "PROXIMO_OFFSET_OLDEST",
Value: true,
})
...
actionSource, err := initialiseProximoSource(proximoAddr, consumerID, actionTopic, proximoOffsetOldest)
if err != nil {
log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()
以上の部分にはKafkaへの依存は含まれておらず、バックエンドシステムがなんであるか気をせず実装できています。
デプロイしたらデバッグしてみましょう。
gRPCは無事成功、新たなTodoのIDを返しました。
ではDBを覗いてみます。
$ kubectl -n qiita exec -ti cockroachdb-0 -- /cockroach/cockroach sql --url postgresql://root@localhost:26257/qiita_advent_calendar_2019_db?sslmode=disable
...
root@localhost:26257/qiita_advent_calendar_2019_db> select * from todo where id = '71face5d-404e-4cf3-b831-dd33da5147a2';
id | title | description
+--------------------------------------+-----------+-----------------------------------------------+
71face5d-404e-4cf3-b831-dd33da5147a2 | Holidays! | Pack your items and prepare for the holidays!
(1 row)
Time: 1.328365ms
DBへの保存も無事できています:)
というわけで、ここまでマイクロサービスの立ち上げからgRPC、DBと辿りKafkaを使った非同期コミュニーケーションとみてきました。
アドベントカレンダーはまだ残り日数ありますが、一番紹介したかったことはここまでなのでひとまず本編これにて終了としたいと思います。
他にもいくつか触れたいトピックはあるのであとはマイペースに更新していきたいと思います。
ここまで読んでくださりありがとうございました:)
それではみなさまメリークリスマス&よいお年を!