3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Go / Protobuf / Node.js】gRPC の双方向ストリーミングを利用してチャットを実装する

Last updated at Posted at 2024-04-07

※ この記事は 2022年10月 に作成したものを一部改稿したものです。

gRPC は RPC (Remote Procedure Call) の一種で、Google が自社サービス向けに開発したものをオープンソース化した技術です。
RPC はあるコンピュータ上で動作するプログラムから別の場所にあるプログラムの処理を実行する手法で、1970年代から存在する考え方です。

gRPC では、HTTP/2 の「ストリーム」を利用してサーバ - クライアント間で接続を確立することで、HTTP/1.1 のように1つのリクエストに対して1つのレスポンスを返すだけでなく、複数のリクエスト・レスポンスを並行して処理することができます。

そこで本記事では、gRPC の双方向ストリーミングを利用して CLI 上で動作する簡易的なチャットアプリケーションを実装してみようと思います。

gRPC の特徴

実装に入る前に、APIの構築に最もよく使われる REST との対比を交えて gRPC の特徴について見ていきます。

まず通信方式については、REST は先述の通り HTTP/1.1 が主流で単一のリクエスト・レスポンスが対になる方式のみであるのに対し、gRPC は HTTP/2 を使用しており、以下のように単一とストリーミングを組み合わせて4種類の双方向通信が可能です。

クライアント サーバ
Unary RPC 単一 単一
Server streaming RPC 単一 ストリーミング
Client streaming RPC ストリーミング 単一
Bidirectional streaming RPC ストリーミング ストリーミング

双方向通信と聞くと WebSocket を思い浮かべる方も多いと思いますが、WebSocket は専用のサーバを別途用意する必要があり少し手間がかかります。

通信でやり取りされるメッセージの形式については、REST では JSON や XML など人間が読み書きしやすいテキスト形式が一般的ですが、gRPCでは Protocol Buffers (Protobuf) という Google が開発したインターフェース定義言語 (IDL) を用いてバイナリ形式でやり取りするため、省リソースかつ高速です。

コード生成については、REST では Swagger などのサードパーティ製ツールを使用して自動でAPIドキュメントやコードを作成する方法がありますが、gRPC では protoc という Protobuf からコードを自動生成するためのコンパイラが公式で提供されています。

APIを実装するための所要時間については、gRPC は REST に比べて時間がかかる傾向があります。
REST はAPIアーキテクチャの主流であり、簡単に実装するためのフレームワークが多数存在するため、短い時間で実装が可能です。

以下の表に REST と gRPC の比較をまとめています。

REST gRPC
通信プロトコル HTTP/1.1 HTTP/2
通信方式 単一 単一 / ストリーミング
メッセージ形式 JSON, XML など Protocol Buffers (Protobuf)
コード生成 サードパーティ製ツール protoc コンパイラ
実装時間 短い 長い

プロトコル定義ファイルの作成

ここからはアプリケーションの実装に入っていきます。
ディレクトリ構成は以下のようにします。

.
├── go
│   ├── client
│   └── server
├── node
│   └── client
└── protobuf

まずは Protobuf でプロトコル定義ファイルを作成します。
作成したファイル (protobuf/command_chat.proto) は以下のようになります。

command_chat.proto
syntax = "proto3";

option go_package = "github.com/CRaLFa/comchat";

package comchat;

message ChatMessage {
    string author = 1;
    string body = 2;
}

service CommandChat {
    rpc Chat(stream ChatMessage) returns (stream ChatMessage) {}
}

まず、1行目の syntax で使用する Protobuf のバージョンを指定します。
Google の公式ドキュメントで最新バージョンである proto3 の使用が推奨されているため、それに従っています。

go_packagepackage は Go のパッケージ名に関わるオプションです。
パッケージ名は comchat としています。(某ラジオ番組とは関係ありません)

message でデータのやり取りに用いるメッセージの型を定義します。
文字列型の authorbody の2つのフィールドを持つ ChatMessage 型を定義しています。
各フィールドには一意な番号を割り当てる必要があり、正の整数を順番に割り当てています。

service で RPC のサービスのインターフェースを定義します。
今回は双方向ストリーミングを利用するので、ChatMessage 型のストリームを受け取り、ChatMessage 型のストリームを返却する Chat メソッドを定義しています。

Go によるサーバおよびクライアントの実装

続いてサーバとクライアントの実装に入っていきますが、今回は Google が開発した言語である Go を使用します。

まずは前項で作成したプロトコル定義ファイルを元にサーバとクライアントで使用するコードを自動生成します。
以下のコマンドで protoc コンパイラと Go 用のプラグインをインストールします。

