Akka
gRPC
Akka-gRPC
ScalaDay 8

Akka gRPCを試してみた

この記事は、 Scala Advent Calendar 2018 の8日目のエントリーです。

最近、何かと私の周辺で話題の CNCF(Cloud Native Computing Foundation)が、 gRPC-Web is going GA - Cloud Native Computing Foundation なる記事を、先日出しており、少し話題にあがってました。

gRPC-Webが正式リリース。WebブラウザからgRPCを直接呼び出し可能に - Publickey

gRPCを使ったバックエンドサーバーの技術的な事例も色々なところで出てきており、これからのWebアプリケーション開発において、フロントエンドでも、バックエンドでも、最注目の技術だと考えております。

今回、gRPCとはなんぞやってあたりの説明は割愛しまして、我らがAkkaが Akka gRPC なるライブラリを出していただいておりますので、まずはgRPC触ってみたいんだーってことで、さわってみました。

基本的に、 Walkthrough をなぞっているだけです。

なお、Akka gRPCは、

This library is in preview mode: basic functionality is in place, but APIs and build plugins are still expected to be improved.

ということで、Preview状態です。本番適用などは、そのあたり、考慮したうえでご検討ください :bow:

Server

まずは、Serverを作る。

ベースとなるプロジェクトを作る

Walkthroughを流すため、ゼロからプロジェクトを作っていきたいと思います。

$ sbt new sbt/scala-seed.g8

A minimal Scala project.

name [Scala Seed Project]: example-akka-grpc

みたいな感じで、 example-akka-grpc なる名前でプロジェクトを作成します。

project/plugins.sbt

plugins.sbt にプラグインを記載します。

$ cd example-akka-grpc
$ touch project/plugins.sbt
project/plugins.sbt
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.4.2")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")

Akka gRPCのプラグインと、JavaAgentなるプラグインですね。

build.sbt

プラグインを有効にする設定をbuild.sbtに書きます。

build.sbt
import Dependencies._

lazy val root = (project in file(".")).
  enablePlugins(AkkaGrpcPlugin).
  enablePlugins(JavaAgent).
  settings(
    inThisBuild(List(
      organization := "com.example",
      scalaVersion := "2.12.7",
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "example-akka-grpc",
    libraryDependencies += scalaTest % Test,
    javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
  )

こんな感じになります。

src/main/protobuf/helloworld.proto

Protocol Buffers の記法でファイルを作成します。

src/main/protobuf/helloworld.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service GreeterService {
    // Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
    rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
    rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}

ここでは、Request/Responseに必要なメッセージの定義と、そのメッセージを操作するサービスを定義しています。

option で、自動生成されたJavaのコードが出力されるパッケージとかもろもろを指定しています。

sbt compile

$ sbt compile

を実行します。

すると、

貼り付けた画像_2018_12_07_17_41.png

こんな感じでScalaのコードが自動生成されます。

sbt compile を実行すると、enablePluginsしていた AkkaGrpcPlugin あたりが、 thesamet/sbt-protoc なるプラグインを呼びだし、良い感じに src/main/protobuf/helloworld.proto を見つけてScalaファイルをgenerateしてくれます。

ちょっと見てみる

自動生成されたコードを少し見てみますと。

target/scala-2.12/src_managed/main/example/myapp/helloworld/grpc/GreeterService.scala
// Generated by Akka gRPC. DO NOT EDIT.
package example.myapp.helloworld.grpc

import scala.concurrent.Future

import akka.NotUsed
import akka.stream.scaladsl.Source

trait GreeterService {

  def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): Future[example.myapp.helloworld.grpc.HelloReply]

  def itKeepsTalking(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Future[example.myapp.helloworld.grpc.HelloReply]

  def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]

  def streamHellos(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]

}

object GreeterService {
  val name = "helloworld.GreeterService"

  object Serializers {
    import akka.grpc.scaladsl.ScalapbProtobufSerializer

    val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion)

    val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion)

  }
}

こんな感じで、必要なサービスのtraitが自動生成されている。
こういったサービスとか、Request/Responseに必要なメッセージのモデルとかってあたのボイラープレートなコードを自動生成してくれるんだな。
で、これを活用して、HTTPサーバー立てたら、そのHTTPサーバーにアクセスするクライアント側のコードも、同様の手法で生成したクライアント用の言語に合わせたサービスやモデル郡を使って通信できるんだな。
それが利点なんだな。

あと、ここでは、4つのパターンの呼び出しを作ってる。

Single Request Stream Request
Single Response sayHello itKeepsTalking
Stream Response itKeepsReplying streamHellos

こんな感じで、4パターン。

Serviceの実装クラスGreeterServiceImpl

で、自動生成されたGreeterServiceは、traitなので、実装クラスを作る。

src/main/scala/example/myapp/helloworld/GreeterServiceImpl.scala
package example.myapp.helloworld

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import example.myapp.helloworld.grpc.{GreeterService, HelloReply, HelloRequest}

import scala.concurrent.Future

