はじめに
この記事は Kotlin Advent Calendar 2019 6日目の記事です。5日目は @kuluna さんのKotlinで@Deprecatedと@Experimentalを使って無言で意思疎通をとる、7日目は @toliner さんのGradle Kotlin DSL入門です。
Kroto+(kroto-plus) とは
タイトルにもある通り、Kotlin向けのgRPC/protobufライブラリです。
https://github.com/marcoferrer/kroto-plus
従来では、KotlinでgRPCやprotobufを扱う場合、Java向けのライブラリやGradleプラグインを使い、Javaのコードを生成してそれをKotlin側から使っていましたが、Kroto+を使うことによって、KotlinのCoroutineを使ったコードの生成や、messageのインスタンス生成やserviceの呼び出しをDSLスタイルで書くことが可能になります。
// 従来
val message1 = MessageRequest.newBuilder()
.setMessage("hoge")
.build()
// Kroto+
val message2 = MessageRequest {
message = "hoge"
}
サンプル
公式のGetting Started With Gradleとexample-projectを参考に、簡単なサンプルを用意しました。
https://github.com/yt8492/KrotoPlusSample/tree/qiita_article_version
build.gradle
基本的に公式のサンプルを参考にしましたが、そのままではうまくいかない部分があったので一部自分で手を加えています。より良い方法などあればコメントでご指摘ください。
今回使用したライブラリのバージョンは以下の通りです。
krotoplus_version=0.5.0
protobuf_version=3.10.0
coroutines_version=1.3.2
grpc_version=1.25.0
pluginsブロックにprotobufを追加します。
plugins {
id 'org.jetbrains.kotlin.jvm' version '1.3.61'
id 'com.google.protobuf' version '0.8.10' // 追加
id 'application'
}
dependenciesに依存を追加していきます。
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
implementation "com.github.marcoferrer.krotoplus:kroto-plus-coroutines:$krotoplus_version"
implementation "com.github.marcoferrer.krotoplus:kroto-plus-message:$krotoplus_version"
implementation "com.google.protobuf:protobuf-java:$protobuf_version"
implementation "io.grpc:grpc-protobuf:$grpc_version"
implementation "io.grpc:grpc-stub:$grpc_version"
implementation "io.grpc:grpc-netty:$grpc_version"
}
compileKotlinにオプションを追加します。
compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs += [
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi"
]
}
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
protobufタスクを定義していきます。
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protobuf_version"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
}
coroutines {
artifact = "com.github.marcoferrer.krotoplus:protoc-gen-grpc-coroutines:$krotoplus_version:jvm8@jar"
}
kroto {
artifact = "com.github.marcoferrer.krotoplus:protoc-gen-kroto-plus:$krotoplus_version:jvm8@jar"
}
}
generateProtoTasks {
def krotoConfig = file("krotoPlusConfig.asciipb")
all().each { task ->
task.inputs.files krotoConfig
task.plugins {
grpc {}
coroutines {}
kroto {
outputSubDir = "java"
option "ConfigPath=$krotoConfig"
}
}
}
}
}
generateProtoTasks
の krotoConfig
に読み込んだ krotoPlusConfig.asciipb
には、自動生成するStubの設定を書くことができます。今回は、protobufのmessageをDSLスタイルで使えるようにしたいので、以下のようにしました。
proto_builders {
filter { exclude_path: "google/*" }
unwrap_builders: true
use_dsl_markers: true
}
最後に、自動生成されたファイルを読み込むため、sourceSetsで自動生成されたファイルが入るディレクトリを指定しています。
sourceSets {
main {
java {
srcDir("$buildDir/generated/source/proto/main/java")
srcDir("$buildDir/generated/source/proto/main/grpc")
srcDir("$buildDir/generated/source/proto/main/coroutines")
}
}
}
protoファイルの定義
今回は、1:1の通信、クライアントサイドストリーミング、サーバーサイドストリーミング、双方向ストリーミングの4つをrpcとして定義しました。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.yt8492.krotosample.protobuf";
option java_outer_classname = "KrotoSample";
package api;
message MessageRequest {
string message = 1;
}
message MessageResponse {
string message = 1;
}
service MessageService {
rpc Unary (MessageRequest) returns (MessageResponse);
rpc ClientStream (stream MessageRequest) returns (MessageResponse);
rpc ServerStream (MessageRequest) returns (stream MessageResponse);
rpc BidirectionalStream (stream MessageRequest) returns (stream MessageResponse);
}
コードの自動生成
generateProto
タスクを実行しましょう。
./gradlew generateProto
Serviceの実装
従来は自動生成されたJavaの HogeGrpc.FugaImplBase
(今回の場合は MessageServiceGrpc.MessageServiceImplBase
)を継承したクラスに実装を書いていくのですが、Kroto+ではKotlinの HogeCoroutineGrpc.FugaImplBase
(今回の場合は MessageServiceCoroutineGrpc.MessageServiceImplBase
)を継承し実装していきます。
@ExperimentalCoroutinesApi
class MessageServiceImpl : MessageServiceCoroutineGrpc.MessageServiceImplBase() {
override val initialContext: CoroutineContext
get() = Dispatchers.Default + SupervisorJob()
override suspend fun unary(
request: MessageRequest
): MessageResponse {
return MessageResponse {
message = request.message.toUpperCase()
}
}
override suspend fun clientStream(
requestChannel: ReceiveChannel<MessageRequest>
): MessageResponse {
val requestList = requestChannel.toList()
return MessageResponse {
message = requestList.joinToString("\n") {
it.message.toUpperCase()
}
}
}
override suspend fun serverStream(
request: MessageRequest,
responseChannel: SendChannel<MessageResponse>
) {
val response = MessageResponse {
message = request.message.toUpperCase()
}
repeat(2) {
responseChannel.send(response)
}
responseChannel.close()
}
override suspend fun bidirectionalStream(
requestChannel: ReceiveChannel<MessageRequest>,
responseChannel: SendChannel<MessageResponse>
) {
requestChannel.consumeEach { request ->
val response = MessageResponse {
message = request.message.toUpperCase()
}
responseChannel.send(response)
}
}
}
今回はサンプルなので複雑なことはせず、リクエストで来た文字列をUpperCaseにして返しているだけの雑な実装です。
ストリームはCoroutineのChannelを使って処理を書くことができるので、従来のコールバックスタイルより楽です。
サーバーの起動
./gradlew run
でサーバーを起動、Ctrl+Cでシャットダウンできるようにします。
@ExperimentalCoroutinesApi
fun main() {
val port = 6565
val server = ServerBuilder.forPort(port)
.addService(MessageServiceImpl())
.build()
.start()
Runtime.getRuntime().addShutdownHook(Thread() {
server.shutdown()
})
server.awaitTermination()
}
build.gradle
で application
プラグインを設定していない場合は以下を追記してください。
application {
mainClassName = "MainKt"
}
実際に起動して、クライアントからリクエストを投げてみます。以下の例ではevansを使いました。
うまくいってそうですね!
クライアントでもKroto+を使ってみる
Kroto+はクライアントで利用するStubもCoroutineを利用するものを生成することができます。
先程の main
関数を書き換えてみましょう。
@ExperimentalCoroutinesApi
fun main() {
val port = 6565
val server = ServerBuilder.forPort(port)
.addService(MessageServiceImpl())
.build()
.start()
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build()
val client = MessageServiceCoroutineGrpc
.MessageServiceCoroutineStub.newStub(channel)
println("--- Bidirectional Stream start ---")
runBlocking {
val (requestChannel, responseChannel) = client.bidirectionalStream()
listOf("hoge", "fuga", "piyo").forEach {
requestChannel.send {
message = it
}
}
requestChannel.close()
responseChannel.consumeEach {
println(it.message)
}
}
println("--- Bidirectional Stream finish ---")
println("--- Client Stream start ---")
runBlocking {
val (requestChannel, response) = client.clientStream()
listOf("hgoe", "fuga", "piyo").forEach {
requestChannel.send {
message = it
}
}
requestChannel.close()
println(response.await().message)
}
println("--- Client Stream finish ---")
println("--- Server Stream start ---")
runBlocking {
val request = MessageRequest {
message = "hoge"
}
client.serverStream(request).consumeEach {
println(it.message)
}
}
println("--- Server Stream finish ---")
server.shutdown()
}
Kroto+で生成されるStubのメソッドはsuspend funなので、コルーチン内で呼び出しています。
サーバー側と同様、ストリームはChannelを使って処理を書いてます。最近登場したFlowに変換しても使い勝手がいいと思います。
以下が実行結果です。
--- Bidirectional Stream start ---
HOGE
FUGA
PIYO
--- Bidirectional Stream finish ---
--- Client Stream start ---
HGOE
FUGA
PIYO
--- Client Stream finish ---
--- Server Stream start ---
HOGE
HOGE
--- Server Stream finish ---
ちゃんとできてそうです!
おまけ: grpc-spring-boot-starterで使う場合
2日目の @n_takehata さんの記事(サーバーサイドKotlin×gRPCコトハジメ 〜⑤プロジェクト作成から基本的なAPI実装までまとめ〜)でも紹介されてたgrpc-spring-boot-starterを使う場合でも、Kroto+のCoroutineServiceを使った実装を @GRpcService
アノテーションをつけて問題なく利用できます。
以下のサンプルは今回の記事とは関係なく前に作ってたものを流用した雑な例なのでご了承ください…
@GRpcService
class GRpcService : ChatServiceGrpc.ChatServiceImplBase() {
private val observers = ConcurrentHashMap.newKeySet<StreamObserver<MessageResponse>?>()
override fun execStream(
responseObserver: StreamObserver<MessageResponse>?
): StreamObserver<MessageRequest> {
observers.add(responseObserver)
return object : StreamObserver<MessageRequest> {
override fun onNext(value: MessageRequest?) {
println(value?.message)
val res = MessageResponse.newBuilder()
.setMessage(value?.message)
.build()
observers.forEach {
it?.onNext(res)
}
}
override fun onError(t: Throwable?) {
t?.printStackTrace()
observers.remove(responseObserver)
responseObserver?.onCompleted()
}
override fun onCompleted() {
println("onCompleted")
}
}
}
override fun healthCheck(request: Empty?, responseObserver: StreamObserver<Empty>?) {
responseObserver?.onNext(request)
}
}
@ExperimentalCoroutinesApi
@GRpcService
class ChatServiceImpl : ChatServiceCoroutineGrpc.ChatServiceImplBase() {
private val channels = ConcurrentHashMap.newKeySet<SendChannel<MessageResponse>>()
override val initialContext: CoroutineContext
get() = Dispatchers.Default + SupervisorJob()
override suspend fun execStream(
requestChannel: ReceiveChannel<MessageRequest>,
responseChannel: SendChannel<MessageResponse>
) {
channels.add(responseChannel)
requestChannel.consumeEach {
val res = MessageResponse {
message = it.message
}
channels.forEach {
try {
it.send(res)
} catch (e: StatusRuntimeException) {
channels.remove(responseChannel)
}
}
}
}
override suspend fun healthCheck(request: Empty): Empty {
return request
}
}
少なくとも自分の環境では後者も問題なくSpring Bootで動くことを確認しています。
まとめ
もともとgRPCが好きだったこともあり、今回Kroto+を試してみましたが、普通に使い勝手は良さそうでした。KotlinのCoroutineとgRPCを組み合わせて使いたい場合には試してみる価値がありそうです。これからに期待ですね。
ありがとうございました。