10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Akka gRPCを試してみた

Last updated at Posted at 2018-12-08

この記事は、 Scala Advent Calendar 2018 の8日目のエントリーです。

最近、何かと私の周辺で話題の CNCF(Cloud Native Computing Foundation)が、 gRPC-Web is going GA - Cloud Native Computing Foundation なる記事を、先日出しており、少し話題にあがってました。

gRPC-Webが正式リリース。WebブラウザからgRPCを直接呼び出し可能に - Publickey

gRPCを使ったバックエンドサーバーの技術的な事例も色々なところで出てきており、これからのWebアプリケーション開発において、フロントエンドでも、バックエンドでも、最注目の技術だと考えております。

今回、gRPCとはなんぞやってあたりの説明は割愛しまして、我らがAkkaが Akka gRPC なるライブラリを出していただいておりますので、まずはgRPC触ってみたいんだーってことで、さわってみました。

基本的に、 Walkthrough をなぞっているだけです。

なお、Akka gRPCは、

This library is in preview mode: basic functionality is in place, but APIs and build plugins are still expected to be improved.

ということで、Preview状態です。本番適用などは、そのあたり、考慮したうえでご検討ください :bow:

Server

まずは、Serverを作る。

ベースとなるプロジェクトを作る

Walkthroughを流すため、ゼロからプロジェクトを作っていきたいと思います。

$ sbt new sbt/scala-seed.g8

A minimal Scala project.

name [Scala Seed Project]: example-akka-grpc

みたいな感じで、 example-akka-grpc なる名前でプロジェクトを作成します。

project/plugins.sbt

plugins.sbt にプラグインを記載します。

$ cd example-akka-grpc
$ touch project/plugins.sbt
project/plugins.sbt
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.4.2")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")

Akka gRPCのプラグインと、JavaAgentなるプラグインですね。

build.sbt

プラグインを有効にする設定をbuild.sbtに書きます。

build.sbt
import Dependencies._

lazy val root = (project in file(".")).
  enablePlugins(AkkaGrpcPlugin).
  enablePlugins(JavaAgent).
  settings(
    inThisBuild(List(
      organization := "com.example",
      scalaVersion := "2.12.7",
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "example-akka-grpc",
    libraryDependencies += scalaTest % Test,
    javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
  )

こんな感じになります。

src/main/protobuf/helloworld.proto

Protocol Buffers の記法でファイルを作成します。

src/main/protobuf/helloworld.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service GreeterService {
    // Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
    rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
    rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}

ここでは、Request/Responseに必要なメッセージの定義と、そのメッセージを操作するサービスを定義しています。

option で、自動生成されたJavaのコードが出力されるパッケージとかもろもろを指定しています。

sbt compile

$ sbt compile

を実行します。

すると、

貼り付けた画像_2018_12_07_17_41.png

こんな感じでScalaのコードが自動生成されます。

sbt compile を実行すると、enablePluginsしていた AkkaGrpcPlugin あたりが、 thesamet/sbt-protoc なるプラグインを呼びだし、良い感じに src/main/protobuf/helloworld.proto を見つけてScalaファイルをgenerateしてくれます。

ちょっと見てみる

自動生成されたコードを少し見てみますと。

target/scala-2.12/src_managed/main/example/myapp/helloworld/grpc/GreeterService.scala
// Generated by Akka gRPC. DO NOT EDIT.
package example.myapp.helloworld.grpc

import scala.concurrent.Future

import akka.NotUsed
import akka.stream.scaladsl.Source

trait GreeterService {

  def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): Future[example.myapp.helloworld.grpc.HelloReply]

  def itKeepsTalking(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Future[example.myapp.helloworld.grpc.HelloReply]

  def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]

  def streamHellos(in: Source[example.myapp.helloworld.grpc.HelloRequest, NotUsed]): Source[example.myapp.helloworld.grpc.HelloReply, NotUsed]

}

object GreeterService {
  val name = "helloworld.GreeterService"

  object Serializers {
    import akka.grpc.scaladsl.ScalapbProtobufSerializer

    val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion)

    val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion)

  }
}

こんな感じで、必要なサービスのtraitが自動生成されている。
こういったサービスとか、Request/Responseに必要なメッセージのモデルとかってあたのボイラープレートなコードを自動生成してくれるんだな。
で、これを活用して、HTTPサーバー立てたら、そのHTTPサーバーにアクセスするクライアント側のコードも、同様の手法で生成したクライアント用の言語に合わせたサービスやモデル郡を使って通信できるんだな。
それが利点なんだな。

あと、ここでは、4つのパターンの呼び出しを作ってる。

Single Request Stream Request
Single Response sayHello itKeepsTalking
Stream Response itKeepsReplying streamHellos

こんな感じで、4パターン。

Serviceの実装クラスGreeterServiceImpl

で、自動生成されたGreeterServiceは、traitなので、実装クラスを作る。

src/main/scala/example/myapp/helloworld/GreeterServiceImpl.scala
package example.myapp.helloworld

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import example.myapp.helloworld.grpc.{GreeterService, HelloReply, HelloRequest}

