LoginSignup
49

More than 1 year has passed since last update.

DartでgRPCを使う

Last updated at Posted at 2019-12-12

この記事について

Dart Advent Calendar 2019 の 13 日目の記事です。
その後も読みやすい記事になるようたびたび改善してます。

対象者

  • gRPC を知らない人
  • 少し知っているけれど使い方がわからない人
  • 使いどころがわからない人
  • Dart での使い方を知らない人

目標

  • Dart で REST 等の Web API の代わりに gRPC を使う基本的な方法を把握する
  • 他の使い方(双方向の通信が必要な用途など)もイメージできるようにする

GitHub リポジトリ

この記事のコードを GitHub に置いています(英語にして、他にも少しだけ変えています)。
kaboc/dart_grpc_examples

gRPC とは

Google によって開発されたオープンソースの RPC フレームワークで、今は Cloud Native Computing Foundation(CNCF)によって管理されています。
RPC は Remote Procedure Call のことで、まるでローカルの関数を使うようにリモートのマシンの機能を呼び出せる仕組みです。

用途

言語を問わない点や速さなどの利点があり、マイクロサービスでの利用に向いています。
gRPC に対応したクラウドサービスもあって便利になってきています。

他に、Web やモバイルのアプリでもサーバとクライアントを繋ぐ API として有用です。
Dart/Flutter 製のアプリでも採用例があるようです。

特徴

  • 速い
  • ペイロードが小さくて転送するデータ量が少ない
  • レイテンシーが小さい
    • 人が読みやすいようにできている JSON のような形式と違って gRPC はコンピュータが扱いやすいバイナリになっているので、性能の低いマシンにも優しいです。
  • 認証、ロードバランシングなどが簡単
  • 様々な言語に対応
  • やり取りする相手側の言語を気にする必要がない
    • IDL(Protocol Buffers)で仕様を記述して各言語で使うコードを出力できます。

gRPC 自体というよりも、gRPC で利用する Protocol Buffers や HTTP/2 の複合的な恩恵が大きいと思います。1

REST API に対するアドバンテージ

Protocol Buffers のドキュメント では下記のように XML と比較されています。

  • よりシンプル
  • 3 ~ 10 倍小さい
    • XML
      • ホワイトスペースを除くと 69 バイト
    • Protocol Buffers
      • バイナリにエンコードされると 28 バイト程
  • 20 ~ 100 倍速い
    • XML
      • パースに 5,000~10,000 ナノ秒かかる
    • Protocol Buffers
      • パースに 100~200 ナノ秒しかかからない
  • 曖昧さが少ない
  • 扱いやすいデータアクセスクラスを生成する

速度やサイズは API において非常に重要ですが、扱いやすさも使う上で大事です。
データアクセスクラスによってレスポンスデータのパース等を自分でしなくて済むのはとても楽です。

XML 以外の形式も含めた一般的な API との比較で、扱いやすさの面では次のようなアドバンテージもあると思います。

  • エンドポイントやHTTPリクエストメソッドに悩まなくて済む
  • エラーの返し方も悩む必要がない
  • クラスや構造体を書くような感覚で記述できるのでやり取りがイメージしやすい
  • 双方向のやり取りも簡単
  • バイナリデータも扱いやすい
  • proto ファイルが仕様書の代わりになる(中身を見ればぱっと仕様が掴める)

Stack Overflow に次々と上がってくる質問を見ていると、REST API でサーバから受け取った JSON 形式のレスポンスをパースするところで悩む人が 非常に多い です。
gRPC ではそのようなパースを自分で行う必要がなくて苦労を減らせます。

対応言語

GitHub の README によれば、次の言語が対応しています。

  • C++
  • C#
  • Dart
  • Go
  • Java
  • Kotlin
  • Node
  • Objective-C
  • PHP
  • Python
  • Ruby
  • WebJS

このうち C++ / C# / Ruby / Python / PHP / NodeJS / Objective-C は C のライブラリを利用しています。2

それ以外の言語(Dart / Go / Java など)は C に頼らないピュアな実装ということだと思います。
Dart、なかなかやりますね。

この他にも Swift などが非公式ながら対応しているようです。

種類

四種類あり、本記事では Unary と Bidirectional streaming を試します。
Bidiraectional(双方向)は「bidi streaming」(バイダイストリーミング)と略した呼び方もあります。

  • Unary RPCs
    • 1 リクエスト、1 レスポンス
  • Server streaming RPCs
    • 1 リクエスト、多レスポンス
  • Client streaming RPCs
    • 多リクエスト、1 レスポンス
  • Bidirectional streaming RPCs
    • 多リクエスト、多レスポンス

ストリーミングは大きなファイルのときに分割して送るのにも使えて便利です。

Protocol Buffers

gRPC では Protocol Buffers というものが用いられています。
それは、構造化データをIDL(インタフェース定義言語)で記述して通信等で使えるようにシリアライズできる仕組みです。
gRPC 専用ではありませんが、gRPC の性能や API の定義のしやすさ等に大きく貢献しているものです。

クライアントとサーバの間で行うやり取りの仕様を書いたプロトコル定義ファイル(.proto)を基にして、そのやり取り処理の実装に使いたい言語のコードを自動生成する流れになります。

インストール

まず、生成に必要となるツールをインストールしていきましょう。

protoc

Protocol Buffers のコンパイラです。
下記ページに従ってインストールします。

OS によってはパッケージマネージャを使ってインストールできます。
それ以外の環境ではバイナリをダウンロードして設定しましょう。

protoc の Dart 用プラグイン

下記ページに従ってインストールします。
dart pub global activate protoc_plugin の方法が楽で良いと思います。

※バージョンが変わったときなどにエラーが出るようになったら再実行してみてください。

補助用のツール

proto ファイルを書くときに間違いなどを指摘してくれる linter を入れておくと便利です。
いくつかあるようですが、protolint がお手軽でした。

VS Code についても調べたところ、プラグインはありましたが protolint の作者によるものではないようでした。

proto ファイル

インストールできましたか?

では、公式のサンプルのうち helloworld の proto ファイル(一部省略)を見てみましょう。
説明のコメントを入れました。

protos/helloworld.proto
syntax = "proto3";

package helloworld;

service Greeter {
  // 送受信に使うメソッドの定義
  // 一つ目の括弧がリクエストの型、二つ目の括弧がレスポンスの型
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// リクエストの型の定義
message HelloRequest {
  string name = 1;
}

// レスポンスの型の定義
message HelloReply {
  string message = 1;
}

= 1 のように指定している数字は、バイナリにエンコードしたときに識別するためのものです。
一つの message の中に複数ある場合には 1 からの連番にするなどユニークな値になるようにする必要があります(詳細は こちら)。

型は stringint32 等の他に repeated(配列)、enum などがあります。
この記事では map も使っています。

Dart 用のコードを出力

proto ファイルで定義したことを Dart のコードとして出力させます。

下記は helloworld/ がカレントディレクトリの場合です。
proto ファイルは helloworld/protos/ に入っています。
出力先である helloworld/lib/src/generated というディレクトリは先に作っておく必要があります。

protoc --dart_out=grpc:lib/src/generated -I protos helloworld.proto

protoc-gen-dart のパスを設定していなくてエラーが出る場合は設定するか、--plugin=protoc-gen-dart=パス のオプションで解決できると思います。

実行すると次の四つのファイルが生成されます。

  • helloworld.pb.dart
  • helloworld.pbenum.dart
  • helloworld.pbgrpc.dart
  • helloworld.pbjson.dart

proto ファイルの分割

import のキーワードを使用して複数のファイルに分けることもできます。

例)service と message を helloworld1.protohelloworld2.proto に分け、helloworld1.protohelloworld2.proto を読み込んで使う。

protos/helloworld1.proto
syntax = "proto3";

package helloworld;

import 'helloworld2.proto';

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
protos/helloworld2.proto
syntax = "proto3";

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

コードを出力するには、対象のすべてのファイルを指定します。

protoc --dart_out=grpc:lib/src/generated -I protos helloworld1.proto helloworld2.proto

機能ごとの service にしておいてファイルを分けるのも良いと思います。

ここまで大丈夫でしょうか?
helloworld はシンプルすぎるのでここまでにして次の例に進みます。
生成されたファイルの使い方はそこで説明しますので大丈夫です。

進む前にもっと細かく見ておきたい方は 公式の解説 をご覧ください。

Unary RPC の例

helloworld をアレンジして、リクエストとレスポンスの中身を少し増やしてみます。

  1. クライアント
    • 名字と名前をサーバに渡す
  2. サーバ
    • 受け取った名字と名前をマップに入れる
    • 現在時刻に応じて挨拶を決める(「Good Morning」か「Hi」)
    • それらと現在時刻の情報をまとめてクライアントに返す
  3. クライアント
    • 受け取ったデータから各情報を取り出し、「Hi, Foo Bar! It's 12:34:56 now.」のように出力する

これを sayHello() というメソッドとして定義します。

Dart で利用する準備

grpc パッケージを使う設定を pubspec.yaml ファイルに記述して dart pub get しておきましょう。
Flutter で利用するなら flutter pub get です。

