Help us understand the problem. What is going on with this article?

Kotlin向けgRPC/protobufライブラリKroto+を使ってみよう

はじめに

この記事は Kotlin Advent Calendar 2019 6日目の記事です。5日目は @kuluna さんのKotlinで@Deprecatedと@Experimentalを使って無言で意思疎通をとる、7日目は @toliner さんのGradle Kotlin DSL入門です。

Kroto+(kroto-plus) とは

タイトルにもある通り、Kotlin向けのgRPC/protobufライブラリです。
https://github.com/marcoferrer/kroto-plus
従来では、KotlinでgRPCやprotobufを扱う場合、Java向けのライブラリやGradleプラグインを使い、Javaのコードを生成してそれをKotlin側から使っていましたが、Kroto+を使うことによって、KotlinのCoroutineを使ったコードの生成や、messageのインスタンス生成やserviceの呼び出しをDSLスタイルで書くことが可能になります。

messageのインスタンス生成の例
// 従来
val message1 = MessageRequest.newBuilder()
    .setMessage("hoge")
    .build()

// Kroto+
val message2 = MessageRequest { 
    message = "hoge"
}

サンプル

公式のGetting Started With Gradleexample-projectを参考に、簡単なサンプルを用意しました。
https://github.com/yt8492/KrotoPlusSample/tree/qiita_article_version

build.gradle

基本的に公式のサンプルを参考にしましたが、そのままではうまくいかない部分があったので一部自分で手を加えています。より良い方法などあればコメントでご指摘ください。

今回使用したライブラリのバージョンは以下の通りです。

gradle.properties
krotoplus_version=0.5.0
protobuf_version=3.10.0
coroutines_version=1.3.2
grpc_version=1.25.0

pluginsブロックにprotobufを追加します。

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.3.61'
    id 'com.google.protobuf' version '0.8.10' // 追加
    id 'application'
}

dependenciesに依存を追加していきます。

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"

    implementation "com.github.marcoferrer.krotoplus:kroto-plus-coroutines:$krotoplus_version"
    implementation "com.github.marcoferrer.krotoplus:kroto-plus-message:$krotoplus_version"

    implementation "com.google.protobuf:protobuf-java:$protobuf_version"

    implementation "io.grpc:grpc-protobuf:$grpc_version"
    implementation "io.grpc:grpc-stub:$grpc_version"
    implementation "io.grpc:grpc-netty:$grpc_version"
}

compileKotlinにオプションを追加します。

compileKotlin {
    kotlinOptions {
        jvmTarget = "1.8"
        freeCompilerArgs += [
                "-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi"
        ]
    }
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}

protobufタスクを定義していきます。

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:$protobuf_version"
    }

    plugins {
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
        }
        coroutines {
            artifact = "com.github.marcoferrer.krotoplus:protoc-gen-grpc-coroutines:$krotoplus_version:jvm8@jar"
        }
        kroto {
            artifact = "com.github.marcoferrer.krotoplus:protoc-gen-kroto-plus:$krotoplus_version:jvm8@jar"
        }
    }

    generateProtoTasks {
        def krotoConfig = file("krotoPlusConfig.asciipb")

        all().each { task ->

            task.inputs.files krotoConfig

            task.plugins {
                grpc {}
                coroutines {}
                kroto {
                    outputSubDir = "java"
                    option "ConfigPath=$krotoConfig"
                }
            }
        }
    }
}

generateProtoTaskskrotoConfig に読み込んだ krotoPlusConfig.asciipb には、自動生成するStubの設定を書くことができます。今回は、protobufのmessageをDSLスタイルで使えるようにしたいので、以下のようにしました。

krotoPlusConfig.asciipb
proto_builders {
    filter { exclude_path: "google/*" }
    unwrap_builders: true
    use_dsl_markers: true
}

最後に、自動生成されたファイルを読み込むため、sourceSetsで自動生成されたファイルが入るディレクトリを指定しています。

sourceSets {
    main {
        java {
            srcDir("$buildDir/generated/source/proto/main/java")
            srcDir("$buildDir/generated/source/proto/main/grpc")
            srcDir("$buildDir/generated/source/proto/main/coroutines")
        }
    }
}

protoファイルの定義

