概要
Databricks 上で Confluent(Kafka)からデータを取得しようとすると、まれに以下のようなエラーが発生することがあります。本記事では、よくあるエラーパターンや、その再現方法・対応策を紹介します。
kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
エラーパターンと対応方法
1. 認証情報が適切でない場合
エラーの再現方法
以下のように、誤った Confluent の認証情報を設定して Spark のストリーミング読み込みを行うと、KafkaAdminClient の作成に失敗し、上記エラーが発生するケースがあります。
# Confluent に対する認証情報をセット(誤った例)
bootstrap_servers = "pkc-56d.eastus.azure.confluent.cloud:9092"
sasl_username = "AAAA"
sasl_password = "AAAAA"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": topic_name,
"startingOffsets": "earliest",
}
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
)
df.display()
対応方法
Confluent から正しい認証情報(bootstrap_servers
や username
, password
など)を取得して、改めて以下のように設定し直します。正しい認証情報を設定することで、エラーが解消され、Kafka からメッセージを正常に取得できます。
# Confluent に対する認証情報をセット(正しい例)
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "322PJOFHAYLZYGMQ"
sasl_password = "8kJNn1c4yuRu41a8/gdcuHGcYEFBkQTOlgTk1luCbTJUBBV5XGNwuUmDou/KE3jM"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": topic_name,
"startingOffsets": "earliest",
}
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
)
df.display()
2. kafka.sasl.jaas.config
の末尾が ;
で終わっていない場合
エラーの再現方法
sasl.jaas.config
が正しく設定されていても、以下のように末尾にセミコロン ;
を付け忘れると、KafkaAdminClient の作成に失敗してしまいます。
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "322PJOFHAYLZYGMQ"
sasl_password = "8kJNn1c4yuRu41a8/gdcuHGcYEFBkQTOlgTk1luCbTJUBBV5XGNwuUmDou/KE3jM"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}"',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": topic_name,
"startingOffsets": "earliest",
}
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
)
df.display()
対応方法
セミコロン ;
が抜けていることが原因です。次のように修正しましょう。
- "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}"',
+ "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
修正後のサンプルコードは以下のとおりです。
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": topic_name,
"startingOffsets": "earliest",
}
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
)
df.display()
3. kafka.sasl.jaas.config
のオプションにて kafkashaded.
が抜けている場合
エラーの再現方法
次のように、Kafka のライブラリによる影響を受けている Databricks 上では、クラス名に kafkashaded.
が必要です。org.apache.kafka.common.security.plain.PlainLoginModule
のままで設定すると、同様にエラーが発生します。
# Confluent に対する認証情報をセット(kafkashaded. が抜けた例)
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "322PJOFHAYLZYGMQ"
sasl_password = "8kJNn1c4yuRu41a8/gdcuHGcYEFBkQTOlgTk1luCbTJUBBV5XGNwuUmDou/KE3jM"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": topic_name,
"startingOffsets": "earliest",
}
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
)
df.display()
対応方法
次のように kafkashaded.
付きのクラス名に修正すると、エラーを解消できます。
- "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
+ "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
修正後のサンプルコードは以下のとおりです。
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": topic_name,
"startingOffsets": "earliest",
}
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
)
df.display()