こんにちは。
前回Confluent Platformの全体概要を見たので、次は各コンポーネントの特徴を見ていきます。
ただ、既に概要の時点でどんなものかは書かれているため、各トピックや各コンポーネントごとに特徴的な内容をまとめるにとどめます。
また、ある意味わかりやすいCamusやProactive Supportは省略します。
データのシリアライズ革命
- JSONフォーマットはよくつかわれているが、下記のように2つ欠点がある。
-
- JSON自体にフォーマットを仕込むと各フィールドに形式を付記する必要があるため冗長
-
- 構造定義なしでデータを投入するとデータが欠けたり変なのが加わったりするので危険
-
- データスキーマの問題を解消するため、最近はThrift、Protocol Buffer、Avroといったスキーマ定義言語がある
- その中では、スキーマが今後変わることを考慮した機構を有するAvroを我々は勧める。
Avro
- Avroのスキーマ定義はJSONで行われ、サンプルは下記。
{"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"}
]
}
- 面白い点としては、Avroはデータをシリアライズするときだけでなく、デシリアライズ時もこのスキーマを使用する。
- 結果シリアライズ化したデータからはフィールド名が除去され、データサイズ小さい
- 重要ポイントとして挙げたスキーマの進化として、データのスキーマが変わった場合、下流でシームレスに扱えるかが重要
- その際に見落としが発生しがちであるし、確認に時間もかかる。
- Avroでは以下の3パターンの互換性確保のパターンがある。
-
- 後方互換(Backward Compatibility)
-
- 前方互換(Forward Compatibility)
-
- 完全互換(Full Compatibility)
-
後方互換
- HDFSに投入されたデータをHive等のクエリで横断的に検索するケースを考える。
- その場合、古いスキーマでエンコードされたデータを新規スキーマでも読める必要がある。
- このようなケースに対応するためにAvroは「後方互換」のルールを持つ。
- 全データが「後方互換」のルールを守られれば、全データに対して統一的にクエリをかけることが可能となる。
- 例えば、先ほどのuserスキーマに
favorite_color
というフィールドを下記のように追加することを考える。
{"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string", "default": "green"}
]
}
-
favorite_color
がデフォルト値green
を持つことに注目- デフォルト値が古いスキーマでシリアライズされたデータを新しいスキーマで読み込む際に使用される。
- デフォルト値定義がない場合、新しいスキーマで古いデータを読み込む際に新規フィールドの値が不定となり、扱えなくなる。
前方互換
- アプリケーションロジックが特定スキーマバージョンに紐づけられているケースを考える。
- スキーマ更新時、直ちにアプリケーションを更新できない場合は多々ある。
- そのため、新しいスキーマのデータを過去のアプリケーションが理解できる形に落とし込む必要がある。
- そのため、古いスキーマで新しいスキーマのデータを読む手段を提供する。
- 例えば、上記の
favorite_color
が追加されたスキーマも前方互換性がある。 - 古いスキーマに新しいスキーマのデータを投影すると追加フィールドは単純に捨てられる。
- 上記の
favorite_color
を追加したスキーマからfavorite_color
を削除すると、そのスキーマは前方互換ではなくなる。
- 例えば、上記の
完全互換
-
前述の後方互換、前方互換を1スキーマが両方満たす場合、そのスキーマは完全互換といえる。
- 古いデータを新しいスキーマで読め、新しいデータを古いスキーマで読める。
-
ここまででわかるように、Avroのスキーマ管理においては下記が重要
- スキーマの管理
- スキーマをバージョンアップ時の互換性の確保
-
Confluent Platformではスキーマの管理手段と、スキーマバージョン間の整合性チェックの機能を提供する。
アプリケーション開発
- Confluent Platformでは管理しているスキーマを用いてKafkaにデータを送受信する手段を下記の通り用意
- JVMアプリケーション用:シリアライズ
- その他アプリケーション用:Rest Proxy
JVMアプリケーション:シリアライズ
- JVMアプリケーションにおいては依存性の追加とシリアライザ設定によって使用することが可能。
- 追加する依存性は下記の通り。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0-cp1</version>
<scope>provided</scope>
</dependency>
- シリアライザ設定は以下2項目を参照。
-
- Key-Valueの方をGenericsでObject型にする。Primitive型、Map、Recordが投入可能。
-
- 下記コードのようにシリアライザとSchema RegistryのURLを追加する。
-
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);
- ただ、これらの設定はPropertyファイルに書いて読み込ませるのをお勧めする。
- 上記の設定により、Kafkaに対してシリアライズしたデータが送信されるようになると同時にバリデーションも実行される。
- それはデシリアライザも同じく
- この状態でデータを投入すると、下記のコードとやっていることは同じとなる。
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);
その他アプリケーション:Rest Proxy
- Rest ProxyによってKafkaのAPIやAvroサポートがない言語からもConfluent Platformの機能を使用することが可能
- 下記のAPIが存在
- Topic一覧取得
- Topicメタデータ取得
- TopicのPartition一覧&メタデータ取得
- Topicへのメッセージ投入
- Topicからのメッセージ取得
- Consumerグループの作成
- ConsumerグループのOffsetの更新
- Consumerグループを介したメッセージの取得
- Broker一覧取得
- 例としては下記のようにリクエストを送ってメッセージを投入し、投入結果を取得可能。
Request.json
POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
{
"value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
"records": [
{
"value": 12
},
{
"value": 24,
"partition": 1
}
]
}
Response.json
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json
{
"key_schema_id": null,
"value_schema_id": 32,
"offsets": [
{
"partition": 2,
"offset": 103
},
{
"partition": 1,
"offset": 104
}
]
}
Schema Registry
概要設計
- Schema RegistryはAvroSchema用分散ストレージ層でKafkaをバックエンドとしている。
- 登録されたスキーマにはユニークなIDが割り振られる。単調増加するが1ずつ増えるわけではない。(採番にはZookeeperを使用)
- Kafkaは耐障害性のあるバックエンドと変化を記録するWALを保持
- Schema Registryは分散、Singleマスタ構成。(選出にはZookeeperを使用)
- Slave側はアクセスされた際にMasterにリクエストを転送する方式
API一覧
- 下記のようなAPIを保持
- Schema Subject一覧取得(-key、-value等)
- Schema Subjectのバージョン一覧取得
- Subject、バージョン指定してSchemaを取得
- Subject指定してのSchema取得
- Subjectを指定してのSchema更新
- Schemaの互換性確認
- 互換性レベル設定(デフォルト、Subject指定)
- 互換性レベル取得(デフォルト、Subject指定)
Rest Proxy
- Rest Proxyは元々のJava Consumer/Producer/CLIのうち下記の機能を有する。
- 実行可能なAPI一覧は前述。
- メタデータ
- クラスタからbroker、topic、partition、config情報が取得可能
- Producer
- Producerを公開するのではなく、Producerを介してKafkaにメッセージを投入するAPIを公開
- データの投入はRest Proxy内でプーリングされたProducerを使用
- 圧縮方式などの一部の設定はConfig API側で設定可能
- Producerを公開するのではなく、Producerを介してKafkaにメッセージを投入するAPIを公開
- Consumer
- Rest Proxy内ではConsumer Groupを実現するためにHigh Level Consumerを用いている。
- ConsumerはOffsetがあるためステートフル。
- OffsetのCommitは自動か手動か選択可能
- Data Formats
- Rest Proxy自体はJSONでデータをやり取りするが、実際にKafkaに保持されるのはBase64エンコードか、Avro.
- Avroを使用する場合、Schema Registryに登録されたスキーマによってバリデーション
- Rest Proxy自体はJSONでデータをやり取りするが、実際にKafkaに保持されるのはBase64エンコードか、Avro.
- Rest Proxyのクラスタ化とロードバランシング
- Rest Proxy自体は認識のためのidを別定義するのみで、各インスタンス間の連携は無い。
- そのため、クラスタ化は不要、ロードバランシングは別機構で実施する必要がある。
- Rest Proxy自体は認識のためのidを別定義するのみで、各インスタンス間の連携は無い。
- Simple Consumer
- 基本的にはHigh LevelのConsumerを使うことが望ましいが、Low LevelのConsumerも使用可能
- Offset指定でメッセージの取得を行う際などに用いられる。
- 基本的にはHigh LevelのConsumerを使うことが望ましいが、Low LevelのConsumerも使用可能
- 下記の機能は現状まだ提供されていない。
- 管理者用オペレーション
- 複数のTopic、Partitionに対する同時投入
- 複数スレッドのConsumer(現状シングルスレッドのため、スループットは出ない)
- マルチテナントにおけるユーザ間のバランシングなど特殊な機能
まとめ
Confulent Platformの重要機能であるスキーマ管理、Rest APIによるProxy、Avroスキーマによる互換性管理やシリアライズについての概要を確認しました。
とはいえ、現状はRest ProxyはProducer側しか性能は出ないなど、全てをRest APIで置き換えるのは遠いようです。
やはり、取得する側はJava / C++のConsumerを使わざるを得ないというのが現状のようですね。