Posted at

Avro,SchemaRegistryことはじめ


はじめに

30才になったのを機にアウトプットを意識していこうと思います。

最初のテーマは地味ですが、ご興味あれば。

現在、自分は、Hadoop,Spark,Kafka,Fluentdなどを導入し、社内のログ基盤を整えようとしています。

Kafkaでシリアライズするには、Avro,SchemaRegistryを利用するのがメジャーであるが、

何も考えずにすぐ動くAvroのソースや一歩踏み込んだ説明がなかなか見つからず、なかなか理解できず苦しんだので、誰かの助けになればと思います。

SchemaRegistryはAvroについて理解できれば難しくないと思うので軽く触れる程度です。

また、コードはGitHubにあげてますので、参考までに。

なお、基本的に既に良質なものがあればリンクだけ張って、説明はそちらに譲るというスタンスでいきます。


Avroとは

1分で読めるので、Avro本家のOverviewを読みましょう。


JSONじゃだめなの?

ConfluentのドキュメントにAvroが解決するJSONのデメリットが書いてます。

JSONは言語に依存しないメリットがある一方、厳格なフォーマットがないことが欠点です。

具体的には、


  • データ作成側が自由にフィールドを追加、削除等の変更ができるので、利用側が解釈できなくなる可能性がある。

  • フィールドや型情報はファイル内では同一であるにもかかわらず、レコードごとに記載するので、冗長である。

Avroでは、スキーマ情報を1つのみファイルの冒頭に格納します。また、SchemaRegistryを使えば、スキーマの一元管理、互換性チェック等を行えます。


スキーマ定義

Avroのドキュメントを読むのが一番かと思いますが、簡単に説明します。

また、いくつかハマりどころ、勘違いしやすいところがあるので、説明します。

Payment2.avscを例に説明します。

{"namespace": "com.example.kafka",

"type": "record",
"name": "Payment2",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "region", "type": ["string", "null"], "default": "NOWHERE"}
]
}


namespace

そのまま、namespaceです。ドット区切りで書きます。

ここで指定したものが、自動生成されたスキーマのクラスのパッケージ名となるようです。

なので、クライアントアプリと異なるパッケージ名となる場合は、明示的にimportしないと参照できません。


type

プリミティブ型(null, boolean, int, long, float, double, bytes, and string)と複雑型(record, enum, array, map, union, and fixed)があります。

["string", "null"]についてですが、これはunion型であり、任意項目のstring型という意味です。このように指定しないと必須項目と判断され、regionを明示的に指定しないと、スキーマと適合していないことになり、エラーになります。


name

自動生成されるスキーマのクラス名となります。


default属性

非常に大事な属性なのですが、非常にわかりづらいように思えます。Avroの仕様ドキュメントをそのまま引用すると、


default: A default value for this field, used when reading instances that lack this field (optional). Permitted values depend on the field's schema type, according to the table below. Default values for union fields correspond to the first schema in the union. ...


文字通りの和訳、「default属性が指定されたフィールドが存在しないレコードを読むときに、デフォルト属性値が利用されます」をちゃんと覚える必要があります。値を指定しないでシリアライズしたときに採用される設定値ではないです。あくまで、デシリアライズするときに利用されるものとなります。フィールドが存在しないときのみに利用されるのであって、nullのときには利用されません

具体的には、GitHubにあげた表にあるように、



  • id,amountのみ指定してシリアライズすると、regionnullとして書き込まれます。


  • {"id": "DEF", "amount": 200.0, "region": null}のレコードをこのスキーマで読んでも、regionはnullのままです。


  • {"id": "DEF", "amount": 200.0}のレコードをこのスキーマで読むと、regionNOWHEREになります。

では、なぜ、3つ目のレコードであるid=GHIのレコードはシリアライズ時に指定してないのに、NOWHEREとなるのでしょうか。

ここで2,3時間悩んだのですが、ふとGettingStartedのページを見ると、以下の記載がありました。


As shown in this example, Avro objects can be created either by invoking a constructor directly or by using a builder. Unlike constructors, builders will automatically set any default values specified in the schema.


わかりましたね!builderパターンでスキーマクラスから生成した時には、指定しないフィールドにはデフォルト値が設定されます。

サンプルのソース箇所としてはこちらで、またavroファイルをみると、実際に「NOWHERE」が書き込まれていることがわかります。


プログラムにて実行

pom.xmlのこちらにてスキーマを指定しています。sourceDirectory*.avsc形式のファイルはデフォルトで読みこんでくれますが、こちらにあるように明示的に指定することも可能です。

コード生成する方法としない方法の2通りあり、生成する方法をSpecific方式、しない方法をGeneric方式とここでは呼びます。


Specific方式

mavenを利用していれば、自動でスキーマのjavaのコードを生成してくれます。pom.xmlに生成先を指定できます。生成後のソースファイルを見たいという方もいると思うので、こちらに配置してますが、おそらく普通はレポジトリ管理しない類のファイルかと思います。ソースを眺めると確かに、GettingStartedにあるように、スキーマのオブジェクトをconstructor方式、set方式、builder方式の3通りで生成できそうだなとわかるかと思います。

この方式では、コード生成が必要なので、スキーマ定義を変更する場合は、コンパイルが必要になります。


Generic方式

こちらのほうがむしろシンプルです。コード生成はせずに、実行時にスキーマ定義を参照します。

なので、動的にスキーマを変更したいときにはこちらを採用するのかと思います。

