この記事はなに?
ScalaでgRPCなアプリケーションを作るためのやりかた。
具体的には4つの通信方式、
- unary
- server straming
- client streaming
- bidirectional streaming
それぞれについてサーバ/クライアントをScalaで実装する。
参考
目次
- sbtの設定、マルチプロジェクト対応
- .protoファイルを作成
- .protoから生成したクラスをScalaから使用する
- unary
- server straming
- client streaming
- bidirectional streaming
サンプルコードはpetitviolet/scala-grpc-pracに置いた。
sbtの設定、マルチプロジェクト対応
modules
配下にmain
, model
, protocol
という3つのプロジェクトを設定する。
gRPCのprotocol bufferで使う.protoファイルはmodules/protocol/protocol
配下に置けるようにする。
treeだとこんな感じ。
./modules/
├── main
│ └── src
│ └── main
├── model
│ └── src
│ └── main
└── protocol
└── protocol
└── my_service.proto
build.sbtに設定を記述する
だいたい全部を載せておく
// 共通設定
def commonSettings(_name: String) = Seq(
scalaVersion := "2.12.4",
version := "1.0.0",
libraryDependencies ++= commonDependencies,
name := _name,
)
// gRPC用の設定
def grpcProtocolSettings = {
import scalapb.compiler.Version
Seq(
PB.targets in Compile := Seq(
scalapb.gen(singleLineToProtoString = true) -> (sourceManaged in Compile).value
),
PB.protoSources in Compile +=
(baseDirectory in ThisProject).value / "protocol",
libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % Version.scalapbVersion % "protobuf",
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % Version.scalapbVersion,
"io.grpc" % "grpc-netty" % Version.grpcJavaVersion
)
)
}
// rootプロジェクト
lazy val grpcPrac = (project in file("."))
.settings(commonSettings("grpcPrac"))
.aggregate(main)
// gRPCの.protoファイルを配置するプロジェクト
lazy val protocol = (project in file("modules/protocol"))
.settings(commonSettings("protocol"))
.settings(grpcProtocolSettings) // gRPC設定を適用
// gRPC非依存なプロジェクト
lazy val model = (project in file("modules/model"))
.settings(commonSettings("model"))
// gRPCなserverを実装するプロジェクト
lazy val main = (project in file("modules/main"))
.settings(
commonSettings("main"),
)
.dependsOn(model, protocol)
これでOK。
.protoファイルを作成
Protocol Bufferの文法とかはLanguage Guide (proto3)を見たほうが早い。
今回のサンプルで使うのは以下のprotoファイル。
modules/protocol/protocol/MyService.proto
に置いてある。
syntax = "proto3";
// 出力されるScalaプログラムのpackageを指定
package net.petitviolet.prac.grpc.protocol;
service MyService {
// unary
rpc ShowOrganization (ShowOrganizationRequest) returns (Organization) {
};
// server stream
rpc ShowEmployees (ShowEmployeeRequest) returns (stream Employee) {
};
// client stream
rpc AddEmployee (stream Employee) returns (MessageResponse) {
};
// bidirectional stream
rpc Lottery(stream FetchRandomRequest) returns (stream Employee) {};
}
message MessageResponse {
string message = 1;
}
message ShowEmployeeRequest {
int32 organizationId = 1;
}
message ShowOrganizationRequest {
int32 organizationId = 1;
}
message FetchRandomRequest {
}
message Employee {
string name = 1;
int32 age = 2;
enum Post {
NoTitle = 0;
Manager = 1;
Officer = 2;
}
int32 organizationId = 3;
}
message Organization {
int32 id = 1;
string name = 2;
repeated Employee emproyees = 3;
}
これを置いた上でsbt compile
するとmodules/protocol/target
配下にScalaファイルが吐かれる。
gRPC使う側はimportするだけで使用できる。
package
を指定してMyService.protoというファイル名なので、実際に使う際のimport文は以下になる。
import net.petitviolet.prac.grpc.protocol.MyService._
.protoから生成したクラスをScalaから使用する
grpc / gRPC Concepts#RPC life cycleにあるようにRPCの方式は4つある。
- unary
- server streaming
- client streaming
- bidirectional streaming
それぞれについてserver/clientで実装してみる。
記事に書くにあたってサンプル実装とは一部変更してある。
共通
server
実装するべきI/Fはこのようになっている。
class MyServiceImpl extends MyServiceGrpc.MyService {
// unary
def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
// server streaming
def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: StreamObserver[MyService.Employee]): Unit = ???
// client streaming
def addEmployee(responseObserver: StreamObserver[MyService.MessageResponse]): StreamObserver[MyService.Employee] = ???
// bidirectional streaming
def lottery(responseObserver: StreamObserver[MyService.Employee]): StreamObserver[MyService.FetchRandomRequest] = ???
}
これを実装してServiceBuilder#addService
に渡せば良い。
サンプル実装はこのあたり
client
こちらは既に実装されているので、利用するにあたってはI/Fに合わせて利用するだけ。
MyServiceBlockingClient
とMyServiceStub
の2クラスが生成されている。
前者は名前の通りblockingするクライアントになっていて、unary, server streamingの2パターンしかサポートしていない。
後者のMyServiceStub
は全てサポートしていて以下のようなI/Fになっている。
class MyServiceStub(channel: io.grpc.Channel, options: io.grpc.CallOptions = io.grpc.CallOptions.DEFAULT) extends io.grpc.stub.AbstractStub[MyServiceStub](channel, options) with MyService {
// unary
override def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
// server streaming
override def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: _root_.io.grpc.stub.StreamObserver[MyService.Employee]): Unit = ???
// client streaming
override def addEmployee(responseObserver: io.grpc.stub.StreamObserver[MyService.MessageResponse]): io.grpc.stub.StreamObserver[MyService.Employee] = ???
// bidirectional streaming
override def lottery(responseObserver: io.grpc.stub.StreamObserver[MyService.Employee]): io.grpc.stub.StreamObserver[MyService.FetchRandomRequest] = ???
}
上記のMyServiceStub
クラスのコンストラクタにあるio.grpc.Channel
インスタンスからクライアントはnewできる。
val blockingClient: MyServiceGrpc.MyServiceBlockingClient = MyServiceGrpc.blockingStub(channel)
val asyncClient: MyServiceGrpc.MyServiceStub = MyServiceGrpc.stub(channel)
unary
1リクエストに対して1レスポンスのもっともシンプルな通信。
server
普通に実装すればよく、特に言及することはない。
override def showOrganization(request: ShowOrganizationRequest): Future[Organization] = {
organizationRepository.findById(request.organizationId) // Future[Option[Organization]]
.map {
case Some(organization) => organization
case None =>
throw new RuntimeException(s"invalid request organization id = ${ request.organizationId }")
}
}
client
blocking/asyncどちらにも実装されているメソッドを呼び出すだけ。
val org: Organization = blockingClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
val orgF: Future[Organization] = asyncClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
client streaming
クライアントからのstreamなリクエストに対してサーバから1レスポンスを返す方式。
server
引数と返り値が共にStreamObserver
となっていて一瞬混乱するかも。
注意点としては、サーバからは1レスポンスな通信方式なので、返り値のStreamObserver.onNext
を複数回呼ばないようにすること。
override def addEmployee(responseObserver: StreamObserver[MessageResponse]): StreamObserver[Employee] = {
new StreamObserver[Employee] {
override def onError(t: Throwable): Unit = {
logger.error("addEmployee onError", t)
responseObserver.onError(t)
}
override def onCompleted(): Unit = {
// ここで一度だけonNextを実行し、続けてonCompleteを実行する
responseObserver.onNext(MessageResponse(s"addEmployee succeeded"))
responseObserver.onCompleted()
}
// ここがclientからのリクエストによって複数実行される可能性がある
override def onNext(employee: Employee): Unit = {
employeeRepository.store(employee)
.onComplete { // ここではresponseObserver.onNextは呼ばない
case Success(_) => logger.info(s"addEmployee onNext: ${ employee.name }")
case Failure(t) => onError(t)
}
}
}
}
client
asyncなクライアントしか使用することは出来ない。
注意点としては、asyncClient.addEmployee
の返り値のStreamObserver
のonCompleted
を呼ぶこと。
def addEmployee(name: String) = rpc {
val responseObserver = new StreamObserver[MessageResponse] {
override def onError(t: Throwable): Unit = logger.error("add failed to add employee", t)
override def onCompleted(): Unit = logger.info("add completed to add employee")
override def onNext(value: MessageResponse): Unit = logger.info("add onNext. message = ${value.message}")
}
val requestObserver: StreamObserver[Employee] = asyncClient.addEmployee(responseObserver)
(1 to 3).foreach { i =>
val employee = Employee(s"${ name }-$i", i * 10, i)
requestObserver.onNext(employee)
}
responseObserver.onCompleted()
requestObserver.onCompleted() // これを呼ぶ
}
requestObserver.onCompleted()
を実行しないとクライアントからのstreamingが終わったことを通知する必要があるらしい。
そうしないと、クライアント側のManagedChannel.shutdown()
を実行した時にサーバ側のonError
が呼び出されてしまう。
上の実装だとlogger.error
のあとにresponseObserver.onError
も実行しているが、既に接続が切れているのでクライアント側には通知されない。
ちなみにonError
時のstacktraceを抜粋するとこんな感じ。
io.grpc.StatusException: CANCELLED
at io.grpc.Status.asException(Status.java:534)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:272)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:282)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$3.runInContext(ServerImpl.java:618)
server streaming
クライアントからの1リクエストに対してサーバからstreamで複数レスポンスを返す方式。
server
クライアントからのリクエストに対して、responseObserver.onNext
を使って結果を複数返却する。
override def showEmployees(request: ShowEmployeeRequest, responseObserver: StreamObserver[Employee]): Unit = {
Option(request.organizationId)
.filterNot { _ == 0 } // デフォルト値を除外
.map { orgId =>
employeeRepository.findByOrganizationId(orgId) // Future[Seq[Employee]]
}.getOrElse {
employeeRepository.findAll() // Future[Seq[Employee]]
}.map { employees: Seq[Employee] =>
employees.foreach { employee: Employee =>
responseObserver.onNext(employee) // onNextを複数回呼んで良い
}
}.onComplete {
case Success(_) => responseObserver.onCompleted() // 全て終了したらonComplete
case Failure(t) => responseObserver.onError(t) // 何か失敗したらonError
}
}
onNext
を全て呼び終わったら、最後に一度だけonComplete
を実行する。
client
こちらはリクエストを1回送り、複数レスポンスをStreamObserver
で受け付ける。
def showEmployees(): Future[Unit] = rpc {
val promise = Promise[Unit]()
val responseObserver = new StreamObserver[Employee] {
override def onError(t: Throwable): Unit = {
logger.error(s"showEmployee onError", t)
promise.failure(t)
}
override def onCompleted(): Unit = {
logger.info(s"showEmployee onComplete")
promise.success(())
}
override def onNext(value: Employee): Unit = {
logger.info(s"showEmployee onNext: $value")
}
}
asyncClient.showEmployees(new ShowEmployeeRequest(), responseObserver)
promise.future
}
onComplete
が呼ばれるタイミングはサーバ側に依存するがPromise
を使って表現することが出来る。
bidirectional streaming
クライアントとサーバ間で双方向にリクエストしあう方式。
server
実装としてはclient streamingの時の実装とよく似ている。
override def lottery(responseObserver: StreamObserver[Employee]): StreamObserver[FetchRandomRequest] = {
new StreamObserver[FetchRandomRequest] {
override def onError(t: Throwable): Unit = logger.error(s"lottery onError", t)
override def onCompleted(): Unit = {
logger.info(s"lottery onCompleted")
responseObserver.onCompleted()
}
override def onNext(value: FetchRandomRequest): Unit = {
employeeRepository.findAll() foreach { _.headOption { em =>
responseObserver.onNext(Employee(em.name, em.age, em.organization.id))
}
}
}
}
この実装だとしていないが、サーバ側でresponseObserver.onNext
はどんなタイミングで呼んでもよいものになっている。
サーバ側のonCompleted
でresponseObserver.onCompleted
を呼ぶように実装しておく。
client
こちらはserver streamingの時の実装に似たものになる。
def lottery(): Future[Unit] = rpc {
val promise = Promise[Unit]()
val responseObserver = new StreamObserver[Employee] {
override def onError(t: Throwable): Unit = {
logger.error(s"lottery onError", t)
promise.failure(t)
}
override def onCompleted(): Unit = {
logger.info(s"lottery onComplete")
promise.success(())
}
override def onNext(value: Employee): Unit = logger.info(s"lottery onNext: $value")
}
val requestObserver: StreamObserver[FetchRandomRequest] = asyncClient.lottery(responseObserver)
(1 to 3).foreach { i =>
requestObserver.onNext(FetchRandomRequest())
}
requestObserver.onCompleted()
promise.future
}
server streamingの時と同様にPromise
を使って結果を受け取れるような実装にしてある。
クライアント側からrequestObserver.onCompleted
を呼ぶことで、サーバからresponseObserver.onCompleted
が実行されるように実装してあるのでこれで両方正常に終了出来る。
所感
gRPC、Protocol BufferでIDLが提供されているのでサーバ・クライアント間のインタフェースが型安全に表現できて非常に便利。
ScalaPBのおかげでScalaからも特に違和感なく使うことが出来る。
ただ、4つの通信方式それぞれで知っておいたほうがいいこともあるようなので、当たり前だけど慎重に実装する必要はある。