pubspec.yamlの一部
dependencies:
  grpc: ^3.0.2

.proto ファイル

リクエストに Name、レスポンスに Greeting という独自の型を使うことにしました。

  • Name
    • 名前(文字列)
    • 名字(文字列)
  • Greeting
    • 挨拶メッセージ(文字列)
    • 名前・名字(first と last をキーとする Map)
    • 現在時刻のエポックミリ秒(int64、Dart では package:fixnum の Int64)
protos/greet.proto
syntax = "proto3";

package greet;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message Name {
  string first_name = 1;
  string last_name = 2;
}

message Greeting {
  string message = 1;
  map<string, string> names = 2;
  int64 time = 3;
}

message HelloRequest {
  Name name = 1;
}

message HelloReply {
  Greeting greeting = 1;
}

出力先はフォルダ名を変えて generated ではなく pb にしました(これはご自由に)。

protoc --dart_out=grpc:lib/src/pb -I protos greet.proto

サーバ

出力されたファイルの一つを見ると次のようになっています。

lib/src/pb/greet.pbgrpc.dart
class GreeterClient extends $grpc.Client {
  // 省略
}

abstract class GreeterServiceBase extends $grpc.Service {
  $core.String get $name => 'greet.Greeter';

  GreeterServiceBase() { /* 省略 */ }

  $async.Future<$0.HelloReply> sayHello_Pre(
      $grpc.ServiceCall call, $async.Future<$0.HelloRequest> request) async { /* 省略 */ }

  $async.Future<$0.HelloReply> sayHello(
      $grpc.ServiceCall call, $0.HelloRequest request);  // このメソッドだけ実装がない
}

クライアント用の GreeterClient とサーバ用の GreeterServiceBase というクラスがあります。
この名前は proto ファイルで設定した「Greeter」という service の名称に基づいています。

サーバ用は抽象クラスで、sayHello() はシグネチャの定義のみになっていて実装がありません。
これを自分で実装することになります。
上記の GreeterServiceBase を継承した GreeterService クラスを作ります。

Service クラス

sayHello() を実装します。

lib/server.dart
import 'package:fixnum/fixnum.dart';
import 'package:grpc/grpc.dart';

class GreeterService extends GreeterServiceBase {
  @override
  Future<HelloReply> sayHello(ServiceCall call, HelloRequest request) async {
    print('${request.name.lastName}さんからリクエストがありました。');

    final now = DateTime.now();

    final greeting = Greeting(
      message: now.hour < 12 ? 'Good Morning' : 'Hi',
      names: {
        'first': request.name.firstName,
        'last': request.name.lastName,
      },
      time: Int64(now.millisecondsSinceEpoch),
    );

    return HelloReply(greeting: greeting);
  }
}

引数や戻り値の型は greet.pbgrpc.dart を見るとわかりますが、IDE の助けを借りるのが楽です。
メソッドが未実装なら記述を助けてくれます。3
自動挿入されたコードに async が付いていない場合は自分で付けましょう。

このコードで主にやっているのは

  • 受け取った値(= クライアントから届いた値)を request から取り出す
  • Greeting()message 等に値(クライアントに返したい情報)を設定する
  • それを HelloReply()greeting に入れて return する(= レスポンスを返す)

ということだけです。

このようにメソッドの引数としてリクエストを受け取り、戻り値でレスポンスを返すことができます。
とても楽ですね。

書き方の変更

追記(2021-9-26)

以前は渡したい値をセットするためにはセッターを使う必要がありましたが、引数で指定できるようになりました。4
元の書き方も引き続き可能ですが、本記事では変更後の書き方に更新済みです。

変更前

final greeting = Greeting()..message = 'Hi';
return HelloReply()..greeting = greeting;

変更後

final greeting = Greeting(message: 'Hi');
return HelloReply(greeting: greeting);

main()

ここまでのまとめ:

  • proto ファイルの定義に基づいてファイルを出力
  • 出力ファイル内の抽象クラスに sayHello() の定義が含まれている
  • そのクラスを継承した GreeterService を作り、sayHello() を自分で実装する

それでは、いよいよサーバを動かせる状態にするところになります。
main() はここでは GreeterService と同じファイル内にしておきます(慣れたらアレンジしてください)。

lib/server.dart
Future<void> main() async {
  final server = Server([GreeterService()]);
  await server.serve(port: 50051);
  print('Server listening on port ${server.port}...');
}

GreeterServiceServer() に渡して serve() を実行することでサーバが出来上がります。
Service クラスが複数あればその複数を渡しますが、この例では GreeterService のみです。
ポートはここでは gRPC で通常使われる 50051 にしました。

