LoginSignup
1
0

KEDAとEventHubsを使用したKafka ScaledJobsのサンプル

Last updated at Posted at 2024-01-21

以下は、Kafka ScalerとScaledJobのテスト用サンプルを作成するためのブログの日本語訳です。シンプルなGoクライアントが必要で、メッセージを消費したら終了するものです。Goを使用する理由は、KEDAのKafka ScalerがKafka Scalerの実装にsaramを使用しているからです。そのため、Kafka Scalerの実装を理解するのに役立つと思いました。

Kafka Broker

Kafka Brokerは、ブローカーを簡単に作成するためにEventHubsとして使用します。EventHubsはKafka APIで使用することができます。

設定

Sarama

EventHubsでのSaramaの設定は以下のようになります。config.Net.SASL.PasswordとしてEventHubs接続文字列を渡す必要があります。もう一つの考慮点は、config.Consumer.Group.Session.Timeoutです。この値を設定しないと、クライアントはpanic: Error from consumer: read tcp 172.26.24.68:34288->40.78.250.71:9093: i/o timeoutというエラーを簡単に返します。この値を設定することで、クライアントは正常に動作します。

config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = os.Getenv(saslPassword)
config.Consumer.Group.Session.Timeout = 60 * time.Second
config.Net.TLS.Enable = true

consumer := Consumer{
	ready: make(chan bool),
}

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
	log.Panicf("Error creating consumer group client: %v", err)
}

Scaled Jobについては、アプリがメッセージを受信したらポッドを終了させたいと思います。メッセージ処理のためのコールバックStructを実装する必要があります。これはサンプルのための大まかな実装ですが、チャネルによってメッセージの消費を通知します。

// Consumer struct
type Consumer struct {
	ready chan bool
}

// Setup method
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	close(consumer.ready)
	return nil
}

// Cleanup method
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim method
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)
		session.MarkMessage(message, "")
		consumed <- true
		break
	}
	return nil
}

Scaled Job

Secret

Kafka Scaled Jobのために、2つのシークレットを作成します。一つはKafka Scalerの認証用、もう一つはポッドの環境変数用です。YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64の値をあなたのEventHubs接続文字列に置き換えてください。base64に変換することを忘れないでください。

secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-secrets
  namespace: default
data:
  sasl: "plaintext as base64"
  username: "$ConnectionString as base64"
  password: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
  tls: "enable as base64"

pod-secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-pod-secrets
  namespace: default
data:
  SASL_PASSWORD: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
  BROKER_LIST: "BROKER_LIST_BASE64"
  TOPICS: "TOPIC_BASE64"

ScaledJob

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata: 
  name: kafka-consumer
  namespace: default
spec:
  jobTargetRef:
    template:
      spec:
        containers:
        - name: kafka-consumer
          image: tsuyoshiushio/kafka-consumer:0.2
          imagePullPolicy: Always
          envFrom:
          - secretRef:
              name: keda-kafka-pod-secrets
        restartPolicy: Never
  pollingInterval: 5
  maxReplicaCount: 10
  successfulJobsHistoryLimit: 10
  failedJobsHistoryLimit: 10
  scalingStrategy:
    strategy: accurate
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: MY_EVENT_HUB_NAMESPACE.servicebus.windows.net:9093
      consumerGroup: $Default
      topic: multi-topic
      lagThreshold: '5'
    authenticationRef:
      name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: kafka-trigger-auth
  namespace: default
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password    
  - parameter: tls
    name: keda-kafka-secrets
    key: tls

Kubernetes上で実行する

KEDAをインストールして、2つのkedaポッドが動作していることを確認します。この例では、kubectl logs -f keda-operator-6b546dc696-gzc72でオペレーターのログを見ることができます。これは、kedaがジョブをどのようにスケールするかを理解するのに役立ちます。

kubectl get pods -n keda
NAME                                      READY   STATUS    RESTARTS   AGE
keda-metrics-apiserver-6dff8c4f7f-z9szt   1/1     Running   0          5d1h
keda-operator-6b546dc696-gzc72            1/1     Running   0          51m

サンプルのイメージをクローンしてビルドし、公開します。KafkaScaledJobWithGo

$ docker build -f tsuyoshiushio/kafka-consumer:0.2
$ docker push tsuyoshiushio/kafka-consumer:0.2

バージョンや名前を更新する場合は、scaledjob.yamlimageセクションを変更してください。

SecretとScaledJobを適用します。

$ kubectl apply -f secret.yaml
$ kubectl apply -f pod-secret.yaml
$ kubectl apply -f scaledjob.yaml

その後、対象のEventHubにメッセージを送信すると、メッセージの数に応じてジョブが作成されるのを見ることができます。

$ kubectl get pods
NAME                                   READY   STATUS      RESTARTS   AGE
func-deployment-http-7dffc56bc-6494h   1/1     Running     0          4d8h
kafka-consumer-5bmdd-dwb44             0/1     Completed   0          73s
kafka-consumer-n4vvx-gwrhf             0/1     Completed   0          82s
kafka-consumer-t2hth-cthlh             1/1     Running     0          64s
kafka-consumer-vzh4d-rjjt9             0/1     Completed   0          91s

リソース

Translated this blog post powered by BlogBabel.

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0