今回は、1:1の通信、クライアントサイドストリーミング、サーバーサイドストリーミング、双方向ストリーミングの4つをrpcとして定義しました。

api.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.yt8492.krotosample.protobuf";
option java_outer_classname = "KrotoSample";

package api;

message MessageRequest {
    string message = 1;
}

message MessageResponse {
    string message = 1;
}

service MessageService {
    rpc Unary (MessageRequest) returns (MessageResponse);
    rpc ClientStream (stream MessageRequest) returns (MessageResponse);
    rpc ServerStream (MessageRequest) returns (stream MessageResponse);
    rpc BidirectionalStream (stream MessageRequest) returns (stream MessageResponse);
}

コードの自動生成

generateProto タスクを実行しましょう。

 ./gradlew generateProto

以下のようなファイルが自動生成されると思います。
スクリーンショット 2019-12-04 17.55.35.png

Serviceの実装

従来は自動生成されたJavaの HogeGrpc.FugaImplBase (今回の場合は MessageServiceGrpc.MessageServiceImplBase )を継承したクラスに実装を書いていくのですが、Kroto+ではKotlinの HogeCoroutineGrpc.FugaImplBase (今回の場合は MessageServiceCoroutineGrpc.MessageServiceImplBase )を継承し実装していきます。

MessageServiceImpl.kt
@ExperimentalCoroutinesApi
class MessageServiceImpl : MessageServiceCoroutineGrpc.MessageServiceImplBase() {
    override val initialContext: CoroutineContext
        get() = Dispatchers.Default + SupervisorJob()

    override suspend fun unary(
        request: MessageRequest
    ): MessageResponse {
        return MessageResponse {
            message = request.message.toUpperCase()
        }
    }

    override suspend fun clientStream(
        requestChannel: ReceiveChannel<MessageRequest>
    ): MessageResponse {
        val requestList = requestChannel.toList()
        return MessageResponse {
            message = requestList.joinToString("\n") {
                it.message.toUpperCase()
            }
        }
    }

    override suspend fun serverStream(
        request: MessageRequest,
        responseChannel: SendChannel<MessageResponse>
    ) {
        val response = MessageResponse {
            message = request.message.toUpperCase()
        }
        repeat(2) {
            responseChannel.send(response)
        }
        responseChannel.close()
    }

    override suspend fun bidirectionalStream(
        requestChannel: ReceiveChannel<MessageRequest>,
        responseChannel: SendChannel<MessageResponse>
    ) {
        requestChannel.consumeEach { request ->
            val response = MessageResponse {
                message = request.message.toUpperCase()
            }
            responseChannel.send(response)
        }
    }
}

今回はサンプルなので複雑なことはせず、リクエストで来た文字列をUpperCaseにして返しているだけの雑な実装です。
ストリームはCoroutineのChannelを使って処理を書くことができるので、従来のコールバックスタイルより楽です。

サーバーの起動

./gradlew run でサーバーを起動、Ctrl+Cでシャットダウンできるようにします。

Main.kt
@ExperimentalCoroutinesApi
fun main() {
    val port = 6565
    val server = ServerBuilder.forPort(port)
        .addService(MessageServiceImpl())
        .build()
        .start()
    Runtime.getRuntime().addShutdownHook(Thread() {
        server.shutdown()
    })
    server.awaitTermination()
}

build.gradleapplication プラグインを設定していない場合は以下を追記してください。

application {
    mainClassName = "MainKt"
}

実際に起動して、クライアントからリクエストを投げてみます。以下の例ではevansを使いました。
スクリーンショット 2019-12-04 18.35.16.png
うまくいってそうですね!

クライアントでもKroto+を使ってみる

Kroto+はクライアントで利用するStubもCoroutineを利用するものを生成することができます。
先程の main 関数を書き換えてみましょう。

Main.kt
@ExperimentalCoroutinesApi
fun main() {
    val port = 6565
    val server = ServerBuilder.forPort(port)
        .addService(MessageServiceImpl())
        .build()
        .start()

    val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
        .usePlaintext()
        .build()
    val client = MessageServiceCoroutineGrpc
        .MessageServiceCoroutineStub.newStub(channel)
    println("--- Bidirectional Stream start ---")
    runBlocking {
        val (requestChannel, responseChannel) = client.bidirectionalStream()
        listOf("hoge", "fuga", "piyo").forEach {
            requestChannel.send {
                message = it
            }
        }
        requestChannel.close()
        responseChannel.consumeEach {
            println(it.message)
        }
    }
    println("--- Bidirectional Stream finish ---")
    println("--- Client Stream start ---")
    runBlocking {
        val (requestChannel, response) = client.clientStream()
        listOf("hgoe", "fuga", "piyo").forEach {
            requestChannel.send {
                message = it
            }
        }
        requestChannel.close()
        println(response.await().message)
    }
    println("--- Client Stream finish ---")
    println("--- Server Stream start ---")
    runBlocking {
        val request = MessageRequest {
            message = "hoge"
        }
        client.serverStream(request).consumeEach {
            println(it.message)
        }
    }
    println("--- Server Stream finish ---")
    server.shutdown()
}

Kroto+で生成されるStubのメソッドはsuspend funなので、コルーチン内で呼び出しています。
サーバー側と同様、ストリームはChannelを使って処理を書いてます。最近登場したFlowに変換しても使い勝手がいいと思います。

以下が実行結果です。

--- Bidirectional Stream start ---
HOGE
FUGA
PIYO
--- Bidirectional Stream finish ---
--- Client Stream start ---
HGOE
FUGA
PIYO
--- Client Stream finish ---
--- Server Stream start ---
HOGE
HOGE
--- Server Stream finish ---

ちゃんとできてそうです!

おまけ: grpc-spring-boot-starterで使う場合

2日目の @n_takehata さんの記事(サーバーサイドKotlin×gRPCコトハジメ 〜⑤プロジェクト作成から基本的なAPI実装までまとめ〜)でも紹介されてたgrpc-spring-boot-starterを使う場合でも、Kroto+のCoroutineServiceを使った実装を @GRpcService アノテーションをつけて問題なく利用できます。
以下のサンプルは今回の記事とは関係なく前に作ってたものを流用した雑な例なのでご了承ください…

従来のService
@GRpcService
class GRpcService : ChatServiceGrpc.ChatServiceImplBase() {

    private val observers = ConcurrentHashMap.newKeySet<StreamObserver<MessageResponse>?>()

    override fun execStream(
            responseObserver: StreamObserver<MessageResponse>?
    ): StreamObserver<MessageRequest> {
        observers.add(responseObserver)
        return object : StreamObserver<MessageRequest> {
            override fun onNext(value: MessageRequest?) {
                println(value?.message)
                val res = MessageResponse.newBuilder()
                        .setMessage(value?.message)
                        .build()
                observers.forEach {
                    it?.onNext(res)
                }
            }

            override fun onError(t: Throwable?) {
                t?.printStackTrace()
                observers.remove(responseObserver)
                responseObserver?.onCompleted()
            }

            override fun onCompleted() {
                println("onCompleted")
            }
        }
    }

    override fun healthCheck(request: Empty?, responseObserver: StreamObserver<Empty>?) {
        responseObserver?.onNext(request)
    }
}
Kroto+のService
@ExperimentalCoroutinesApi
@GRpcService
class ChatServiceImpl : ChatServiceCoroutineGrpc.ChatServiceImplBase() {

    private val channels = ConcurrentHashMap.newKeySet<SendChannel<MessageResponse>>()

    override val initialContext: CoroutineContext
        get() = Dispatchers.Default + SupervisorJob()

    override suspend fun execStream(
            requestChannel: ReceiveChannel<MessageRequest>,
            responseChannel: SendChannel<MessageResponse>
    ) {
        channels.add(responseChannel)
        requestChannel.consumeEach {
            val res = MessageResponse {
                message = it.message
            }
            channels.forEach {
                try {
                    it.send(res)
                } catch (e: StatusRuntimeException) {
                    channels.remove(responseChannel)
                }
            }
        }
    }

    override suspend fun healthCheck(request: Empty): Empty {
        return request
    }
}

少なくとも自分の環境では後者も問題なくSpring Bootで動くことを確認しています。

まとめ

もともとgRPCが好きだったこともあり、今回Kroto+を試してみましたが、普通に使い勝手は良さそうでした。KotlinのCoroutineとgRPCを組み合わせて使いたい場合には試してみる価値がありそうです。これからに期待ですね。
ありがとうございました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした