Edited at
ScalaDay 8

Akka gRPCを試してみた

この記事は、 Scala Advent Calendar 2018 の8日目のエントリーです。

最近、何かと私の周辺で話題の CNCF(Cloud Native Computing Foundation)が、 gRPC-Web is going GA - Cloud Native Computing Foundation なる記事を、先日出しており、少し話題にあがってました。

gRPC-Webが正式リリース。WebブラウザからgRPCを直接呼び出し可能に - Publickey

gRPCを使ったバックエンドサーバーの技術的な事例も色々なところで出てきており、これからのWebアプリケーション開発において、フロントエンドでも、バックエンドでも、最注目の技術だと考えております。

今回、gRPCとはなんぞやってあたりの説明は割愛しまして、我らがAkkaが Akka gRPC なるライブラリを出していただいておりますので、まずはgRPC触ってみたいんだーってことで、さわってみました。

基本的に、 Walkthrough をなぞっているだけです。

なお、Akka gRPCは、


This library is in preview mode: basic functionality is in place, but APIs and build plugins are still expected to be improved.


ということで、Preview状態です。本番適用などは、そのあたり、考慮したうえでご検討ください :bow:


Server

まずは、Serverを作る。


ベースとなるプロジェクトを作る

Walkthroughを流すため、ゼロからプロジェクトを作っていきたいと思います。

$ sbt new sbt/scala-seed.g8

A minimal Scala project.

name [Scala Seed Project]: example-akka-grpc

みたいな感じで、 example-akka-grpc なる名前でプロジェクトを作成します。


project/plugins.sbt

plugins.sbt にプラグインを記載します。

$ cd example-akka-grpc

$ touch project/plugins.sbt


project/plugins.sbt

addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.4.2")

addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")

Akka gRPCのプラグインと、JavaAgentなるプラグインですね。


build.sbt

プラグインを有効にする設定をbuild.sbtに書きます。


build.sbt

import Dependencies._

lazy val root = (project in file(".")).
enablePlugins(AkkaGrpcPlugin).
enablePlugins(JavaAgent).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.12.7",
version := "0.1.0-SNAPSHOT"
)),
name := "example-akka-grpc",
libraryDependencies += scalaTest % Test,
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
)


こんな感じになります。


src/main/protobuf/helloworld.proto

Protocol Buffers の記法でファイルを作成します。


src/main/protobuf/helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service GreeterService {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}


ここでは、Request/Responseに必要なメッセージの定義と、そのメッセージを操作するサービスを定義しています。

option で、自動生成されたJavaのコードが出力されるパッケージとかもろもろを指定しています。


sbt compile

$ sbt compile

を実行します。

すると、

貼り付けた画像_2018_12_07_17_41.png

こんな感じでScalaのコードが自動生成されます。

sbt compile を実行すると、enablePluginsしていた AkkaGrpcPlugin あたりが、 thesamet/sbt-protoc なるプラグインを呼びだし、良い感じに src/main/protobuf/helloworld.proto を見つけてScalaファイルをgenerateしてくれます。


ちょっと見てみる

自動生成されたコードを少し見てみますと。


target/scala-2.12/src_managed/main/example/myapp/helloworld/grpc/GreeterService.scala

// Generated by Akka gRPC. DO NOT EDIT.

package example.myapp.helloworld.grpc

import scala.concurrent.Future

import akka.NotUsed
import akka.stream.scaladsl.Source

trait GreeterService {

def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): Future[example.myapp.helloworld.grpc.HelloReply]

def itKeepsTalking(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Future[example.myapp.helloworld.grpc.HelloReply]

def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]

def streamHellos(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]

}

object GreeterService {
val name = "helloworld.GreeterService"

object Serializers {
import akka.grpc.scaladsl.ScalapbProtobufSerializer

val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion)

val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion)

}
}


こんな感じで、必要なサービスのtraitが自動生成されている。

こういったサービスとか、Request/Responseに必要なメッセージのモデルとかってあたのボイラープレートなコードを自動生成してくれるんだな。

で、これを活用して、HTTPサーバー立てたら、そのHTTPサーバーにアクセスするクライアント側のコードも、同様の手法で生成したクライアント用の言語に合わせたサービスやモデル郡を使って通信できるんだな。

それが利点なんだな。

あと、ここでは、4つのパターンの呼び出しを作ってる。

Single Request
Stream Request

Single Response
sayHello
itKeepsTalking

Stream Response
itKeepsReplying
streamHellos

こんな感じで、4パターン。


Serviceの実装クラスGreeterServiceImpl

で、自動生成されたGreeterServiceは、traitなので、実装クラスを作る。


src/main/scala/example/myapp/helloworld/GreeterServiceImpl.scala

