LoginSignup
24
20

More than 1 year has passed since last update.

gRPCでチャットを実装する(Go / React)

Last updated at Posted at 2022-08-15

概要

前回の記事 → なるべく詳しく調べるgRPC入門👨‍💻

gRPCについて学びましたが、何かしら手を動かしてアウトプットしたい気持ちになってきました。
諸々調べると、gRPCのストリーミングな仕組みを活かしてチャットを作っている方が何人かいらっしゃったので、自分も作ることにしました👨‍💻

実装にあたり、下記の技術を選定しました。

フロントエンド - React(TypeScript)
バックエンド - Go
プロキシ - Envoy

ブラウザ上で動くReactとgRPCサーバ(Go)がやりとりすることでチャットを構成します。
ブラウザから直接gRPCを喋ることはできないので、gRPC-Webの仕組みに乗ってEnvoyプロキシ越しにバックエンドと通信します。

成果物

実装したチャットのgifはコチラです。

chat2x.gif

コードはこちらに置いています。

ディレクトリとしてはclient / server / protobufの三つに分かれています。
clientにフロントエンド、serverにgRPCサーバ、protobufにProtocol Buffersのprotoファイルを格納しています。

.
├── Dockerfile.client
├── Dockerfile.envoy
├── Dockerfile.firestore
├── README.md
├── client
├── docker-compose.yml
├── envoy.yaml
├── firebase.json
├── protobuf
└── server

コンテナで固めているのでdocker composeで立ち上げることができます。

❯ docker compose up
docker-compose.yml
version: "3.8"
services:
    # フロントエンドが通信するgRPC-Web対応Envoyプロキシを起動
    envoy:
      build:
        context: .
        dockerfile: ./Dockerfile.envoy
      # Envoyの管理GUIにアクセスするための9901番とプロキシを動かす9090番を解放
      ports:
        - 9090:9090
        - 9901:9901
    # チャットメッセージを保存するためのDBとしてFirestoreのエミュレータを起動
    db:
      build:
        context: .
        dockerfile: ./Dockerfile.firestore
      # Firebase管理GUIにアクセスするための8000番とFirestoreを立ち上げる4000番を解放
      ports:
        - 8000:8000
        - 4000:4000
      volumes:
        - .:/app
    # gRPCサーバ(Go)を起動
    server:
      build:
        context: .
        target: dev
        dockerfile: ./server/Dockerfile
      tty: true
      expose:
          - 8080
      ports:
        - 8080:8080
      environment:
        PROJECT_ID: foo
        FIRESTORE_EMULATOR_HOST: host.docker.internal:8000
      command: ["air"]
      volumes:
        - ./server:/go/src/app
    # フロントエンド(React)を配信する開発サーバを起動
    client:
      build:
        context: .
        dockerfile: ./Dockerfile.client
      expose:
          - 3000
      ports:
        - 3000:3000    
      command: ["npm", "run", "start"]

docker compose後、ブラウザからlocalhost:3000にアクセスするとチャットをスタートできます🙆‍♂️
開いたふたつのウィンドウでlocalhost:3000にアクセスして、それぞれのウィンドウからメッセージを打ち合えばチャットっぽい動作を確認できるかと思います。

動作の大まかな流れは下記のようなイメージです。

1. フロントエンド: gRPCサーバのServer streamingを受け入れるように構成
2. フロントエンド: ブラウザからメッセージを送信
3. バックエンド : gRPCサーバにメッセージが届く
4. バックエンド : gRPCサーバがメッセージの新着を確認し、Server Streamingによってクライアント全員にメッセージを送信
5. フロントエンド: 新着メッセージを受け取り表示

チャットはインタラクティブなシステムなので双方向通信(Bidirectional streaming RPC)を使うのがスジに感じますが、gRPC-WebはBidirectional streaming RPCに対応していません
今後の対応ロードマップに期待しつつ、今回はServer streaming RPCでサーバ起点の送信を実現しています。

以降、Protocol Buffersとフロントエンド、バックエンドの実装についてそれぞれ解説していきます。

Protocol Buffersの実装

定義ファイル(.proto)

