19
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

NestJSで始める本気のgRPC

Last updated at Posted at 2020-12-30

gRPCが:fire:だがNestJSでは修羅の道感が否めない

CNCFのIncubating Projectにも選出されている次世代RPCのgRPCに手を出したいけど、まだやってないという方は多いのではないのでしょうか。

gRPC界隈でのJavaScriptのサンプルが世の中に少なすぎる。ググっても出てくるのはGoばかり。せめてもの救いはNestJSのドキュメントのgRPCのページ公式サンプルですが、どうにもこうにもシンプル過ぎる:sob:

これらを穴が空くほど見つめていましたが、

  • どうやって.protoからコード生成して使うの:question:
  • どうやってリクエスト送るの:question:
  • StreamingはObservableとStreamの2通りの書き方があるみたいだけど、実際どう書くの:question:
  • E2Eテストどうやって書くの:question:

というような疑問が生まれました。今回はこんな疑問をNestJSの公式サンプルを拡張しながら解消していきます。

今回書かないこと

今回はgRPCとNestJSが何者なのかは書きません。既にいい感じにまとめて下さっている方の記事を参考にして下さい。

gRPC

NestJS

それでは本気になる:fire:

完成したものはこちらに置いてあるので、適宜参照して下さい。
https://github.com/mizozobu/nestjs-grpc-extended

.protoからTypeScriptのInterfaceを生成する:fire:

NestJSのgRPCドキュメントNestJS公式のgRPCサンプルを眺めていると、.protoを読み込んでいるものの、.protoからInterfaceやClassを生成してません。これではgRPCの良さが半減してしまいます。

TypeScript用のProtocol BufferコンパイラはNestJSに対応しているts-protoを使ってTypeScriptのInterfaceを生成してみます。こちらの記事「NestJSでgRPC API作るならコード生成はts-protoで決まり」を参考にしています。

まず始めにprotocをインストールします。

brew install protobuf

次にts-protoをインストールします。

yarn add -D ts-proto 

準備ができたので、hero.protoからTypeScriptのInterfaceを生成します。

protoc --plugin=$(yarn bin)/protoc-gen-ts_proto \
  --ts_proto_out=src \
  --ts_proto_opt=nestJs=true \
  --ts_proto_opt=outputClientImple=false \
  --ts_proto_opt=addGrpcMetadata=true \
  -Isrc \
  src/hero/hero.proto
生成されたファイルの内容はこちら
hero.ts
/* eslint-disable */
import { Metadata } from 'grpc';
import { Observable } from 'rxjs';
import { GrpcMethod, GrpcStreamMethod } from '@nestjs/microservices';


export interface HeroById {
  id: number;
}

export interface Hero {
  id: number;
  name: string;
}

export interface HeroServiceClient {

  findOne(request: HeroById, metadata?: Metadata): Observable<Hero>;

  findMany(request: Observable<HeroById>, metadata?: Metadata): Observable<Hero>;

}

export interface HeroServiceController {

  findOne(request: HeroById, metadata?: Metadata): Promise<Hero> | Observable<Hero> | Hero;

  findMany(request: Observable<HeroById>, metadata?: Metadata): Observable<Hero>;

}

export function HeroServiceControllerMethods() {
  return function (constructor: Function) {
    const grpcMethods: string[] = ['findOne'];
    for (const method of grpcMethods) {
      const descriptor: any = Reflect.getOwnPropertyDescriptor(constructor.prototype, method);
      GrpcMethod('HeroService', method)(constructor.prototype[method], method, descriptor);
    }
    const grpcStreamMethods: string[] = ['findMany'];
    for (const method of grpcStreamMethods) {
      const descriptor: any = Reflect.getOwnPropertyDescriptor(constructor.prototype, method);
      GrpcStreamMethod('HeroService', method)(constructor.prototype[method], method, descriptor);
    }
  }
}

export const protobufPackage = 'hero'

export const HERO_PACKAGE_NAME = 'hero'
export const HERO_SERVICE_NAME = 'HeroService';

このように.protoからコードを生成することでAPIの仕様を共有する仕組みになっています。
生成されたInterfaceとDecoratorを使ってhero.controller.tsを書き直します。

hero.controller.ts
import { Controller } from '@nestjs/common';
import { Observable, Subject } from 'rxjs';
import { Hero, HeroById, HeroServiceController, HeroServiceControllerMethods } from './hero';

@HeroServiceControllerMethods()
@Controller('hero')
export class HeroController implements HeroServiceController {
  private readonly items: Hero[] = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  findOne(data: HeroById): Hero {
    return this.items.find(({ id }) => id === data.id);
  }