import scala.concurrent.Future

class GreeterServiceImpl(materializer: Materializer) extends GreeterService {
  import materializer.executionContext
  private implicit val mat: Materializer = materializer

  override def sayHello(in: HelloRequest): Future[HelloReply] = {
    println(s"sayHello to ${in.name}")
    Future.successful(HelloReply(s"Hello, ${in.name}"))
  }

  override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
    println(s"sayHello to in stream...")
    in.runWith(Sink.seq)
      .map(elements => HelloReply(s"Hello, ${elements.mkString(", ")}"))
  }

  override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
    println(s"sayHello to ${in.name} with stream of chars...")
    Source(s"Hello, ${in.name}".toList)
      .map(character => HelloReply(character.toString))
  }

  override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
    println(s"sayHello to stream...")
    in.map(request => HelloReply(s"Hello, ${request.name}"))
  }
}

Main

で、あとは、サーバー側のAkka HTTPのメインとなるobjectを作って、Akka HTTPの起動処理を書く。
ここは、ほぼ、お作法みたいなもん。

src/main/scala/example/myapp/helloworld/GreeterServer.scala
package example.myapp.helloworld

import akka.actor.ActorSystem
import akka.http.scaladsl.UseHttp2.Always
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import example.myapp.helloworld.grpc.GreeterServiceHandler

import scala.concurrent.{ExecutionContext, Future}

object GreeterServer extends App {

  val conf = ConfigFactory
    .parseString("akka.http.server.preview.enable-http2 = on")
    .withFallback(ConfigFactory.defaultApplication())
  val system = ActorSystem("HelloWorld", conf)

  new GreeterServer(system).run()
}

class GreeterServer(system: ActorSystem) {

  def run(): Future[Http.ServerBinding] = {
    implicit val sys: ActorSystem = system
    implicit val mat: Materializer = ActorMaterializer()
    implicit val ec: ExecutionContext = system.dispatcher

    val service: HttpRequest => Future[HttpResponse] =
      GreeterServiceHandler(new GreeterServiceImpl(mat))

    val bound = Http().bindAndHandleAsync(
      service,
      interface = "127.0.0.1",
      port = 8080,
      connectionContext = HttpConnectionContext(http2 = Always)
    )

    bound.foreach { binding =>
      println(s"gRPC server bound to: ${binding.localAddress}")
    }

    bound
  }
}

ふむ。良い感じ。

Running the Server

なお、gRPCをサーバーで公開するには、HTTP/2である必要があり、これを正しく実行するには、 Akka HTTPのマニュアル に記載されているようにJetty ALPNエージェントを構成する必要があるとのこと。

Javaコマンドで実行するときは、

$ java -javaagent:/path/to/jetty-alpn-agent-2.0.7.jar -jar app.jar

みたいに、javaagentオプションを指定してあげる必要があったり、sbtの場合だと、 sbt/sbt-javaagent プラグインを有効にしたうえで、

  .enablePlugins(JavaAgent)
  .settings(
    javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.7" % "runtime"
  )

とか、build.sbtのプロジェクト設定に追加してあげる必要がある。
ふむふむなるほどー。

とりあえず、現時点でサーバー立ち上がるか、sbtから実行して試してみる。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"
...
[info] gRPC server bound to: /127.0.0.1:8080

ふむ。立ち上がったのかな?

Client

つぎに、gRPC Clientを作る。

sbtでマルチプロジェクトにしても良いけど、なんか、まあ、今回は、いいやって感じで、同じプロジェクトに別のClient Mainを作ろう。

src/main/protobuf/helloworld.proto

これ、上で作ったやつを、そのまま使います。
実際、Clientをマルチプロジェクトで作るとしたら、きっと、Protocol Buffersを書いたプロジェクトを別に作っておいて、そのプロジェクトを、dependsOnさせたりするのかと。
それか、publishして共通化しておくとか。
もしくは、Gitで別リポジトリにしておいて参照できるようにするとか。
なんせ、Protocol Buffersで書いたprotoファイルをClient/Serverで共通にしてそこから生成されるボイラープレートコードを使ってって感じになるのかと。

src/main/scala/example/myapp/helloworld/GreeterClient.scala

ClientのMainを作る。

