LoginSignup
1
1

More than 3 years have passed since last update.

Akka gRPC + Airframe で作る gRPC Server

Last updated at Posted at 2020-08-27

近頃は仕事でgRPC Serverを開発しています。

ちょっと前まではGoで開発してましたがScalaでもやっていくことになり、ちょうどGAになったところだったAkka gRPCを採用しました。それとDDDやってる都合でDIの機構が欲しかったので、Airframeを使うことにしました。備忘録がてらこれらの技術スタックを組み合わせてgRPC Serverを構築する手順を解説していきます。

準備

ライブラリ依存にAkka gRPCとAirframeの依存を追加します。1Akka gRPCは他のAkkaライブラリに依存するので対象となるものを全部追加します。logbackはロガーのバックエンドとして必要なだけなのですが無いとアプリケーション起動時に警告が出て煩わしいので依存に追加しておきますが特に重要ではないです。

Dependencies.scala
object Dependencies {

  object airframe {
    val airframeVersion = "20.6.2"
    val airframe        = "org.wvlet.airframe" %% "airframe" % airframeVersion
  }

  object akka {
    val akkaHttpVersion     = "10.1.12"
    val akkaVersion         = "2.6.8"
    val akkaHttp            = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
    val akkaStream          = "com.typesafe.akka" %% "akka-stream" % akkaVersion
    val akkaDiscovery       = "com.typesafe.akka" %% "akka-discovery" % akkaVersion
    val akkaProtobuf        = "com.typesafe.akka" %% "akka-protobuf" % akkaVersion
  }

  object logback {
    val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3"
  }
}

build.sbtは以下のようになります。Protocol BuffersからScalaのコードを自動生成するため、AkkaGrpcPluginを有効にします。

build.sbt

lazy val root = (project in file("."))
  .settings(
    name := "akka-grpc-playground",
    libraryDependencies ++= Seq(
        airframe.airframe,
        akka.akkaHttp,
        akka.akkaDiscovery,
        akka.akkaStream,
        akka.akkaProtobuf,
        logback.logbackClassic
      ),
    // metadata参照のために必要 → https://doc.akka.io/docs/akka-grpc/current/buildtools/sbt.html
    akkaGrpcCodeGeneratorSettings += "server_power_apis",
    // .protoファイルの配置先
    inConfig(Compile)(
      Seq(
        PB.protoSources += resourceDirectory.value / "protos"
      )
    )
  )
  .enablePlugins(AkkaGrpcPlugin)

Protocol Buffersを定義

上のbuild.sbtにある通りsrc/main/resources/protos.protoファイルを配置します。

user.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "myapp.proto.user";
option java_outer_classname = "UserProto";

package user;

service UserService {
    rpc GetUserList (GetUserListRequest) returns (GetUserListResponse) {}
}

message GetUserListRequest {
}

message GetUserListResponse {
    repeated User users = 1;
}

message User {
    int64 id = 1;
    string email = 2;
    string name = 3;
    UserRole role = 4;
    enum UserRole {
        MEMBER = 0;
        ADMIN = 1;
        MANAGER = 2;
    }
}

ここでコンパイルを通すと.protoの定義を元にScalaコードが自動生成されます。

$ tree ./target/scala-2.13/akka-grpc/main/myapp/proto/user/
./target/scala-2.13/akka-grpc/main/myapp/proto/user/
├── GetUserListRequest.scala
├── GetUserListResponse.scala
├── User.scala
├── UserProto.scala
├── UserService.scala
├── UserServiceClient.scala
├── UserServiceHandler.scala
├── UserServicePowerApi.scala
└── UserServicePowerApiHandler.scala

gRPC Serverの実装

gRPCのエンドポイント実装

自動生成された定義をもとにgRPC Serverのエンドポイントを実装していきます。まずは、自動生成したUserServicePowerApi2を継承したクラスを実装します。これがProtcol Buffersのエンドポイントに対応した実装になります。ここではExecutionContextをAirframeを通してDIしています。

UserServiceImpl.scala
trait UserServiceImpl extends UserServicePowerApi {

  implicit val ec: ExecutionContext = bind[ExecutionContext]

  override def getUserList(in: GetUserListRequest, metadata: Metadata): Future[GetUserListResponse] =
    Future.successful(
      GetUserListResponse(
        Seq(
          myapp.proto.user.User(
            1,
            "user1@texample.com",
            "user1",
            myapp.proto.user.User.UserRole.MEMBER
          ),
          myapp.proto.user.User(
            2,
            "user2@texample.com",
            "user2",
            myapp.proto.user.User.UserRole.ADMIN
          ),
          myapp.proto.user.User(
            3,
            "user3@texample.com",
            "user3",
            myapp.proto.user.User.UserRole.MANAGER
          )
        )
      )
    )

}

