gRPCがだがNestJSでは修羅の道感が否めない
CNCFのIncubating Projectにも選出されている次世代RPCのgRPCに手を出したいけど、まだやってないという方は多いのではないのでしょうか。
gRPC界隈でのJavaScriptのサンプルが世の中に少なすぎる。ググっても出てくるのはGoばかり。せめてもの救いはNestJSのドキュメントのgRPCのページと公式サンプルですが、どうにもこうにもシンプル過ぎる
これらを穴が空くほど見つめていましたが、
- どうやって
.proto
からコード生成して使うの - どうやってリクエスト送るの
- StreamingはObservableとStreamの2通りの書き方があるみたいだけど、実際どう書くの
- E2Eテストどうやって書くの
というような疑問が生まれました。今回はこんな疑問をNestJSの公式サンプルを拡張しながら解消していきます。
今回書かないこと
今回はgRPCとNestJSが何者なのかは書きません。既にいい感じにまとめて下さっている方の記事を参考にして下さい。
gRPC
NestJS
それでは本気になる
完成したものはこちらに置いてあるので、適宜参照して下さい。
https://github.com/mizozobu/nestjs-grpc-extended
.protoからTypeScriptのInterfaceを生成する
NestJSのgRPCドキュメントやNestJS公式のgRPCサンプルを眺めていると、.proto
を読み込んでいるものの、.proto
からInterfaceやClassを生成してません。これではgRPCの良さが半減してしまいます。
TypeScript用のProtocol BufferコンパイラはNestJSに対応しているts-protoを使ってTypeScriptのInterfaceを生成してみます。こちらの記事「NestJSでgRPC API作るならコード生成はts-protoで決まり」を参考にしています。
まず始めにprotocをインストールします。
brew install protobuf
次にts-protoをインストールします。
yarn add -D ts-proto
準備ができたので、hero.proto
からTypeScriptのInterfaceを生成します。
protoc --plugin=$(yarn bin)/protoc-gen-ts_proto \
--ts_proto_out=src \
--ts_proto_opt=nestJs=true \
--ts_proto_opt=outputClientImple=false \
--ts_proto_opt=addGrpcMetadata=true \
-Isrc \
src/hero/hero.proto
生成されたファイルの内容はこちら
/* eslint-disable */
import { Metadata } from 'grpc';
import { Observable } from 'rxjs';
import { GrpcMethod, GrpcStreamMethod } from '@nestjs/microservices';
export interface HeroById {
id: number;
}
export interface Hero {
id: number;
name: string;
}
export interface HeroServiceClient {
findOne(request: HeroById, metadata?: Metadata): Observable<Hero>;
findMany(request: Observable<HeroById>, metadata?: Metadata): Observable<Hero>;
}
export interface HeroServiceController {
findOne(request: HeroById, metadata?: Metadata): Promise<Hero> | Observable<Hero> | Hero;
findMany(request: Observable<HeroById>, metadata?: Metadata): Observable<Hero>;
}
export function HeroServiceControllerMethods() {
return function (constructor: Function) {
const grpcMethods: string[] = ['findOne'];
for (const method of grpcMethods) {
const descriptor: any = Reflect.getOwnPropertyDescriptor(constructor.prototype, method);
GrpcMethod('HeroService', method)(constructor.prototype[method], method, descriptor);
}
const grpcStreamMethods: string[] = ['findMany'];
for (const method of grpcStreamMethods) {
const descriptor: any = Reflect.getOwnPropertyDescriptor(constructor.prototype, method);
GrpcStreamMethod('HeroService', method)(constructor.prototype[method], method, descriptor);
}
}
}
export const protobufPackage = 'hero'
export const HERO_PACKAGE_NAME = 'hero'
export const HERO_SERVICE_NAME = 'HeroService';
このように.proto
からコードを生成することでAPIの仕様を共有する仕組みになっています。
生成されたInterfaceとDecoratorを使ってhero.controller.ts
を書き直します。
import { Controller } from '@nestjs/common';
import { Observable, Subject } from 'rxjs';
import { Hero, HeroById, HeroServiceController, HeroServiceControllerMethods } from './hero';
@HeroServiceControllerMethods()
@Controller('hero')
export class HeroController implements HeroServiceController {
private readonly items: Hero[] = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
findOne(data: HeroById): Hero {
return this.items.find(({ id }) => id === data.id);
}
findMany(data$: Observable<HeroById>): Observable<Hero> {
const hero$ = new Subject<Hero>();
const onNext = (heroById: HeroById) => {
const item = this.items.find(({ id }) => id === heroById.id);
hero$.next(item);
};
const onComplete = () => hero$.complete();
data$.subscribe(onNext, null, onComplete);
return hero$.asObservable();
}
}
リクエストを送る
gRPCのcurlという位置づけのgrpcurlを使って、リクエストを送ってみましょう。
まず、grpcurlをインストールします。
brew install grpcurl
NestJSのアプリを起動します。
yarn start:dev
ではリクエストを送ります。
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/FindOne <<EOM
{
"id": 1
}
EOM
レスポンスが返ってくることが確認できます。
{
"id": 1,
"name": "John"
}
[備考]
PostmanのようなGUIが良い方はBloomRPCを使ってみて下さい。
Streamingを書く
Streamingとは?という方はこの記事「gRPC / MagicOnion 入門 (2) - 4 種類の通信方式」をおすすめします。
NestJSではStreamingの書き方は2通りあります。
- Subject Strategyパターン: RxJSのObservableとSubjectを使う
- Call Stream Handlerパターン: grpc-nodeの用意しているStreamクラスを使う
gRPCの3種類のStreamingを各パターンで対応可能かまとめました。私はServer StreamingをCall Stream Handlerパターンで書こうと四苦八苦した結果、ギブアップしました。書き方が分かる方がいれば教えて欲しいです。
Subject Strategyパターン | Call Stream Handlerパターン | |
---|---|---|
Client Streaming | ○ | ○ |
Server Streaming | ○ | ✗ |
Bidirectional Streaming | ○ | ○ |
Call Stream Handlerパターンの方がシンプルだと思いますが、RxJSの汎用性とts-protoの恩恵を考えるとSubject Strategyパターンが優勢というところでしょうか。
では3つのStreamingを用意するために、hero.proto
を編集します。ついでにFindOne
の名前も変えておきます。
service HeroService {
- rpc FindOne (HeroById) returns (Hero);
+ rpc UnaryCall (HeroById) returns (Hero); // Unary Call
+ rpc ClientStreamExample (stream HeroById) returns (Hero); // Client Streaming
+ rpc ServerStreamExample (HeroById) returns (stream Hero); // Server Streaming
- rpc FindMany (stream HeroById) returns (stream Hero);
+ rpc BidirectionalStreamExample (stream HeroById) returns (stream Hero); // Bidirectional Streaming
}
そうしたら.protoからTypeScriptのInterfaceを生成するでやったようにhero.ts
を生成しましょう。
それでは3つのStreamingを書いていきます。
Subject Strategyパターン
こちらに完成版があります。
https://github.com/mizozobu/nestjs-grpc-extended/blob/master/src/hero/hero.controller.ts
Client Streaming
// Client Streaming (Observable)
clientStreamAsObservable(data$: Observable<HeroById>): Observable<Hero> {
const hero$ = new Subject<Hero>();
const onNext = (heroById: HeroById) => {
console.log('HeroService.ClientStreamAsObservable received %o', heroById);
const item = this.items.find(({ id }) => id === heroById.id);
hero$.next(item);
};
const onComplete = () => {
hero$.complete()
console.log('HeroService.ClientStreamAsObservable completed');
};
data$.subscribe({
next: onNext,
error: null,
complete: onComplete
});
return hero$.asObservable();
};
Client Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/ClientStreamAsObservable
{
"id": 1
}
{
"id": 2
}
Server Streaming
// Server Streaming (Observable)
serverStreamAsObservable(data: HeroById): Observable<Hero> {
const subject = new Subject<Hero>();
console.log('HeroService.ServerStreamAsObservable received %o', data);
const onNext = (item: Hero): void => {
console.log('HeroService.ServerStreamAsObservable responses %o', item);
};
const onComplete = (): void => {
console.log('HeroService.ServerStreamAsObservable completed');
};
subject.subscribe({
next: onNext,
error: null,
complete: onComplete
});
let i = 0;
setInterval(() => {
if (i >= this.items.length) {
subject.complete();
}
else {
const item = this.items[i];
subject.next(item);
i += 1;
}
}, 1000);
return subject.asObservable();
}
Server Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/ServerStreamAsObservable <<EOM
{
"id": 1
}
EOM
// response
// 1秒後
{
"id": 1,
"name": "John"
}
// 2秒後
{
"id": 2,
"name": "Doe"
}
Bidirectional Streaming
// Bidirectional Streaming (Observable)
bidirectionalStreamAsObservable(data$: Observable<HeroById>): Observable<Hero> {
const hero$ = new Subject<Hero>();
const onNext = (heroById: HeroById) => {
console.log('HeroService.BidirectionalStreamAsObservable received %o', heroById);
const item = this.items.find(({ id }) => id === heroById.id);
console.log('HeroService.BidirectionalStreamAsObservable responses %o', item);
hero$.next(item);
};
const onComplete = (): void => {
console.log('HeroService.BidirectionalStreamAsObservable completed');
};
data$.subscribe({
next: onNext,
error: null,
complete: onComplete
});
return hero$.asObservable();
}
Bidirectional Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/hero/hero.proto \
-import-path ./src/hero \
-d @ localhost:5000 hero.HeroService/BidirectionalStreamAsObservable
{
"id": 1
}
// response
{
"id": 1,
"name": "John"
}
{
"id": 2
}
// response
{
"id": 2,
"name": "Doe"
}
Call Stream Handlerパターン
こちらのIssueで議論されていましたが、ts-protoはStreamパターンをサポートしていないようです。 @HeroServiceControllerMethods()
の恩恵を受けることができないため、@GrpcMethod()
、@GrpcStreamMethod()
、@GrpcStreamCall()
を各メソッドに付与する必要があることに注意して下さい。
こちらに完成版があります。
https://github.com/mizozobu/nestjs-grpc-extended/blob/master/src/villan/villan.controller.ts
Client Streaming
// Client Streaming (Stream)
@GrpcStreamCall('VillanService')
clientStreamAsStream(
stream: ServerReadableStream<VillanById>,
callback: (err: unknown, res: Villan) => void,
): void {
stream.on('data', (villanById: VillanById) => {
console.log('VillanService.ClientStreamAsStream received %o', villanById);
});
stream.on('end', () => {
console.log('VillanService.ClientStreamAsStream completed');
callback(null, this.items[this.items.length - 1]);
});
};
Client Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/villan/villan.proto \
-import-path ./src/villan \
-d @ localhost:5000 villan.VillanService/ClientStreamAsStream
{
"id": 1
}
{
"id": 2
}
Server Streaming
// Server Streaming (Stream)
// なし
Bidirectional Streaming
// Bidirectional Streaming (Stream)
@GrpcStreamCall('VillanService')
bidirectionalStreamAsStream(
stream: ServerDuplexStream<VillanById, Villan>,
): void {
stream.on('data', (villanById: VillanById) => {
console.log('VillanService.BidirectionalStreamAsStream received %o', villanById);
const item = this.items.find(({ id }) => id === villanById.id);
console.log('VillanService.BidirectionalStreamAsStream responses %o', item);
stream.write(item);
});
stream.on('end', () => console.log('VillanService.BidirectionalStreamAsStream ended'));
}
Bidirectional Streamingリクエスト例
grpcurl -plaintext \
-proto ./src/villan/villan.proto \
-import-path ./src/villan \
-d @ localhost:5000 villan.VillanService/BidirectionalStreamAsStream
{
"id": 1
}
// response
{
"id": 1,
"name": "John"
}
{
"id": 2
}
// response
{
"id": 2,
"name": "Doe"
}
ここまで来ると簡単なチャットアプリくらいなら余裕で作れそうな気がしてきました。
E2Eテストを書く
UTはJestさえ分かっていればRESTのときと同じようmockを使って書けるはずなので、ここでは省略します。
E2Eテストですが、RESTのときはsupertestを使っていましたがgRPCでは使えません。代わりに@grpc/grpc-jsを使ってgRPCクライアントを作ります。Server Streamingの例が無かったりと少し物足りないですが、実はNestJSのリポジトリにもE2Eテストの例が隠されています。
- https://github.com/nestjs/nest/blob/master/integration/microservices/e2e/sum-grpc.spec.ts
- https://github.com/nestjs/nest/blob/master/integration/microservices/e2e/orders-grpc.spec.ts
こちらを参考にしつつ書きました。
セットアップ
import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import * as ProtoLoader from '@grpc/proto-loader';
import { loadPackageDefinition, credentials } from '@grpc/grpc-js';
import {
ServiceClient,
ServiceClientConstructor,
} from '@grpc/grpc-js/build/src/make-client';
import { resolve } from 'path';
import { HeroModule } from '../src/hero/hero.module';
import { Hero, HeroById } from '../src/hero/hero';
describe('HeroController (E2E)', () => {
let module: TestingModule;
let app: INestMicroservice;
let client: ServiceClient;
beforeAll(async () => {
module = await Test.createTestingModule({
imports: [HeroModule],
}).compile();
const url = 'localhost:5000';
app = module.createNestMicroservice<MicroserviceOptions>({
transport: Transport.GRPC,
options: {
url,
package: ['hero'],
protoPath: [
resolve(__dirname, '../src/hero/hero.proto'),
],
},
});
await app.listenAsync()
const proto = await ProtoLoader.load(resolve(__dirname, '../src/hero/hero.proto'));
const protoGrpc = loadPackageDefinition(proto) as {
hero: {
HeroService: ServiceClientConstructor;
};
};
client = new protoGrpc.hero.HeroService(
url,
credentials.createInsecure(),
);
});
afterAll(async () => {
await app.close();
client.close();
});
...
});
Unary Call
it('unaryCall', async () => {
await new Promise<void>((resolve) => {
const payload: HeroById = { id: 1 };
client.unaryCall(payload, (err: Error, response: Hero) => {
expect(err).toBeNull();
expect(response).toEqual({ id: 1, name: 'John' });
resolve();
});
});
});
Client Streaming
it('clientStreamAsObservable', async () => {
const callHandler = client.clientStreamAsObservable((err: Error, res: Hero) => {
if (err && String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
expect(res).toEqual({ id: 2, name: 'Doe' });
});
return new Promise<void>((resolve, reject) => {
callHandler.write({ id: 1 });
callHandler.write({ id: 2 });
callHandler.end();
setTimeout(() => resolve(), 1000);
});
});
Server Streaming
it('serverStreamAsObservable', async () => {
const callHandler = client.serverStreamAsObservable({ id: 1 });
let n = 0;
callHandler.on('data', (msg: Hero) => {
if(n === 0) expect(msg).toEqual({ id: 1, name: 'John' });
else if(n === 1) expect(msg).toEqual({ id: 2, name: 'Doe' });
else fail(`received unexpected message: ${msg}`);
n++;
});
callHandler.on('error', (err: Error) => {
if (String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
});
await new Promise<void>((resolve, reject) => {
setTimeout(() => resolve(), 3000);
});
});
Bidirectional Streaming
it('bidirectionalStreamAsObservable', async () => {
const callHandler = client.bidirectionalStreamAsObservable();
const payloads = [
{ id: 1 },
{ id: 2 },
];
const responses = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
let n = 0;
callHandler.on('data', (msg: Hero) => {
if(n === 0) expect(msg).toEqual({ id: 1, name: 'John' });
else if(n === 1) expect(msg).toEqual({ id: 2, name: 'Doe' });
else fail(`received unexpected message: ${msg}`);
n++;
if(n === responses.length) callHandler.cancel();
});
callHandler.on('error', (err: Error) => {
if (String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
});
await new Promise<void>((resolve, reject) => {
payloads.forEach(payload => callHandler.write(payload));
setTimeout(() => resolve(), 1000);
});
});
さらなる高みへ
.proto
からTypeScriptのInterfaceを生成したように、SwiftやKotlinなどでもコードの生成できます。クライアント側でも.proto
から生成したコードを使うことで、クライアントとサーバーでAPIの仕様を統一できます(というかそういう思想でgRPCは作られています)。
ReactやVueでもgRPCを使いたいところですが、残念ながらまだWebはgRPCに対応していません。どうしてもWebからgRPCのAPIを呼び出したい場合はNestJS公式のサンプルのようにRESTとgRPCのHybrid Appにするか、EnvoyなどでProxyを噛ませる必要があります。こちらの記事「gRPC WebがGAになったのでそろそろ理解してみる」が参考になると思います。
まとめ
ここまで来ればNestJS + gRPCでアプリが書けるような気がしてきました。「まだRESTなの?時代はgRPCじゃないの」とマウント取れそうですね。次のプロジェクトにgRPCはいかがでしょうか?
2021年はQiita Advent CalendarにgRPCが欲しい(誰が作って)。