※TLS 認証など行う場合はもう少しコードが増えます(こちら)。

メタデータを受け取る

この記事のサンプルでは使っていませんが、クライアントからメタデータを受け取ることもできます。
サーバ側での取り出し方は、Service クラスの call という引数を使うだけです。

print(call.clientMetadata);

クライアント側でメタデータを指定しなかった場合は次のようになっています。

{:method: POST, :scheme: http, :path: /greet.Greeter/SayHello, :authority: localhost:50051, grpc-timeout: 10000m, content-type: application/grpc, te: trailers, grpc-accept-encoding: identity, user-agent: dart-grpc/2.0.0}

クライアントからの渡し方は後ほど書きます。

逆にサーバからクライアントにデータを送ることもできます(トレーラーと呼ばれるものです)。

gzip 圧縮

grpc パッケージの v2.9.0 にてサポートされたので追記します(2021/7/10)。

helloworld のサンプルの bin/client.dart と bin/server.dart を見ると使い方がわかりますが、変更履歴 もわかりやすくて、次の方法で設定できると書かれています。

  • サーバ側
    • ServercodecRegistry という引数で設定
  • クライアント側
    • CallOptionsmetadata という引数(後述)に grpc-accept-encoding を追加
    • ChannelOptions(後述)の codecRegistry という引数で設定
    • 外向きの RPC の圧縮は CallOptions(後述)の compression という引数で設定

helloworld のサーバ側でこの設定を行っている 箇所 は下記のようになっています。

final server = Server(
  [GreeterService()],
  const <Interceptor>[],
  CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
);

IdentityCodec というのは ドキュメント では次のとおり説明されています。

The "identity", or "none" codec.

This codec is special in that it can be used to explicitly disable Call compression on a Channel that by default compresses.

デフォルトで有効になっている圧縮を明示的に無効化するのに使えるという説明です。
有効にする GzipCodec と無効にする IdentityCodec の両方を指定することでどちらにも対応したサーバになるということだろうと思われます。

クライアント

greet.pbgrpc.dart にあるクライアント用のクラス(GreeterClient)を使うだけです。
文章にするとわかりにくいのでコードにコメントを入れました。

lib/client.dart
Future<void> main() async {
  // エンドポイントへのチャンネルを用意
  final channel = ClientChannel(
    'localhost',  // サーバの IP アドレス(またはホスト名)を指定すること
    port: 50051,
    options: const ChannelOptions(
      // TLS を無効にする
      credentials: ChannelCredentials.insecure(),
      // ユーザエージェントはここでしか設定できない(動的には不可)
      // 省略すると「dart-grpc/2.0.0」
      userAgent: 'ユーザエージェントの文字列',
    ),
  );

  // クライアントの用意
  // (クライアントスタブと呼ばれる)
  final client = GreeterClient(channel);

  // リクエストデータの用意
  final request = HelloRequest();
  request.name = Name(
    firstName: 'Foo',
    lastName: 'Bar',
  );

  // サーバが起動していないときなどの例外を捕捉
  try {
    // リクエストを sayHello() に渡してサーバに送ると
    // レスポンスが戻り値として返ってくる
    final response = await client.sayHello(request);

    // レスポンスから情報を取り出す
    final greet = response.greeting;

    // 取り出した情報を成形して出力
    final time = DateTime.fromMillisecondsSinceEpoch(greet.time.toInt());
    print(
      '${greet.message}, ${greet.names['first']} ${greet.names['last']}! '
      "It's ${DateFormat.Hms().format(time)} now.",
    );
  } catch (e) {
    print('Caught error: $e');
  }

  // 終わったらチャンネルをシャットダウン
  await channel.shutdown();
}

HelloRequest()SayHello() は proto ファイルで定義したものです。
サーバよりはコード量が多いですが、難しくはないと思います。

さて、このコードでは gRPC の大きなポイントを一つ感じられますが、おわかりでしょうか?

  ↓

response.greetingmessagenamestime は proto ファイルで定義した型になっています。

  • message
    • String
  • names
    • Map
  • time
    • Int64
      • Dart の普通の int に変換するには toInt() するだけ

JSON を扱うときのような型の辛みがありません!

例外処理

gRPC に関連する例外は on GrpcError catch で捕捉できます(上記コードでは省略)。

受け取った情報が e に入っているとすると、エラーコードは e.code で取り出せます。
例えば接続関連エラーは 14、タイムアウトは 4 で、コードに対応する StatusCode.unavailableStatusCode.deadlineExceeded といった定数と比較することで原因を判断できます。

