0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksAdvent Calendar 2024

Day 22

Databricks にて Confluent からデータを取得する際の Failed to create new KafkaAdminClient エラーへの対応方法

Posted at

概要

Databricks 上で Confluent(Kafka)からデータを取得しようとすると、まれに以下のようなエラーが発生することがあります。本記事では、よくあるエラーパターンや、その再現方法・対応策を紹介します。

kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient

エラーパターンと対応方法

1. 認証情報が適切でない場合

エラーの再現方法

以下のように、誤った Confluent の認証情報を設定して Spark のストリーミング読み込みを行うと、KafkaAdminClient の作成に失敗し、上記エラーが発生するケースがあります。

# Confluent に対する認証情報をセット(誤った例)
bootstrap_servers = "pkc-56d.eastus.azure.confluent.cloud:9092"
sasl_username = "AAAA"
sasl_password = "AAAAA"

kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    "kafka.sasl.mechanism": "PLAIN",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
}

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
)
df.display()

image.png

対応方法

Confluent から正しい認証情報(bootstrap_serversusername, password など)を取得して、改めて以下のように設定し直します。正しい認証情報を設定することで、エラーが解消され、Kafka からメッセージを正常に取得できます。

# Confluent に対する認証情報をセット(正しい例)
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "322PJOFHAYLZYGMQ"
sasl_password = "8kJNn1c4yuRu41a8/gdcuHGcYEFBkQTOlgTk1luCbTJUBBV5XGNwuUmDou/KE3jM"

kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    "kafka.sasl.mechanism": "PLAIN",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
}

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
)
df.display()

image.png

2. kafka.sasl.jaas.config の末尾が ; で終わっていない場合

エラーの再現方法

sasl.jaas.config が正しく設定されていても、以下のように末尾にセミコロン ; を付け忘れると、KafkaAdminClient の作成に失敗してしまいます。

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "322PJOFHAYLZYGMQ"
sasl_password = "8kJNn1c4yuRu41a8/gdcuHGcYEFBkQTOlgTk1luCbTJUBBV5XGNwuUmDou/KE3jM"

kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}"',
    "kafka.sasl.mechanism": "PLAIN",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
}

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
)
df.display()

image.png

対応方法

セミコロン ; が抜けていることが原因です。次のように修正しましょう。

- "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}"',
+ "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',

修正後のサンプルコードは以下のとおりです。

kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    "kafka.sasl.mechanism": "PLAIN",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
}

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
)
df.display()

image.png

3. kafka.sasl.jaas.config のオプションにて kafkashaded. が抜けている場合

エラーの再現方法

次のように、Kafka のライブラリによる影響を受けている Databricks 上では、クラス名に kafkashaded. が必要です。org.apache.kafka.common.security.plain.PlainLoginModule のままで設定すると、同様にエラーが発生します。

# Confluent に対する認証情報をセット(kafkashaded. が抜けた例)
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "322PJOFHAYLZYGMQ"
sasl_password = "8kJNn1c4yuRu41a8/gdcuHGcYEFBkQTOlgTk1luCbTJUBBV5XGNwuUmDou/KE3jM"

kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    "kafka.sasl.mechanism": "PLAIN",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
}

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
)
df.display()

image.png

対応方法

次のように kafkashaded. 付きのクラス名に修正すると、エラーを解消できます。

- "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
+ "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',

修正後のサンプルコードは以下のとおりです。

kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    "kafka.sasl.mechanism": "PLAIN",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
}

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
)
df.display()

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?