以下は、Kafka ScalerとScaledJobのテスト用サンプルを作成するためのブログの日本語訳です。シンプルなGoクライアントが必要で、メッセージを消費したら終了するものです。Goを使用する理由は、KEDAのKafka ScalerがKafka Scalerの実装にsaramを使用しているからです。そのため、Kafka Scalerの実装を理解するのに役立つと思いました。
Kafka Broker
Kafka Brokerは、ブローカーを簡単に作成するためにEventHubsとして使用します。EventHubsはKafka APIで使用することができます。
設定
Sarama
EventHubsでのSaramaの設定は以下のようになります。config.Net.SASL.Password
としてEventHubs接続文字列
を渡す必要があります。もう一つの考慮点は、config.Consumer.Group.Session.Timeout
です。この値を設定しないと、クライアントはpanic: Error from consumer: read tcp 172.26.24.68:34288->40.78.250.71:9093: i/o timeout
というエラーを簡単に返します。この値を設定することで、クライアントは正常に動作します。
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = os.Getenv(saslPassword)
config.Consumer.Group.Session.Timeout = 60 * time.Second
config.Net.TLS.Enable = true
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
Scaled Jobについては、アプリがメッセージを受信したらポッドを終了させたいと思います。メッセージ処理のためのコールバックStructを実装する必要があります。これはサンプルのための大まかな実装ですが、チャネルによってメッセージの消費を通知します。
// Consumer struct
type Consumer struct {
ready chan bool
}
// Setup method
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}
// Cleanup method
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim method
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
consumed <- true
break
}
return nil
}
Scaled Job
Secret
Kafka Scaled Jobのために、2つのシークレットを作成します。一つはKafka Scalerの認証用、もう一つはポッドの環境変数用です。YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64
の値をあなたのEventHubs接続文字列に置き換えてください。base64に変換することを忘れないでください。
secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-secrets
namespace: default
data:
sasl: "plaintext as base64"
username: "$ConnectionString as base64"
password: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
tls: "enable as base64"
pod-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-pod-secrets
namespace: default
data:
SASL_PASSWORD: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
BROKER_LIST: "BROKER_LIST_BASE64"
TOPICS: "TOPIC_BASE64"
ScaledJob
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: kafka-consumer
namespace: default
spec:
jobTargetRef:
template:
spec:
containers:
- name: kafka-consumer
image: tsuyoshiushio/kafka-consumer:0.2
imagePullPolicy: Always
envFrom:
- secretRef:
name: keda-kafka-pod-secrets
restartPolicy: Never
pollingInterval: 5
maxReplicaCount: 10
successfulJobsHistoryLimit: 10
failedJobsHistoryLimit: 10
scalingStrategy:
strategy: accurate
triggers:
- type: kafka
metadata:
bootstrapServers: MY_EVENT_HUB_NAMESPACE.servicebus.windows.net:9093
consumerGroup: $Default
topic: multi-topic
lagThreshold: '5'
authenticationRef:
name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: kafka-trigger-auth
namespace: default
spec:
secretTargetRef:
- parameter: sasl
name: keda-kafka-secrets
key: sasl
- parameter: username
name: keda-kafka-secrets
key: username
- parameter: password
name: keda-kafka-secrets
key: password
- parameter: tls
name: keda-kafka-secrets
key: tls
Kubernetes上で実行する
KEDAをインストールして、2つのkedaポッドが動作していることを確認します。この例では、kubectl logs -f keda-operator-6b546dc696-gzc72
でオペレーターのログを見ることができます。これは、kedaがジョブをどのようにスケールするかを理解するのに役立ちます。
kubectl get pods -n keda
NAME READY STATUS RESTARTS AGE
keda-metrics-apiserver-6dff8c4f7f-z9szt 1/1 Running 0 5d1h
keda-operator-6b546dc696-gzc72 1/1 Running 0 51m
サンプルのイメージをクローンしてビルドし、公開します。KafkaScaledJobWithGo
$ docker build -f tsuyoshiushio/kafka-consumer:0.2
$ docker push tsuyoshiushio/kafka-consumer:0.2
バージョンや名前を更新する場合は、scaledjob.yaml
のimage
セクションを変更してください。
SecretとScaledJobを適用します。
$ kubectl apply -f secret.yaml
$ kubectl apply -f pod-secret.yaml
$ kubectl apply -f scaledjob.yaml
その後、対象のEventHubにメッセージを送信すると、メッセージの数に応じてジョブが作成されるのを見ることができます。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
func-deployment-http-7dffc56bc-6494h 1/1 Running 0 4d8h
kafka-consumer-5bmdd-dwb44 0/1 Completed 0 73s
kafka-consumer-n4vvx-gwrhf 0/1 Completed 0 82s
kafka-consumer-t2hth-cthlh 1/1 Running 0 64s
kafka-consumer-vzh4d-rjjt9 0/1 Completed 0 91s
リソース
Translated this blog post powered by BlogBabel.