LoginSignup
1
0

More than 5 years have passed since last update.

AVRO Union Record とその周辺

Last updated at Posted at 2019-04-12

kafka に json を流すとき。AVRO json が最大公約数的な立ち位置か。メンバーに構造体を入れ子に保持するパターン、特に Union で型を切り替えられるようにしていると、とってもややこしい。

kafka のトピックは型を安定させたほうがよく、データセットが何であるかを基準にクラス設計するとよい(様々な意見がある)。この結果 Union が最善の選択肢になるケースもある。少し冗長な JSON 表現形式になるが、他のツールからは汎用 JSON として対応できるだろう。

AVRO json serialize

Union で record を扱う場合、key に record 名が含まれる。
https://avro.apache.org/docs/1.8.1/spec.html#json_encoding

{"bulk":null}
{"bulk":{"string":"test"}}
{"bulk":{"name.space.Name":{"field1":"test"}}}

faust.Record

__faust で型情報を埋め込む路線。https://faust.readthedocs.io/en/latest/userguide/models.html

Avro は汎用型との変換になるけれども、faust.Record では UUID, datetime といった、さらに特定の型までの変換ができる。.dump() ですぐに出力を眺められるのがよい。

ksql

double-quote で囲んでエスケープできる。メンバーアクセスは arrow。
https://docs.confluent.io/current/ksql/docs/developer-guide/query-with-structured-data.html

CREATE STREAM T (TYPE VARCHAR,
                DATA STRUCT<
                      timestamp VARCHAR,
                      "field-a" INT,
                      "field-b" VARCHAR,
                      "field-c" INT,
                      "field-d" VARCHAR>)
        WITH (KAFKA_TOPIC='raw-topic',
              VALUE_FORMAT='JSON');

SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' LIMIT 2;

avro-json-serializer

そこそこ動くのだけれど、union + record をシリアライズする方法の制御が難しい。avro-python3 に依存。
https://github.com/linkedin/python-avro-json-serializer

import avro.schema
from avro_json_serializer import *

schema_dict = {
  "fields": [
    {"name": "frec",
      "type": [
        "null",
        "string",
        {"fields": [{"name": "subfint", "type": "int"}],
                   "name": "Rec",
                  "type": "record"},
        {"fields": [{"name": "subfint2", "type": "int"}],
                  "name": "Rec2",
                  "type": "record"}
      ]
    }
  ],
  "name": "all_field",
  "namespace": "com.some.thing",
  "type": "record"
}

avro_schema = avro.schema.SchemaFromJSONData(schema_dict, avro.schema.Names())
serializer = AvroJsonSerializer(avro_schema)

serializer.to_json({
    "ffloat": 1.0,
    "funion_null": None,
    "flong": 1,
    "fdouble": 2.0,
    "fint": 1,
    "fstring": "hi there",
    "frec": {"subfint": 1}
})
# '{"frec":{"com.some.thing.Rec":{"subfint":1}}}'
serializer.to_json({
    "ffloat": 1.0,
    "funion_null": None,
    "flong": 1,
    "fdouble": 2.0,
    "fint": 1,
    "fstring": "hi there",
    "frec": {"subfint2": 2}
})
# '{"frec":{"com.some.thing.Rec2":{"subfint2":2}}}'
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0