kafka に json を流すとき。AVRO json が最大公約数的な立ち位置か。メンバーに構造体を入れ子に保持するパターン、特に Union で型を切り替えられるようにしていると、とってもややこしい。
kafka のトピックは型を安定させたほうがよく、データセットが何であるかを基準にクラス設計するとよい(様々な意見がある)。この結果 Union が最善の選択肢になるケースもある。少し冗長な JSON 表現形式になるが、他のツールからは汎用 JSON として対応できるだろう。
AVRO json serialize
Union で record を扱う場合、key に record 名が含まれる。
https://avro.apache.org/docs/1.8.1/spec.html#json_encoding
{"bulk":null}
{"bulk":{"string":"test"}}
{"bulk":{"name.space.Name":{"field1":"test"}}}
faust.Record
__faust
で型情報を埋め込む路線。https://faust.readthedocs.io/en/latest/userguide/models.html
Avro は汎用型との変換になるけれども、faust.Record
では UUID, datetime といった、さらに特定の型までの変換ができる。.dump()
ですぐに出力を眺められるのがよい。
ksql
double-quote で囲んでエスケープできる。メンバーアクセスは arrow。
https://docs.confluent.io/current/ksql/docs/developer-guide/query-with-structured-data.html
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
timestamp VARCHAR,
"field-a" INT,
"field-b" VARCHAR,
"field-c" INT,
"field-d" VARCHAR>)
WITH (KAFKA_TOPIC='raw-topic',
VALUE_FORMAT='JSON');
SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' LIMIT 2;
avro-json-serializer
そこそこ動くのだけれど、union + record をシリアライズする方法の制御が難しい。avro-python3
に依存。
https://github.com/linkedin/python-avro-json-serializer
import avro.schema
from avro_json_serializer import *
schema_dict = {
"fields": [
{"name": "frec",
"type": [
"null",
"string",
{"fields": [{"name": "subfint", "type": "int"}],
"name": "Rec",
"type": "record"},
{"fields": [{"name": "subfint2", "type": "int"}],
"name": "Rec2",
"type": "record"}
]
}
],
"name": "all_field",
"namespace": "com.some.thing",
"type": "record"
}
avro_schema = avro.schema.SchemaFromJSONData(schema_dict, avro.schema.Names())
serializer = AvroJsonSerializer(avro_schema)
serializer.to_json({
"ffloat": 1.0,
"funion_null": None,
"flong": 1,
"fdouble": 2.0,
"fint": 1,
"fstring": "hi there",
"frec": {"subfint": 1}
})
# '{"frec":{"com.some.thing.Rec":{"subfint":1}}}'
serializer.to_json({
"ffloat": 1.0,
"funion_null": None,
"flong": 1,
"fdouble": 2.0,
"fint": 1,
"fstring": "hi there",
"frec": {"subfint2": 2}
})
# '{"frec":{"com.some.thing.Rec2":{"subfint2":2}}}'