0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 25

Databricks にて Confluent から Protobuf 形式のデータを手動で読み込むときの MALFORMED_PROTOBUF_MESSAGE エラーへの対応方法

Last updated at Posted at 2024-12-25

概要

Databricks で Confluent から Protobuf 形式のデータを読み込む際に発生する MALFORMED_PROTOBUF_MESSAGE エラーへの対処方法を共有します。本エラーはデータ型の不一致だけでなく、Confluent が生成するバイナリ形式の仕様が原因となる場合もあります。Confluent が生成するバイナリは、先頭に Magic Byte 、 Schema ID 、および、 message-indexes が格納されているため、手動でバイナリをデシリアライズする際にはそのバイト数を除去する必要があります。 message-indexes は desc ファイルによりバイト数が変わるようです。本記事では、その具体的な対応手順を解説します。

org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'. SQLSTATE: XX000

image.png

image.png

引用元:Formats, Serializers, and Deserializers for Schema Registry on Confluent Cloud | Confluent Documentation

Avro 形式のデータをデシアライズするサイトの除去するバイト数が異なることに注意してください。

image.png

引用元:Databricks にて Confluent から Avro 形式のデータを手動で読み込むときの MALFORMED_AVRO_MESSAGE エラーへの対応方法 #Kafka - Qiita

エラーの再現方法と対応方法

1. 事前準備

Confluent にて Avro 形式の Topic を準備

syntax = "proto3";
package ksql;

message users {
  int64 registertime = 1;
  string userid = 2;
  string regionid = 3;
  string gender = 4;
}

image.png

Databricks にて Confluent に対する認証情報をセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"

image.png

Databricks ノートブックと同階層のディレクトリにuser.protoファイルを作成

syntax = "proto3";
package ksql;

message users {
  int64 registertime = 1;
  string userid = 2;
  string regionid = 3;
  string gender = 4;
}

image.png

Databricks にてuser.descファイルを生成

%sh sudo apt-get update -qq
%sh sudo apt-get install -y protobuf-compiler -qq
%sh protoc --include_imports \
--descriptor_set_out=user.desc \
user.proto

image.png

Databricks にて Topic をセット

topic_name = "protobu_user"

image.png

Confluent を参照したデータフレームを定義

# Confluetn から読み込み
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")
    .load()
)

image.png

エラーの再現方法

from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, from_json, to_char
from pyspark.sql.protobuf.functions import from_protobuf

df_stripped = df.withColumn(
    "value_stripped",
    F.expr(f"substring(value, 6, length(value) - 5)"),
)

df_stripped = df_stripped.withColumn(
    "decoded_value",
    from_protobuf(
        col("value_stripped"),
        messageName="ksql.users", 
        descFilePath="./user.desc"  # Descriptor File のパス
    ),
)

df_stripped.select("value", "decoded_value").display()
org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'. SQLSTATE: XX000

image.png

エラーの対応方法

デシリアライズする前にMagic Byte 、 Schema ID 、および、 message-indexes 分のバイト (6 バイト)を除去してから実行することを確認できます。

magic_bytes = 1
schema_id_bytes = 4
message_indexes_bytes = 1
offset_bytes = str(magic_bytes + schema_id_bytes + message_indexes_bytes)
start_bytes = str(int(offset_bytes) + 1)

from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, from_json, to_char
from pyspark.sql.protobuf.functions import from_protobuf

df_stripped = df.withColumn(
    "value_stripped",
    F.expr(f"substring(value, {start_bytes}, length(value) - {offset_bytes})"),
)

df_stripped = df_stripped.withColumn(
    "decoded_value",
    from_protobuf(
        col("value_stripped"),
        messageName="ksql.users", 
        descFilePath="./user.desc"  # Descriptor File のパス
    ),
)

df_stripped.select("value", "decoded_value").display()

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?