Help us understand the problem. What is going on with this article?

gRPCでブロードキャストを実現

gRPC

https://grpc.io/

Googleが中心となって開発しているRPCのフレームワークです。
Streaming RPCという、接続を維持したままリアルタイムにメッセージをやりとりできるRPCが用意されているため、オンラインゲームのようなリアルタイム性の高いものにも使えそうだと期待しています。

ブロードキャスト

ブロードキャストとは、図のように、接続中のみんなにメッセージを飛ばすことです。
オンラインゲームや多人数のチャットを実装する際は必須となります。

image.png

gRPCにブロードキャスト専用の機能はある?

ないです。 gRPCは名前の通り、RPC(遠隔呼び出し)のためのフレームワークであり、
クライアント側が遠隔にあるサーバーに対して関数を呼び出す部分のみを提供しています。

ということで、専用の機能がないので、自前で仕組みを構築します。

Goで実装してみる

ということで、gRPCでブロードキャストの実装を実際にしてみます。
ソースコードは次のリポジトリにも公開しています。

https://github.com/castaneai/grpc-broadcast-example

次のようにチャットを送るBiDirectional Streaming RPCで定義します。

chat.proto
syntax = "proto3";

package hello;

service ChatRoom {
    rpc Chat (stream ChatRequest) returns (stream ChatResponse) {}
}

message ChatRequest {
    string message = 1;
}

message ChatResponse {
    string message = 1;
}

サーバー側から接続しているクライアント全員にメッセージを送るためには、接続中のクライアント一覧が必要です。

接続中のクライアント一覧をGoのmapを使って保持します。(sliceではなくmapにした理由は、1ユーザーに付き1エントリであり、検索と削除が激しいからです)

server.go
type server struct{
    // 接続中のクライアントリスト
    clients map[string]pb.ChatRoom_ChatServer
}

func (s *server) Chat(srv pb.ChatRoom_ChatServer) error {
    uid := uuid.Must(uuid.NewRandom()).String()
    log.Printf("new user: %s", uid)

    // 接続クライアントリストに登録
    s.clients[uid] = srv
    // 関数を抜けるときはリストから削除
    defer delete(s.clients, uid)

    for {
        resp, err := srv.Recv()
        if err != nil {
            log.Printf("recv err: %v", err)
            break
        }
        log.Printf("broadcast: %s", resp.Message)
        for _, ss := range s.clients {
            if err := ss.Send(&pb.ChatResponse{Message: resp.Message}); err != nil {
                log.Printf("broadcast err: %v", err)
            }
        }
    }
    return nil
}

このサーバーを起動して、クライアントを2つ起動して試してみます。

image.png

実行結果は次のようになります。

2019/10/01 13:04:52 new user: e0224bcc-be22-438f-a44b-5e3022d339a4
2019/10/01 13:04:52 broadcast: hello, I'm Bob
2019/10/01 13:04:53 broadcast: hello, I'm Bob
2019/10/01 13:04:54 broadcast: hello, I'm Bob
2019/10/01 13:04:59 new user: 36f4573d-b368-42d1-91e2-4137284352df
2019/10/01 13:04:59 broadcast: hello, I'm Alice
2019/10/01 13:04:59 broadcast: hello, I'm Bob
2019/10/01 13:05:00 broadcast: hello, I'm Alice
2019/10/01 13:05:00 broadcast: hello, I'm Bob

2人分のチャットがサーバーに届き、そこから全員に送られています。ブロードキャスト成功です! :smile:

並列にmapに触る危険性

しかし、高速に多くのRPC呼び出しをするとどうなるでしょうか?

// ループでわざとRPCを高速で呼び出しまくってみる
for {
    if _, err := c.Chat(ctx); err != nil {
        log.Fatal(err)
    }
}

これを実行してみると、サーバー側はpanic祭りになってしまいました。

goroutine 1365 [semacquire]:
sync.runtime_SemacquireMutex(0xc0000c8004, 0x5d92e200, 0x1)
        /usr/local/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0000c8000)
        /usr/local/go/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
        /usr/local/go/src/sync/mutex.go:81
log.(*Logger).Output(0xc0000c8000, 0x2, 0xc0009ec7e0, 0x2e, 0x0, 0x0)
        /usr/local/go/src/log/log.go:153 +0x420
log.Printf(0x1519355, 0xc, 0xc000bceae0, 0x1, 0x1)
        /usr/local/go/src/log/log.go:307 +0x80
main.(*server).Chat(0xc0000c2050, 0x15c49c0, 0xc000645720, 0x0, 0x0)
        /Users/castaneai/Documents/grpc-broadcast-example/server/server.go:19 +0x162

...

これは、Goのmapが同時に読み書きすることに対応していないからです。