package example.myapp.helloworld

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import example.myapp.helloworld.grpc.{GreeterService, HelloReply, HelloRequest}

import scala.concurrent.Future

class GreeterServiceImpl(materializer: Materializer) extends GreeterService {
import materializer.executionContext
private implicit val mat: Materializer = materializer

override def sayHello(in: HelloRequest): Future[HelloReply] = {
println(s"sayHello to ${in.name}")
Future.successful(HelloReply(s"Hello, ${in.name}"))
}

override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
println(s"sayHello to in stream...")
in.runWith(Sink.seq)
.map(elements => HelloReply(s"Hello, ${elements.mkString(", ")}"))
}

override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
println(s"sayHello to ${in.name} with stream of chars...")
Source(s"Hello, ${in.name}".toList)
.map(character => HelloReply(character.toString))
}

override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
println(s"sayHello to stream...")
in.map(request => HelloReply(s"Hello, ${request.name}"))
}
}



Main

で、あとは、サーバー側のAkka HTTPのメインとなるobjectを作って、Akka HTTPの起動処理を書く。

ここは、ほぼ、お作法みたいなもん。


src/main/scala/example/myapp/helloworld/GreeterServer.scala

package example.myapp.helloworld

import akka.actor.ActorSystem
import akka.http.scaladsl.UseHttp2.Always
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import example.myapp.helloworld.grpc.GreeterServiceHandler

import scala.concurrent.{ExecutionContext, Future}

object GreeterServer extends App {

val conf = ConfigFactory
.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("HelloWorld", conf)

new GreeterServer(system).run()
}

class GreeterServer(system: ActorSystem) {

def run(): Future[Http.ServerBinding] = {
implicit val sys: ActorSystem = system
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val service: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(new GreeterServiceImpl(mat))

val bound = Http().bindAndHandleAsync(
service,
interface = "127.0.0.1",
port = 8080,
connectionContext = HttpConnectionContext(http2 = Always)
)

bound.foreach { binding =>
println(s"gRPC server bound to: ${binding.localAddress}")
}

bound
}
}


ふむ。良い感じ。


Running the Server

なお、gRPCをサーバーで公開するには、HTTP/2である必要があり、これを正しく実行するには、 Akka HTTPのマニュアル に記載されているようにJetty ALPNエージェントを構成する必要があるとのこと。

Javaコマンドで実行するときは、

$ java -javaagent:/path/to/jetty-alpn-agent-2.0.7.jar -jar app.jar

みたいに、javaagentオプションを指定してあげる必要があったり、sbtの場合だと、 sbt/sbt-javaagent プラグインを有効にしたうえで、

  .enablePlugins(JavaAgent)

.settings(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.7" % "runtime"
)

とか、build.sbtのプロジェクト設定に追加してあげる必要がある。

ふむふむなるほどー。

とりあえず、現時点でサーバー立ち上がるか、sbtから実行して試してみる。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"

...
[info] gRPC server bound to: /127.0.0.1:8080

ふむ。立ち上がったのかな?


Client

つぎに、gRPC Clientを作る。

sbtでマルチプロジェクトにしても良いけど、なんか、まあ、今回は、いいやって感じで、同じプロジェクトに別のClient Mainを作ろう。


src/main/protobuf/helloworld.proto

これ、上で作ったやつを、そのまま使います。

実際、Clientをマルチプロジェクトで作るとしたら、きっと、Protocol Buffersを書いたプロジェクトを別に作っておいて、そのプロジェクトを、dependsOnさせたりするのかと。

それか、publishして共通化しておくとか。

もしくは、Gitで別リポジトリにしておいて参照できるようにするとか。

なんせ、Protocol Buffersで書いたprotoファイルをClient/Serverで共通にしてそこから生成されるボイラープレートコードを使ってって感じになるのかと。


src/main/scala/example/myapp/helloworld/GreeterClient.scala

ClientのMainを作る。


src/main/scala/example/myapp/helloworld/GreeterClient.scala

