gRPCの勉強がてら、簡単なチャットアプリのサーバ側とクライアント側を実装してみました。
Pushも試したかったので、サーバからクライアント側へのPushも実装してみました。
環境の準備
homebrewとnodeを導入済みであることが前提です。
ローカルで試せない場合も https://io2015codelabs.appspot.com/codelabs/gRPC#2 を参考にGCEにインスタンスを立ててしまえばそこで同じことが試せるはずです。
Proto3
Proto3がGAになるまでは https://github.com/grpc/homebrew-grpc のスクリプトを利用すると良いようです。
curl -fsSL https://goo.gl/getgrpc | bash -
gRPC(Node.js)
作業フォルダを作成して次を実行します。
npm install grpc
実装:定義の作成
インターフェースとしてRPCを定義します。書式とRPCを別のファイルに分けてみました。
モデル
今回のサンプルでは、チャットルームとしてRoom
、チャットルーム内の投稿としてMessage
を定義しました。
他にはidの値だけを送るリクエスト用にInt32Id
を定義しています。
syntax = "proto3";
package myapp;
message Empty {}
message Int32Id { int32 id = 1; }
message Room {
int32 id = 1;
string title = 2;
string createdBy = 3;
int64 createdAt = 4;
}
message RoomList { repeated Room rooms = 1; }
message Message {
int32 id = 1;
string text = 2;
string createdBy = 3;
int64 createdAt = 4;
int32 room = 5;
}
message MessageList { repeated Message messages = 1; }
RPC
RPCの定義は次の形が基本です。上に書いたモデルの定義ファイルをmessage.proto
としてインポートしています。
service サービス名 {
rpc RPC名 (リクエストの型) returns (レスポンスの型)
...
}
今回はチャットルームの作成と一覧、チャットルームへのメッセージの投稿と投稿一覧、指定したチャットルームへの投稿の監視(Push配信)を定義しています。
import public "message.proto";
syntax = "proto3";
package myapp;
service ChatService {
rpc InsertRoom (Room) returns (Room);
rpc ListRooms (Empty) returns (RoomList);
rpc InsertMessage (Message) returns (Message);
rpc ListMessages (Int32Id) returns (MessageList);
rpc WatchRoom (Int32Id) returns (stream Message);
}
非同期に結果を返す(今回はチャットなのでクライアント側へ投稿されたメッセージをPushする)レスポンスの型にはstream
をつけています。
実装:サーバ
ひな形
var grpc = require('grpc');
var myapp = grpc.load('service.proto').myapp;
var AppSever = grpc.buildServer([myapp.ChatService.service]);
var server = new AppSever({
"myapp.ChatService": {
"listRooms": listRooms/*未実装*/,
"insertRoom": insertRoom/*未実装*/,
"listMessages": listMessages/*未実装*/,
"insertMessage": insertMessage/*未実装*/,
"watchRoom": watchRoom/*未実装*/
},
});
/** 部屋 */
var rooms = [
{id:1, title:"部屋1", createdBy: "default", createdAt: new Date().getTime()},
{id:2, title:"部屋2", createdBy: "default", createdAt: new Date().getTime()}
];
/** 部屋ごとの投稿 */
var messages = {"1":[], "2":[]};
/** pushすべきクライアントの接続 */
var watchers = [];
server.bind('127.0.0.1:50051');
server.listen();
おおよそ次の手順です。
-
grpc.load()
でproto定義を読み込む -
grpc.buildServer()
でサービスクラスを作成する - サービスクラスをインスタンス化する。その際、protoで定義したrpcのインターフェースと実装をバインドしたオブジェクトを渡す。
- サービスクラスをインスタンス化したサーバを起動する
データベースを用意するのは面倒なので、今回はメモリ上にチャット関連のデータなどを変数として定義しています。
RPCの実装
function findRoom(room) {
for (var i=0; i<rooms.length; i++) {
if (rooms[i].id===room) return rooms[i];
}
return null;
}
function roomNotFound(room) {
return { code: grpc.status.NOT_FOUND, details: 'Room not found:'+room };
}
// これより下がRPCの実装
function listRooms(call, callback) { return callback(null, rooms); }
function insertRoom(call, callback) {
var newRoom = call.request;
newRoom.createdAt = new Date().getTime();
rooms.push(newRoom);
messages[newRoom.id] = [];
return callback(null, newRoom);
}
function listMessages(call, callback) {
var id = call.request.id;
if (!findRoom(id)) return callback(roomNotFound(id));
return callback(null, messages[id]);
}
function insertMessage(call, callback) {
var newMessage = call.request;
if (!findRoom(newMessage.room)) return callback(roomNotFound(newMessage.room));
newMessage.createdAt = new Date().getTime();
messages[newMessage.room].push(newMessage);
for (var i=0; i<watchers.length; i++) {
if (watchers[i].request.id===newMessage.room) {
watchers[i].write(newMessage);
}
}
return callback(null, newMessage);
}
function watchRoom(call, callback) {
watchers.push(call);
console.log('added new watcher', call);
}
各関数はふたつの引数call, callback
がバインドされて、call
がクライアントからのリクエスト情報などが格納されていて(call.request
から参照できる)、callback
はクライアント側で使用するコールバック関数です。callback
は第一引数にエラー情報、第二引数に返したい値を渡します。
クライアントに直接値を返したい場合はcall.write()
で返すこともできて、Push配信にはそれを利用しています。引数として渡されたcall
オブジェクトを保存しておいて、投稿を受けた時にその部屋の監視を意味するcall
オブジェクトへwrite()しています。
なお今回は実装していませんが、接続を終了するにはcall.end()
を実行すれば良いです(このあたりの後片付けはテキトーです)。
実装:クライアント側
実装をザクっと貼り付けてみます。
var grpc = require('grpc');
var myapp = grpc.load('service.proto').myapp;
var client = new myapp.ChatService('127.0.0.1:50051');
var printResponse = function(error, response) {
if (error) console.log('Error:', error); else console.log(response);
}
function listRooms() {
client.listRooms({}, printResponse);
}
function insertRoom(id, title, createdBy) {
client.insertRoom({ id: parseInt(id), title: title, createdBy: createdBy }, printResponse);
}
function listMessages(room) {
client.listMessages({id:parseInt(room)}, printResponse);
}
function insertMessage(room, text, createdBy) {
var message = {room:parseInt(room), text:text, createdBy:createdBy};
client.insertMessage(message, printResponse);
}
function watchRoom(id) {
var call = client.watchRoom({id:parseInt(id)});
call.on('data', function(message) { console.log('Data:', message); })
.on('end', function() { console.log('End:'); })
.on('status', function(status) { console.log('Status:', status); });
}
var processName = process.argv.shift();
var scriptName = process.argv.shift();
var command = process.argv.shift();
if (command==='listRooms') listRooms();
else if (command==='insertRoom') insertRoom(process.argv[0], process.argv[1], process.argv[2]);
else if (command==='listMessages') listMessages(process.argv[0]);
else if (command==='insertMessage') insertMessage(process.argv[0], process.argv[1], process.argv[2]);
else if (command==='watchRoom') watchRoom(process.argv[0]);
-
grpc.load()
でproto定義を読み込む - サービスクラスをインスタンス化する
- サービスクラスのインスタンスには定義したRPCが存在しているので、それを呼び出す。基本的にはコールバック関数でエラーやレスポンスを受け取ることができるが、RPCを実行した返り値に対して
on()
することもできる。
実行
node server.js
node client.js watchRoom 部屋id
node client.js listRooms
node client.js listMessages 部屋id
node client.js insertMessage 部屋id メッセージ 投稿者
公式サイトのリファレンスもまだcomming soonだったりする箇所も多いし、不十分なのでいろいろ調査しながら追記した新しい投稿をしたりするかもしれません。