チャットメッセージを送るためのCreateMessageとチャットメッセージを購読するためのGetMessageStreamを定義します。
CreateMessageは1リクエスト / 1レスポンス方式なのでUnary RPCで、GetMessageStreamは1つのリクエストに対してサーバ側から複数のレスポンスを返却する通信方式なのでServer streaming RPCで定義します。
Unary RPCもServer streaming RPCもほとんど同じ定義に見えるかも知れませんが、後者については戻り値の頭にstreamが付いています。
やりとりするメッセージの型はMessageとして定義しました。
定義した型は入れ子にできるので、各RPCのリクエストとレスポンス内にMessageを埋め込んでいます。

protobuf/chat.proto
// protoのバージョン
syntax = "proto3";

// Googleが定義してパッケージとして公開した便利型を利用
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

// コードが自動生成されるディレクトリ指定
option go_package = "./";

// 名前衝突を避けるためにパッケージ名を指定
package chat.v1;

// RPCをまとめて一括りにしたものをサービスとして定義
service ChatService {
    rpc GetMessageStream (google.protobuf.Empty) returns (stream GetMessageStreamResponse);
    rpc CreateMessage (CreateMessageRequest) returns (CreateMessageResponse);
}

message Message {
    string from = 1; // 誰からのメッセージか
    string message_content = 2; // メッセージ内容
    google.protobuf.Timestamp created_at = 3; // 作成日時
}

message GetMessageStreamResponse {
    Message message = 1;
}

message CreateMessageRequest {
   Message message = 1;
}

message CreateMessageResponse {
    string result = 1;
}

protoファイルの仕様は直感的なので、理解しやすいかと思います。
より詳細な情報は公式ドキュメントに記載があります。

Protocol Buffersの命名規則

Google Cloudが設計規則を公開しているので、これに従うのが良いでしょう。

今回もこちらの規則に従って命名しました。
実装する際、とりわけ参考にした部分をピックアップして引用します。

パッケージ名

API .proto ファイルで宣言されたパッケージ名は、プロダクト名やサービス名と一貫したものにすることが推奨されます。

サービス名

可能性は低いものの、インターフェース名が API 内の別の名前と競合する場合は、曖昧さを回避するために、接尾辞(Api や Service など)を使用することが推奨されます。

メソッド名

メソッド名は、大文字のキャメルケースで、VerbNoun の命名規則に従うことが推奨されます。

リクエスト / レスポンス名

RPC メソッドのリクエスト メッセージとレスポンス メッセージには、メソッド名にちなんだ名前を付けて、その後にそれぞれ接尾辞 Request や Response を続けることが推奨されます。

フィールド名

.proto ファイルのフィールド定義では、lower_case_underscore_separated_names を使用する必要があります。これらの名前は、プログラミング言語ごとに生成されたコードのネイティブ命名規則にマッピングされます。

Protocol Buffersのコンパイラは賢いので、protoファイルから各言語のソースコードを生成する際には、それぞれの命名規則に準拠したEffectiveなソースコードが出力されます。
例えばフィールド名をスネークケースにしても、生成したGoコードはキャメルケースになっています。

コードを生成する

protoファイルからコードを生成するためにはProtocol Buffersのコンパイラ(protoc)が必要です。
様々な方法(ex. brew)で導入できますが、ローカルに入れたくなかったのでコンテナを使います。
アメリカの企業(namely)がメンテしているイメージが有ったので利用しました。

README通りに利用すると指定したディレクトリに対象言語のソースコードが出力されます。

# protocが入ったイメージをpullする
❯ docker pull namely/protoc-all
# バックエンドのコードを生成(Go)
# -v: defs(definitions)ディレクトリにカレントディレクトリをマウント
# -f: 対象のprotoファイルを指定
# -o: アウトプットフォルダを指定
# -l: 生成する言語を指定
❯ docker run -v $PWD:/defs namely/protoc-all -f protobuf/chat.proto -o ./server/pb -l go
# フロントエンドのコードを生成(TypeScript)
❯ docker run -v $PWD:/defs namely/protoc-all -f protobuf/chat.proto -o ./client/src/pb -l web

フロントエンドの実装

概要

主に下記ふたつの記事を参考に実装を進めました。

環境構築

create-react-appにて環境構築を行いました。

❯ npx create-react-app client --template typescript

当初はViteで環境構築を行ったのですが、Protocol Buffersから吐き出したソースコードを読み込めない等、gRPC-Webと相性があまり良くないように感じました(Issueに記載の通りViteでも対応することは可能ですが、面倒くさがってしまいました👶)。

実装

Protocol Buffersの項で生成したソースコードをもとに実装を進めればOKなので、割とスラスラと進めることができました。

gRPC通信の関心事はCustom HooksであるuseMessagesに一任しました。
まずはメッセージの永続化を行うaddMessageです。

client/src/hooks/useMessages.ts
const addMessage = useCallback(
    (newMessage: Message) => {
      const req = new CreateMessageRequest();
      req.setMessage(newMessage);
      client.client.createMessage(req, null, (res) => {
        console.log(res);
      });
    },
    [client]
  );

次にServer streamingを監視しつつ最新メッセージを受け取るコードを実装します。
ストリームへのコールバック登録は一度だけ行いたいので、useDidUpdateEffectで囲んでいます。

ストリーム越しに新たなメッセージが届き次第、messagesの状態が更新されるので画面が再描画されチャットが更新される仕組みです。

client/src/hooks/useMessages.ts
// 中略

useDidUpdateEffect(() => {
    const stream = client.client.getMessageStream(new Empty());
    stream.on("data", (m) => {
      const newMessage = m.getMessage();
      if (!newMessage) {
        return;
      }
      setMessages((current) => {
        return [...(current ?? []), newMessage];
      });
    });
  }, [client]);

// 中略

const useDidUpdateEffect = (fn: EffectCallback, deps: DependencyList) => {
  const didMountRef = useRef(false);
  useEffect(() => {
    if (!didMountRef.current) {
      didMountRef.current = true;
    } else {
      fn();
    }
  }, deps);
};

後は通常通り利用するコンポーネントと紐付ければOKです。