なお残念なことに、試した限りでは例外の発生の仕方やメッセージの文言がプラットフォームによって異なるようでした。
各プラットフォームで確認しながら開発するようにしましょう。

2021/7/10 追記

grpc パッケージの v2.7.0 にて「Support 'details' from the RPC Error Model」という PR の変更が反映されました。

GrpcErrordetails という List 型のプロパティが追加され、それを見ることでエラーの詳細がわかるようになったようです。
List の中身は GeneratedMessage という型で、それを継承した型(BadRequest など)が入ります。

しかし試してみたところ、details 自体が null だったり、null でない場合も List が空だったりして、実際に値が入っている場合にどのような情報が得られるのかを確認できませんでした。
おそらく error_details.proto というファイルで定義されているものが details から取り出せる情報だと思います。

GrpcError には codeName というゲッターも追加されていて(v2.7.0 時点ではプロパティで、その後ゲッターに変更)、そちらは UNIMPLEMENTEDUNAVAILABLE というテキストが得られることを確認しました。

TLS 認証

gRPC では通常は TLS 認証が有効ですが、ローカル環境で試すときなどに無効にできます。
チャンネルを用意する部分の options: ChannelOptions()credentials: ChannelCredentials.insecure() のように指定しているのがそれです。5

無効化しない場合の書き方は こちら に説明があります。

チャンネルの終了

shutdown()terminate() があります。
RPC が残っている場合、shutdown() では終わるまで待ち、terminate() では待たないという違いのようです。

メタデータを送る

トークンなどのデータを付加して送信したい場合にはメタデータを使うと便利です。
クライアントスタブを用意するところとリクエストを送るところの二箇所で CallOptions を使って設定できます。

クライアントスタブのほうで設定する例を見てみましょう。
メタデータは下記のように Map で渡します。

GreeterClient(
  channel,
  options: CallOptions(
    metadata: {'キー1': '値1', 'キー2': '値2'},
    timeout: Duration(seconds: 10),
  ),
)

クライアントに設定したオプションは、そのクライアントによる毎リクエストに適用されます。
特定のリクエストのときだけ渡したい場合にはリクエスト時にオプション設定しましょう。

なお、両方で設定すると両方のメタデータが送られ、同じキーの項目は送信時のほうの値で上書きされるようです。

ついでですが、上のコードにはタイムアウトの設定も含めました。
指定した Duration の間にレスポンスがなければ例外が発生します。
タイムアウトの詳細は後ろのほうの「タイムアウトを設定しにくい」をご覧ください。

トレーラーを受け取る

ついでに、クライアント側でトレーラーを受け取る方法も見ておきましょう。
(サーバ側から送る方法は本記事では省略しています。)

トレーラーは Map<String, String> 型になっています。
例えば、サーバ側で token というキーで何らかのトークンを設定していた場合、クライアント側では次のようにして取り出すことができます。

final call = sayHello(request);
final response = await call;
final trailers = await call.trailers;
print(trailers['token']);

gzip 圧縮

先ほどサーバの説明のところにも書いたクライアント側の設定方法を再掲します。

  • CallOptionsmetadatagrpc-accept-encoding を追加
  • ChannelOptionscodecRegistry という引数で設定
  • 外向きの RPC の圧縮は CallOptionscompression という引数で設定

メタデータによる設定では Map に String 型で記述するときに間違えれば機能しないので、他の二つの方法のほうが安全だと思います。

helloworld のサンプルのクライアントで ChannelOptions によって設定している 箇所 は下記のようになっています。

final channel = ClientChannel(
  'localhost',
  port: 50051,
  options: ChannelOptions(
    credentials: ChannelCredentials.insecure(),
    codecRegistry: CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
  ),
);

これはサーバ側で Server の第三引数を使って設定する方法に似ていますね。

次に RPC のメソッドで設定している 箇所 です。

final response = await stub.sayHello(
  HelloRequest(name: name),
  options: CallOptions(compression: const GzipCodec()),
);

先ほどの ClientChennel での設定で圧縮/非圧縮の両方を指定していたのは、おそらくレスポンスを受けるときの設定としてサーバ側と同様に両方に対応させておくためだと思いますが、RPC 時は送信するだけなので圧縮設定のみです。

ClientChannel でだけ設定して RPC 時に省略しても勝手に圧縮して送信してくれるのかどうかは未検証です。
サンプルで両方の設定が行われているので、両方とも必要なのかもしれません。

実行

unary_greet.gif

あまり面白い例ではなかったかと思いますが、これを理解すればアプリとバックエンドのやり取り等に使えるはずです。

