概要
前回の記事 → なるべく詳しく調べるgRPC入門👨💻
gRPCについて学びましたが、何かしら手を動かしてアウトプットしたい気持ちになってきました。
諸々調べると、gRPCのストリーミングな仕組みを活かしてチャットを作っている方が何人かいらっしゃったので、自分も作ることにしました👨💻
実装にあたり、下記の技術を選定しました。
フロントエンド - React(TypeScript)
バックエンド - Go
プロキシ - Envoy
ブラウザ上で動くReactとgRPCサーバ(Go)がやりとりすることでチャットを構成します。
ブラウザから直接gRPCを喋ることはできないので、gRPC-Webの仕組みに乗ってEnvoyプロキシ越しにバックエンドと通信します。
成果物
実装したチャットのgifはコチラです。
コードはこちらに置いています。
ディレクトリとしてはclient / server / protobufの三つに分かれています。
clientにフロントエンド、serverにgRPCサーバ、protobufにProtocol Buffersのprotoファイルを格納しています。
.
├── Dockerfile.client
├── Dockerfile.envoy
├── Dockerfile.firestore
├── README.md
├── client
├── docker-compose.yml
├── envoy.yaml
├── firebase.json
├── protobuf
└── server
コンテナで固めているのでdocker composeで立ち上げることができます。
❯ docker compose up
version: "3.8"
services:
# フロントエンドが通信するgRPC-Web対応Envoyプロキシを起動
envoy:
build:
context: .
dockerfile: ./Dockerfile.envoy
# Envoyの管理GUIにアクセスするための9901番とプロキシを動かす9090番を解放
ports:
- 9090:9090
- 9901:9901
# チャットメッセージを保存するためのDBとしてFirestoreのエミュレータを起動
db:
build:
context: .
dockerfile: ./Dockerfile.firestore
# Firebase管理GUIにアクセスするための8000番とFirestoreを立ち上げる4000番を解放
ports:
- 8000:8000
- 4000:4000
volumes:
- .:/app
# gRPCサーバ(Go)を起動
server:
build:
context: .
target: dev
dockerfile: ./server/Dockerfile
tty: true
expose:
- 8080
ports:
- 8080:8080
environment:
PROJECT_ID: foo
FIRESTORE_EMULATOR_HOST: host.docker.internal:8000
command: ["air"]
volumes:
- ./server:/go/src/app
# フロントエンド(React)を配信する開発サーバを起動
client:
build:
context: .
dockerfile: ./Dockerfile.client
expose:
- 3000
ports:
- 3000:3000
command: ["npm", "run", "start"]
docker compose後、ブラウザからlocalhost:3000
にアクセスするとチャットをスタートできます🙆♂️
開いたふたつのウィンドウでlocalhost:3000
にアクセスして、それぞれのウィンドウからメッセージを打ち合えばチャットっぽい動作を確認できるかと思います。
動作の大まかな流れは下記のようなイメージです。
1. フロントエンド: gRPCサーバのServer streamingを受け入れるように構成
2. フロントエンド: ブラウザからメッセージを送信
3. バックエンド : gRPCサーバにメッセージが届く
4. バックエンド : gRPCサーバがメッセージの新着を確認し、Server Streamingによってクライアント全員にメッセージを送信
5. フロントエンド: 新着メッセージを受け取り表示
チャットはインタラクティブなシステムなので双方向通信(Bidirectional streaming RPC)を使うのがスジに感じますが、gRPC-WebはBidirectional streaming RPCに対応していません。
今後の対応ロードマップに期待しつつ、今回はServer streaming RPCでサーバ起点の送信を実現しています。
以降、Protocol Buffersとフロントエンド、バックエンドの実装についてそれぞれ解説していきます。
Protocol Buffersの実装
定義ファイル(.proto)
チャットメッセージを送るためのCreateMessage
とチャットメッセージを購読するためのGetMessageStream
を定義します。
CreateMessage
は1リクエスト / 1レスポンス方式なのでUnary RPCで、GetMessageStream
は1つのリクエストに対してサーバ側から複数のレスポンスを返却する通信方式なのでServer streaming RPCで定義します。
Unary RPCもServer streaming RPCもほとんど同じ定義に見えるかも知れませんが、後者については戻り値の頭にstreamが付いています。
やりとりするメッセージの型はMessage
として定義しました。
定義した型は入れ子にできるので、各RPCのリクエストとレスポンス内にMessage
を埋め込んでいます。
// protoのバージョン
syntax = "proto3";
// Googleが定義してパッケージとして公開した便利型を利用
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
// コードが自動生成されるディレクトリ指定
option go_package = "./";
// 名前衝突を避けるためにパッケージ名を指定
package chat.v1;
// RPCをまとめて一括りにしたものをサービスとして定義
service ChatService {
rpc GetMessageStream (google.protobuf.Empty) returns (stream GetMessageStreamResponse);
rpc CreateMessage (CreateMessageRequest) returns (CreateMessageResponse);
}
message Message {
string from = 1; // 誰からのメッセージか
string message_content = 2; // メッセージ内容
google.protobuf.Timestamp created_at = 3; // 作成日時
}
message GetMessageStreamResponse {
Message message = 1;
}
message CreateMessageRequest {
Message message = 1;
}
message CreateMessageResponse {
string result = 1;
}
protoファイルの仕様は直感的なので、理解しやすいかと思います。
より詳細な情報は公式ドキュメントに記載があります。
Protocol Buffersの命名規則
Google Cloudが設計規則を公開しているので、これに従うのが良いでしょう。
今回もこちらの規則に従って命名しました。
実装する際、とりわけ参考にした部分をピックアップして引用します。
パッケージ名
API .proto ファイルで宣言されたパッケージ名は、プロダクト名やサービス名と一貫したものにすることが推奨されます。
サービス名
可能性は低いものの、インターフェース名が API 内の別の名前と競合する場合は、曖昧さを回避するために、接尾辞(Api や Service など)を使用することが推奨されます。
メソッド名
メソッド名は、大文字のキャメルケースで、VerbNoun の命名規則に従うことが推奨されます。
リクエスト / レスポンス名
RPC メソッドのリクエスト メッセージとレスポンス メッセージには、メソッド名にちなんだ名前を付けて、その後にそれぞれ接尾辞 Request や Response を続けることが推奨されます。
フィールド名
.proto ファイルのフィールド定義では、lower_case_underscore_separated_names を使用する必要があります。これらの名前は、プログラミング言語ごとに生成されたコードのネイティブ命名規則にマッピングされます。
Protocol Buffersのコンパイラは賢いので、protoファイルから各言語のソースコードを生成する際には、それぞれの命名規則に準拠したEffectiveなソースコードが出力されます。
例えばフィールド名をスネークケースにしても、生成したGoコードはキャメルケースになっています。
コードを生成する
protoファイルからコードを生成するためにはProtocol Buffersのコンパイラ(protoc)が必要です。
様々な方法(ex. brew)で導入できますが、ローカルに入れたくなかったのでコンテナを使います。
アメリカの企業(namely)がメンテしているイメージが有ったので利用しました。
README通りに利用すると指定したディレクトリに対象言語のソースコードが出力されます。
# protocが入ったイメージをpullする
❯ docker pull namely/protoc-all
# バックエンドのコードを生成(Go)
# -v: defs(definitions)ディレクトリにカレントディレクトリをマウント
# -f: 対象のprotoファイルを指定
# -o: アウトプットフォルダを指定
# -l: 生成する言語を指定
❯ docker run -v $PWD:/defs namely/protoc-all -f protobuf/chat.proto -o ./server/pb -l go
# フロントエンドのコードを生成(TypeScript)
❯ docker run -v $PWD:/defs namely/protoc-all -f protobuf/chat.proto -o ./client/src/pb -l web
フロントエンドの実装
概要
主に下記ふたつの記事を参考に実装を進めました。
環境構築
create-react-appにて環境構築を行いました。
❯ npx create-react-app client --template typescript
当初はViteで環境構築を行ったのですが、Protocol Buffersから吐き出したソースコードを読み込めない等、gRPC-Webと相性があまり良くないように感じました(Issueに記載の通りViteでも対応することは可能ですが、面倒くさがってしまいました👶)。
実装
Protocol Buffersの項で生成したソースコードをもとに実装を進めればOKなので、割とスラスラと進めることができました。
gRPC通信の関心事はCustom HooksであるuseMessages
に一任しました。
まずはメッセージの永続化を行うaddMessage
です。
const addMessage = useCallback(
(newMessage: Message) => {
const req = new CreateMessageRequest();
req.setMessage(newMessage);
client.client.createMessage(req, null, (res) => {
console.log(res);
});
},
[client]
);
次にServer streamingを監視しつつ最新メッセージを受け取るコードを実装します。
ストリームへのコールバック登録は一度だけ行いたいので、useDidUpdateEffect
で囲んでいます。
ストリーム越しに新たなメッセージが届き次第、messages
の状態が更新されるので画面が再描画されチャットが更新される仕組みです。
// 中略
useDidUpdateEffect(() => {
const stream = client.client.getMessageStream(new Empty());
stream.on("data", (m) => {
const newMessage = m.getMessage();
if (!newMessage) {
return;
}
setMessages((current) => {
return [...(current ?? []), newMessage];
});
});
}, [client]);
// 中略
const useDidUpdateEffect = (fn: EffectCallback, deps: DependencyList) => {
const didMountRef = useRef(false);
useEffect(() => {
if (!didMountRef.current) {
didMountRef.current = true;
} else {
fn();
}
}, deps);
};
後は通常通り利用するコンポーネントと紐付ければOKです。
const App = () => {
// 中略
const [messages, addMessage] = useMessages(client);
ブラウザは直接gRPCを話せないので、gRPCの接続ホストはEnvoyを指定します。
docker-compose.yml
にてEnvoyを9090番で立ち上げるので、http://localhost:9090
を指すように指定します。
.envで設定した環境変数にアクセスできるcreate-react-appの仕組みを利用しました。
PORT=3000 # npm run devで立ち上がるポートを指定
REACT_APP_DEV_ENVOY_HOST="http://localhost:9090" # Envoyのホストを指定
export const config = (): Config => {
if (process.env.NODE_ENV === "development") {
// 中略
return {
envoyHost: process.env.REACT_APP_DEV_ENVOY_HOST,
};
// 中略
};
const App = () => {
const [client] = useState<Client>(() => {
const conf = config();
return {
client: new ChatServiceClient(conf.envoyHost),
};
});
// 初期化したgRPCクライアントをCustom Hooksに注入
const [messages, addMessage] = useMessages(client);
バックエンドの実装
概要
フロントエンドと同様にProtocol Buffersが吐き出したソースコードをもとに実装していきます。
フロントエンドからの通信を受けるEnvoyと、そのEnvoyと通信するgRPCサーバ(Go)のふたつで構成します。
実装
まずはEnvoyの定義を書いていきます。
ほぼ公式のyamlファイルそのままを引用して作成しました(コンテナを用いるためホスト等を一部変更しただけです)。
実装したenvoy.yaml
をDockerfile.envoy
で読み込みコンテナで固めて立ち上げます。
# 管理GUI
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
# Envoyプロキシ
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 9090 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: auto
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: grpc_service
timeout: 0s
max_stream_duration:
grpc_timeout_header_max: 0s
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, PUT, DELETE, POST, OPTIONS
allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
max_age: "1728000"
expose_headers: custom-header-1,grpc-status,grpc-message
http_filters:
- name: envoy.filters.http.grpc_web
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
- name: envoy.filters.http.cors
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
# プロキシのバックエンド(今回の場合であればGoのgRPCサーバ)
clusters:
- name: grpc_service
connect_timeout: 0.25s
type: logical_dns
http2_protocol_options: {}
lb_policy: round_robin
load_assignment:
cluster_name: cluster_0
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal
port_value: 8080
FROM envoyproxy/envoy:v1.21-latest
COPY ./envoy.yaml /etc/envoy/envoy.yaml
CMD /usr/local/bin/envoy -c /etc/envoy/envoy.yaml
EXPOSE 9090
次にGoによるgRPCサーバの実装です。
基本的な方針は下記の記事を参考にしています。
まずは起動部分ですが、gRPCサーバにregister()
で依存関係を注入してからサーバが立ち上がるように実装しました。
func main() {
ctx := context.Background()
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
s := grpc.NewServer()
register(ctx, s)
// サーバーリフレクション
// https://zenn.dev/hsaki/books/golang-grpc-starting/viewer/server#%5B%E3%82%B3%E3%83%A9%E3%83%A0%5D%E3%82%B5%E3%83%BC%E3%83%90%E3%83%BC%E3%83%AA%E3%83%95%E3%83%AC%E3%82%AF%E3%82%B7%E3%83%A7%E3%83%B3%E3%81%A8%E3%81%AF%EF%BC%9F
reflection.Register(s)
go func() {
log.Printf("start gRPC server, port: %d", port)
s.Serve(listener)
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("stopping gRPC server...")
s.GracefulStop()
}
func register(ctx context.Context, s *grpc.Server) {
// c := infra.NewFirestoreClient(ctx)
// repo := infra.NewMessageRepositoryImpl(c)
repo := infra.NewLocalMessageRepositoryImpl()
createMessageService := usecase.NewCreateMessageService(repo)
getMessageService := usecase.NewGetMessageStreamService(repo)
pb.RegisterChatServiceServer(s, NewServer(*createMessageService, *getMessageService))
}
Protocol Buffersの項でCreateMessage
とGetMessageStream
のふたつのメソッドを定義したので、それぞれに対応するハンドラーおよびユースケースを実装していきます。
まずはCreateMessage
に対応する部分です。
ハンドラーではgRPCのリクエストを受け取り、ユースケースを呼び出します。
ユースケースからリポジトリを呼び出し、メッセージの永続化を行います。
リポジトリはかなりナイーブな実装を行っており、メモリ上のスライスでメッセージを管理しています。
func (s *Server) CreateMessage(ctx context.Context, req *pb.CreateMessageRequest) (*pb.CreateMessageResponse, error) {
input := usecase.NewCreateMessageServiceInput(req.Message.From, req.Message.MessageContent, req.Message.CreatedAt.AsTime())
if err := s.CreateMessageService.Handle(ctx, input); err != nil {
return &pb.CreateMessageResponse{
Result: err.Error(),
}, err
}
return &pb.CreateMessageResponse{
Result: "ok",
}, nil
}
func (g *CreateMessageService) Handle(ctx context.Context, input *CreateMessageServiceInput) error {
log.Printf("create message: %#v\n", input.message)
if err := g.messageRepository.Add(ctx, input.message); err != nil {
return fmt.Errorf("failed to CreateMessageService.Handle: %w", err)
}
return nil
}
func (m *LocalMessageRepositoryImpl) Add(ctx context.Context, message *domain.Message) error {
message.CreatedAt = time.Now()
localMessages = append(localMessages, *message)
return nil
}
次にGetMessageStream
に対応する部分です。
ハンドラー内では無限ループのfor文で待機しつつ、新着メッセージをチャネル経由で受け取り次第server.Send()
でクライアントに配信できるようにしています。
リポジトリのListen()
内で、スライスのサイズを確認し、変更があり次第チャネルにメッセージを詰めます。
func (s *Server) GetMessageStream(req *emptypb.Empty, server pb.ChatService_GetMessageStreamServer) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := make(chan domain.Message)
// 後続の処理を進めるためにもユースケースはgoroutineで起動
go func() {
if err := s.GetMessageStreamService.Handle(ctx, stream); err != nil {
log.Println(err)
}
}()
for {
// チャネルの更新を待つ
v := <-stream
createdAt := timestamppb.New(v.CreatedAt)
if err := server.Send(&pb.GetMessageStreamResponse{
Message: &pb.Message{
From: v.From,
MessageContent: v.MessageContent,
CreatedAt: createdAt,
},
}); err != nil {
return err
}
}
}
func (g *GetMessageStreamService) Handle(ctx context.Context, stream chan<- domain.Message) error {
defer close(stream)
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
if err := g.messageRepository.Listen(ctx, stream); err != nil {
return err
}
return nil
})
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to GetMessageStreamService.Handle: %s", err)
}
return nil
}
func (m *LocalMessageRepositoryImpl) Listen(ctx context.Context, stream chan<- domain.Message) error {
// チャットに途中参加した場合今まで全てのメッセージを受け取る
for _, message := range localMessages {
stream <- message
}
currentLocalMessagesLen := len(localMessages)
for {
select {
case <-ctx.Done():
return nil
default:
if currentLocalMessagesLen < len(localMessages) {
log.Printf("localMessages size: %d\n", len(localMessages))
// スライスの変更があり次第追加された最新のメッセージをチャネルに詰める
stream <- localMessages[len(localMessages)-1]
currentLocalMessagesLen = len(localMessages)
}
}
}
}
Firestoreエミュレータに永続化を行うリポジトリ(MessageRepositoryImpl
)も実装しましたが、より挙動が安定しているLocalMessageRepositoryImpl
(スライス上でメッセージを管理)を用いて説明を行いました。
手元で検証
作ったgRPCサーバの疎通確認ができると便利です。
gRPCurlを利用すればcurl感覚で動作確認を行うことができます。
# インストール
❯ brew install grpcurl
# gRPCサーバの立ち上げ
❯ docker compose up server
# サービスの一覧を確認
❯ grpcurl -plaintext localhost:8080 list
# サービスのメソッド一覧を確認
❯ grpcurl -plaintext localhost:8080 list chat.v1.ChatService
# GetMessageStreamを叩く
❯ grpcurl -plaintext localhost:8080 chat.v1.ChatService.GetMessageStream
# CreateMessageを叩く
❯ grpcurl -plaintext -d '{"message": {"name": "test", "message": "waiwai", "created_at": "2020-05-22T20:32:05Z"}}' localhost:8080 chat.v1.ChatService.CreateMessage
gRPCurl以外にもPostmanで試す方法もあるようです。