class GreeterServiceImpl(materializer: Materializer) extends GreeterService {
  import materializer.executionContext
  private implicit val mat: Materializer = materializer

  override def sayHello(in: HelloRequest): Future[HelloReply] = {
    println(s"sayHello to ${in.name}")
    Future.successful(HelloReply(s"Hello, ${in.name}"))
  }

  override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
    println(s"sayHello to in stream...")
    in.runWith(Sink.seq)
      .map(elements => HelloReply(s"Hello, ${elements.mkString(", ")}"))
  }

  override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
    println(s"sayHello to ${in.name} with stream of chars...")
    Source(s"Hello, ${in.name}".toList)
      .map(character => HelloReply(character.toString))
  }

  override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
    println(s"sayHello to stream...")
    in.map(request => HelloReply(s"Hello, ${request.name}"))
  }
}

Main

で、あとは、サーバー側のAkka HTTPのメインとなるobjectを作って、Akka HTTPの起動処理を書く。
ここは、ほぼ、お作法みたいなもん。

src/main/scala/example/myapp/helloworld/GreeterServer.scala
package example.myapp.helloworld

import akka.actor.ActorSystem
import akka.http.scaladsl.UseHttp2.Always
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import example.myapp.helloworld.grpc.GreeterServiceHandler

import scala.concurrent.{ExecutionContext, Future}

object GreeterServer extends App {

  val conf = ConfigFactory
    .parseString("akka.http.server.preview.enable-http2 = on")
    .withFallback(ConfigFactory.defaultApplication())
  val system = ActorSystem("HelloWorld", conf)

  new GreeterServer(system).run()
}

class GreeterServer(system: ActorSystem) {

  def run(): Future[Http.ServerBinding] = {
    implicit val sys: ActorSystem = system
    implicit val mat: Materializer = ActorMaterializer()
    implicit val ec: ExecutionContext = system.dispatcher

    val service: HttpRequest => Future[HttpResponse] =
      GreeterServiceHandler(new GreeterServiceImpl(mat))

    val bound = Http().bindAndHandleAsync(
      service,
      interface = "127.0.0.1",
      port = 8080,
      connectionContext = HttpConnectionContext(http2 = Always)
    )

    bound.foreach { binding =>
      println(s"gRPC server bound to: ${binding.localAddress}")
    }

    bound
  }
}

ふむ。良い感じ。

Running the Server

なお、gRPCをサーバーで公開するには、HTTP/2である必要があり、これを正しく実行するには、 Akka HTTPのマニュアル に記載されているようにJetty ALPNエージェントを構成する必要があるとのこと。

Javaコマンドで実行するときは、

$ java -javaagent:/path/to/jetty-alpn-agent-2.0.7.jar -jar app.jar

みたいに、javaagentオプションを指定してあげる必要があったり、sbtの場合だと、 sbt/sbt-javaagent プラグインを有効にしたうえで、

  .enablePlugins(JavaAgent)
  .settings(
    javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.7" % "runtime"
  )

とか、build.sbtのプロジェクト設定に追加してあげる必要がある。
ふむふむなるほどー。

とりあえず、現時点でサーバー立ち上がるか、sbtから実行して試してみる。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"
...
[info] gRPC server bound to: /127.0.0.1:8080

ふむ。立ち上がったのかな?

Client

つぎに、gRPC Clientを作る。

sbtでマルチプロジェクトにしても良いけど、なんか、まあ、今回は、いいやって感じで、同じプロジェクトに別のClient Mainを作ろう。

src/main/protobuf/helloworld.proto

これ、上で作ったやつを、そのまま使います。
実際、Clientをマルチプロジェクトで作るとしたら、きっと、Protocol Buffersを書いたプロジェクトを別に作っておいて、そのプロジェクトを、dependsOnさせたりするのかと。
それか、publishして共通化しておくとか。
もしくは、Gitで別リポジトリにしておいて参照できるようにするとか。
なんせ、Protocol Buffersで書いたprotoファイルをClient/Serverで共通にしてそこから生成されるボイラープレートコードを使ってって感じになるのかと。

src/main/scala/example/myapp/helloworld/GreeterClient.scala

ClientのMainを作る。

