Help us understand the problem. What is going on with this article?

gRPCのサーバ/クライアントをScalaで実装する

More than 1 year has passed since last update.

この記事はなに?

ScalaでgRPCなアプリケーションを作るためのやりかた。

具体的には4つの通信方式、

  • unary
  • server straming
  • client streaming
  • bidirectional streaming

それぞれについてサーバ/クライアントをScalaで実装する。

参考

目次

  • sbtの設定、マルチプロジェクト対応
  • .protoファイルを作成
  • .protoから生成したクラスをScalaから使用する
    • unary
    • server straming
    • client streaming
    • bidirectional streaming

サンプルコードはpetitviolet/scala-grpc-pracに置いた。

sbtの設定、マルチプロジェクト対応

modules配下にmain, model, protocolという3つのプロジェクトを設定する。

gRPCのprotocol bufferで使う.protoファイルはmodules/protocol/protocol配下に置けるようにする。
treeだとこんな感じ。

./modules/
├── main
│   └── src
│       └── main
├── model
│   └── src
│       └── main
└── protocol
    └── protocol
        └── my_service.proto

build.sbtに設定を記述する

だいたい全部を載せておく

// 共通設定
def commonSettings(_name: String) = Seq(
  scalaVersion := "2.12.4",
  version := "1.0.0",
  libraryDependencies ++= commonDependencies,
  name := _name,
)

// gRPC用の設定
def grpcProtocolSettings = {
  import scalapb.compiler.Version

  Seq(
    PB.targets in Compile := Seq(
      scalapb.gen(singleLineToProtoString = true) -> (sourceManaged in Compile).value
    ),
    PB.protoSources in Compile +=
      (baseDirectory in ThisProject).value / "protocol",

    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "scalapb-runtime" % Version.scalapbVersion % "protobuf",
      "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % Version.scalapbVersion,
      "io.grpc" % "grpc-netty" % Version.grpcJavaVersion
    )
  )
}

// rootプロジェクト
lazy val grpcPrac = (project in file("."))
  .settings(commonSettings("grpcPrac"))
  .aggregate(main)

// gRPCの.protoファイルを配置するプロジェクト  
lazy val protocol = (project in file("modules/protocol"))
  .settings(commonSettings("protocol"))
  .settings(grpcProtocolSettings)  // gRPC設定を適用

// gRPC非依存なプロジェクト
lazy val model = (project in file("modules/model"))
  .settings(commonSettings("model"))

// gRPCなserverを実装するプロジェクト
lazy val main = (project in file("modules/main"))
  .settings(
    commonSettings("main"),
  )
  .dependsOn(model, protocol)

これでOK。

.protoファイルを作成

Protocol Bufferの文法とかはLanguage Guide (proto3)を見たほうが早い。
今回のサンプルで使うのは以下のprotoファイル。

modules/protocol/protocol/MyService.protoに置いてある。

syntax = "proto3";

// 出力されるScalaプログラムのpackageを指定
package net.petitviolet.prac.grpc.protocol;

service MyService {
    // unary
    rpc ShowOrganization (ShowOrganizationRequest) returns (Organization) {
    };
    // server stream
    rpc ShowEmployees (ShowEmployeeRequest) returns (stream Employee) {
    };
    // client stream
    rpc AddEmployee (stream Employee) returns (MessageResponse) {
    };
    // bidirectional stream
    rpc Lottery(stream FetchRandomRequest) returns (stream Employee) {};
}

message MessageResponse {
    string message = 1;
}

message ShowEmployeeRequest {
    int32 organizationId = 1;
}

message ShowOrganizationRequest {
    int32 organizationId = 1;
}

message FetchRandomRequest { 
}

message Employee {
    string name = 1;
    int32 age = 2;
    enum Post {
        NoTitle = 0;
        Manager = 1;
        Officer = 2;
    }
    int32 organizationId = 3;
}

message Organization {
    int32 id = 1;
    string name = 2;
    repeated Employee emproyees = 3;
}

これを置いた上でsbt compileするとmodules/protocol/target配下にScalaファイルが吐かれる。
gRPC使う側はimportするだけで使用できる。

packageを指定してMyService.protoというファイル名なので、実際に使う際のimport文は以下になる。

import net.petitviolet.prac.grpc.protocol.MyService._

.protoから生成したクラスをScalaから使用する

grpc / gRPC Concepts#RPC life cycleにあるようにRPCの方式は4つある。

  • unary
  • server streaming
  • client streaming
  • bidirectional streaming

それぞれについてserver/clientで実装してみる。

記事に書くにあたってサンプル実装とは一部変更してある。

共通

server

実装するべきI/Fはこのようになっている。

