Edited at

Akka gRPCのWalkthroughってみた


はじめに

Play 2.7.0 is here!だそうで、リンク先を見てみると・・・


gRPC support

gRPC is a transport mechanism for request/response and (non-persistent) streaming use cases. It is a schema-first RPC framework, where your protocol is declared in a protobuf service descriptor, and requests and responses will be streamed over an HTTP/2 connection. Play now offers play-grpc which is a module built on top of akka-grpc and gives you experimental support to declare your services in this format. See Akka gRPC's documentation on Why gRPC? for more information about when to use gRPC as your transport.


Akka gRPC使ってgRPCをサポートするようです。

元々Akka gRPCが気になってたことですし、ドキュメントにあるWalkthroughでもやってみようかと思います。

※gRPC自体の解説はしません。


構成


  • scala: 2.12.8

  • sbt: 1.2.8

  • Akka gRPC: 0.5.0


成果物

https://github.com/lightstaff/akka-grpc-example


SBT設定

まずはプラグインを追加します。


plugin.sbt

// akka grpcプラグイン

addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.5.0")

// JavaAgentプラグイン
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")


続いてビルド構成。


build.sbt

name := "akka-grpc-example"

version := "0.1"

scalaVersion := "2.12.8"

// akka grpcプラグインを適用
enablePlugins(AkkaGrpcPlugin)

// JavaAgentプラグインを適用
enablePlugins(JavaAgent)

// JavaAgentにALPNを追加
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"


Akka gRPCプラグインとともにJavaAgentプラグインを導入しています。これはgRPCが行うhttp2でのサーバー・クライアント間の通信に必要なためのようです(JVMバージョン9以降はALPNは不要らしい?)。


protoファイル

/src/main/protobufhelloworld.protoというファイル名でProtocol Buffersを定義します。


helloworld.proto

syntax = "proto3";

// 単一ファイル化を拒否
option java_multiple_files = true;
// コード上のパッケージ名
option java_package = "example.myapp.helloworld.grpc";
// 何のために出力されるかよく分からんクラス
option java_outer_classname = "HelloWoldProto";

// protobuf的パッケージ名
package helloworld;

// サービス規定
service GreeterService {

// 単発リクエスト → 単発リプライ
rpc SayHello(HelloRequest) returns (HelloReply) {}

// ストリームリクエスト → 単発リプライ
rpc ItKeepsTalking(stream HelloRequest) returns (HelloReply) {}

// 単発リプライ → ストリームリクエスト
rpc ItKeepsReplying(HelloRequest) returns (stream HelloReply) {}

// ストリームリクエスト → ストリームリプライ
rpc StreamHellos(stream HelloRequest) returns (stream HelloReply) {}
}

// リクエスト形式
message HelloRequest {
string name = 1;
}

// リプライ形式
message HelloReply {
string message = 1;
}


GreeterServiceSayHelloItKeepsTalkingItKeepsReplyingStreamHellosの4つのRPCメソッドを用意しています。

ここまでできたら一旦ビルドし、上記helloworld.protoから各クラス等を自動生成します。

$: sbt compile

内部的にはScalaPBが動作しているようで/targetフォルダ内に生成され名前空間でのインポートが可能となっているようです。


サービス実装

Protocol Buffersから自動生成されたGreeterServiceを継承し実装します。


GreeterServiceImpl.scala

package example.myapp.helloworld

import scala.concurrent.Future

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

import example.myapp.helloworld.grpc._

// GreeterServerの実装
class GreeterServiceImpl(materializer: Materializer) extends GreeterService {

import materializer.executionContext
private implicit val mat: Materializer = materializer

// 単発リクエスト → 単発リプライ
override def sayHello(in: HelloRequest): Future[HelloReply] = {
println(s"sayHello to ${in.name}")

// そのまんま
Future.successful(HelloReply(s"Hello, ${in.name}"))
}

// ストリームリクエスト → 単発リプライ
override def itKeepsTalking(
in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
println(s"sayHello to in stream...")

// SourceをSinkで受けて、リクエストのnameを結合
in.runWith(Sink.seq)
.map(elements =>
HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}"))
}

// 単発リプライ → ストリームリクエスト
override def itKeepsReplying(
in: HelloRequest): Source[HelloReply, NotUsed] = {
println(s"sayHello to ${in.name} with stream of chars...")

// リクエストのnameをcharをSource化して流す
Source(s"Hello, ${in.name}".toList).map(character =>
HelloReply(character.toString))
}

// ストリームリクエスト → ストリームリプライ
override def streamHellos(
in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
println(s"sayHello to stream...")

// SourceからSource
in.map(request => HelloReply(s"Hello, s${request.name}"))
}
}


それぞれ単純な作りになっているのでメソッド自体の解説はコメントの通りです。

Akka-gRPCがProtcol BuffersのstreamをAkka-Streamに変換してくれているので、SourceSinkを使って処理ができるようです(Flowは今回登場しませんでした)。複雑化するようならGraphDSLも持ち込めると思います。


サーバー

上記で実装したサービスを提供するサーバーを定義します。


GreeterServer.scala

package example.myapp.helloworld

import scala.concurrent.{ExecutionContext, Future}

import akka.actor.ActorSystem
import akka.http.scaladsl.UseHttp2.Always
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import example.myapp.helloworld.grpc.GreeterServiceHandler

object GreeterServer {

def main(args: Array[String]): Unit = {

// http2をon 必須!!
val conf =
ConfigFactory
.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())

val system = ActorSystem("HelloWorld", conf)

new GreeterServer(system).run()
}
}

