先日Protocol Buffersを触る機会があったので、これをKafkaアプリケーションに使ってみる。
Protocol Buffersとは
Protocol Buffersは、オブジェクトをシリアライズするためのライブラリである。シリアライズに使えるデータ形式は世の中に数多くあり、JSONやXMLもその一つだが、これらはテキストデータであるため容量が大きくなりやすい。Protocol Buffersを使うと、シリアライズによって生成されたバイト列の容量が軽く、ネットワークを介して通信する時のパフォーマンスの改善につながりうる。また、スキーマはプログラミング言語から独立しているので、異なる技術スタックを使ったマイクロサービス間でも通信ができる。
アプリケーションの全体像
Protocol Buffersを使うには、まずprotoファイル内でデータのスキーマを定義し、それをデータの送受信側それぞれで共有する必要がある。
protoファイルがあれば、それをもとにデータの生成やパースを行うソースコードを各言語向けに自動生成することができるので、スキーマが変更されても開発者はパーサーの保守をする必要がなく、ビジネスロジックに集中することができる。
Protocol BuffersをKafkaによるメッセージ送受信に用いる場合、アプリケーションは以下のようなフローになる。
アプリケーションのレポジトリは以下。
protoファイルを作成する
まずはprofoファイルを作成する。今回は movie_name
, location
, box_office
の3つのフィールドからなるデータをKafkaで送受信する。
syntax = "proto3";
package proto;
option java_multiple_files = true;
option java_package = "com.udomomo.springbootkafkapractice.proto";
message MyTopicEntry {
string movie_name = 1;
enum LOCATION {
UNKNOWN = 0;
THEATER = 1;
STREAMING = 2;
}
LOCATION location = 2;
int32 box_office = 3;
}
java_multiple_files
は、protoファイルから生成されるJavaソースコードを複数ファイルにするかどうかを設定できる。 java_package
は生成されるソースコードのパッケージ名である。
protoファイルの文法は以下を参照。
protoファイルはどこに配置すべきか?
protoファイルは複数のアプリケーションが共有するものなので、特定のアプリケーションの中に配置するべきではない。よくある方法として、以下のようなものがある。
- 複数アプリケーションから参照されるcommonパッケージの中にprotoファイルと生成コードを入れ、各アプリケーションの依存関係として追加する
- Schema Registryを利用する(例: Confluent Schema Registry)
今回はローカルで行う簡易プロジェクトなので、proto/
ディレクトリを作成してそこに配置しておくのみとする。
protoファイルからコードを生成する
アプリケーションのビルド時に、最新のprotoファイルからコードが生成されるようにする。今回はGradleを使う。
必要な依存関係は以下の3つ。
-
com.google.protobuf
: Gradle用のプラグイン。protoファイルからコードを生成するためのタスクを提供する。 -
com.google.protobuf:protobuf-java
: Java用のProtocol Buffersコアライブラリ。生成されたコードにアプリケーションからアクセスするためのメソッド等を提供する。Gradleではdependency
に追加すれば良い。 -
protoc
: protoファイルをコンパイルするために必要なC言語パッケージ。CLIも提供されているが、ベストプラクティスはプリコンパイルされたcom.google.protobuf:protoc
パッケージをタスク内で使うこと。(下記参照)
なお、 protobuf-java
と protoc
のバージョンは揃えておく必要がある。
以下のようにGradleタスクを定義する。
sourceSets {
main {
proto {
srcDir "$rootDir/proto"
}
java {
srcDirs "$buildDir/generated/source/proto/main/java"
}
}
}
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.17.3'
}
}
protobuf
タスク内で、protoc
を使ってprotoファイルがコンパイルされる。
また、sourceSets
タスクは、生成されたコードにアプリケーションからアクセスできるようにするために用意している。プロパティは以下の通り。
-
proto.srcDir
: protoファイルの場所を指定している。 -
java.srcDirs
: 生成されたコードが格納される場所を指定している。
これらを main
以下で定義することで、生成されたコードがアプリケーションの main
sourceSetの一部となり、アクセス可能になる。
タスクを定義したら、ビルド時にprotoファイルが本当にコンパイルされるかどうかをgradlew
の --dry-run
オプションで確認できる。以下は producer
アプリケーションのビルドで確認した例。
$ ./gradlew :producer:build --dry-run
Starting a Gradle Daemon, 1 incompatible Daemon could not be reused, use --status for details
:producer:extractIncludeProto SKIPPED
:producer:extractProto SKIPPED
:producer:generateProto SKIPPED
:producer:compileJava SKIPPED
:producer:processResources SKIPPED
:producer:classes SKIPPED
:producer:bootJar SKIPPED
:producer:jar SKIPPED
:producer:assemble SKIPPED
:producer:extractIncludeTestProto SKIPPED
:producer:extractTestProto SKIPPED
:producer:generateTestProto SKIPPED
:producer:compileTestJava SKIPPED
:producer:processTestResources SKIPPED
:producer:testClasses SKIPPED
:producer:test SKIPPED
:producer:check SKIPPED
:producer:build SKIPPED
BUILD SUCCESSFUL in 16s
compileJava
タスクの前に、Proto関連のタスクが実行されることがわかる。
Kafkaでメッセージを送受信する
Producer
protoファイルで定義したデータオブジェクトは、builderメソッドを使って作成することができる。以下の setMovieName()
などのメソッドは、protoファイルをもとに自動で生成されたものである。
MyTopicEntry myTopicEntry1 = MyTopicEntry.newBuilder()
.setMovieName("Titanic")
.setLocation(MyTopicEntry.LOCATION.STREAMING)
.setBoxOffice(340000)
.build();
このデータオブジェクトをproducerで送信するためのSerializerを作っておく。中身は単にバイト列に変換しているだけだが、データの型ごとにSerializerを分けておく方が、後で実装を追加するときの影響範囲を限定しやすい。
public class MyTopicEntrySerializer implements Serializer<MyTopicEntry> {
public MyTopicEntrySerializer() {
}
public byte[] serialize(String topic, MyTopicEntry data) {
return data.toByteArray();
}
}
作成したSerializerを使って、メッセージを送信することができる。
kafkaTemplate.send(kafkaSettings.getTopic(), myTopicEntry1);
Consumer
MyTopicEntryDeserializer
を作り、Consumerからメッセージを受け取る。受け取ったメッセージは、parseFrom()
メソッドを使うことで、Protoファイルで定義したデータオブジェクトにデシリアライズできる。
MyTopicEntry entry = MyTopicEntry.parseFrom(record.value());