class MyServiceImpl extends MyServiceGrpc.MyService {
  // unary
  def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
  // server streaming
  def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: StreamObserver[MyService.Employee]): Unit = ???
  // client streaming
  def addEmployee(responseObserver: StreamObserver[MyService.MessageResponse]): StreamObserver[MyService.Employee] = ???
  // bidirectional streaming
  def lottery(responseObserver: StreamObserver[MyService.Employee]): StreamObserver[MyService.FetchRandomRequest] = ???
}

これを実装してServiceBuilder#addServiceに渡せば良い。

サンプル実装はこのあたり

client

こちらは既に実装されているので、利用するにあたってはI/Fに合わせて利用するだけ。
MyServiceBlockingClientMyServiceStubの2クラスが生成されている。
前者は名前の通りblockingするクライアントになっていて、unary, server streamingの2パターンしかサポートしていない。
後者のMyServiceStubは全てサポートしていて以下のようなI/Fになっている。

class MyServiceStub(channel: io.grpc.Channel, options: io.grpc.CallOptions = io.grpc.CallOptions.DEFAULT) extends io.grpc.stub.AbstractStub[MyServiceStub](channel, options) with MyService {
  // unary
  override def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
  // server streaming
  override def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: _root_.io.grpc.stub.StreamObserver[MyService.Employee]): Unit = ???
  // client streaming
  override def addEmployee(responseObserver: io.grpc.stub.StreamObserver[MyService.MessageResponse]): io.grpc.stub.StreamObserver[MyService.Employee] = ???
  // bidirectional streaming
  override def lottery(responseObserver: io.grpc.stub.StreamObserver[MyService.Employee]): io.grpc.stub.StreamObserver[MyService.FetchRandomRequest] = ???
}

上記のMyServiceStubクラスのコンストラクタにあるio.grpc.Channelインスタンスからクライアントはnewできる。

val blockingClient: MyServiceGrpc.MyServiceBlockingClient = MyServiceGrpc.blockingStub(channel)
val asyncClient: MyServiceGrpc.MyServiceStub = MyServiceGrpc.stub(channel)

unary

1リクエストに対して1レスポンスのもっともシンプルな通信。

server

普通に実装すればよく、特に言及することはない。

override def showOrganization(request: ShowOrganizationRequest): Future[Organization] = {
  organizationRepository.findById(request.organizationId) // Future[Option[Organization]]
    .map {
      case Some(organization) => organization
      case None =>
        throw new RuntimeException(s"invalid request organization id = ${ request.organizationId }")
    }
  }

client

blocking/asyncどちらにも実装されているメソッドを呼び出すだけ。

val org: Organization = blockingClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
val orgF: Future[Organization] = asyncClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))

client streaming

クライアントからのstreamなリクエストに対してサーバから1レスポンスを返す方式。

server

引数と返り値が共にStreamObserverとなっていて一瞬混乱するかも。
注意点としては、サーバからは1レスポンスな通信方式なので、返り値のStreamObserver.onNextを複数回呼ばないようにすること。

override def addEmployee(responseObserver: StreamObserver[MessageResponse]): StreamObserver[Employee] = {
  new StreamObserver[Employee] {
    override def onError(t: Throwable): Unit =  {
      logger.error("addEmployee onError", t)
      responseObserver.onError(t)
    }

    override def onCompleted(): Unit = {
      // ここで一度だけonNextを実行し、続けてonCompleteを実行する
      responseObserver.onNext(MessageResponse(s"addEmployee succeeded"))
      responseObserver.onCompleted()
    }

    // ここがclientからのリクエストによって複数実行される可能性がある
    override def onNext(employee: Employee): Unit = {
      employeeRepository.store(employee)
        .onComplete { // ここではresponseObserver.onNextは呼ばない
          case Success(_) => logger.info(s"addEmployee onNext: ${ employee.name }")
          case Failure(t) => onError(t)
        }
    }
  }
}

client

asyncなクライアントしか使用することは出来ない。
注意点としては、asyncClient.addEmployeeの返り値のStreamObserveronCompletedを呼ぶこと。

def addEmployee(name: String) = rpc {
  val responseObserver = new StreamObserver[MessageResponse] {
    override def onError(t: Throwable): Unit = logger.error("add failed to add employee", t)

    override def onCompleted(): Unit = logger.info("add completed to add employee")

    override def onNext(value: MessageResponse): Unit = logger.info("add onNext. message = ${value.message}")
  }

  val requestObserver: StreamObserver[Employee] = asyncClient.addEmployee(responseObserver)
  (1 to 3).foreach { i =>
    val employee = Employee(s"${ name }-$i", i * 10, i)
    requestObserver.onNext(employee)
  }

  responseObserver.onCompleted()
  requestObserver.onCompleted() // これを呼ぶ
}

requestObserver.onCompleted()を実行しないとクライアントからのstreamingが終わったことを通知する必要があるらしい。
そうしないと、クライアント側のManagedChannel.shutdown()を実行した時にサーバ側のonErrorが呼び出されてしまう。

