概要
Databricks で Confluent から Protobuf 形式のデータを読み込む際に発生する MALFORMED_PROTOBUF_MESSAGE エラーへの対処方法を共有します。本エラーはデータ型の不一致だけでなく、Confluent が生成するバイナリ形式の仕様が原因となる場合もあります。Confluent が生成するバイナリは、先頭に Magic Byte 、 Schema ID 、および、 message-indexes が格納されているため、手動でバイナリをデシリアライズする際にはそのバイト数を除去する必要があります。 message-indexes は desc ファイルによりバイト数が変わるようです。本記事では、その具体的な対応手順を解説します。
org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'. SQLSTATE: XX000
Avro 形式のデータをデシアライズするサイトの除去するバイト数が異なることに注意してください。
引用元:Databricks にて Confluent から Avro 形式のデータを手動で読み込むときの MALFORMED_AVRO_MESSAGE エラーへの対応方法 #Kafka - Qiita
エラーの再現方法と対応方法
1. 事前準備
Confluent にて Avro 形式の Topic を準備
syntax = "proto3";
package ksql;
message users {
int64 registertime = 1;
string userid = 2;
string regionid = 3;
string gender = 4;
}
Databricks にて Confluent に対する認証情報をセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"
Databricks ノートブックと同階層のディレクトリにuser.proto
ファイルを作成
syntax = "proto3";
package ksql;
message users {
int64 registertime = 1;
string userid = 2;
string regionid = 3;
string gender = 4;
}
Databricks にてuser.desc
ファイルを生成
%sh sudo apt-get update -qq
%sh sudo apt-get install -y protobuf-compiler -qq
%sh protoc --include_imports \
--descriptor_set_out=user.desc \
user.proto
Databricks にて Topic をセット
topic_name = "protobu_user"
Confluent を参照したデータフレームを定義
# Confluetn から読み込み
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", topic_name)
.option("startingOffsets", "earliest")
.load()
)
エラーの再現方法
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, from_json, to_char
from pyspark.sql.protobuf.functions import from_protobuf
df_stripped = df.withColumn(
"value_stripped",
F.expr(f"substring(value, 6, length(value) - 5)"),
)
df_stripped = df_stripped.withColumn(
"decoded_value",
from_protobuf(
col("value_stripped"),
messageName="ksql.users",
descFilePath="./user.desc" # Descriptor File のパス
),
)
df_stripped.select("value", "decoded_value").display()
org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'. SQLSTATE: XX000
エラーの対応方法
デシリアライズする前にMagic Byte 、 Schema ID 、および、 message-indexes 分のバイト (6 バイト)を除去してから実行することを確認できます。
magic_bytes = 1
schema_id_bytes = 4
message_indexes_bytes = 1
offset_bytes = str(magic_bytes + schema_id_bytes + message_indexes_bytes)
start_bytes = str(int(offset_bytes) + 1)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, from_json, to_char
from pyspark.sql.protobuf.functions import from_protobuf
df_stripped = df.withColumn(
"value_stripped",
F.expr(f"substring(value, {start_bytes}, length(value) - {offset_bytes})"),
)
df_stripped = df_stripped.withColumn(
"decoded_value",
from_protobuf(
col("value_stripped"),
messageName="ksql.users",
descFilePath="./user.desc" # Descriptor File のパス
),
)
df_stripped.select("value", "decoded_value").display()