また、デシリアライズの際に、シリアライズした際と同じスキーマを指定する場合は、スキーマを指定する必要がないです。avroファイル自体にスキーマ定義が存在するためです。スキーマ指定は必須と勘違いしている人が多いようですが、おそらくこれはGettingStartedのサンプルコードが指定してあるためかと思います(自分も勘違いしてましたが、実際に動作確認済みです)。


Avroファイル

こちらにファイルの中身を見ることができます。なんとなくどのようにシリアライズされているかわかります。

詳しくは、このあたりに記載があります。

たしかにObjの3文字から始まっていることやavro.schemaにスキーマ定義が一つだけ存在し、データがその後に続いている様子がわかります。


Schema Resolution

書き込みと読み込みのスキーマが異なる場合にどのように解決されるかです。

「常に同じスキーマにすればいいじゃん」と思う方がいるかもしれませんが、

Kafkaなどのストリーム処理でProducerとConsumerアプリを同時に更新してもおそらく古いスキ―マ定義のトピックデータを読むことがありますし、Producer、Consumerが別サーバ、別チーム、別会社管理などになるとより完全に同時に変更するのは困難になります。

具体的にどのように解決されるかは詳しくはこちらを参照ですが、大事なのは以下の2点と理解しました。



  • if the reader's record schema has a field that contains a default value, and writer's schema does not have a field with the same name, then the reader should use the default value from its field.

  • if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled.


また、サンプルコードのSchema Resolutionの結果をGitHubに張ってますので、参考までに。


SchemaRegistry

SchemaRegistryについは、Avroをちゃんと理解できれば、難しくないです。

また、Confluentのドキュメントは非常によくできていて、GitHubにサンプルコードがあり、ドキュメントとリンクしているので、あえて自分が書くことも少ないのですが、少しだけまとめます。

ちなみに自分は以下の二つを呼んで、なんとなく理解しました。


機能

Kafka上で、シリアライズにAvroを利用する上で、以下の2点の機能を提供します。


  • スキーマを一元管理できます。RESTAPI経由で、Kafkaのproducer,consumerを介さずに、スキーマ参照、作成等できます。

  • スキーマの変更において、互換性チェックができます。


スキーマ管理

スキーマ情報はSchemaRegistry上で管理され、schema idと言われるidを付与し、Kafkaのメッセージとしては、スキーマを全て持たずに、schema idのみを保持します。動作としては、以下となります。


  1. producerは新しいスキーマがあると、SchemaRegistryにそれを送り、返却されたschema idを得ます

  2. producerはスキーマとschema idのマッピングをキャッシュに保持します

  3. consumerは新しいschema idがあると、SchemaRegistryにそれを送り、返却されたスキーマを得ます

  4. consumerはschema idとスキーマのマッピングをキャッシュに保持します

producer,consumer共に、キャッシュに存在する場合は、SchemaRegistryとの通信は発生しません。

デフォルトだと、producerは新しいスキーマがあると自動登録しますが、ドキュメントでは、本番環境では、自動登録させずに、RESTAPI経由で行うようにと記載があります。

また、自分が勘違いしていた点としては、consumer側でSpecific形式でメッセージをconsumeする際、新しいスキーマを指定しても正常にでデシリアライズしてくれたので、SchemaRegistry上に登録してくれていると思いきや、されていませんでした。SchemaRegistryに登録できるのはproducerとRESTAPIのみのようです。


互換性チェック

詳しくはドキュメントに譲りますが、デフォルトの設定は、


BACKWARD: (default) consumers using the new schema can read data written by producers using the latest registered schema


とあるように、最新のバージョンに対する後方互換のみを保証しますので、注意してください。

例えば、スキーマ1 -> スキーマ2 -> スキーマ3 とバージョンが進化した際、スキーマ1とスキーマ2、スキーマ2とスキーマ3の間に後方互換性があり、スキーマ1とスキーマ3の間に後方互換性がないときに以下があり得ます。(動作確認済み)


  • スキーマ3でcosumerがconsumeするときに、kafkaトピック上にスキーマ1が残っており、エラーになる

  • スキーマ2の後にスキーマ1に戻し、その後にスキーマ3にエラーなく、あげることが可能(最新のバージョンはあくまでスキーマ2なので)。この場合、consumerはスキーマ3にあげると、スキーマ1をconsumeすることになり、エラーになります。


サンプルコード

Specific方式はこちらで動かしてみるとよいでしょう。Tutorialで利用されているサンプルコードです。

Generic方式はこちらで動かしてみるとよいでしょう。ConfluentのKafkaStreamsのサンプルコードの一部ですが、KafkaStreamsの知識は不要なのでご安心ください。


おまけ

既存のAvroファイルにデータを追加することが可能です。

具体的には、dataFileWriter.create(p1.getSchema(), file)でファイル新規作成している箇所をdataFileWriter.appendTo(file)に変更すればよいです。

既存のAvroファイルにスキーマの異なるレコードを追加することができるのか気になったのでやってみました。

当然ですが、普通はやりません!


型も含めてスキーマが異なるオブジェクトを追記した場合

org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException: Not in union ["int","null"]: 900.0

コンパイルは通りますが、書き込み時に「型が違うよ!」とエラーになります。


型などを含めたフォーマットは全て同じで、プロパティ名だけ異なるオブジェクトを追記した場合

普通にとおります。

そして、レコードをスキーマ無しで読んでみるとプロパティ名が追記元のフィールド名で読み込まれました。


最後に

簡単そうですが、意外と難しいというのが率直な感想です。

導入するかは迷いどころですね。。

Avro, Schema Registryともに勉強してまだ2週間ほどの新参者ですので、誤りがあれば優しくご指摘ください。