近頃は仕事でgRPC Serverを開発しています。
ちょっと前まではGoで開発してましたがScalaでもやっていくことになり、ちょうどGAになったところだったAkka gRPCを採用しました。それとDDDやってる都合でDIの機構が欲しかったので、Airframeを使うことにしました。備忘録がてらこれらの技術スタックを組み合わせてgRPC Serverを構築する手順を解説していきます。
準備
ライブラリ依存にAkka gRPCとAirframeの依存を追加します。1Akka gRPCは他のAkkaライブラリに依存するので対象となるものを全部追加します。logbackはロガーのバックエンドとして必要なだけなのですが無いとアプリケーション起動時に警告が出て煩わしいので依存に追加しておきますが特に重要ではないです。
object Dependencies {
object airframe {
val airframeVersion = "20.6.2"
val airframe = "org.wvlet.airframe" %% "airframe" % airframeVersion
}
object akka {
val akkaHttpVersion = "10.1.12"
val akkaVersion = "2.6.8"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
val akkaDiscovery = "com.typesafe.akka" %% "akka-discovery" % akkaVersion
val akkaProtobuf = "com.typesafe.akka" %% "akka-protobuf" % akkaVersion
}
object logback {
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3"
}
}
build.sbt
は以下のようになります。Protocol BuffersからScalaのコードを自動生成するため、AkkaGrpcPlugin
を有効にします。
lazy val root = (project in file("."))
.settings(
name := "akka-grpc-playground",
libraryDependencies ++= Seq(
airframe.airframe,
akka.akkaHttp,
akka.akkaDiscovery,
akka.akkaStream,
akka.akkaProtobuf,
logback.logbackClassic
),
// metadata参照のために必要 → https://doc.akka.io/docs/akka-grpc/current/buildtools/sbt.html
akkaGrpcCodeGeneratorSettings += "server_power_apis",
// .protoファイルの配置先
inConfig(Compile)(
Seq(
PB.protoSources += resourceDirectory.value / "protos"
)
)
)
.enablePlugins(AkkaGrpcPlugin)
Protocol Buffersを定義
上のbuild.sbt
にある通りsrc/main/resources/protos
に.proto
ファイルを配置します。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "myapp.proto.user";
option java_outer_classname = "UserProto";
package user;
service UserService {
rpc GetUserList (GetUserListRequest) returns (GetUserListResponse) {}
}
message GetUserListRequest {
}
message GetUserListResponse {
repeated User users = 1;
}
message User {
int64 id = 1;
string email = 2;
string name = 3;
UserRole role = 4;
enum UserRole {
MEMBER = 0;
ADMIN = 1;
MANAGER = 2;
}
}
ここでコンパイルを通すと.proto
の定義を元にScalaコードが自動生成されます。
$ tree ./target/scala-2.13/akka-grpc/main/myapp/proto/user/
./target/scala-2.13/akka-grpc/main/myapp/proto/user/
├── GetUserListRequest.scala
├── GetUserListResponse.scala
├── User.scala
├── UserProto.scala
├── UserService.scala
├── UserServiceClient.scala
├── UserServiceHandler.scala
├── UserServicePowerApi.scala
└── UserServicePowerApiHandler.scala
gRPC Serverの実装
gRPCのエンドポイント実装
自動生成された定義をもとにgRPC Serverのエンドポイントを実装していきます。まずは、自動生成したUserServicePowerApi
2を継承したクラスを実装します。これがProtcol Buffersのエンドポイントに対応した実装になります。ここではExecutionContext
をAirframeを通してDIしています。
trait UserServiceImpl extends UserServicePowerApi {
implicit val ec: ExecutionContext = bind[ExecutionContext]
override def getUserList(in: GetUserListRequest, metadata: Metadata): Future[GetUserListResponse] =
Future.successful(
GetUserListResponse(
Seq(
myapp.proto.user.User(
1,
"user1@texample.com",
"user1",
myapp.proto.user.User.UserRole.MEMBER
),
myapp.proto.user.User(
2,
"user2@texample.com",
"user2",
myapp.proto.user.User.UserRole.ADMIN
),
myapp.proto.user.User(
3,
"user3@texample.com",
"user3",
myapp.proto.user.User.UserRole.MANAGER
)
)
)
)
}
gRPC Serverの起動モジュール
gRPC Serverの起動モジュールを実装します。
akka.http.scaladsl.Http.bindAndHandleAsync(...)
が叩かれるとgRPC Serverが非同期で起動します。この時にUserServicePowerApiHandler.partial(userServiceImpl)
3を追加することで先程実装したエンドポイントがgRPC Serverに追加されます。EvansなどのCLIツールを使ってgRPC Serverに問い合わせるにはServerReflection.partial(List(UserService))
を追加してリフレクションを有効にする必要があります。
ここではExecutionContext
に加え、ActorSystem
、先程実装したUserServiceImpl
、AirframeのSession
をDIします。ここでSession
を参照しているのはシャットダウンするときに閉じるためです。
trait GRPCServer {
private val session = bind[Session]
private val userServiceImpl = bind[UserServiceImpl]
implicit val system: ActorSystem = bind[ActorSystem]
implicit val ec: ExecutionContext = bind[ExecutionContext]
def run(): Future[Http.ServerBinding] = {
val service: PartialFunction[HttpRequest, Future[HttpResponse]] =
UserServicePowerApiHandler.partial(userServiceImpl)
val reflection: PartialFunction[HttpRequest, Future[HttpResponse]] =
ServerReflection.partial(List(UserService))
val bound = Http().bindAndHandleAsync(
ServiceHandler.concatOrNotFound(service, reflection),
interface = "127.0.0.1",
port = 8080,
settings = ServerSettings(system)
)
bound.onComplete {
case Success(binding) =>
system.log.info(
s"gRPC Server online at http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/"
)
case Failure(ex) =>
system.log.error(ex, "occurred error")
}
sys.addShutdownHook {
bound
.flatMap(_.unbind())
.onComplete { _ =>
system.terminate()
// シャットダウンするときにSessionも閉じる
session.shutdown
}
}
bound
}
}
gRPC Serverの実装をコンポーネントにまとめる
ここまでの実装をAirframeのdesignに追加し、1コンポーネントとして定義します。
object GRPCComponent {
val design = newDesign
.bind[UserServiceImpl]
.toSingleton
.bind[GRPCServer]
.toSingleton
}
メインモジュール実装
http2を有効にする
http2を有効にするためにapplication.conf
に下記を追加します。
akka.http.server.preview.enable-http2 = on
メインモジュール実装
アプリケーションのメインモジュールを実装します。ActorSystem
とExecutionContext
を作り、先程のgRPC Serverのコンポーネントと合わせてAirframeのdesignを作ります。最後にSession
を立ち上げてgRPC Serverを起動します。
object Main extends App {
val conf = ConfigFactory.load()
val system = ActorSystem("GRPCServer", conf)
val ec: ExecutionContext = system.dispatcher
val design = newDesign
.bind[ActorSystem]
.toInstance(system)
.bind[ExecutionContext]
.toInstance(ec)
.add(GRPCComponent.design)
design.newSession.build[GRPCServer].run()
}
動作確認
アプリケーションを立ち上げて、Evansで問い合わせてみます。
$ sbt run
・・・
[INFO] [MM/dd/yyyy 00:00:00.000] [GRPCServer-akka.actor.default-dispatcher-1] [akka.actor.ActorSystemImpl(GRPCServer)] gRPC Server online at http://localhost:8080/

お疲れさまでした。
参考までにGithubのリンクをこちらに貼っておきます。
所感
gRPCを使うようになってからはProtocol BuffersにAPI仕様が表現されているので別途APIドキュメントが必要になるということがなくなりました。.protoを見れば仕様がすべてわかるし自動生成されたコードを使ってる限り不備があればコンパイルエラーになってくるので個人的にはポジティブな印象を受けてます。まだ、streamingを扱ったことないので今後チャレンジしてみたいです。
Airframeについてははdesignに突っ込んでおけばどこからでもDIできてしまうところが気に入ってます。designも自分好みの単位で分割して作れるので管理しやすいところもいいですね。
おわり
おわり。