client/src/App.tsx
const App = () => {
// 中略
  const [messages, addMessage] = useMessages(client);

ブラウザは直接gRPCを話せないので、gRPCの接続ホストはEnvoyを指定します。
docker-compose.ymlにてEnvoyを9090番で立ち上げるので、http://localhost:9090を指すように指定します。
.envで設定した環境変数にアクセスできるcreate-react-appの仕組みを利用しました。

client/.env
PORT=3000 # npm run devで立ち上がるポートを指定

REACT_APP_DEV_ENVOY_HOST="http://localhost:9090" # Envoyのホストを指定
client/src/config.ts
export const config = (): Config => {
  if (process.env.NODE_ENV === "development") {
    // 中略
    return {
      envoyHost: process.env.REACT_APP_DEV_ENVOY_HOST,
    };
  // 中略
};
client/src/App.tsx
const App = () => {
  const [client] = useState<Client>(() => {
    const conf = config();
    return {
      client: new ChatServiceClient(conf.envoyHost),
    };
  });
  // 初期化したgRPCクライアントをCustom Hooksに注入
  const [messages, addMessage] = useMessages(client);

パッケージ多すぎ問題

JavaScript(TypeScrpt)に関するgRPCを調べると、ブラウザとnode、ふたつの情報が出てくるため頭が混乱してきます。
基本的には下記方針に従うのが分かりやすいかなと思います。

grpc-webやりたいならgrpc/grpc-web
そうでなければgrpc-tools+grpc_tools_node_protoc_tsが一番無難そう
両対応したいならimprobable-eng/ts-protoc-genも選択肢に上がってきそう

TypeScriptでgRPCしたいがいろいろあってよくわからない

バックエンドの実装

概要

フロントエンドと同様にProtocol Buffersが吐き出したソースコードをもとに実装していきます。
フロントエンドからの通信を受けるEnvoyと、そのEnvoyと通信するgRPCサーバ(Go)のふたつで構成します。

実装

まずはEnvoyの定義を書いていきます。
ほぼ公式のyamlファイルそのままを引用して作成しました(コンテナを用いるためホスト等を一部変更しただけです)。
実装したenvoy.yamlDockerfile.envoyで読み込みコンテナで固めて立ち上げます。

envoy.yaml
# 管理GUI
admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }

# Envoyプロキシ
static_resources:
  listeners:
    - name: listener_0
      address:
        socket_address: { address: 0.0.0.0, port_value: 9090 }
      filter_chains:
        - filters:
          - name: envoy.filters.network.http_connection_manager
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
              codec_type: auto
              stat_prefix: ingress_http
              route_config:
                name: local_route
                virtual_hosts:
                  - name: local_service
                    domains: ["*"]
                    routes:
                      - match: { prefix: "/" }
                        route:
                          cluster: grpc_service
                          timeout: 0s
                          max_stream_duration:
                            grpc_timeout_header_max: 0s
                    cors:
                      allow_origin_string_match:
                        - prefix: "*"
                      allow_methods: GET, PUT, DELETE, POST, OPTIONS
                      allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
                      max_age: "1728000"
                      expose_headers: custom-header-1,grpc-status,grpc-message
              http_filters:
                - name: envoy.filters.http.grpc_web
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
                - name: envoy.filters.http.cors
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
                - name: envoy.filters.http.router
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router

# プロキシのバックエンド(今回の場合であればGoのgRPCサーバ)
clusters:
- name: grpc_service
  connect_timeout: 0.25s
  type: logical_dns
  http2_protocol_options: {}
  lb_policy: round_robin
  load_assignment:
    cluster_name: cluster_0
    endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: host.docker.internal
                port_value: 8080
Dockerfile.envoy
FROM envoyproxy/envoy:v1.21-latest
COPY ./envoy.yaml /etc/envoy/envoy.yaml
CMD /usr/local/bin/envoy -c /etc/envoy/envoy.yaml
EXPOSE 9090

次にGoによるgRPCサーバの実装です。
基本的な方針は下記の記事を参考にしています。

まずは起動部分ですが、gRPCサーバにregister()で依存関係を注入してからサーバが立ち上がるように実装しました。

server/main.go
func main() {
	ctx := context.Background()

	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		panic(err)
	}

	s := grpc.NewServer()

	register(ctx, s)

    // サーバーリフレクション
	// https://zenn.dev/hsaki/books/golang-grpc-starting/viewer/server#%5B%E3%82%B3%E3%83%A9%E3%83%A0%5D%E3%82%B5%E3%83%BC%E3%83%90%E3%83%BC%E3%83%AA%E3%83%95%E3%83%AC%E3%82%AF%E3%82%B7%E3%83%A7%E3%83%B3%E3%81%A8%E3%81%AF%EF%BC%9F
	reflection.Register(s)

	go func() {
		log.Printf("start gRPC server, port: %d", port)
		s.Serve(listener)
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt)
	<-quit
	log.Println("stopping gRPC server...")
	s.GracefulStop()
}

func register(ctx context.Context, s *grpc.Server) {
	// c := infra.NewFirestoreClient(ctx)
	// repo := infra.NewMessageRepositoryImpl(c)
	repo := infra.NewLocalMessageRepositoryImpl()
	createMessageService := usecase.NewCreateMessageService(repo)
	getMessageService := usecase.NewGetMessageStreamService(repo)
	pb.RegisterChatServiceServer(s, NewServer(*createMessageService, *getMessageService))
}

Protocol Buffersの項でCreateMessageGetMessageStreamのふたつのメソッドを定義したので、それぞれに対応するハンドラーおよびユースケースを実装していきます。

まずはCreateMessageに対応する部分です。
ハンドラーではgRPCのリクエストを受け取り、ユースケースを呼び出します。
ユースケースからリポジトリを呼び出し、メッセージの永続化を行います。
リポジトリはかなりナイーブな実装を行っており、メモリ上のスライスでメッセージを管理しています。

server/server.go
func (s *Server) CreateMessage(ctx context.Context, req *pb.CreateMessageRequest) (*pb.CreateMessageResponse, error) {
	input := usecase.NewCreateMessageServiceInput(req.Message.From, req.Message.MessageContent, req.Message.CreatedAt.AsTime())
	if err := s.CreateMessageService.Handle(ctx, input); err != nil {
		return &pb.CreateMessageResponse{
			Result: err.Error(),
		}, err
	}
	return &pb.CreateMessageResponse{
		Result: "ok",
	}, nil
}
server/usecase/create_message_service.go
func (g *CreateMessageService) Handle(ctx context.Context, input *CreateMessageServiceInput) error {
	log.Printf("create message: %#v\n", input.message)
	if err := g.messageRepository.Add(ctx, input.message); err != nil {
		return fmt.Errorf("failed to CreateMessageService.Handle: %w", err)
	}
	return nil
}
server/infra/local_repository_impl.go
func (m *LocalMessageRepositoryImpl) Add(ctx context.Context, message *domain.Message) error {
	message.CreatedAt = time.Now()
	localMessages = append(localMessages, *message)
	return nil
}

次にGetMessageStreamに対応する部分です。
ハンドラー内では無限ループのfor文で待機しつつ、新着メッセージをチャネル経由で受け取り次第server.Send()でクライアントに配信できるようにしています。
リポジトリのListen()内で、スライスのサイズを確認し、変更があり次第チャネルにメッセージを詰めます。

server/server.go
func (s *Server) GetMessageStream(req *emptypb.Empty, server pb.ChatService_GetMessageStreamServer) error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	stream := make(chan domain.Message)

    // 後続の処理を進めるためにもユースケースはgoroutineで起動
	go func() {
		if err := s.GetMessageStreamService.Handle(ctx, stream); err != nil {
			log.Println(err)
		}
	}()

	for {
        // チャネルの更新を待つ
		v := <-stream
		createdAt := timestamppb.New(v.CreatedAt)
		if err := server.Send(&pb.GetMessageStreamResponse{
			Message: &pb.Message{
				From:           v.From,
				MessageContent: v.MessageContent,
				CreatedAt:      createdAt,
			},
		}); err != nil {
			return err
		}
	}
}
server/usecase/get_message_stream_service.go
func (g *GetMessageStreamService) Handle(ctx context.Context, stream chan<- domain.Message) error {
	defer close(stream)
	eg, _ := errgroup.WithContext(ctx)
	eg.Go(func() error {
		if err := g.messageRepository.Listen(ctx, stream); err != nil {
			return err
		}
		return nil
	})
	if err := eg.Wait(); err != nil {
		return fmt.Errorf("failed to GetMessageStreamService.Handle: %s", err)
	}
	return nil
}
server/infra/local_repository_impl.go
func (m *LocalMessageRepositoryImpl) Listen(ctx context.Context, stream chan<- domain.Message) error {
    // チャットに途中参加した場合今まで全てのメッセージを受け取る
	for _, message := range localMessages {
		stream <- message
	}

	currentLocalMessagesLen := len(localMessages)

	for {
		select {
		case <-ctx.Done():
			return nil
		default:
			if currentLocalMessagesLen < len(localMessages) {
				log.Printf("localMessages size: %d\n", len(localMessages))
                // スライスの変更があり次第追加された最新のメッセージをチャネルに詰める
				stream <- localMessages[len(localMessages)-1]
				currentLocalMessagesLen = len(localMessages)
			}
		}
	}
}

Firestoreエミュレータに永続化を行うリポジトリ(MessageRepositoryImpl)も実装しましたが、より挙動が安定しているLocalMessageRepositoryImpl(スライス上でメッセージを管理)を用いて説明を行いました。

手元で検証

作ったgRPCサーバの疎通確認ができると便利です。
gRPCurlを利用すればcurl感覚で動作確認を行うことができます。

# インストール
❯ brew install grpcurl
# gRPCサーバの立ち上げ
❯ docker compose up server
# サービスの一覧を確認
❯ grpcurl -plaintext localhost:8080 list
# サービスのメソッド一覧を確認
❯ grpcurl -plaintext localhost:8080 list chat.v1.ChatService
# GetMessageStreamを叩く
❯ grpcurl -plaintext localhost:8080 chat.v1.ChatService.GetMessageStream
# CreateMessageを叩く
❯ grpcurl -plaintext -d '{"message": {"name": "test", "message": "waiwai", "created_at": "2020-05-22T20:32:05Z"}}' localhost:8080 chat.v1.ChatService.CreateMessage

gRPCurl以外にもPostmanで試す方法もあるようです。

https://blog.postman.com/postman-now-supports-grpc/

24
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
20