gRPC
Googleが中心となって開発しているRPCのフレームワークです。
Streaming RPCという、接続を維持したままリアルタイムにメッセージをやりとりできるRPCが用意されているため、オンラインゲームのようなリアルタイム性の高いものにも使えそうだと期待しています。
ブロードキャスト
ブロードキャストとは、図のように、接続中のみんなにメッセージを飛ばすことです。
オンラインゲームや多人数のチャットを実装する際は必須となります。
gRPCにブロードキャスト専用の機能はある?
ないです。 gRPCは名前の通り、RPC(遠隔呼び出し)のためのフレームワークであり、
クライアント側が遠隔にあるサーバーに対して関数を呼び出す部分のみを提供しています。
ということで、専用の機能がないので、自前で仕組みを構築します。
Goで実装してみる
ということで、gRPCでブロードキャストの実装を実際にしてみます。
ソースコードは次のリポジトリにも公開しています。
次のようにチャットを送るBiDirectional Streaming RPCで定義します。
syntax = "proto3";
package hello;
service ChatRoom {
rpc Chat (stream ChatRequest) returns (stream ChatResponse) {}
}
message ChatRequest {
string message = 1;
}
message ChatResponse {
string message = 1;
}
サーバー側から接続しているクライアント全員にメッセージを送るためには、接続中のクライアント一覧が必要です。
接続中のクライアント一覧をGoのmapを使って保持します。(sliceではなくmapにした理由は、1ユーザーに付き1エントリであり、検索と削除が激しいからです)
type server struct{
// 接続中のクライアントリスト
clients map[string]pb.ChatRoom_ChatServer
}
func (s *server) Chat(srv pb.ChatRoom_ChatServer) error {
uid := uuid.Must(uuid.NewRandom()).String()
log.Printf("new user: %s", uid)
// 接続クライアントリストに登録
s.clients[uid] = srv
// 関数を抜けるときはリストから削除
defer delete(s.clients, uid)
for {
resp, err := srv.Recv()
if err != nil {
log.Printf("recv err: %v", err)
break
}
log.Printf("broadcast: %s", resp.Message)
for _, ss := range s.clients {
if err := ss.Send(&pb.ChatResponse{Message: resp.Message}); err != nil {
log.Printf("broadcast err: %v", err)
}
}
}
return nil
}
このサーバーを起動して、クライアントを2つ起動して試してみます。
実行結果は次のようになります。
2019/10/01 13:04:52 new user: e0224bcc-be22-438f-a44b-5e3022d339a4
2019/10/01 13:04:52 broadcast: hello, I'm Bob
2019/10/01 13:04:53 broadcast: hello, I'm Bob
2019/10/01 13:04:54 broadcast: hello, I'm Bob
2019/10/01 13:04:59 new user: 36f4573d-b368-42d1-91e2-4137284352df
2019/10/01 13:04:59 broadcast: hello, I'm Alice
2019/10/01 13:04:59 broadcast: hello, I'm Bob
2019/10/01 13:05:00 broadcast: hello, I'm Alice
2019/10/01 13:05:00 broadcast: hello, I'm Bob
2人分のチャットがサーバーに届き、そこから全員に送られています。ブロードキャスト成功です!
並列にmapに触る危険性
しかし、高速に多くのRPC呼び出しをするとどうなるでしょうか?
// ループでわざとRPCを高速で呼び出しまくってみる
for {
if _, err := c.Chat(ctx); err != nil {
log.Fatal(err)
}
}
これを実行してみると、サーバー側はpanic祭りになってしまいました。
goroutine 1365 [semacquire]:
sync.runtime_SemacquireMutex(0xc0000c8004, 0x5d92e200, 0x1)
/usr/local/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0000c8000)
/usr/local/go/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
/usr/local/go/src/sync/mutex.go:81
log.(*Logger).Output(0xc0000c8000, 0x2, 0xc0009ec7e0, 0x2e, 0x0, 0x0)
/usr/local/go/src/log/log.go:153 +0x420
log.Printf(0x1519355, 0xc, 0xc000bceae0, 0x1, 0x1)
/usr/local/go/src/log/log.go:307 +0x80
main.(*server).Chat(0xc0000c2050, 0x15c49c0, 0xc000645720, 0x0, 0x0)
/Users/castaneai/Documents/grpc-broadcast-example/server/server.go:19 +0x162
...
これは、Goのmapが同時に読み書きすることに対応していないからです。
gRPCのサーバー側の各RPCは独立したgoroutineで動作し、複数のRPCがgoroutineで並列に動作するため、並列でひとつのmapにアクセスすると危険です。
Each RPC handler attached to a registered server will be invoked in its own goroutine. For example, SayHello will be invoked in its own goroutine. The same is true for service handlers for streaming RPCs
Go 1.6で追加されたsync.Mapを使う手もありますが、sync.Mapを使うと型があいまいになるので、今回はsync.RWMutexで解決します。
単純に読み取る前にRLock()
を取り、書き込む前にLock()
を取るという方式にしました。
type server struct{
// 接続中のクライアントリスト
clients map[string]pb.ChatRoom_ChatServer
mu sync.RWMutex
}
func (s *server) addClient(uid string, srv pb.ChatRoom_ChatServer) {
s.mu.Lock()
defer s.mu.Unlock()
s.clients[uid] = srv
}
func (s *server) removeClient(uid string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.clients, uid)
}
func (s *server) getClients() []pb.ChatRoom_ChatServer {
var cs []pb.ChatRoom_ChatServer
s.mu.RLock()
defer s.mu.RUnlock()
for _, c := range s.clients {
cs = append(cs, c)
}
return cs
}
func (s *server) Chat(srv pb.ChatRoom_ChatServer) error {
uid := uuid.Must(uuid.NewRandom()).String()
log.Printf("new user: %s", uid)
// 接続クライアントリストに登録
s.addClient(uid, srv)
// 関数を抜けるときはリストから削除
defer s.removeClient(uid)
for {
resp, err := srv.Recv()
if err != nil {
log.Printf("recv err: %v", err)
break
}
log.Printf("broadcast: %s", resp.Message)
for _, ss := range s.getClients() {
if err := ss.Send(&pb.ChatResponse{Message: resp.Message}); err != nil {
log.Printf("broadcast err: %v", err)
}
}
}
return nil
}
これで、高速で大量のクライアントが接続されてもpanicが発生せずに実行できました!
2019/10/01 14:33:54 new user: 860f5df2-0c8f-4d51-9c2c-14eb73c890f3
2019/10/01 14:33:54 new user: a8c2b774-1e8a-4eed-a9f9-02c2eb515691
2019/10/01 14:33:54 new user: cc394bb7-2355-4492-a537-cc237d52ce3c
2019/10/01 14:33:54 new user: 908c9031-d037-444d-bcd4-7f2699dee19a
2019/10/01 14:33:54 new user: 2c3ea4a4-f040-4ad2-9382-a328e789ab2b
2019/10/01 14:33:54 new user: c2dee101-5992-48b6-8e06-4814632aca67
2019/10/01 14:33:54 new user: 8f063d95-f271-440a-939e-16e3bfcb788c
2019/10/01 14:33:54 new user: 6ac4a494-a734-4537-a8dd-5e9c23a9419f
2019/10/01 14:33:54 new user: bf5604c2-4988-4887-845a-f586ff433afc
2019/10/01 14:33:54 new user: 8aa1f781-53ca-46ee-b915-f5beccb0187e
...
他のメッセージも送りたい場合
Chat以外のメッセージも送りたい場合、ちょっと面倒です。
gRPCの仕組み上、新たなBiDirectional Streaming RPCを追加することになりますが、そうなると今までmapに pb.ChatRoom_ChatServer
を保持していましたが、他のRPCも増えるとなると、複数RPCのServerStreamを束ねた構造体などに差し替える必要があります。
type User struct {
Chat pb.ChatRoom_ChatServer
Move pb.ChatRoom_MoveServer
...
}
type server struct{
// 接続中のクライアントリスト
clients map[string]*User
...
}
そして、broadcastを送るときは、明示的にどこのRPC Streamに送るかどうかを指定する必要がでてきます。
for _, ss := range s.getClients() {
if err := ss.Chat.Send(&pb.ChatResponse{Message: resp.Message}); err != nil {
log.Printf("broadcast err: %v", err)
}
}
また、別々のRPCに分かれてしまうのでHTTP/2のストリーム数が増えてしまうのも心配です(HTTP/2の仕組み上、TCP Connectionは1本で済みそうですが、実際どうなのか検証はしていません)。
まとめ
- gRPC本体にはブロードキャストのための機能はない
- サーバー側で「接続中のクライアント一覧」を保持しておいて、素直にループで回して全員に配信する
- 「接続中のクライアント一覧」は並列でアクセスされる可能性があるため、排他制御をしっかりする
やりたいことは、次の図のように「みんなにメッセージを配信する」だけなのですが、意外と考慮することが多いです。もっと楽にできる方法があれば知りたいですね