grpc-kotlin とは
grpc-kotlinは、gRPCのServiceをKotlinのCoroutineで扱えるようにするためのprotocプラグインです。
このprotocプラグインを使うことにより、Kotlin Coroutinesを使うgRPCのコードを生成できます。
// 従来
override fun unary(
request: MessageRequest,
responseObserver: StreamObserver<MessageResponse>
) {
val response = MessageResponse.newBuilder()
.setMessage(request.message.toUpperCase())
.build()
responseObserver.onNext(response)
}
//grpc-kotlin
override suspend fun unary(
request: MessageRequest
): MessageResponse {
val response = MessageResponse.newBuilder()
.setMessage(request.message.toUpperCase())
.build()
return response
}
サンプル
サーバーとクライアントの簡単なサンプルを用意しました。
build.gradle
今回使用したライブラリのバージョンは以下の通りです。
grpc_kotlin_version=0.1.4
protoc_version=3.11.0
coroutines_version=1.3.5
grpc_version=1.29.0
pluginsブロックにprotobufを追加します。
plugins {
id 'org.jetbrains.kotlin.jvm' version '1.3.72'
id 'com.google.protobuf' version '0.8.12'
id 'application'
}
dependenciesに依存を追加していきます。
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
implementation "io.grpc:grpc-netty-shaded:$grpc_version"
implementation "io.grpc:grpc-protobuf:$grpc_version"
implementation "io.grpc:grpc-stub:$grpc_version"
}
compileKotlinにオプションを追加します。
compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs += ["-Xopt-in=kotlin.RequiresOptIn"]
}
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
protobufタスクを定義していきます。
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protoc_version"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
}
grpckotlin {
artifact = "io.rouz:grpc-kotlin-gen:$grpc_kotlin_version"
}
}
generateProtoTasks {
all().each { task ->
task.plugins {
grpc {}
grpckotlin {}
}
}
}
}
最後に、自動生成されたファイルを読み込むため、sourceSetsで自動生成されたファイルが入るディレクトリを指定しています。
sourceSets {
main {
java {
srcDir("$buildDir/generated/source/proto/main/java")
srcDir("$buildDir/generated/source/proto/main/grpc")
srcDir("$buildDir/generated/source/proto/main/grpckotlin")
}
}
}
protoファイルの定義
1:1の通信、クライアントサイドストリーミング、サーバーサイドストリーミング、双方向ストリーミングの4つをrpcとして定義しました。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.yt8492.grpcsample.protobuf";
option java_outer_classname = "GrpcSample";
package api;
message MessageRequest {
string message = 1;
}
message MessageResponse {
string message = 1;
}
service MessageService {
rpc Unary (MessageRequest) returns (MessageResponse);
rpc ClientStream(stream MessageRequest) returns (MessageResponse);
rpc ServerStream(MessageRequest) returns (stream MessageResponse);
rpc BidirectionalStream (stream MessageRequest) returns (stream MessageResponse);
}
コードの自動生成
generateProto
タスクを実行しましょう。
./gradlew generateProto
サーバーの実装
grpc-kotlinでは、自動生成された ${service名}ImplBase
(今回の場合は MessageServiceImplBase
)を継承しServiceの実装を書いていきます。
@OptIn(ExperimentalCoroutinesApi::class)
class MessageServiceImpl : MessageServiceImplBase() {
override suspend fun unary(
request: MessageRequest
): MessageResponse {
val response = MessageResponse.newBuilder()
.setMessage(request.message.toUpperCase())
.build()
return response
}
override suspend fun clientStream(
requests: ReceiveChannel<MessageRequest>
): MessageResponse {
val requestList = requests.toList()
val response = MessageResponse.newBuilder()
.setMessage(
requestList.joinToString("\n") {
it.message.toUpperCase()
}
)
.build()
return response
}
override fun serverStream(
request: MessageRequest
): ReceiveChannel<MessageResponse> {
val response = MessageResponse.newBuilder()
.setMessage(request.message.toUpperCase())
.build()
return produce {
repeat(2) {
send(response)
}
}
}
override fun bidirectionalStream(
requests: ReceiveChannel<MessageRequest>
): ReceiveChannel<MessageResponse> {
return produce {
requests.consumeEach { request ->
val response = MessageResponse.newBuilder()
.setMessage(request.message.toUpperCase())
.build()
send(response)
}
}
}
}
今回はサンプルなので複雑なことはせず、リクエストで来た文字列をUpperCaseにして返しているだけの雑な実装です。
grpc-kotlinの特徴として、ImplBaseのメソッドのシグネチャが直感的というのが挙げられると思います。メソッドの引数に受け取るMessageを、メソッドの返り値に返すMessageを定義しています。Streamの場合にはそれがCoroutineのChannelになるだけです。
サーバーを立ち上げるコードです。
fun main() {
val port = 6565
val server = ServerBuilder.forPort(port)
.addService(MessageServiceImpl())
.build()
.start()
Runtime.getRuntime().addShutdownHook(Thread() {
server.shutdown()
})
server.awaitTermination()
}
build.gradleでmain classを指定し、 ./gradlew run
を実行するとサーバーが起動します。
application {
mainClassName = "MainKt"
}
クライアントの実装
サーバーを実装したので、クライアントを実装してみましょう。
grpc-kotlinは、生成されたJavaのStubのclassの拡張関数を生成します。
そのため、Stub自体はJavaのものを利用します。
@OptIn(ExperimentalCoroutinesApi::class)
fun main() {
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build()
val client = MessageServiceGrpc.newStub(channel)
runBlocking {
println("--- Unary Call start ---")
val request = MessageRequest.newBuilder()
.setMessage("hoge")
.build()
val response = client.unary(request)
println(response.message)
println("--- Unary Call finish ---")
}
runBlocking {
println("--- Client Stream start ---")
val call = client.clientStream()
listOf("hoge", "fuga", "piyo").forEach {
val request = MessageRequest.newBuilder()
.setMessage(it)
.build()
call.send(request)
}
call.close()
val response = call.await()
println(response.message)
println("--- Client Stream finish ---")
}
runBlocking {
println("--- Server Stream start ---")
val request = MessageRequest.newBuilder()
.setMessage("hoge")
.build()
val call = client.serverStream(request)
call.consumeEach {
println(it.message)
}
println("--- Server Stream finish ---")
}
runBlocking {
println("--- Bidirectional Stream start ---")
val call = client.bidirectionalStream()
listOf("hoge", "fuga", "piyo").forEach {
val request = MessageRequest.newBuilder()
.setMessage(it)
.build()
call.send(request)
}
call.close()
call.consumeEach {
println(it.message)
}
println("--- Bidirectional Stream finish ---")
}
}
Unary Callは、単純にリクエストのMessageを渡しレスポンスのMessageを受け取るsuspend関数です。
Client Streamは、引数なしで関数を呼び出し、ManyToOneCallという型の値を返します。リクエストのMessageはManyToOneCallのメソッドであるsendで渡し、レスポンスはManyToOneCallのawaitで受け取ります。
Server Streamは、引数にリクエストのMessageを渡し、返り値にレスポンスのMessageのChannelを受け取ります。
BidirectionalStreamは、引数なしで関数を呼び出し、ManyToManyCallという型の値を返します。リクエストのMessageはManyToManyCallのメソッドであるsendで渡します。ManyToManyCall型自体がReceiveChannel型を継承しているため、consumeEachなどでレスポンスを扱うことができます。
サーバー側と同じように、build.gradleでmain classを指定し、 ./gradlew run
で実行します。
実行結果
--- Unary Call start ---
HOGE
--- Unary Call finish ---
--- Client Stream start ---
HOGE
FUGA
PIYO
--- Client Stream finish ---
--- Server Stream start ---
HOGE
HOGE
--- Server Stream finish ---
--- Bidirectional Stream start ---
HOGE
FUGA
PIYO
--- Bidirectional Stream finish ---
まとめ
前回の記事ではkroto-plusを試しましたが、今回はgrpc-kotlinというKotlin向けのprotocプラグインを試してみました。Messageの生成などはDSLスタイルで書くことができるkroto-plusのほうが個人的に好みですが、Kotlin Coroutinesを利用するという点ではこちらも機能的に十分だと思います。
kroto-plusとgrpc-kotlin、どちらも便利なのでKotlinでgRPCを扱う際の候補としてぜひ検討してみてください。