  findMany(data$: Observable<HeroById>): Observable<Hero> {
    const hero$ = new Subject<Hero>();

    const onNext = (heroById: HeroById) => {
      const item = this.items.find(({ id }) => id === heroById.id);
      hero$.next(item);
    };
    const onComplete = () => hero$.complete();
    data$.subscribe(onNext, null, onComplete);

    return hero$.asObservable();
  }
}

リクエストを送る:fire:

gRPCのcurlという位置づけのgrpcurlを使って、リクエストを送ってみましょう。

まず、grpcurlをインストールします。

brew install grpcurl

NestJSのアプリを起動します。

yarn start:dev

ではリクエストを送ります。

grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/FindOne <<EOM
{
  "id": 1
}
EOM

レスポンスが返ってくることが確認できます。

{
  "id": 1,
  "name": "John"
}

[備考]
PostmanのようなGUIが良い方はBloomRPCを使ってみて下さい。

Streamingを書く:fire:

Streamingとは?という方はこの記事「gRPC / MagicOnion 入門 (2) - 4 種類の通信方式」をおすすめします。
NestJSではStreamingの書き方は2通りあります。

gRPCの3種類のStreamingを各パターンで対応可能かまとめました。私はServer StreamingをCall Stream Handlerパターンで書こうと四苦八苦した結果、ギブアップしました。書き方が分かる方がいれば教えて欲しいです。

Subject Strategyパターン Call Stream Handlerパターン
Client Streaming
Server Streaming
Bidirectional Streaming

Call Stream Handlerパターンの方がシンプルだと思いますが、RxJSの汎用性とts-protoの恩恵を考えるとSubject Strategyパターンが優勢というところでしょうか。

では3つのStreamingを用意するために、hero.protoを編集します。ついでにFindOneの名前も変えておきます。

hero.proto
service HeroService {
- rpc FindOne (HeroById) returns (Hero);
+ rpc UnaryCall (HeroById) returns (Hero); // Unary Call
+ rpc ClientStreamExample (stream HeroById) returns (Hero); // Client Streaming
+ rpc ServerStreamExample (HeroById) returns (stream Hero); // Server Streaming
- rpc FindMany (stream HeroById) returns (stream Hero);
+ rpc BidirectionalStreamExample (stream HeroById) returns (stream Hero); // Bidirectional Streaming
}

そうしたら.protoからTypeScriptのInterfaceを生成するでやったようにhero.tsを生成しましょう。
それでは3つのStreamingを書いていきます。

Subject Strategyパターン

こちらに完成版があります。
https://github.com/mizozobu/nestjs-grpc-extended/blob/master/src/hero/hero.controller.ts

Client Streaming

hero.controller.ts
// Client Streaming (Observable)
clientStreamAsObservable(data$: Observable<HeroById>): Observable<Hero> {
  const hero$ = new Subject<Hero>();

  const onNext = (heroById: HeroById) => {
    console.log('HeroService.ClientStreamAsObservable received %o', heroById);
    const item = this.items.find(({ id }) => id === heroById.id);
    hero$.next(item);
  };
  const onComplete = () => {
    hero$.complete()
    console.log('HeroService.ClientStreamAsObservable completed');
  };
  data$.subscribe({
    next: onNext,
    error: null,
    complete: onComplete
  });

  return hero$.asObservable();
};
Client Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/ClientStreamAsObservable
{
  "id": 1
}
{
  "id": 2
}

Server Streaming

hero.controller.ts
// Server Streaming (Observable)
serverStreamAsObservable(data: HeroById): Observable<Hero> {
  const subject = new Subject<Hero>();
  console.log('HeroService.ServerStreamAsObservable received %o', data);

  const onNext = (item: Hero): void => {
    console.log('HeroService.ServerStreamAsObservable responses %o', item);
  };
  const onComplete = (): void => {
    console.log('HeroService.ServerStreamAsObservable completed');
  };
  subject.subscribe({
    next: onNext,
    error: null,
    complete: onComplete
  });

  let i = 0;
  setInterval(() => {
    if (i >= this.items.length) {
      subject.complete();
    }
    else {
      const item = this.items[i];
      subject.next(item);
      i += 1;
    }
  }, 1000);

  return subject.asObservable();
}
Server Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/ServerStreamAsObservable <<EOM
{
  "id": 1
}
EOM

// response
// 1秒後
{
  "id": 1,
  "name": "John"
}

// 2秒後
{
  "id": 2,
  "name": "Doe"
}

Bidirectional Streaming

hero.controller.ts
// Bidirectional Streaming (Observable)
bidirectionalStreamAsObservable(data$: Observable<HeroById>): Observable<Hero> {
  const hero$ = new Subject<Hero>();

  const onNext = (heroById: HeroById) => {
    console.log('HeroService.BidirectionalStreamAsObservable received %o', heroById);
    const item = this.items.find(({ id }) => id === heroById.id);
    console.log('HeroService.BidirectionalStreamAsObservable responses %o', item);
    hero$.next(item);
  };
  const onComplete = (): void => {
    console.log('HeroService.BidirectionalStreamAsObservable completed');
  };
  data$.subscribe({
    next: onNext,
    error: null,
    complete: onComplete
  });

  return hero$.asObservable();
}
Bidirectional Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/BidirectionalStreamAsObservable
{
  "id": 1
}

// response
{
  "id": 1,
  "name": "John"
}
{
  "id": 2
}

// response
{
  "id": 2,
  "name": "Doe"
}

Call Stream Handlerパターン

こちらのIssueで議論されていましたが、ts-protoはStreamパターンをサポートしていないようです。 @HeroServiceControllerMethods()の恩恵を受けることができないため、@GrpcMethod()@GrpcStreamMethod()@GrpcStreamCall()を各メソッドに付与する必要があることに注意して下さい。

こちらに完成版があります。
https://github.com/mizozobu/nestjs-grpc-extended/blob/master/src/villan/villan.controller.ts

Client Streaming

hero.controller.ts
// Client Streaming (Stream)
@GrpcStreamCall('VillanService')
clientStreamAsStream(
  stream: ServerReadableStream<VillanById>,
  callback: (err: unknown, res: Villan) => void,
): void {
  stream.on('data', (villanById: VillanById) => {
    console.log('VillanService.ClientStreamAsStream received %o', villanById);
  });

  stream.on('end', () => {
    console.log('VillanService.ClientStreamAsStream completed');
    callback(null, this.items[this.items.length - 1]);
  });
};
Client Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/villan/villan.proto \
-import-path ./src/villan \
-d @ localhost:5000 villan.VillanService/ClientStreamAsStream
{
  "id": 1
}
{
  "id": 2
}

Server Streaming

hero.controller.ts
// Server Streaming (Stream)
// なし

Bidirectional Streaming

hero.controller.ts
// Bidirectional Streaming (Stream)
@GrpcStreamCall('VillanService')
bidirectionalStreamAsStream(
  stream: ServerDuplexStream<VillanById, Villan>,
): void {
  stream.on('data', (villanById: VillanById) => {
    console.log('VillanService.BidirectionalStreamAsStream received %o', villanById);
    const item = this.items.find(({ id }) => id === villanById.id);
    console.log('VillanService.BidirectionalStreamAsStream responses %o', item);
    stream.write(item);
  });

  stream.on('end', () => console.log('VillanService.BidirectionalStreamAsStream ended'));
}
Bidirectional Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/villan/villan.proto \
-import-path ./src/villan \
-d @ localhost:5000 villan.VillanService/BidirectionalStreamAsStream
{
  "id": 1
}

// response
{
  "id": 1,
  "name": "John"
}
{
  "id": 2
}

// response
{
  "id": 2,
  "name": "Doe"
}

ここまで来ると簡単なチャットアプリくらいなら余裕で作れそうな気がしてきました。

E2Eテストを書く:fire:

UTはJestさえ分かっていればRESTのときと同じようmockを使って書けるはずなので、ここでは省略します。

E2Eテストですが、RESTのときはsupertestを使っていましたがgRPCでは使えません。代わりに@grpc/grpc-jsを使ってgRPCクライアントを作ります。Server Streamingの例が無かったりと少し物足りないですが、実はNestJSのリポジトリにもE2Eテストの例が隠されています。

こちらを参考にしつつ書きました。

セットアップ

hero.e2e-spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import * as ProtoLoader from '@grpc/proto-loader';
import { loadPackageDefinition, credentials } from '@grpc/grpc-js';
import {
  ServiceClient,
  ServiceClientConstructor,
} from '@grpc/grpc-js/build/src/make-client';
import { resolve } from 'path';
import { HeroModule } from '../src/hero/hero.module';
import { Hero, HeroById } from '../src/hero/hero';