$ sudo apt install protobuf-compiler # Ubuntu の場合
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

インストールが完了したら、protobuf ディレクトリに移動し以下のコマンドを実行します。

$ protoc --go_out='../go/comchat' --go_opt='paths=source_relative' \
  --go-grpc_out='../go/comchat' --go-grpc_opt='paths=source_relative' \
  command_chat.proto

成功すると、go/comchat ディレクトリに Go ファイルが2つ生成されます。

次に、go ディレクトリに移動し Go Mudules とよばれるパッケージ管理システムを利用して必要なパッケージをインストールします。

$ go mod init CRaLFa/comchat
$ go mod edit -replace github.com/CRaLFa=../go
$ go mod tidy

プロトコル定義ファイルで、自動生成する Go ファイルのパッケージ名を github.com/CRaLFa/comchat としましたが、実際はローカル (go/comchat ディレクトリ) のファイルを参照するため replace の設定を行っています。

サーバの実装

これで準備が完了したので、まずはサーバ (server/server.go) を実装していきます。
自動生成されたコードの中にサーバのインターフェースがあるので、構造体に埋め込んで実装します。

type commandChatServer struct {
	pb.UnimplementedCommandChatServer

	mu       sync.Mutex
	clients  map[string]pb.CommandChat_ChatServer
	msgQueue []*pb.ChatMessage
}

func (s *commandChatServer) Chat(stream pb.CommandChat_ChatServer) error {
	errCh := make(chan error)

	go s.receive(stream, errCh)
	go s.send(stream, errCh)

	return <-errCh
}

commandChatServer 構造体に紐付けた Chat メソッドが、クライアントから RPC で呼び出される処理です。
Chat メソッドの中では、クライアントから送信されたチャットメッセージを受け取る receive メソッドとクライアントにチャットメッセージを配信する send メソッドを goroutine (並行処理のための軽量なスレッド) を用いて呼び出しています。
また、各メソッド内で発生したエラーを受信するためにチャネルを使用しています。

receive メソッドは以下のようになっています。

func (s *commandChatServer) receive(stream pb.CommandChat_ChatServer, errCh chan error) {
	for {
		msg, err := stream.Recv()
		if err == io.EOF {
			continue
		}
		if err != nil {
			log.Printf("Failed to receive message: %v", err)
			errCh <- err
			continue
		}
		if msg.Body == "LOGGED_IN" || msg.Body == "LOGGED_OUT" {
			name := msg.Author
			msg.Author = "SYSTEM"

			var format string
			s.mu.Lock()
			if msg.Body == "LOGGED_IN" {
				format = "%s has entered."
				s.clients[name] = stream
			} else {
				format = "%s has exited."
				delete(s.clients, name)
			}
			s.mu.Unlock()
			msg.Body = fmt.Sprintf(format, name)
		} else {
			log.Printf("Received message: {%v}", msg)
		}
		s.mu.Lock()
		s.msgQueue = append(s.msgQueue, msg)
		s.mu.Unlock()
	}
}

無限ループで stream.Recv() を呼び出し、メッセージをキューに追加しています。
gRPC にはブロードキャストの仕組みはないため、メッセージを全クライアントに配信するために各クライアントのストリームをマップに格納しています。

キュー (スライス) やマップが複数のスレッドから同時に変更されるのを避けるため、アクセス時に sync.Mutex を使用してロックをかけています。

send メソッドは以下のようになっています。

func (s *commandChatServer) send(stream pb.CommandChat_ChatServer, errCh chan error) {
	for {
		time.Sleep(100 * time.Millisecond)

		s.mu.Lock()
		msgLen := len(s.msgQueue)
		if msgLen == 0 {
			s.mu.Unlock()
			continue
		}
		mq := make([]*pb.ChatMessage, msgLen)
		copy(mq, s.msgQueue)
		s.mu.Unlock()

		for _, msg := range mq {
			for _, cs := range s.clients {
				if err := cs.Send(msg); err != nil {
					log.Printf("Failed to send message: %v", err)
					errCh <- err
					continue
				}
			}
			log.Printf("Sent message: {%v}", msg)
		}

		s.mu.Lock()
		s.msgQueue = []*pb.ChatMessage{}
		s.mu.Unlock()
	}
}

こちらも無限ループを使用しており、キューに未配信のメッセージがあれば全て各クライアントに送信しキューを空にします。

後は以下のように main 関数でサーバを起動してやれば完成です。

func newServer() *commandChatServer {
	return &commandChatServer{
		clients:  make(map[string]pb.CommandChat_ChatServer),
		msgQueue: []*pb.ChatMessage{},
	}
}