gRPCのサーバー側の各RPCは独立したgoroutineで動作し、複数のRPCがgoroutineで並列に動作するため、並列でひとつのmapにアクセスすると危険です。

Each RPC handler attached to a registered server will be invoked in its own goroutine. For example, SayHello will be invoked in its own goroutine. The same is true for service handlers for streaming RPCs

https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md#servers

Go 1.6で追加されたsync.Mapを使う手もありますが、sync.Mapを使うと型があいまいになるので、今回はsync.RWMutexで解決します。

単純に読み取る前にRLock()を取り、書き込む前にLock()を取るという方式にしました。

type server struct{
    // 接続中のクライアントリスト
    clients map[string]pb.ChatRoom_ChatServer

    mu sync.RWMutex
}


func (s *server) addClient(uid string, srv pb.ChatRoom_ChatServer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.clients[uid] = srv
}

func (s *server) removeClient(uid string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    delete(s.clients, uid)
}

func (s *server) getClients() []pb.ChatRoom_ChatServer {
    var cs []pb.ChatRoom_ChatServer

    s.mu.RLock()
    defer s.mu.RUnlock()
    for _, c := range s.clients {
        cs = append(cs, c)
    }
    return cs
}

func (s *server) Chat(srv pb.ChatRoom_ChatServer) error {
    uid := uuid.Must(uuid.NewRandom()).String()
    log.Printf("new user: %s", uid)

    // 接続クライアントリストに登録
    s.addClient(uid, srv)
    // 関数を抜けるときはリストから削除
    defer s.removeClient(uid)

    for {
        resp, err := srv.Recv()
        if err != nil {
            log.Printf("recv err: %v", err)
            break
        }
        log.Printf("broadcast: %s", resp.Message)
        for _, ss := range s.getClients() {
            if err := ss.Send(&pb.ChatResponse{Message: resp.Message}); err != nil {
                log.Printf("broadcast err: %v", err)
            }
        }
    }
    return nil
}

これで、高速で大量のクライアントが接続されてもpanicが発生せずに実行できました!

2019/10/01 14:33:54 new user: 860f5df2-0c8f-4d51-9c2c-14eb73c890f3
2019/10/01 14:33:54 new user: a8c2b774-1e8a-4eed-a9f9-02c2eb515691
2019/10/01 14:33:54 new user: cc394bb7-2355-4492-a537-cc237d52ce3c
2019/10/01 14:33:54 new user: 908c9031-d037-444d-bcd4-7f2699dee19a
2019/10/01 14:33:54 new user: 2c3ea4a4-f040-4ad2-9382-a328e789ab2b
2019/10/01 14:33:54 new user: c2dee101-5992-48b6-8e06-4814632aca67
2019/10/01 14:33:54 new user: 8f063d95-f271-440a-939e-16e3bfcb788c
2019/10/01 14:33:54 new user: 6ac4a494-a734-4537-a8dd-5e9c23a9419f
2019/10/01 14:33:54 new user: bf5604c2-4988-4887-845a-f586ff433afc
2019/10/01 14:33:54 new user: 8aa1f781-53ca-46ee-b915-f5beccb0187e

...

他のメッセージも送りたい場合

Chat以外のメッセージも送りたい場合、ちょっと面倒です。
gRPCの仕組み上、新たなBiDirectional Streaming RPCを追加することになりますが、そうなると今までmapに pb.ChatRoom_ChatServer を保持していましたが、他のRPCも増えるとなると、複数RPCのServerStreamを束ねた構造体などに差し替える必要があります。

type User struct {
    Chat pb.ChatRoom_ChatServer
    Move pb.ChatRoom_MoveServer
    ...
}

type server struct{
    // 接続中のクライアントリスト
    clients map[string]*User
    ...
}

そして、broadcastを送るときは、明示的にどこのRPC Streamに送るかどうかを指定する必要がでてきます。

for _, ss := range s.getClients() {
    if err := ss.Chat.Send(&pb.ChatResponse{Message: resp.Message}); err != nil {
        log.Printf("broadcast err: %v", err)
    }
}

また、別々のRPCに分かれてしまうのでHTTP/2のストリーム数が増えてしまうのも心配です(HTTP/2の仕組み上、TCP Connectionは1本で済みそうですが、実際どうなのか検証はしていません)。

まとめ

  • gRPC本体にはブロードキャストのための機能はない
  • サーバー側で「接続中のクライアント一覧」を保持しておいて、素直にループで回して全員に配信する
  • 「接続中のクライアント一覧」は並列でアクセスされる可能性があるため、排他制御をしっかりする

やりたいことは、次の図のように「みんなにメッセージを配信する」だけなのですが、意外と考慮することが多いです。もっと楽にできる方法があれば知りたいですね :worried:

image.png

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした