Motivation
前回の記事では、.proto
ファイル定義から Node.js 用スタブを自動生成する方法の構築と、それを利用した通信方法を例に動きを確認した。
gPRC の特徴として、通信のストリーミングに対応しているということが挙げられる。そこで本記事では、クライアントサイド、サーバーサイドのストリーミングという2パターンについて、これらの実装を通して gRPC によるより現実的なアプリケーションの構築のイメージを掴むことを目的とする。
TL;DR
以下のストリーミング通信を gRPC で実装した:
- サーバーサイド RPC
- クライアントサイド RPC
ストリーミング中のサーバーサイド側およびクライアントサイド側の状態のログを出力し、ストリーミングが実際に行われている様子を確認した。
Pre-condition
参照情報など
gRPC についての前回の記事をもとに、ストリーミング部分を追加する形で話を進めたい。
また、ソースコードは Github で公開しているので必要に応じて参照していただければ幸いである。
開発環境
- 開発環境: Mac/MacOS 14.4
- npm: 9.8.1
- node: v18.18.0
Experiment
アーキテクチャ
アーキテクチャは前回とほぼ同じで、クライアントサーバー間の通信がストリーミングで行われるところのみ変更となる。
何をストリーミングするか1
サーバーサイド・ストリーミング
「複数ユーザーの情報を取得するサービス」を実装する。
サーバーがリクエストを受け取ると、ユーザー情報を逐次ストリームデータとして gRPC クライアント側にレスポンスするような仕様とする。
クライアントサイド・ストリーミング
「複数ユーザーの情報を更新するサービス」を実装する。
更新したいユーザーの情報を gRPC クライアント側が逐次ストリームデータとしてサーバー側にリクエストするような仕様とする。
.proto
のアップデート
RPC 通信をストリーミング対応させるためには、まずは .proto
ファイル内の該当箇所を修正する必要がある。
service User {
...
// 複数ユーザー取得サービスをサーバーサイド・ストリーミングで実装
rpc ListStreamUsers(ListUsersRequest) returns (stream GetUserResponse) {}
// 複数ユーザー更新サービスをクライアントサイド・ストリーミングで実装
rpc UpdateStreamUsers(stream UpdateUserRequest) returns (UpdateUsersResponse) {}
}
...
上記のように、サーバーサイド・ストリーミングではサービスの戻り値の型に stream
を、クライアントサイド・ストリーミングでは、サービスの引数の型に stream
を指定する。
この状態で以下のシェルスクリプトを実行すると、ストリーミングに対応したサービスおよびメッセージのスタブコードのファイルが生成される。
PROTOC_GEN_TS_PATH="./node_modules/.bin/protoc-gen-ts"
OUT_DIR="./generated"
NODE_PROTOC="./node_modules/.bin/grpc_tools_node_protoc"
$NODE_PROTOC \
--plugin="protoc-gen-ts=${PROTOC_GEN_TS_PATH}" \
--js_out="import_style=commonjs,binary:${OUT_DIR}" \
--ts_out="service=grpc-node,mode=grpc-js:${OUT_DIR}" \
--grpc_out="grpc_js:${OUT_DIR}" \
user.proto
実行する。
sh/ts-gen-service-node.sh
次のコードは生成されるスタブの一部だが、ストリームが有効になっているのがわかる。
...
User.ListStreamUsers = {
methodName: "ListStreamUsers",
service: User,
requestStream: false,
responseStream: true,// <-- ListStreamUsers はサーバーサイド・ストリーミングなので、
// 戻り値(response)に `stream` が付与されている
requestType: user_pb.ListUsersRequest,
responseType: user_pb.GetUserResponse
};
準備は整ったので、以下ではサービスの実装を行なっていきたい。
サービスの実装〜動作確認
前回同様、スタブを使用したサービス本体の実装を gRPC クライアント側と gRPC サーバー側でそれぞれ行う。
複数ユーザーの情報を取得する(サーバーサイド・ストリーミング)
クライアント側実装
const listStreamUsers = (limit?: number, offset?: number): Promise<ListUsersResponse> => {
const request = new ListUsersRequest();
if (!!limit) request.setLimit(limit);
if (!!offset) request.setOffset(offset);
return new Promise((resolve) => {
const call = client.listStreamUsers(request);
const res = new ListUsersResponse();
call.on('data', (response) => {
// user ごとのデータが stream の一単位として送られてくる
const userInfo = new UserInfo();
const user = response.array[0];
userInfo.setId(user[0]);
const userDetail = new UserDetail();
userDetail.setEmail(user[1][0]);
userDetail.setFullName(user[1][1]);
userDetail.setCreatedAt(user[1][2]);
userDetail.setUpdatedAt(user[1][3]);
userInfo.setDetail(userDetail);
res.addUsers(userInfo, res.getTotal());
res.setTotal(res.getTotal() + 1);
console.log('client:', userInfo);
});
call.on('end', () => {
console.log('number of users:', res.getTotal());
resolve(res);
});
call.on('error', (err) => {
console.log(err);
});
call.on('status', (status) => {
console.log('status:', status);
});
});
};
call.on('data', callback)
内で、サーバーから逐次送られてくるユーザー情報を ListUsersResponse
にセットし、call.on('end', callback)
で resolve する。その後はこのメソッドの呼び出し元である Express のルーターへと引き渡す。
ここで setId
や setEmail
は UserInfo
および UserDetail
クラスのメソッドであり、これらの実装こそが先の手順で生成されたスタブコードである。
また、user
は GetUserResponse
型でその中には 中身は下のようになっており、これは .proto
ファイルで定義したものと一致する。すなわち、データとしては int32 型の id
と UserInfo
型の detail
があり、detail
はさらに string
型の email
, full_name
と int64 型の created_at
, updated_at
を含む。
// 一部抜粋
service User {
rpc ListStreamUsers(ListUsersRequest) returns (stream GetUserResponse) {}
}
message GetUserResponse {
UserInfo user = 1;
}
message UserInfo {
int32 id = 1;
UserDetail detail = 2;
}
message UserDetail {
string email = 1;
string full_name = 2;
int64 created_at = 3;
int64 updated_at = 4;
}
実行時に console.log(user) した時のログがこちら
[
1,// UserInfo.id
[
'hernandezwalsh@makingway.com',// UserInfo.detail.email
'Glover Goodman',// UserInfo.detail.full_name
1673797076,// UserInfo.detail.created_at
1688737953 // UserInfo.detail.updated_at
]
]
サーバー側実装
dummyUsers は DB を模した json 定義のユーザー情報一覧である。
そこから一つずつデータを取り出し、call.write
によりクライアント側にレスポンスしている。
また、少しリアルシチュエーションに近づけるため、レスポンスは0~1秒のランダム時間かけて行われるようにしている。
const listStreamUsers = async (call) => {
const limit = call.request.hasLimit() ? call.request.getLimit() : 100;
const offset = call.request.hasOffset() ? call.request.getOffset() : 0;
// response with stream for every 1 second
let p = Promise.resolve();
const users = dummyUsers.slice(offset).slice(0, limit);
const f = (v) =>
new Promise<void>((resolve) =>
setTimeout(() => {
const userDetail = new UserDetail();
userDetail.setEmail(v.email);
userDetail.setFullName(v.fullName);
userDetail.setCreatedAt(v.createdAt);
userDetail.setUpdatedAt(v.updatedAt);
const userInfo = new UserInfo();
userInfo.setId(v.id);
userInfo.setDetail(userDetail);
const reply = new GetUserResponse();
reply.setUser(userInfo);
call.write(reply);
console.log('server:', reply);
resolve();
}, Math.random() * 1000)
);
users.forEach((v) => (p = p.then(() => f(v))));
await p;
call.end();
};
実行
ターミナルを3タブ開き、それぞれで
# gRPC サーバー起動
npm run dev:server
# > dev:server
# > ts-node -r tsconfig-paths/register src/server.ts
# server start listing on port 50051
# gRPC クライアント起動
npm run dev:client
# > dev:client
# > ts-node -r tsconfig-paths/register src/client.ts
# Start on port 3003.
そしてエンドポイントに対してリクエストを投げる。
これは、Postman や Curl など、Http クライアントであればなんでも良い(多分
curl localhost:3003/users-stream
gRPC サーバー(左)およびクライアント(右)のログ出力の様子がこちら。
サーバー側からクライアント側にユーザー情報が一つ分ずつ送信されている。
複数ユーザーの情報を更新する(クライアントサイド・ストリーミング)
クライアント側実装
リクエストとして送られてくる users
から UpdateUserRequest
を逐次作成し、サーバー側に送信する。サーバー側ですべてのリクエストが処理し終わると resolve される。その後はこのメソッドの呼び出し元である Express のルーターへと引き渡す。
const updateStreamUsers = async (users: User[]): Promise<UpdateUsersResponse> => {
return new Promise(async (resolve) => {
const apiRequestStream = client.updateStreamUsers((err, value) => {
if (err) console.error('error on server:', err);
if (value === undefined) return;
console.log(`completed: ${value.getUsersList().length} user(s) have been updated.`);
resolve(value);
});
let p = Promise.resolve();
const f = (v) =>
new Promise<void>((resolve) =>
setTimeout(() => {
const userDetail = new UserDetail();
userDetail.setEmail(v.email);
userDetail.setFullName(v.fullName);
userDetail.setCreatedAt(v.createdAt);
userDetail.setUpdatedAt(v.updatedAt);
const userInfo = new UserInfo();
userInfo.setId(v.id);
userInfo.setDetail(userDetail);
console.log('sent from client:', userInfo.toObject());
const request = new UpdateUserRequest();
request.setUser(userInfo);
apiRequestStream.write(request);
resolve();
}, Math.random() * 1000)
);
users.forEach((v) => (p = p.then(() => f(v))));
await p;
apiRequestStream.end();
});
};
サーバー側実装
call.on('data', callback)
内で、クライアントから逐次送られてくるユーザー情報を UserInfo
にセットし、UpdateUsersResponse
に追加していく。
call.on('end', callback)
で更新された結果をクライアント側に送信する。
なお、本プロジェクトには DB がないため、更新処理は行わず、行われた前提で更新された結果を返すようにしている。
const updateStreamUsers = (call, callback) => {
const res = new UpdateUsersResponse();
call.on('data', (user) => {
const userId = user.getUser().getId();
const email = user.getUser().getDetail().getEmail();
const fullName = user.getUser().getDetail().getFullName();
const createdAt = user.getUser().getDetail().getCreatedAt();
const updatedAt = user.getUser().getDetail().getUpdatedAt();
const existingUser = dummyUsers.filter((u) => u.id === userId).shift();
// キー(userId)が存在しないのでスキップ
if (!existingUser) return;
const userDetail = new UserDetail();
// リクエストに含まれる項目のみ更新、それ以外は既存のものを返す
userDetail.setEmail(email || existingUser.email);
userDetail.setFullName(fullName || existingUser.fullName);
userDetail.setCreatedAt(createdAt || existingUser.createdAt);
userDetail.setUpdatedAt(updatedAt || existingUser.updatedAt);
const userInfo = new UserInfo();
userInfo.setId(userId);
userInfo.setDetail(userDetail);
// console.log(userInfo);
console.log('receive from client:', userInfo.toObject());
const cnt = res.getUsersList().length;
res.addUsers(userInfo, cnt);
});
call.on('end', () => {
// 本来ならデータベースの更新などが入る
// updateDB(res);
callback(null, res);
});
call.on('error', (e) => {
console.log('on error', e);
});
};
実行
gRPC サーバーとクライアントの起動は先ほどと同様で、エンドポイントに対するリクエストは、今回は以下のような形とする。
curl -X PATCH -H "Content-Type: application/json" -d \
$' \
{ \
"users": [ \
{ \
"id": "1", \
"email": "hernandezwalsh@gmail.com", \
"updatedAt": "1688747953" \
}, \
{ \
"id": "2", \
"fullName": "Ken Shimura", \
"updatedAt": "1695005544" \
}, \
{ \
"id": "5", \
"email": "char@drifters.com", \
"fullName": "Char Kato" \
} \
] \
}' \
localhost:3003/users
gRPC サーバー(左)およびクライアント(右)のログ出力の様子がこちら。
クライアント側からサーバー側にユーザー情報が一つ分ずつ送信されている。
所感
- gRPC による streaming 通信はなかなかシンプルに行えた。
- 深い実装部分はスタブコードで生成されているので、あとはそれを呼び出すだけ。
- gRPC は
.proto
での設計が重要で、ここ起点でサービスを構築していくイメージ- OpenAPI による generation と似ている?
- 本プロジェクトはモノリポ(gRPC のクライアントとサーバーが同じプロジェクトに属している)であるが、
.proto
ファイルからのスタブコード生成、共有・参照などのワークフローがきちんと整備されれば、リポジトリの分割にも対応できるのではと思う。
今後の展望
- gRPC 通信のパフォーマンス評価
- 例えば同じ通信内容に対して Rest API と比較したときの結果を考察するなど
- bidirectional streaming RPC を試してみる(サーバー・クライアント両サイドが streaming)
参考
-
今回のユースケースはあまり現実に則していると思えないが、ストリーミングの動きを確かめるシンプルな例ということで勘弁いただきたい ↩