src/main/scala/example/myapp/helloworld/GreeterClient.scala
package example.myapp.helloworld

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import example.myapp.helloworld.grpc.{
  GreeterService,
  GreeterServiceClient,
  HelloReply,
  HelloRequest
}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object GreeterClient extends App {
  // Boot akka
  implicit val sys: ActorSystem = ActorSystem("HelloWorldClient")
  implicit val mat: ActorMaterializer = ActorMaterializer()
  implicit val ec: ExecutionContext = sys.dispatcher

  // Take details how to connect to the service from the config.
  val clientSettings = GrpcClientSettings.fromConfig(GreeterService.name)
  // Create a client-side stub for the service
  val client: GreeterService = GreeterServiceClient(clientSettings)

  // Run examples for each of the exposed service methods.
  runSingleRequestReplyExample()
  runStreamingRequestExample()
  runStreamingReplyExample()
  runStreamingRequestReplyExample()

  sys.scheduler.schedule(1.second, 1.second) {
    runSingleRequestReplyExample()
  }

  def runSingleRequestReplyExample(): Unit = {
    sys.log.info("Performing request")
    val reply = client.sayHello(HelloRequest("Alice"))
    reply.onComplete {
      case Success(msg) =>
        println(s"got single reply: $msg")
      case Failure(e) =>
        println(s"Error sayHello: $e")
    }
  }

  def runStreamingRequestExample(): Unit = {
    val requests = List("Alice", "Bob", "Peter").map(HelloRequest.apply)
    val reply = client.itKeepsTalking(Source(requests))
    reply.onComplete {
      case Success(msg) =>
        println(s"got single reply for streaming requests: $msg")
      case Failure(e) =>
        println(s"Error streamingRequest: $e")
    }
  }

  def runStreamingReplyExample(): Unit = {
    val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
    val done: Future[Done] =
      responseStream.runForeach(reply =>
        println(s"got streaming reply: ${reply.message}"))

    done.onComplete {
      case Success(_) =>
        println("streamingReply done")
      case Failure(e) =>
        println(s"Error streamingReply: $e")
    }
  }

  def runStreamingRequestReplyExample(): Unit = {
    val requestStream: Source[HelloRequest, NotUsed] =
      Source
        .tick(100.millis, 1.second, "tick")
        .zipWithIndex
        .map { case (_, i) => i }
        .map(i => HelloRequest(s"Alice-$i"))
        .take(10)
        .mapMaterializedValue(_ => NotUsed)

    val responseStream: Source[HelloReply, NotUsed] =
      client.streamHellos(requestStream)
    val done: Future[Done] =
      responseStream.runForeach(reply =>
        println(s"got streaming reply: ${reply.message}"))

    done.onComplete {
      case Success(_) =>
        println("streamingRequestReply done")
      case Failure(e) =>
        println(s"Error streamingRequestReply: $e")
    }
  }
}

ちょと長い。
けどやってることは、シンプルで、Akkaのお作法書いてから、 akka.grpc.GrpcClientSettings なるAkka gRPCが提供しているSettingsを作って、Protocol Buffersが自動生成してくれている example.myapp.helloworld.grpc.GreeterServiceClient を初期化している。

あとは、作った client インスタンスのメソッドを呼ぶメソッドを作ってるだけで、それを、起動時に呼びだして、上述で作ったServerにアクセスして、Request/Responseするって感じになってる。

なので、Clientのコードを書くときに、Server側のReq/ResのModelとか、サービスがSingleかStreamとかを、型の制約に従って実装していくだけで良いので、めちゃ簡単だし、仕様を自ら読み問いて実装する部分がかなり少なくて、ほんと従うだけって感じ。

resources/application.conf

で、上述のClientで、どのServerにアクセスすれば良いのってあたりのsettingsが必要なので、application.confに記載する。

application.conf
akka.grpc.client {
  "helloworld.GreeterService" {
    host = 127.0.0.1
    port = 8080
    override-authority = foo.test.google.fr
    use-tls = false
  }
}

Client実行!!

さーやっと実行しますよー。

まずは、Serverを起動しておきます。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"
...
[info] gRPC server bound to: /127.0.0.1:8080

で、別セッションのTerminalとかから、Clientを起動してみます。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterClient"
...
[info] got streaming reply: Hello, Alice-0
[info] got streaming reply: H
[info] got streaming reply: e
[info] got streaming reply: l
[info] got streaming reply: l
[info] got streaming reply: o
[info] got streaming reply: ,
[info] got streaming reply:
[info] got streaming reply: A
[info] got single reply: HelloReply(Hello, Alice)
[info] got single reply for streaming requests: HelloReply(Hello, HelloRequest(Alice), HelloRequest(Bob), HelloRequest(Peter))
[info] got streaming reply: l
[info] got streaming reply: i
[info] got streaming reply: c
[info] got streaming reply: e
[info] streamingReply done
[info] [INFO] [12/08/2018 11:19:03.949] [HelloWorldClient-akka.actor.default-dispatcher-6] [akka.actor.ActorSystemImpl(HelloWorldClient)] Performing request
[info] got single reply: HelloReply(Hello, Alice)
...

って感じになります。
延々とStreamしていきますね。

あと、Server側のTerminalでも、

[info] sayHello to stream...
[info] sayHello to in stream...
[info] sayHello to Alice
[info] sayHello to Alice with stream of chars...
[info] sayHello to Alice
[info] sayHello to Alice
...

みたいな感じでログが出ますね。
成功したみたい!!わーい 🤗

まとめ

ざっと、Akka gRPCのWalkthroughをやってみました。
この調子で、このサーバーに、gRPC WebとかをJavaScriptで作って接続したりとか、色々試してみたいと思います。

GitHub にコードを上げてます。 https://github.com/yoshiyoshifujii/example-akka-grpc

以上です。