class GreeterServer(system: ActorSystem) {

def run(): Future[Http.ServerBinding] = {
implicit val sys: ActorSystem = system
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher

// サービスを生成
val service: HttpRequest => Future[HttpResponse] = GreeterServiceHandler(
new GreeterServiceImpl(mat))

// akka-httpでバインド
val bound = Http().bindAndHandleAsync(
service,
interface = "127.0.0.1",
port = 18080,
connectionContext = HttpConnectionContext(http2 = Always))

bound.foreach { binding =>
println(s"gRPC server bound to: ${binding.localAddress}")
}

bound
}
}


サービスが提供するハンドラをAkka-Httpでバインドしているだけのシンプルなものです。

akka.http.server.preview.enable-http2 = onを忘れないよう注意。


クライアント

クライアントを実装してサーバーを介してgRPCサービスを呼び出してみます。


GreeterClient.scala

package example.myapp.helloworld

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import example.myapp.helloworld.grpc.{
GreeterService,
GreeterServiceClient,
HelloReply,
HelloRequest
}

object GreeterClient {

def main(args: Array[String]): Unit = {
implicit val sys: ActorSystem = ActorSystem("HelloWorldClient")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher

// Walkthroughから生成方法を変更
// DEADLINE短いとストリームが終わらんので注意
val clientSettings = GrpcClientSettings
.connectToServiceAt("localhost", 18080)
.withDeadline(15.second)
.withTls(false)

// clientを生成
val client: GreeterService = GreeterServiceClient(clientSettings)

// 下記メソッドからサービスを呼び出し
runSingleRequestReplyExample()
runStreamingRequestExample()
runStreamingReplyExample()
runStreamingRequestReplyExample()

// 用も無く無限に呼び出してるのでコメントアウト
// sys.scheduler.schedule(1.second, 1.second) {
// runSingleRequestReplyExample()
// }

// 単発リクエスト → 単発リプライ
def runSingleRequestReplyExample(): Unit = {
sys.log.info("Performing request")

val reply = client.sayHello(HelloRequest("Alice"))

reply.onComplete {
case Success(msg) =>
println(s"got single reply: $msg")
case Failure(e) =>
println(s"Error sayHello: $e")
}
}

// ストリームリクエスト → 単発リプライ
def runStreamingRequestExample(): Unit = {
val requests = List("Alice", "Bob", "Peter").map(HelloRequest.apply)

// リクエストとストリームとして送信
val reply = client.itKeepsTalking(Source(requests))

reply.onComplete {
case Success(msg) =>
println(s"got single reply for streaming requests: $msg")
case Failure(e) =>
println(s"Error streamingRequest: $e")
}
}

// 単発リプライ → ストリームリクエスト
def runStreamingReplyExample(): Unit = {
// サービスから戻り値ストリーム
val responseStream = client.itKeepsReplying(HelloRequest("Alice"))

// サーバーからの送信が終わったらDone
val done: Future[Done] =
responseStream.runForeach(reply =>
println(s"got streaming reply: ${reply.message}"))

done.onComplete {
case Success(_) =>
println("streamingReply done")
case Failure(e) =>
println(s"Error streamingReply: $e")
}
}

// ストリームリクエスト → ストリームリプライ
def runStreamingRequestReplyExample(): Unit = {
// 一秒に一回×10リクエスト
val requestStream: Source[HelloRequest, NotUsed] = Source
.tick(100.millis, 1.second, "tick")
.zipWithIndex
.map { case (_, i) => i }
.map(i => HelloRequest(s"Alice-$i"))
.take(10)
.mapMaterializedValue(_ => NotUsed)

// 戻り値ストリーム
val responseStream: Source[HelloReply, NotUsed] =
client.streamHellos(requestStream)

// サーバーからの送信が終わったらDone
val done: Future[Done] = responseStream.runForeach(reply =>
println(s"got streaming reply: ${reply.message}"))

done.onComplete {
case Success(_) =>
println("streamingRequestReply done")
case Failure(e) =>
println(s"Error streamingRequestReply: $e")
}
}
}
}


二点ほど元のWalkthroughから変更しています。

一点はval clientSettings = GrpcClientSettings...の部分。これは元はapplication.confからロードされるようになってましたが、Walkthrough内にその設定方法が書いてなかった(別ページにはあります)ので直接的な定義に代えました。

もう一点はrunSingleRequestReplyExampleを無限に呼び出すようになっていてログが流れるのが邪魔くさくてその部分をコメントアウトしています。

基本的にサーバーがAkka-Http噛ましているのでFutureStreamなりで処理が返ってきます。


実行

それぞれをsbt runなりsbt runMainなりで動かします。

サーバー側

[Info] sayHello to Alice

[Info] sayHello to in stream...
[Info] sayHello to Alice with stream of chars...
[Inof] sayHello to stream...
...

クライアント側

[Info] got single reply: Hello, Alice

[Info] got single reply for streaming requests: Alice, Bob, Peter
[Info] got streaming reply: H
[Inof] got streaming reply: e
[Inof] got streaming reply: l
[Inof] got streaming reply: l
[Inof] got streaming reply: o
[Inof] got streaming reply: ,
[Inof] got streaming reply: A
[Inof] got streaming reply: l
[Inof] got streaming reply: i
[Inof] got streaming reply: c
[Inof] got streaming reply: e
[Inof] got streaming reply: Alice-0
[Inof] got streaming reply: Alice-1
...

などとそれぞれ出力されるはずです。


おわりに

基本がAkka-StreamなのでAサービスからBサービスにストリームで繋ぐことのも楽かも。