※ファイアウォールが邪魔することがあるかもしれませんので、繋がらないときは設定を確認してみてください。

Bidirectional streaming RPC の例

双方向のストリーミングも使ってみましょう。

ストリーミングとは、例えばサーバ側なら一つのリクエストに対して複数のレスポンスを続けて返せる機能のことで、それをクライアントとサーバの両方で行えるのが Bidirectional streaming です。

単方向のストリーミングは本記事では割愛しますが、Unary と Bidirectional streaming を理解すれば単方向も同時に理解したも同然だと思います。

  1. クライアント
    • 数値を入力
  2. サーバ
    • 受け取った数値の個数・合計・平均の情報を 100 ms 間隔で返す
  3. クライアント
    • 受け取った情報をそのまま出力する
    • 1 に戻る

proto ファイル

先ほどよりシンプルにしました。

protos/calc.proto
syntax = "proto3";

package calc;

service Calc {
  rpc Calculate (stream CalcRequest) returns (stream CalcResponse);
}

message CalcRequest {
  int64 number = 1;
}

message CalcResponse {
  string message = 1;
}

リクエストとレスポンスのそれぞれの型のところに stream を付けているのが注目ポイントです。
これによってストリーミングになります。
リクエストのほうにだけ付ければ Client streaming RPCs です。

サーバ

lib/server.dart
class CalcService extends CalcServiceBase {
  // async* が必要です
  @override
  Stream<CalcResponse> calculate(ServiceCall call, Stream<CalcRequest> request) async* {
    final numbers = <int>[];

    await for (final req in request) {
      print('Received: ${req.number}');

      numbers.add(req.number.toInt());
      final len = numbers.length;
      final sum = numbers.reduce((a, b) => a + b);
      final avg = sum / len;

      yield CalcResponse(message: '$numbers');
      await _wait();
      yield CalcResponse(message: '個数: $len');
      await _wait();
      yield CalcResponse(message: '合計: $sum');
      await _wait();
      yield CalcResponse(message: '平均: $avg');
    }
  }

  Future<void> _wait() async {
    return await Future.delayed(const Duration(milliseconds: 100));
  }
}

双方向なので第二引数(リクエスト)と戻り値(レスポンス)が Stream 型になっています。
Protocol Buffers の stream という言葉が Dart の用語と一致していて違和感がないですね。

この Service では受け取った値を List に入れ、これまで受け取った値の個数等の情報を出力しています。
一回のリクエストにつき四回のレスポンスをしていることになります。6

クライアント

lib/client.dart
Future<void> main() async {
  final channel = ClientChannel(/* 省略 */);
  final client = CalcClient(channel);

  // サーバには Stream を渡す
  final responses = client.calculate(requestStream());

  try {
    // レスポンスの Stream を await for で待ち受ける
    await for (final res in responses) {
      print(res.message);
    }
  } catch (e) {
    print(e);
    await channel.shutdown();
    exit(0);
  }
}

// async* と yield を使ってリクエストの Stream を作る
Stream<CalcRequest> requestStream() async* {
  while (true) {
    // 標準入力を行ごとに処理
    final lines = stdin.transform(utf8.decoder).transform(const LineSplitter());

    await for (final line in lines) {
      final num = int.tryParse(line);
      if (num == null) {
        print('数値を入力してください。');
      } else {
        yield CalcRequest(number: Int64(num));
      }
    }
  }
}

クライアントも Streaming です。
サーバ側のみ streaming にしてもいいですが、クライアントも streaming にすることで、Stream に値を add するだけで次々とリクエストできるわけです。
毎回 client.calculate() を実行しなくて済むということです。

やはり Dart には Stream があるので親和性が高いですね。

実行

grpc_bidirectional.gif

Web での使用

Web 向けには gRPC-Web が使われるため、少しだけ異なります。

gRPC-Web は JavaScript のライブラリですが、自分で JavaScript を混ぜて書くわけではありません。
grpc-dart のパッケージがうまくやってくれます。

サーバ

unary_greet と同じ proto ファイルを使ったサンプルにしましたので、その server.dart をそのまま使います。

ただし、構成が変わります。
ちょっと面倒ですが、サーバの手前に gRPC に対応したプロキシが必要です。

gRPC-Web の README によれば三種類のプロキシがあります。

クライアント

まず import が異なります。

// Web でない場合
import 'package:grpc/grpc.dart';

// Web の場合
import 'package:grpc/grpc_web.dart';

もう一つは Channel を用意するところです。

// Web でない場合
final channel = ClientChannel('localhost', port: 50051, options: ...);

// Web の場合
final channel = GrpcWebClientChannel.xhr(Uri.parse('http://localhost:50051'));