func main() {
	address := fmt.Sprintf("localhost:%d", *port)
	listener, err := net.Listen("tcp", address)
	if err != nil {
		log.Printf("failed to listen: %v", err)
		return
	}
	defer listener.Close()
	log.Printf("Server running at %s", address)

	grpcServer := grpc.NewServer()
	pb.RegisterCommandChatServer(grpcServer, newServer())
	grpcServer.Serve(listener)
}

クライアントの実装

続いてクライアント (client/client.go) を実装します。
大まかな構造はサーバとあまり変わりません。
Chat メソッドを RPC で呼び出してストリームを取得し、receive メソッドと send メソッドを goroutine で呼び出します。

type commandChatClient struct {
	pb.CommandChatClient

	user string
}

func (c *commandChatClient) runChat() {
	stream, err := c.Chat(context.Background())
	if err != nil {
		log.Printf("Failed to chat: %v", err)
		return
	}
	defer stream.CloseSend()

	waitCh := make(chan bool)

	go c.receive(stream, waitCh)
	go c.send(stream, waitCh)
	c.sendMessage(stream, "LOGGED_IN")

	<-waitCh
}

各メソッドの内容は以下のようになっています。

func (c *commandChatClient) receive(stream pb.CommandChat_ChatClient, ch chan bool) {
	for {
		msg, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Printf("Failed to receive message: %v", err)
			continue
		}
		log.Printf("[%s] : %s\n", msg.Author, msg.Body)
	}
	ch <- true
}

func (c *commandChatClient) send(stream pb.CommandChat_ChatClient, ch chan bool) {
	reader := bufio.NewReader(os.Stdin)
	for {
		text, err := reader.ReadString('\n')
		if err != nil {
			log.Printf("Failed to read: %v", err)
			continue
		}
		trimmed := strings.TrimSpace(text)
		if len(trimmed) == 0 {
			continue
		}
		if trimmed == "exit" {
			c.sendMessage(stream, "LOGGED_OUT")
			break
		}
		c.sendMessage(stream, trimmed)
	}
	ch <- true
}

func (c *commandChatClient) sendMessage(stream pb.CommandChat_ChatClient, body string) {
	msg := &pb.ChatMessage{
		Author: c.user,
		Body:   body,
	}
	if err := stream.Send(msg); err != nil {
		log.Printf("Failed to send message: %v", err)
	}
}

receive メソッドでは受信したメッセージを標準出力に出力し、チャット画面に表示します。
send メソッドでは bufio.Reader を使用して標準入力に入力された文字列を読み込みサーバに送信します。

後は main 関数でクライアントを起動すれば完成です。
起動前にユーザに名前を入力してもらいクライアントに設定しています。

func getUserName() string {
	reader := bufio.NewReader(os.Stdin)
	fmt.Printf("Enter your name: ")
	name, err := reader.ReadString('\n')
	if err != nil {
		log.Printf("Failed to read: %v", err)
		return "(anonymous)"
	}
	trimmed := strings.TrimSpace(name)
	if len(trimmed) == 0 {
		return "(anonymous)"
	}
	return trimmed
}

func newClient(cc grpc.ClientConnInterface, userName string) *commandChatClient {
	return &commandChatClient{
		CommandChatClient: pb.NewCommandChatClient(cc),
		user:              userName,
	}
}

func main() {
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	}
	conn, err := grpc.Dial(*serverAddr, opts...)
	if err != nil {
		log.Printf("Failed to dial: %v", err)
		return
	}
	defer conn.Close()

	client := newClient(conn, getUserName())
	client.runChat()
}

以上でサーバとクライアントの実装が完了したので、起動してみます。
go run server/server.go でサーバを起動し、別ウィンドウで go run client/client.go でクライアントを起動します。
以下はクライアントを2つ起動してチャットをする様子です。(画面左側がサーバ、右側がクライアントです。)

comchat_go.gif

Node.js によるクライアントの実装

gRPC では Go 以外にも以下の言語が公式にサポートされています。

  • C# / .NET
  • C++
  • Dart
  • Java
  • Kotlin
  • Node
  • Objective-C
  • PHP
  • Python
  • Ruby

Go しか触れないのはもったいないので、Node.js でもクライアントを実装してみます。

Node.js で Protobuf を利用するには、プロトコル定義ファイルを動的に読み込んで利用する方法と、Go と同様に JavaScript ファイルを自動生成して利用する方法がありますが、今回は補完の利く後者の方法を採用します。
node ディレクトリに移動し、npm で必要なパッケージをインストールします。

$ npm init
$ npm i @grpc/grpc-js
$ npm i -D grpc-tools ts-protoc-gen typescript