package example.myapp.helloworld

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import example.myapp.helloworld.grpc.{
GreeterService,
GreeterServiceClient,
HelloReply,
HelloRequest
}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object GreeterClient extends App {
// Boot akka
implicit val sys: ActorSystem = ActorSystem("HelloWorldClient")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher

// Take details how to connect to the service from the config.
val clientSettings = GrpcClientSettings.fromConfig(GreeterService.name)
// Create a client-side stub for the service
val client: GreeterService = GreeterServiceClient(clientSettings)

// Run examples for each of the exposed service methods.
runSingleRequestReplyExample()
runStreamingRequestExample()
runStreamingReplyExample()
runStreamingRequestReplyExample()

sys.scheduler.schedule(1.second, 1.second) {
runSingleRequestReplyExample()
}

def runSingleRequestReplyExample(): Unit = {
sys.log.info("Performing request")
val reply = client.sayHello(HelloRequest("Alice"))
reply.onComplete {
case Success(msg) =>
println(s"got single reply: $msg")
case Failure(e) =>
println(s"Error sayHello: $e")
}
}

def runStreamingRequestExample(): Unit = {
val requests = List("Alice", "Bob", "Peter").map(HelloRequest.apply)
val reply = client.itKeepsTalking(Source(requests))
reply.onComplete {
case Success(msg) =>
println(s"got single reply for streaming requests: $msg")
case Failure(e) =>
println(s"Error streamingRequest: $e")
}
}

def runStreamingReplyExample(): Unit = {
val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
val done: Future[Done] =
responseStream.runForeach(reply =>
println(s"got streaming reply: ${reply.message}"))

done.onComplete {
case Success(_) =>
println("streamingReply done")
case Failure(e) =>
println(s"Error streamingReply: $e")
}
}

def runStreamingRequestReplyExample(): Unit = {
val requestStream: Source[HelloRequest, NotUsed] =
Source
.tick(100.millis, 1.second, "tick")
.zipWithIndex
.map { case (_, i) => i }
.map(i => HelloRequest(s"Alice-$i"))
.take(10)
.mapMaterializedValue(_ => NotUsed)

val responseStream: Source[HelloReply, NotUsed] =
client.streamHellos(requestStream)
val done: Future[Done] =
responseStream.runForeach(reply =>
println(s"got streaming reply: ${reply.message}"))

done.onComplete {
case Success(_) =>
println("streamingRequestReply done")
case Failure(e) =>
println(s"Error streamingRequestReply: $e")
}
}
}


ちょと長い。

けどやってることは、シンプルで、Akkaのお作法書いてから、 akka.grpc.GrpcClientSettings なるAkka gRPCが提供しているSettingsを作って、Protocol Buffersが自動生成してくれている example.myapp.helloworld.grpc.GreeterServiceClient を初期化している。

あとは、作った client インスタンスのメソッドを呼ぶメソッドを作ってるだけで、それを、起動時に呼びだして、上述で作ったServerにアクセスして、Request/Responseするって感じになってる。

なので、Clientのコードを書くときに、Server側のReq/ResのModelとか、サービスがSingleかStreamとかを、型の制約に従って実装していくだけで良いので、めちゃ簡単だし、仕様を自ら読み問いて実装する部分がかなり少なくて、ほんと従うだけって感じ。


resources/application.conf

で、上述のClientで、どのServerにアクセスすれば良いのってあたりのsettingsが必要なので、application.confに記載する。


application.conf

akka.grpc.client {

"helloworld.GreeterService" {
host = 127.0.0.1
port = 8080
override-authority = foo.test.google.fr
use-tls = false
}
}


Client実行!!

さーやっと実行しますよー。

まずは、Serverを起動しておきます。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"

...
[info] gRPC server bound to: /127.0.0.1:8080

で、別セッションのTerminalとかから、Clientを起動してみます。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterClient"

...
[info] got streaming reply: Hello, Alice-0
[info] got streaming reply: H
[info] got streaming reply: e
[info] got streaming reply: l
[info] got streaming reply: l
[info] got streaming reply: o
[info] got streaming reply: ,
[info] got streaming reply:
[info] got streaming reply: A
[info] got single reply: HelloReply(Hello, Alice)
[info] got single reply for streaming requests: HelloReply(Hello, HelloRequest(Alice), HelloRequest(Bob), HelloRequest(Peter))
[info] got streaming reply: l
[info] got streaming reply: i
[info] got streaming reply: c
[info] got streaming reply: e
[info] streamingReply done
[info] [INFO] [12/08/2018 11:19:03.949] [HelloWorldClient-akka.actor.default-dispatcher-6] [akka.actor.ActorSystemImpl(HelloWorldClient)] Performing request
[info] got single reply: HelloReply(Hello, Alice)
...

って感じになります。

延々とStreamしていきますね。

あと、Server側のTerminalでも、

[info] sayHello to stream...

[info] sayHello to in stream...
[info] sayHello to Alice
[info] sayHello to Alice with stream of chars...
[info] sayHello to Alice
[info] sayHello to Alice
...

みたいな感じでログが出ますね。

成功したみたい!!わーい 🤗


まとめ

ざっと、Akka gRPCのWalkthroughをやってみました。

この調子で、このサーバーに、gRPC WebとかをJavaScriptで作って接続したりとか、色々試してみたいと思います。

GitHub にコードを上げてます。 https://github.com/yoshiyoshifujii/example-akka-grpc

以上です。