この記事は、 Scala Advent Calendar 2018 の8日目のエントリーです。
最近、何かと私の周辺で話題の CNCF(Cloud Native Computing Foundation)が、 gRPC-Web is going GA - Cloud Native Computing Foundation なる記事を、先日出しており、少し話題にあがってました。
gRPC-Webが正式リリース。WebブラウザからgRPCを直接呼び出し可能に - Publickey
gRPCを使ったバックエンドサーバーの技術的な事例も色々なところで出てきており、これからのWebアプリケーション開発において、フロントエンドでも、バックエンドでも、最注目の技術だと考えております。
今回、gRPCとはなんぞやってあたりの説明は割愛しまして、我らがAkkaが Akka gRPC なるライブラリを出していただいておりますので、まずはgRPC触ってみたいんだーってことで、さわってみました。
基本的に、 Walkthrough をなぞっているだけです。
なお、Akka gRPCは、
This library is in preview mode: basic functionality is in place, but APIs and build plugins are still expected to be improved.
ということで、Preview状態です。本番適用などは、そのあたり、考慮したうえでご検討ください
Server
まずは、Serverを作る。
ベースとなるプロジェクトを作る
Walkthroughを流すため、ゼロからプロジェクトを作っていきたいと思います。
$ sbt new sbt/scala-seed.g8
A minimal Scala project.
name [Scala Seed Project]: example-akka-grpc
みたいな感じで、 example-akka-grpc
なる名前でプロジェクトを作成します。
project/plugins.sbt
plugins.sbt
にプラグインを記載します。
$ cd example-akka-grpc
$ touch project/plugins.sbt
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.4.2")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")
Akka gRPCのプラグインと、JavaAgentなるプラグインですね。
build.sbt
プラグインを有効にする設定をbuild.sbtに書きます。
import Dependencies._
lazy val root = (project in file(".")).
enablePlugins(AkkaGrpcPlugin).
enablePlugins(JavaAgent).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.12.7",
version := "0.1.0-SNAPSHOT"
)),
name := "example-akka-grpc",
libraryDependencies += scalaTest % Test,
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
)
こんな感じになります。
src/main/protobuf/helloworld.proto
Protocol Buffers の記法でファイルを作成します。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service GreeterService {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
ここでは、Request/Responseに必要なメッセージの定義と、そのメッセージを操作するサービスを定義しています。
option
で、自動生成されたJavaのコードが出力されるパッケージとかもろもろを指定しています。
sbt compile
$ sbt compile
を実行します。
すると、
こんな感じでScalaのコードが自動生成されます。
sbt compile
を実行すると、enablePluginsしていた AkkaGrpcPlugin
あたりが、 thesamet/sbt-protoc なるプラグインを呼びだし、良い感じに src/main/protobuf/helloworld.proto
を見つけてScalaファイルをgenerateしてくれます。
ちょっと見てみる
自動生成されたコードを少し見てみますと。
// Generated by Akka gRPC. DO NOT EDIT.
package example.myapp.helloworld.grpc
import scala.concurrent.Future
import akka.NotUsed
import akka.stream.scaladsl.Source
trait GreeterService {
def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): Future[example.myapp.helloworld.grpc.HelloReply]
def itKeepsTalking(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Future[example.myapp.helloworld.grpc.HelloReply]
def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]
def streamHellos(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]
}
object GreeterService {
val name = "helloworld.GreeterService"
object Serializers {
import akka.grpc.scaladsl.ScalapbProtobufSerializer
val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion)
val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion)
}
}
こんな感じで、必要なサービスのtraitが自動生成されている。
こういったサービスとか、Request/Responseに必要なメッセージのモデルとかってあたのボイラープレートなコードを自動生成してくれるんだな。
で、これを活用して、HTTPサーバー立てたら、そのHTTPサーバーにアクセスするクライアント側のコードも、同様の手法で生成したクライアント用の言語に合わせたサービスやモデル郡を使って通信できるんだな。
それが利点なんだな。
あと、ここでは、4つのパターンの呼び出しを作ってる。
Single Request | Stream Request | |
---|---|---|
Single Response | sayHello | itKeepsTalking |
Stream Response | itKeepsReplying | streamHellos |
こんな感じで、4パターン。
Serviceの実装クラスGreeterServiceImpl
で、自動生成されたGreeterServiceは、traitなので、実装クラスを作る。
package example.myapp.helloworld
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import example.myapp.helloworld.grpc.{GreeterService, HelloReply, HelloRequest}
import scala.concurrent.Future
class GreeterServiceImpl(materializer: Materializer) extends GreeterService {
import materializer.executionContext
private implicit val mat: Materializer = materializer
override def sayHello(in: HelloRequest): Future[HelloReply] = {
println(s"sayHello to ${in.name}")
Future.successful(HelloReply(s"Hello, ${in.name}"))
}
override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
println(s"sayHello to in stream...")
in.runWith(Sink.seq)
.map(elements => HelloReply(s"Hello, ${elements.mkString(", ")}"))
}
override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
println(s"sayHello to ${in.name} with stream of chars...")
Source(s"Hello, ${in.name}".toList)
.map(character => HelloReply(character.toString))
}
override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
println(s"sayHello to stream...")
in.map(request => HelloReply(s"Hello, ${request.name}"))
}
}
Main
で、あとは、サーバー側のAkka HTTPのメインとなるobjectを作って、Akka HTTPの起動処理を書く。
ここは、ほぼ、お作法みたいなもん。
package example.myapp.helloworld
import akka.actor.ActorSystem
import akka.http.scaladsl.UseHttp2.Always
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import example.myapp.helloworld.grpc.GreeterServiceHandler
import scala.concurrent.{ExecutionContext, Future}
object GreeterServer extends App {
val conf = ConfigFactory
.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("HelloWorld", conf)
new GreeterServer(system).run()
}
class GreeterServer(system: ActorSystem) {
def run(): Future[Http.ServerBinding] = {
implicit val sys: ActorSystem = system
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
val service: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(new GreeterServiceImpl(mat))
val bound = Http().bindAndHandleAsync(
service,
interface = "127.0.0.1",
port = 8080,
connectionContext = HttpConnectionContext(http2 = Always)
)
bound.foreach { binding =>
println(s"gRPC server bound to: ${binding.localAddress}")
}
bound
}
}
ふむ。良い感じ。
Running the Server
なお、gRPCをサーバーで公開するには、HTTP/2である必要があり、これを正しく実行するには、 Akka HTTPのマニュアル に記載されているようにJetty ALPNエージェントを構成する必要があるとのこと。
Javaコマンドで実行するときは、
$ java -javaagent:/path/to/jetty-alpn-agent-2.0.7.jar -jar app.jar
みたいに、javaagentオプションを指定してあげる必要があったり、sbtの場合だと、 sbt/sbt-javaagent プラグインを有効にしたうえで、
.enablePlugins(JavaAgent)
.settings(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.7" % "runtime"
)
とか、build.sbtのプロジェクト設定に追加してあげる必要がある。
ふむふむなるほどー。
とりあえず、現時点でサーバー立ち上がるか、sbtから実行して試してみる。
$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"
...
[info] gRPC server bound to: /127.0.0.1:8080
ふむ。立ち上がったのかな?
Client
つぎに、gRPC Clientを作る。
sbtでマルチプロジェクトにしても良いけど、なんか、まあ、今回は、いいやって感じで、同じプロジェクトに別のClient Mainを作ろう。
src/main/protobuf/helloworld.proto
これ、上で作ったやつを、そのまま使います。
実際、Clientをマルチプロジェクトで作るとしたら、きっと、Protocol Buffersを書いたプロジェクトを別に作っておいて、そのプロジェクトを、dependsOnさせたりするのかと。
それか、publishして共通化しておくとか。
もしくは、Gitで別リポジトリにしておいて参照できるようにするとか。
なんせ、Protocol Buffersで書いたprotoファイルをClient/Serverで共通にしてそこから生成されるボイラープレートコードを使ってって感じになるのかと。
src/main/scala/example/myapp/helloworld/GreeterClient.scala
ClientのMainを作る。
package example.myapp.helloworld
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import example.myapp.helloworld.grpc.{
GreeterService,
GreeterServiceClient,
HelloReply,
HelloRequest
}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object GreeterClient extends App {
// Boot akka
implicit val sys: ActorSystem = ActorSystem("HelloWorldClient")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher
// Take details how to connect to the service from the config.
val clientSettings = GrpcClientSettings.fromConfig(GreeterService.name)
// Create a client-side stub for the service
val client: GreeterService = GreeterServiceClient(clientSettings)
// Run examples for each of the exposed service methods.
runSingleRequestReplyExample()
runStreamingRequestExample()
runStreamingReplyExample()
runStreamingRequestReplyExample()
sys.scheduler.schedule(1.second, 1.second) {
runSingleRequestReplyExample()
}
def runSingleRequestReplyExample(): Unit = {
sys.log.info("Performing request")
val reply = client.sayHello(HelloRequest("Alice"))
reply.onComplete {
case Success(msg) =>
println(s"got single reply: $msg")
case Failure(e) =>
println(s"Error sayHello: $e")
}
}
def runStreamingRequestExample(): Unit = {
val requests = List("Alice", "Bob", "Peter").map(HelloRequest.apply)
val reply = client.itKeepsTalking(Source(requests))
reply.onComplete {
case Success(msg) =>
println(s"got single reply for streaming requests: $msg")
case Failure(e) =>
println(s"Error streamingRequest: $e")
}
}
def runStreamingReplyExample(): Unit = {
val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
val done: Future[Done] =
responseStream.runForeach(reply =>
println(s"got streaming reply: ${reply.message}"))
done.onComplete {
case Success(_) =>
println("streamingReply done")
case Failure(e) =>
println(s"Error streamingReply: $e")
}
}
def runStreamingRequestReplyExample(): Unit = {
val requestStream: Source[HelloRequest, NotUsed] =
Source
.tick(100.millis, 1.second, "tick")
.zipWithIndex
.map { case (_, i) => i }
.map(i => HelloRequest(s"Alice-$i"))
.take(10)
.mapMaterializedValue(_ => NotUsed)
val responseStream: Source[HelloReply, NotUsed] =
client.streamHellos(requestStream)
val done: Future[Done] =
responseStream.runForeach(reply =>
println(s"got streaming reply: ${reply.message}"))
done.onComplete {
case Success(_) =>
println("streamingRequestReply done")
case Failure(e) =>
println(s"Error streamingRequestReply: $e")
}
}
}
ちょと長い。
けどやってることは、シンプルで、Akkaのお作法書いてから、 akka.grpc.GrpcClientSettings
なるAkka gRPCが提供しているSettingsを作って、Protocol Buffersが自動生成してくれている example.myapp.helloworld.grpc.GreeterServiceClient
を初期化している。
あとは、作った client
インスタンスのメソッドを呼ぶメソッドを作ってるだけで、それを、起動時に呼びだして、上述で作ったServerにアクセスして、Request/Responseするって感じになってる。
なので、Clientのコードを書くときに、Server側のReq/ResのModelとか、サービスがSingleかStreamとかを、型の制約に従って実装していくだけで良いので、めちゃ簡単だし、仕様を自ら読み問いて実装する部分がかなり少なくて、ほんと従うだけって感じ。
resources/application.conf
で、上述のClientで、どのServerにアクセスすれば良いのってあたりのsettingsが必要なので、application.confに記載する。
akka.grpc.client {
"helloworld.GreeterService" {
host = 127.0.0.1
port = 8080
override-authority = foo.test.google.fr
use-tls = false
}
}
Client実行!!
さーやっと実行しますよー。
まずは、Serverを起動しておきます。
$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"
...
[info] gRPC server bound to: /127.0.0.1:8080
で、別セッションのTerminalとかから、Clientを起動してみます。
$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterClient"
...
[info] got streaming reply: Hello, Alice-0
[info] got streaming reply: H
[info] got streaming reply: e
[info] got streaming reply: l
[info] got streaming reply: l
[info] got streaming reply: o
[info] got streaming reply: ,
[info] got streaming reply:
[info] got streaming reply: A
[info] got single reply: HelloReply(Hello, Alice)
[info] got single reply for streaming requests: HelloReply(Hello, HelloRequest(Alice), HelloRequest(Bob), HelloRequest(Peter))
[info] got streaming reply: l
[info] got streaming reply: i
[info] got streaming reply: c
[info] got streaming reply: e
[info] streamingReply done
[info] [INFO] [12/08/2018 11:19:03.949] [HelloWorldClient-akka.actor.default-dispatcher-6] [akka.actor.ActorSystemImpl(HelloWorldClient)] Performing request
[info] got single reply: HelloReply(Hello, Alice)
...
って感じになります。
延々とStreamしていきますね。
あと、Server側のTerminalでも、
[info] sayHello to stream...
[info] sayHello to in stream...
[info] sayHello to Alice
[info] sayHello to Alice with stream of chars...
[info] sayHello to Alice
[info] sayHello to Alice
...
みたいな感じでログが出ますね。
成功したみたい!!わーい 🤗
まとめ
ざっと、Akka gRPCのWalkthroughをやってみました。
この調子で、このサーバーに、gRPC WebとかをJavaScriptで作って接続したりとか、色々試してみたいと思います。
GitHub にコードを上げてます。 https://github.com/yoshiyoshifujii/example-akka-grpc
以上です。