インストールが完了したら、以下のコマンドを実行します。

$ npx grpc_tools_node_protoc \
  --plugin="protoc-gen-ts=$(npm bin)/protoc-gen-ts" \
  --js_out='import_style=commonjs,binary:./generated' \
  --ts_out='service=grpc-node,mode=grpc-js:./generated' \
  --grpc_out='grpc_js:./generated' \
  -I ../protobuf/ \
  ../protobuf/command_chat.proto

成功すると、node/generated ディレクトリに .js ファイルが2つ、.d.ts ファイルが2つ生成されます。

これで準備が完了したので、クライアント (client/client.ts) を実装していきます。
自動生成されたファイルから CommandChatClient, ChatMessage をインポートし、CommandChatClient を継承したクラスを作成します。

class CustomCommandChatClient extends CommandChatClient {
    user: string;

    constructor(userName: string) {
        super(SERVER_ADDR, credentials.createInsecure());
        this.user = userName;
    }

    async runChat() {
        const stream = this.chat();
        this.receive(stream);
        await this.sendMessage(stream, 'LOGGED_IN');
        await this.send(stream);
    }

    receive(stream: ClientDuplexStream<ChatMessage, ChatMessage>) {
        stream.on('data', (msg: ChatMessage) => {
            console.log('%s [%s] : %s', getDateTime(), msg.getAuthor(), msg.getBody());
        });
    }

    async send(stream: ClientDuplexStream<ChatMessage, ChatMessage>) {
        const rl = readline.createInterface({ input, output });

        for await (const line of rl) {
            const trimmed = line.trim();
            if (!trimmed) {
                continue;
            }
            if (trimmed === 'exit') {
                await this.sendMessage(stream, 'LOGGED_OUT');
                break;
            }
            await this.sendMessage(stream, trimmed);
        }

        rl.close();
    }

    async sendMessage(stream: ClientDuplexStream<ChatMessage, ChatMessage>, body: string) {
        await new Promise<void>((resolve, reject) => {
            const msg = new ChatMessage();
            msg.setAuthor(this.user);
            msg.setBody(body);

            stream.write(msg, (err: any) => {
                if (err) {
                    reject(err);
                } else {
                    resolve();
                }
            });
        });
    }
}

Go のクライアントと同様に chat メソッドを RPC で呼び出してストリームを取得し、receive メソッドと send メソッドを呼び出します。
send メソッドでも同様に繰り返し標準入力を読み込みますが、readline.Interface は AsyncIterator を実装しているので、for await...of 文でループさせることができます。
sendMessage メソッドのストリームへの書き込みは実行結果がコールバックに渡されるインターフェースなので、Promise を使用して完了を待つようにしています。

後は以下のように main 関数でユーザ名を取得しクライアントを起動すれば完成です。

const getUserName = async () => {
    const rl = readline.createInterface({ input, output });
    const name = await rl.question('Enter your name: ');
    rl.close();

    const trimmed = name.trim();
    return trimmed || '(anonymous)';
};

const main = async () => {
    const client = new CustomCommandChatClient(await getUserName());
    await client.runChat();

    client.close();
    process.exit(0);
};

main();

JavaScript にトランスパイルしチャットクライアントを起動します。

$ npx tsc
$ node dist/client.js

以下は Go のクライアントと Node.js のクライアントでチャットをする様子です。
(画面左下がサーバ、左上が Node.js のクライアント、右側2つが Go のクライアントです。)

comchat_go_node.gif

作成したソースコードは以下で公開しています。

終わりに

本記事では、gRPC の双方向ストリーミングを利用して CLI 上で動作する簡易的なチャットアプリケーションを実装しました。
ストリームを利用した効率的でリアルタイム性の高い通信はもちろん、APIのスキーマをコードで定義することができ、サーバもクライアントも同じ定義ファイルを元に実装用のコードを自動生成できるというのは大きな魅力だと感じました。

CLI でチャットを実現できたので次は Web アプリケーションとして実装して多くの人に使ってもらえるようにしたいところですが、ブラウザが HTTP/2, gRPC の仕様に十分に対応していないため、Envoy プロキシを使用するなどもう一手間必要です。
現状は、外部のクライアントからのリクエストを受け付けるような用途よりも、Google の当初の開発目的でもあるシステム内部のマイクロサービス間の通信等の用途により適していると言えそうです。

Go については本格的に書いたのは今回が初めてで、最初は慣れない文法に少し戸惑いもありましたが、言語仕様がシンプルで並行処理も簡単に記述することができ、今後も使っていきたいと感じました。

参考文献

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?