これだけです。
内部で勝手に切り替えてくれればいいのにと思うくらい少ない違いです。

GrpcWebClientChannel には名前の付かないデフォルト引数は無いようです。
引数が一つしかないのでオプションを指定できません。
これも違いですね。

実行

プロキシにはここでは Envoy を使います。
サンプルでは proxy_example というフォルダに設定ファイルを置いています。

設定ファイルのフォルダで下記コマンドを実行してプロキシを起動します。
ホストマシンの 50051 番ポートへのアクセスが greet_proxy という Docker コンテナの 8080 番ポートに渡され、そのプロキシによって 9090 番ポート(サーバが listen するポート)へ転送されます。

$ docker build -t greet/envoy -f ./envoy.Dockerfile .
$ docker run -d -p 50051:8080 --name greet_proxy greet/envoy

unary_greet のフォルダに移動し、サーバを起動して 9090 番ポートで listen させます。

$ dart lib/server.dart -p 9090

これでサーバ側の用意ができたので、次にクライアント側です。
web サンプルのルートフォルダで次のコマンドでクライアント側の開発用サーバを起動します。

$ webdev serve

起動したらブラウザで http://localhost:8080/ にアクセスします。
問題がなければ GIF アニメのような動作になります。

web.gif

なお、下のコマンドを使えばデプロイ用の一式(変換された JavaScript コードなど)が build/ に生成されます。

$ webdev build

制限

gRPC-Web では今のところ Client streaming と Bidirectional streaming ができません。
Bidirectional streaming を行おうとすると、サーバからの Stream は問題ありませんが、クライアントからの二回目以降の送信ではブラウザのコンソールにエラーが出力されます。
Server streaming 相当の動作にしかならないということだと思われます。

四種類のうち半分しか使えないのは残念ですが、シンプルな API には Unary で十分です。
Server streaming も何か面白い使い方ができそうです。

なお、ロードマップ には含まれていて、今後の用途の広がりが期待できます。

不便なところ

タイムアウトを設定しにくい

タイムアウトが思ったようになりませんでした。
機能が不十分なのか、特定のバージョンの現象なのか、書き方が悪いのかわかりません。
本記事を書いている時点で使った grpc-dart のバージョンは v2.1.3 です。

タイムアウト関連の設定を行うのは次の箇所です。
他に気づいていない箇所があるかもしれません。

  • ClientChannel() に渡す ChannelOptions
  • PianoClient() に渡す CallOptions
  • 自分で作った RPC のメソッドに渡す CallOptions

ChannelOptions には関連するオプションとして次の三つがあります。

  • connectionTimeout
    • ドキュメントには「The maximum time a single connection will be used for new requests.」と書かれています。
    • デフォルトは 50 分で、設定すると値は変わりましたがタイムアウトしませんでした。
    • Google の gRPC のエンドポイントは 1 時間で閉じてしまうのでタイムアウトを 50 分にすることで RPC の失敗を防いでいる7 ようです。自分でいじらないほうが良いものなのかもしれません。
  • idleTimeout
    • デフォルトは 5 分で、設定すると値は変わりましたがタイムアウトしませんでした。
    • 文字通り Idle 状態が続いたときのものと思われますが、短い秒数を設定して何も操作しないで待っても変化がありませんでした。
  • backoffStrategy
    • backoff なので(タイムアウト時の?)リトライ方法を定義するものと思われます。
    • defaultBackoffStrategy() のコードと、同じファイルにURLが書かれている 説明ページ を見るとどういうものかわかると思います。
    • これが働いていることで connectionTimeout などが機能していないように見えるのでしょうか…(不明)。

CallOptions のほうの関連設定は一つだけです。

  • timeout
    • これは効きますが、やり取りの最中であっても指定した Duration が経過すると切れてしまいます。
    • 接続しようとする際のタイムアウト(接続を試みてから接続完了するまでの待機時間)の設定には使えません。

やり取りの途中で切れてしまうなら Unary RPCs 以外では設定しにくいなと思いました。
逆に言えば、クライアントから一回ずつリクエストする単純な API の用途では不都合はありません。

接続状態の確認が難しい

接続しようとする IP アドレスによっては、サーバが起動していなくてもすぐにエラーにならずにしばらく待たされることがあります。
その間、ユーザにとっては既に接続済みなのか待ちなのかがわかりません。
そこで接続状態に合わせた表示をしたいと思ったのですが、状態を取得するのが難しそうでした。

このあたりのファイルでは接続状態に応じた処理が行われているようです。
しかし、ユーザが触れない場所にあるコードが使われています。
簡単に確認できるようになるといいですが、それを求めている人が少なければ改善されないかもしれません。