describe('HeroController (E2E)', () => {
  let module: TestingModule;
  let app: INestMicroservice;
  let client: ServiceClient;

  beforeAll(async () => {
    module = await Test.createTestingModule({
      imports: [HeroModule],
    }).compile();

    const url = 'localhost:5000';
    app = module.createNestMicroservice<MicroserviceOptions>({
      transport: Transport.GRPC,
      options: {
        url,
        package: ['hero'],
        protoPath: [
          resolve(__dirname, '../src/hero/hero.proto'),
        ],
      },
    });
    await app.listenAsync()

    const proto = await ProtoLoader.load(resolve(__dirname, '../src/hero/hero.proto'));
    const protoGrpc = loadPackageDefinition(proto) as {
      hero: {
        HeroService: ServiceClientConstructor;
      };
    };
    client = new protoGrpc.hero.HeroService(
      url,
      credentials.createInsecure(),
    );
  });

  afterAll(async () => {
    await app.close();
    client.close();
  });
  
  ...
});

Unary Call

hero.e2e-spec.ts
it('unaryCall', async () => {
  await new Promise<void>((resolve) => {
    const payload: HeroById = { id: 1 };

    client.unaryCall(payload, (err: Error, response: Hero) => {
      expect(err).toBeNull();
      expect(response).toEqual({ id: 1, name: 'John' });
      resolve();
    });
  });
});

Client Streaming

hero.e2e-spec.ts
it('clientStreamAsObservable', async () => {
  const callHandler = client.clientStreamAsObservable((err: Error, res: Hero) => {
    if (err && String(err).toLowerCase().indexOf('cancelled') === -1) {
      fail('gRPC Stream error happened, error: ' + err);
    }
    expect(res).toEqual({ id: 2, name: 'Doe' });
  });

  return new Promise<void>((resolve, reject) => {
    callHandler.write({ id: 1 });
    callHandler.write({ id: 2 });
    callHandler.end();
    setTimeout(() => resolve(), 1000);
  });
});

Server Streaming

hero.e2e-spec.ts
it('serverStreamAsObservable', async () => {
  const callHandler = client.serverStreamAsObservable({ id: 1 });

  let n = 0;
  callHandler.on('data', (msg: Hero) => {
    if(n === 0) expect(msg).toEqual({ id: 1, name: 'John' });
    else if(n === 1) expect(msg).toEqual({ id: 2, name: 'Doe' });
    else fail(`received unexpected message: ${msg}`);
    n++;
  });

  callHandler.on('error', (err: Error) => {
    if (String(err).toLowerCase().indexOf('cancelled') === -1) {
      fail('gRPC Stream error happened, error: ' + err);
    }
  });

  await new Promise<void>((resolve, reject) => {
    setTimeout(() => resolve(), 3000);
  });
});

Bidirectional Streaming

hero.e2e-spec.ts
it('bidirectionalStreamAsObservable', async () => {
  const callHandler = client.bidirectionalStreamAsObservable();
  const payloads = [
    { id: 1 },
    { id: 2 },
  ];
  const responses = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  let n = 0;
  callHandler.on('data', (msg: Hero) => {
    if(n === 0) expect(msg).toEqual({ id: 1, name: 'John' });
    else if(n === 1) expect(msg).toEqual({ id: 2, name: 'Doe' });
    else fail(`received unexpected message: ${msg}`);
    n++;

    if(n === responses.length) callHandler.cancel();
  });

  callHandler.on('error', (err: Error) => {
    if (String(err).toLowerCase().indexOf('cancelled') === -1) {
      fail('gRPC Stream error happened, error: ' + err);
    }
  });

  await new Promise<void>((resolve, reject) => {
    payloads.forEach(payload => callHandler.write(payload));
    setTimeout(() => resolve(), 1000);
  });
});

さらなる高みへ:sunglasses:

.protoからTypeScriptのInterfaceを生成したように、SwiftやKotlinなどでもコードの生成できます。クライアント側でも.protoから生成したコードを使うことで、クライアントとサーバーでAPIの仕様を統一できます(というかそういう思想でgRPCは作られています)。

ReactVueでもgRPCを使いたいところですが、残念ながらまだWebはgRPCに対応していません。どうしてもWebからgRPCのAPIを呼び出したい場合はNestJS公式のサンプルのようにRESTとgRPCのHybrid Appにするか、EnvoyなどでProxyを噛ませる必要があります。こちらの記事「gRPC WebがGAになったのでそろそろ理解してみる」が参考になると思います。

まとめ

ここまで来ればNestJS + gRPCでアプリが書けるような気がしてきました。「まだRESTなの?時代はgRPCじゃないの:sunglasses:」とマウント取れそうですね。次のプロジェクトにgRPCはいかがでしょうか?

2021年はQiita Advent CalendarにgRPCが欲しい(誰が作って)。

19
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?