6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spring Boot + Protocol Buffers + Kafkaでメッセージを送受信する

Last updated at Posted at 2021-09-19

先日Protocol Buffersを触る機会があったので、これをKafkaアプリケーションに使ってみる。

Protocol Buffersとは

Protocol Buffersは、オブジェクトをシリアライズするためのライブラリである。シリアライズに使えるデータ形式は世の中に数多くあり、JSONやXMLもその一つだが、これらはテキストデータであるため容量が大きくなりやすい。Protocol Buffersを使うと、シリアライズによって生成されたバイト列の容量が軽く、ネットワークを介して通信する時のパフォーマンスの改善につながりうる。また、スキーマはプログラミング言語から独立しているので、異なる技術スタックを使ったマイクロサービス間でも通信ができる。

アプリケーションの全体像

Protocol Buffersを使うには、まずprotoファイル内でデータのスキーマを定義し、それをデータの送受信側それぞれで共有する必要がある。
protoファイルがあれば、それをもとにデータの生成やパースを行うソースコードを各言語向けに自動生成することができるので、スキーマが変更されても開発者はパーサーの保守をする必要がなく、ビジネスロジックに集中することができる。
Protocol BuffersをKafkaによるメッセージ送受信に用いる場合、アプリケーションは以下のようなフローになる。

protobuf-kafka.png

アプリケーションのレポジトリは以下。

protoファイルを作成する

まずはprofoファイルを作成する。今回は movie_name , location, box_office の3つのフィールドからなるデータをKafkaで送受信する。

syntax = "proto3";

package proto;

option java_multiple_files = true;
option java_package = "com.udomomo.springbootkafkapractice.proto";

message MyTopicEntry {
  string movie_name = 1;
  enum LOCATION {
    UNKNOWN = 0;
    THEATER = 1;
    STREAMING = 2;
  }
  LOCATION location = 2;
  int32 box_office = 3;
}

java_multiple_files は、protoファイルから生成されるJavaソースコードを複数ファイルにするかどうかを設定できる。 java_package は生成されるソースコードのパッケージ名である。
protoファイルの文法は以下を参照。

protoファイルはどこに配置すべきか?

protoファイルは複数のアプリケーションが共有するものなので、特定のアプリケーションの中に配置するべきではない。よくある方法として、以下のようなものがある。

  • 複数アプリケーションから参照されるcommonパッケージの中にprotoファイルと生成コードを入れ、各アプリケーションの依存関係として追加する
  • Schema Registryを利用する(例: Confluent Schema Registry

今回はローカルで行う簡易プロジェクトなので、proto/ディレクトリを作成してそこに配置しておくのみとする。

protoファイルからコードを生成する

アプリケーションのビルド時に、最新のprotoファイルからコードが生成されるようにする。今回はGradleを使う。

必要な依存関係は以下の3つ。

  • com.google.protobuf : Gradle用のプラグイン。protoファイルからコードを生成するためのタスクを提供する。
  • com.google.protobuf:protobuf-java : Java用のProtocol Buffersコアライブラリ。生成されたコードにアプリケーションからアクセスするためのメソッド等を提供する。Gradleでは dependency に追加すれば良い。
  • protoc : protoファイルをコンパイルするために必要なC言語パッケージ。CLIも提供されているが、ベストプラクティスはプリコンパイルされた com.google.protobuf:protoc パッケージをタスク内で使うこと。(下記参照)

なお、 protobuf-javaprotoc のバージョンは揃えておく必要がある。

以下のようにGradleタスクを定義する。

sourceSets {
    main {
        proto {
            srcDir "$rootDir/proto"
        }
        java {
            srcDirs "$buildDir/generated/source/proto/main/java"
        }
    }
}

protobuf {
    protoc {
        artifact = 'com.google.protobuf:protoc:3.17.3'
    }
}

protobuf タスク内で、protoc を使ってprotoファイルがコンパイルされる。
また、sourceSets タスクは、生成されたコードにアプリケーションからアクセスできるようにするために用意している。プロパティは以下の通り。

  • proto.srcDir: protoファイルの場所を指定している。
  • java.srcDirs: 生成されたコードが格納される場所を指定している。

これらを main 以下で定義することで、生成されたコードがアプリケーションの main sourceSetの一部となり、アクセス可能になる。

タスクを定義したら、ビルド時にprotoファイルが本当にコンパイルされるかどうかをgradlew--dry-run オプションで確認できる。以下は producer アプリケーションのビルドで確認した例。

$ ./gradlew :producer:build --dry-run
Starting a Gradle Daemon, 1 incompatible Daemon could not be reused, use --status for details
:producer:extractIncludeProto SKIPPED
:producer:extractProto SKIPPED
:producer:generateProto SKIPPED
:producer:compileJava SKIPPED
:producer:processResources SKIPPED
:producer:classes SKIPPED
:producer:bootJar SKIPPED
:producer:jar SKIPPED
:producer:assemble SKIPPED
:producer:extractIncludeTestProto SKIPPED
:producer:extractTestProto SKIPPED
:producer:generateTestProto SKIPPED
:producer:compileTestJava SKIPPED
:producer:processTestResources SKIPPED
:producer:testClasses SKIPPED
:producer:test SKIPPED
:producer:check SKIPPED
:producer:build SKIPPED

BUILD SUCCESSFUL in 16s

compileJavaタスクの前に、Proto関連のタスクが実行されることがわかる。

Kafkaでメッセージを送受信する

Producer

protoファイルで定義したデータオブジェクトは、builderメソッドを使って作成することができる。以下の setMovieName() などのメソッドは、protoファイルをもとに自動で生成されたものである。

MyTopicEntry myTopicEntry1 = MyTopicEntry.newBuilder()
            .setMovieName("Titanic")
            .setLocation(MyTopicEntry.LOCATION.STREAMING)
            .setBoxOffice(340000)
            .build();

このデータオブジェクトをproducerで送信するためのSerializerを作っておく。中身は単にバイト列に変換しているだけだが、データの型ごとにSerializerを分けておく方が、後で実装を追加するときの影響範囲を限定しやすい。

public class MyTopicEntrySerializer implements Serializer<MyTopicEntry> {
  public MyTopicEntrySerializer() {
  }

  public byte[] serialize(String topic, MyTopicEntry data) {
    return data.toByteArray();
  }
}

作成したSerializerを使って、メッセージを送信することができる。

kafkaTemplate.send(kafkaSettings.getTopic(), myTopicEntry1);

Consumer

MyTopicEntryDeserializer を作り、Consumerからメッセージを受け取る。受け取ったメッセージは、parseFrom() メソッドを使うことで、Protoファイルで定義したデータオブジェクトにデシリアライズできる。

MyTopicEntry entry = MyTopicEntry.parseFrom(record.value());
6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?