ブロードキャストできない(しにくい)

Bidirectional streaming RPCs の例としてチャットを最初に考えていた のですが、Dart ではうまくできませんでした

Go ではサーバ側のメソッド(先ほどの例での calculate() に相当するもの)で受け取ったデータ(オブジェクトのようなもの)が送信用のメソッドを持っているので、そのオブジェクトを複数クライアント分保管しておけば一斉に送信できます。

それに対し、Dart では受け取ったものが Stream であって送信できません。
RxDart を駆使してどうにかできないかと思いましたが、私にはできませんでした。
無理ではないのかもしれません。

このように言語によって作りが大きく異なるのが残念です。

追記(2019/12/20)

実現できたので、リポジトリに チャットのサンプル を追加しました。
意外とシンプルなコードになりましたが、Go でやるより難しいです。8

grpc_dart_chat.gif

リフレクションが非対応

リフレクションを利用できれば gRPC のクライアントツールを使えてサーバの確認などがしやすいのですが、対応しているのは Java / Go / C++ / C# / Python のみでした。9

Dart & gRPC の情報が少なめ

チュートリアルは他の言語と大差ありませんが、API ドキュメントには説明が不足しています。
例えば、先ほど書いた idleTimeout などは現時点で説明が書かれていません。
ユーザが触れるようになっているのに説明がないのはちょっと不親切だし、使う上で困ります。

また、利用者による情報が Stack Overflow 等にほとんどありませんでした。

不明なところ

テストの仕方

HTTP 通信用のモックのライブラリがあるように grpc-dart 用のものがないとテストしにくいです。
どのようなライブラリがあるのか不明です。
ちょっと調べた感じではまだ無さそうでした…。

おわりに

不便な面も紹介しましたが、ややこしい使い方をしなければ使えるなという印象でした。
Dart の人気が上がってきているので、改善が今後加速するかもしれません。

簡単に使えますので、気になる方は試してみてください。


追記

2019/12/13
gRPC を使ってアプリを作った話も書きました。
遠隔演奏できるピアノアプリをFlutterとgRPCで作った

2020/1/8
定義をアップデートするときの注意点についてまとめました。
.protoファイルのアップデートにおける注意点(protobuf) - のんびり精進

2021/7/10
grpc パッケージはこの記事を書いたときと比べてかなり進化してきています。
gzip 圧縮の機能が追加されたことやエラー情報を取得できるようになったことを追記しましたが、他にクライアント側のインターセプタや UNIX ドメインソケットへの対応などの改善が加えられて便利になっています。
UNIX ドメインソケットが利用できるならサーバサイドでの Dart の活用が捗りそうだなと思ったりします。

  1. HTTP/2 には HTTP ヘッダ圧縮、ストリームによる並行したやり取りなどの利点があります(参考: 普及が進む「HTTP/2」の仕組みとメリットとは | さくらのナレッジ)。

  2. https://github.com/grpc/grpc/tree/master/src/core - "This directory contains source code for C library (a.k.a the gRPC C core) that provides all gRPC's core functionality through a low level API. Libraries in other languages in this repository (C++, C#, Ruby, Python, PHP, NodeJS, Objective-C) are layered on top of this library."

  3. Android Studio の場合、クラス名のところに警告が出るのでそこにカーソルを合わせてから Windows では Alt + Enter を押すと「Create 1 missing override」の選択肢が出てきます。

  4. CHANGELOG を見てもどのバージョンで変わったのかわかりませんでした。grpc パッケージではなく protoc_pluginprotobuf の変更ではないかと思います。protobufgrpcprotoc_plugin の両方が依存しているパッケージです。

  5. 省略すると ChannelCredentials.secure() です。

  6. await Future.delayed() を使っているのは、複数のレスポンスが返ることが視覚的にわかりやすいようにするためのものです。実際のサーバではその間ブロックしてしまうので良くないと思います。

  7. https://github.com/grpc/grpc-dart/blob/ae17e712e4f5431eb6664f0589899c77462e5475/lib/src/client/options.dart - "It seems like Google's gRPC endpoints will forcefully close the connection after precisely 1 hour. So we proactively refresh our connection after 50 minutes. This will avoid one failed RPC call."

  8. クライアントごとに StreamController を用意して Map に入れておき、リクエストを受け取ったときにそのリクエスト(Stream)を各クライアントの StreamController に sink.add() して、そのデータを受け取った側でそのクライアント分のレスポンスを返すというやり方です。

  9. grpc/server-reflection.md at master · grpc/grpc · GitHub

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
49