gRPC Serverの起動モジュール

gRPC Serverの起動モジュールを実装します。

akka.http.scaladsl.Http.bindAndHandleAsync(...)が叩かれるとgRPC Serverが非同期で起動します。この時にUserServicePowerApiHandler.partial(userServiceImpl)3を追加することで先程実装したエンドポイントがgRPC Serverに追加されます。EvansなどのCLIツールを使ってgRPC Serverに問い合わせるにはServerReflection.partial(List(UserService))を追加してリフレクションを有効にする必要があります。

ここではExecutionContextに加え、ActorSystem、先程実装したUserServiceImpl、AirframeのSessionをDIします。ここでSessionを参照しているのはシャットダウンするときに閉じるためです。

GRPCServer.scala
trait GRPCServer {

  private val session               = bind[Session]
  private val userServiceImpl       = bind[UserServiceImpl]
  implicit val system: ActorSystem  = bind[ActorSystem]
  implicit val ec: ExecutionContext = bind[ExecutionContext]

  def run(): Future[Http.ServerBinding] = {

    val service: PartialFunction[HttpRequest, Future[HttpResponse]] =
      UserServicePowerApiHandler.partial(userServiceImpl) 

    val reflection: PartialFunction[HttpRequest, Future[HttpResponse]] =
      ServerReflection.partial(List(UserService))

    val bound = Http().bindAndHandleAsync(
      ServiceHandler.concatOrNotFound(service, reflection),
      interface = "127.0.0.1",
      port = 8080,
      settings = ServerSettings(system)
    )

    bound.onComplete {
      case Success(binding) =>
        system.log.info(
          s"gRPC Server online at http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/"
        )
      case Failure(ex) =>
        system.log.error(ex, "occurred error")
    }

    sys.addShutdownHook {
      bound
        .flatMap(_.unbind())
        .onComplete { _ =>
          system.terminate()
          // シャットダウンするときにSessionも閉じる
          session.shutdown
        }
    }

    bound
  }
}

gRPC Serverの実装をコンポーネントにまとめる

ここまでの実装をAirframeのdesignに追加し、1コンポーネントとして定義します。

GRPCComponent.scala
object GRPCComponent {

  val design = newDesign
    .bind[UserServiceImpl]
    .toSingleton
    .bind[GRPCServer]
    .toSingleton
}

メインモジュール実装

http2を有効にする

http2を有効にするためにapplication.confに下記を追加します。

application.conf
akka.http.server.preview.enable-http2 = on

メインモジュール実装

アプリケーションのメインモジュールを実装します。ActorSystemExecutionContextを作り、先程のgRPC Serverのコンポーネントと合わせてAirframeのdesignを作ります。最後にSessionを立ち上げてgRPC Serverを起動します。

Main.scala
object Main extends App {

  val conf                 = ConfigFactory.load()
  val system               = ActorSystem("GRPCServer", conf)
  val ec: ExecutionContext = system.dispatcher

  val design = newDesign
    .bind[ActorSystem]
    .toInstance(system)
    .bind[ExecutionContext]
    .toInstance(ec)
    .add(GRPCComponent.design)

  design.newSession.build[GRPCServer].run()
}

動作確認

アプリケーションを立ち上げて、Evansで問い合わせてみます。

$ sbt run
・・・
[INFO] [MM/dd/yyyy 00:00:00.000] [GRPCServer-akka.actor.default-dispatcher-1] [akka.actor.ActorSystemImpl(GRPCServer)] gRPC Server online at http://localhost:8080/

スクリーンショット 2020-09-11 15.42.06.png (1.1 MB)

お疲れさまでした。
参考までにGithubのリンクをこちらに貼っておきます。

所感

gRPCを使うようになってからはProtocol BuffersにAPI仕様が表現されているので別途APIドキュメントが必要になるということがなくなりました。.protoを見れば仕様がすべてわかるし自動生成されたコードを使ってる限り不備があればコンパイルエラーになってくるので個人的にはポジティブな印象を受けてます。まだ、streamingを扱ったことないので今後チャレンジしてみたいです。

Airframeについてははdesignに突っ込んでおけばどこからでもDIできてしまうところが気に入ってます。designも自分好みの単位で分割して作れるので管理しやすいところもいいですね。

おわり

おわり。


  1. バージョンは2020/08/28時点での最新です。 

  2. metadataを扱う必要がなければUserServiceを継承して実装します。 

  3. 3: UserServiceImplの実装でUserServiceを親クラスとしていた場合はUserServiceHandlerに追加します。 

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1