src/main/scala/example/myapp/helloworld/GreeterClient.scala
package example.myapp.helloworld

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import example.myapp.helloworld.grpc.{
  GreeterService,
  GreeterServiceClient,
  HelloReply,
  HelloRequest
}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object GreeterClient extends App {
  // Boot akka
  implicit val sys: ActorSystem = ActorSystem("HelloWorldClient")
  implicit val mat: ActorMaterializer = ActorMaterializer()
  implicit val ec: ExecutionContext = sys.dispatcher

  // Take details how to connect to the service from the config.
  val clientSettings = GrpcClientSettings.fromConfig(GreeterService.name)
  // Create a client-side stub for the service
  val client: GreeterService = GreeterServiceClient(clientSettings)

  // Run examples for each of the exposed service methods.
  runSingleRequestReplyExample()
  runStreamingRequestExample()
  runStreamingReplyExample()
  runStreamingRequestReplyExample()

  sys.scheduler.schedule(1.second, 1.second) {
    runSingleRequestReplyExample()
  }

  def runSingleRequestReplyExample(): Unit = {
    sys.log.info("Performing request")
    val reply = client.sayHello(HelloRequest("Alice"))
    reply.onComplete {
      case Success(msg) =>
        println(s"got single reply: $msg")
      case Failure(e) =>
        println(s"Error sayHello: $e")
    }
  }

  def runStreamingRequestExample(): Unit = {
    val requests = List("Alice", "Bob", "Peter").map(HelloRequest.apply)
    val reply = client.itKeepsTalking(Source(requests))
    reply.onComplete {
      case Success(msg) =>
        println(s"got single reply for streaming requests: $msg")
      case Failure(e) =>
        println(s"Error streamingRequest: $e")
    }
  }

  def runStreamingReplyExample(): Unit = {
    val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
    val done: Future[Done] =
      responseStream.runForeach(reply =>
        println(s"got streaming reply: ${reply.message}"))

    done.onComplete {
      case Success(_) =>
        println("streamingReply done")
      case Failure(e) =>
        println(s"Error streamingReply: $e")
    }
  }

  def runStreamingRequestReplyExample(): Unit = {
    val requestStream: Source[HelloRequest, NotUsed] =
      Source
        .tick(100.millis, 1.second, "tick")
        .zipWithIndex
        .map { case (_, i) => i }
        .map(i => HelloRequest(s"Alice-$i"))
        .take(10)
        .mapMaterializedValue(_ => NotUsed)

    val responseStream: Source[HelloReply, NotUsed] =
      client.streamHellos(requestStream)
    val done: Future[Done] =
      responseStream.runForeach(reply =>
        println(s"got streaming reply: ${reply.message}"))

    done.onComplete {
      case Success(_) =>
        println("streamingRequestReply done")
      case Failure(e) =>
        println(s"Error streamingRequestReply: $e")
    }
  }
}

ちょと長い。
けどやってることは、シンプルで、Akkaのお作法書いてから、 akka.grpc.GrpcClientSettings なるAkka gRPCが提供しているSettingsを作って、Protocol Buffersが自動生成してくれている example.myapp.helloworld.grpc.GreeterServiceClient を初期化している。

あとは、作った client インスタンスのメソッドを呼ぶメソッドを作ってるだけで、それを、起動時に呼びだして、上述で作ったServerにアクセスして、Request/Responseするって感じになってる。

なので、Clientのコードを書くときに、Server側のReq/ResのModelとか、サービスがSingleかStreamとかを、型の制約に従って実装していくだけで良いので、めちゃ簡単だし、仕様を自ら読み問いて実装する部分がかなり少なくて、ほんと従うだけって感じ。

resources/application.conf

で、上述のClientで、どのServerにアクセスすれば良いのってあたりのsettingsが必要なので、application.confに記載する。

application.conf
akka.grpc.client {
  "helloworld.GreeterService" {
    host = 127.0.0.1
    port = 8080
    override-authority = foo.test.google.fr
    use-tls = false
  }
}

Client実行!!

さーやっと実行しますよー。

まずは、Serverを起動しておきます。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterServer"
...
[info] gRPC server bound to: /127.0.0.1:8080

で、別セッションのTerminalとかから、Clientを起動してみます。

$ sbt -mem 2048 "runMain example.myapp.helloworld.GreeterClient"
...
[info] got streaming reply: Hello, Alice-0
[info] got streaming reply: H
[info] got streaming reply: e
[info] got streaming reply: l
[info] got streaming reply: l
[info] got streaming reply: o
[info] got streaming reply: ,
[info] got streaming reply:
[info] got streaming reply: A
[info] got single reply: HelloReply(Hello, Alice)
[info] got single reply for streaming requests: HelloReply(Hello, HelloRequest(Alice), HelloRequest(Bob), HelloRequest(Peter))
[info] got streaming reply: l
[info] got streaming reply: i
[info] got streaming reply: c
[info] got streaming reply: e
[info] streamingReply done
[info] [INFO] [12/08/2018 11:19:03.949] [HelloWorldClient-akka.actor.default-dispatcher-6] [akka.actor.ActorSystemImpl(HelloWorldClient)] Performing request
[info] got single reply: HelloReply(Hello, Alice)
...

って感じになります。
延々とStreamしていきますね。

あと、Server側のTerminalでも、

[info] sayHello to stream...
[info] sayHello to in stream...
[info] sayHello to Alice
[info] sayHello to Alice with stream of chars...
[info] sayHello to Alice
[info] sayHello to Alice
...

みたいな感じでログが出ますね。
成功したみたい!!わーい 🤗

まとめ

ざっと、Akka gRPCのWalkthroughをやってみました。
この調子で、このサーバーに、gRPC WebとかをJavaScriptで作って接続したりとか、色々試してみたいと思います。

GitHub にコードを上げてます。 https://github.com/yoshiyoshifujii/example-akka-grpc

以上です。

10
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?