上の実装だとlogger.errorのあとにresponseObserver.onErrorも実行しているが、既に接続が切れているのでクライアント側には通知されない。
ちなみにonError時のstacktraceを抜粋するとこんな感じ。

io.grpc.StatusException: CANCELLED
        at io.grpc.Status.asException(Status.java:534)
        at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:272)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:282)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$3.runInContext(ServerImpl.java:618)

server streaming

クライアントからの1リクエストに対してサーバからstreamで複数レスポンスを返す方式。

server

クライアントからのリクエストに対して、responseObserver.onNextを使って結果を複数返却する。

override def showEmployees(request: ShowEmployeeRequest, responseObserver: StreamObserver[Employee]): Unit = {
  Option(request.organizationId)
    .filterNot { _ == 0 } // デフォルト値を除外
    .map { orgId =>
      employeeRepository.findByOrganizationId(orgId) // Future[Seq[Employee]]
    }.getOrElse { 
      employeeRepository.findAll()  // Future[Seq[Employee]]
    }.map { employees: Seq[Employee] =>
      employees.foreach { employee: Employee =>
        responseObserver.onNext(employee) // onNextを複数回呼んで良い
      }
    }.onComplete {
      case Success(_) => responseObserver.onCompleted() // 全て終了したらonComplete
      case Failure(t) => responseObserver.onError(t) // 何か失敗したらonError
    }
}

onNextを全て呼び終わったら、最後に一度だけonCompleteを実行する。

client

こちらはリクエストを1回送り、複数レスポンスをStreamObserverで受け付ける。

def showEmployees(): Future[Unit] = rpc {
  val promise = Promise[Unit]()
  val responseObserver = new StreamObserver[Employee] {
    override def onError(t: Throwable): Unit = {
      logger.error(s"showEmployee onError", t)
      promise.failure(t)
    }
    override def onCompleted(): Unit = {
      logger.info(s"showEmployee onComplete")
      promise.success(())
    }

    override def onNext(value: Employee): Unit = {
      logger.info(s"showEmployee onNext: $value")
    }
  }
  asyncClient.showEmployees(new ShowEmployeeRequest(), responseObserver)
  promise.future
}

onCompleteが呼ばれるタイミングはサーバ側に依存するがPromiseを使って表現することが出来る。

bidirectional streaming

クライアントとサーバ間で双方向にリクエストしあう方式。

server

実装としてはclient streamingの時の実装とよく似ている。

override def lottery(responseObserver: StreamObserver[Employee]): StreamObserver[FetchRandomRequest] = {
  new StreamObserver[FetchRandomRequest] {
    override def onError(t: Throwable): Unit = logger.error(s"lottery onError", t)

    override def onCompleted(): Unit = {
      logger.info(s"lottery onCompleted")
      responseObserver.onCompleted()
    }

    override def onNext(value: FetchRandomRequest): Unit = {
      employeeRepository.findAll() foreach { _.headOption { em => 
        responseObserver.onNext(Employee(em.name, em.age, em.organization.id))
      }
    }
  }
}

この実装だとしていないが、サーバ側でresponseObserver.onNextはどんなタイミングで呼んでもよいものになっている。

サーバ側のonCompletedresponseObserver.onCompletedを呼ぶように実装しておく。

client

こちらはserver streamingの時の実装に似たものになる。

def lottery(): Future[Unit] = rpc {
  val promise = Promise[Unit]()
  val responseObserver = new StreamObserver[Employee] {
    override def onError(t: Throwable): Unit = {
      logger.error(s"lottery onError", t)
      promise.failure(t)
    }
    override def onCompleted(): Unit = {
      logger.info(s"lottery onComplete")
      promise.success(())
    }
    override def onNext(value: Employee): Unit = logger.info(s"lottery onNext: $value")
  }

  val requestObserver: StreamObserver[FetchRandomRequest] = asyncClient.lottery(responseObserver)

  (1 to 3).foreach { i =>
    requestObserver.onNext(FetchRandomRequest())
  }
  requestObserver.onCompleted()
  promise.future
}

server streamingの時と同様にPromiseを使って結果を受け取れるような実装にしてある。

クライアント側からrequestObserver.onCompletedを呼ぶことで、サーバからresponseObserver.onCompletedが実行されるように実装してあるのでこれで両方正常に終了出来る。

所感

gRPC、Protocol BufferでIDLが提供されているのでサーバ・クライアント間のインタフェースが型安全に表現できて非常に便利。

ScalaPBのおかげでScalaからも特に違和感なく使うことが出来る。

ただ、4つの通信方式それぞれで知っておいたほうがいいこともあるようなので、当たり前だけど慎重に実装する必要はある。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away