はじめに
この記事はユニークビジョン株式会社 Advent Calendar 2018の12日目の記事です。
11日目から続けてgRPCで超簡素なチャットツールを実装してみました。
今回はgRPCのServer-side streaming RPCとClient-side streaming RPCを活用してリアルタイムなテキストのやりとりを実装します。
streamとは
クライアントもしくはサーバが断続的なメッセージの末尾(EOF等)に到達するまで受信もしくは送信できる方式で、今回はリアルタイムなチャットメッセージの送受信に使用します。
準備
基本的に前回のプロジェクトを踏襲しています。
ディレクトリ構成
.
├── client
│ └── main.go
├── hellogrpc
│ └── hellogrpc.proto
└── server
└── main.go
.protoの定義
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.tokikokoko.hellogrpc.server";
option java_outer_classname = "HelloGrpc";
package HelloGrpc;
// The greeting service definition.
service HelloGrpc {
// Sends a greeting
rpc GreetServer (GreetRequest) returns (GreetMessage) {}
// チャットルーム追加
rpc AddRoom (RoomRequest) returns (RoomInfo) {}
// チャットルーム情報取得
rpc GetRoomInfo (RoomRequest) returns (RoomInfo) {}
// チャットルーム一覧取得
rpc GetRooms (Null) returns (RoomList) {}
// Client-side streamingを用いてメッセージを交換する
rpc SendMessage (stream SendRequest) returns (SendResult) {}
// Server-side streamingを用いてメッセージを交換する
rpc GetMessages (MessagesRequest) returns (stream Message) {}
}
// 空のmessage
message Null {
}
message GreetRequest {
string name = 1;
}
message GreetMessage {
string msg = 1;
}
message RoomRequest {
string id = 1;
}
message RoomInfo {
string id = 1;
int32 messageCount = 2;
}
message RoomList {
repeated RoomInfo rooms = 1;
}
message SendRequest {
string id = 1;
string name = 2;
string content = 3;
}
message SendResult {
bool result = 1;
}
message MessagesRequest {
string id = 1;
}
message Message {
string id = 1;
string name = 2;
string content = 3;
}
今回新たに幾つかのメソッドを定義しましたが、クライアントのメッセージ送信のためにClient-side streaming(サーバ側がstreamを受信する)と、クライアントのメッセージ受信のためにServer-side streaming(クライアント側がstreamを受信する)を利用します。streamを使用するためには、
// Client-side streamingを用いてメッセージを交換する
rpc SendMessage (stream SendRequest) returns (SendResult) {}
// Server-side streamingを用いてメッセージを交換する
rpc GetMessages (MessagesRequest) returns (stream Message) {}
引数もしくは返り値のmessage型の前にstream
を付加するだけでstreamを使用できるようになります。
またmessageの定義にrepeated
という初出の定義がありますが、
message RoomList {
repeated RoomInfo rooms = 1;
}
gRPCで配列やリストのように扱うことのできる型です。goでは、
type RoomList struct {
Rooms []*RoomInfo `protobuf:"bytes,1,rep,name=rooms,proto3" json:"rooms,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
というようにポインタ型のスライスが出力されます。
サーバ
Client-side streaming
// チャットルームへstreamを使いメッセージを送信する
func (s *server) SendMessage(stream pb.HelloGrpc_SendMessageServer) error{
// 無限ループ
for {
// クライアントからメッセージ受信
m, err := stream.Recv()
log.Printf("Receive message>> [%s] %s", m.Name, m.Content)
// EOF、エラーなら終了
if err == io.EOF {
// EOFなら接続終了処理
return stream.SendAndClose(&pb.SendResult{
Result: true,
})
}
if err != nil {
return err
}
// 終了コマンド
if m.Content == "/exit" {
return stream.SendAndClose(&pb.SendResult{
Result: true,
})
}
// チャットルームの探索
index, err := searchRooms(s.rooms, m.Id)
if err != nil {
return err
}
// メッセージの追加
s.rooms[index].contents = append(
s.rooms[index].contents,
message{
author: m.Name,
content: m.Content,
},
)
}
}
サーバ側はクライアントからのメッセージをループで受け続け(stream.Recv()
)、そのメッセージに対して処理を行います。メソッドの引数が少し特殊な型を取るので注意です。
接続終了時にはstream.SendAndClose()
を実行します。
Server-side streaming
// チャットルームの新着メッセージをstreamを使い配信する
func (s *server) GetMessages(p *pb.MessagesRequest, stream pb.HelloGrpc_GetMessagesServer) error{
// チャットルームの探索
index, err := searchRooms(s.rooms, p.Id)
if err != nil {
return err
}
// 対象チャットルーム
targetRoom := s.rooms[index]
// 差を使って新着メッセージを検知する
previousCount := len(targetRoom.contents)
currentCount := 0
// 無限ループ
for {
targetRoom = s.rooms[index]
currentCount = len(targetRoom.contents)
// 現在のmessageCountが前回より多ければ新着メッセージあり
if previousCount < currentCount {
msg, _ := latestMessage(targetRoom.contents)
// クライアントへメッセージ送信
if err := stream.Send(&pb.Message{Id: targetRoom.id, Name: msg.author, Content: msg.content}); err != nil {
return err
}
}
previousCount = currentCount
}
}
ここでは逆にforループで新着メッセージがあれば、stream.Send()
し続けるという実装です。
全体
package main
import (
"context"
"errors"
"fmt"
"io"
"net"
"google.golang.org/grpc"
"log"
pb "github.com/tokikokoko/hello_grpc/hellogrpc"
)
// gRPC struct
type server struct {
rooms []room
}
// チャットルーム
type room struct {
id string
contents []message
}
// チャットメッセージ
type message struct {
author string
content string
}
// Greet
func (s *server) GreetServer(ctx context.Context, p *pb.GreetRequest) (*pb.GreetMessage, error) {
log.Printf("Request from: %s", p.Name)
return &pb.GreetMessage{Msg: fmt.Sprintf("Hello, %s. ", p.Name)}, nil
}
// チャットルームの追加
func (s *server) AddRoom(ctx context.Context, p *pb.RoomRequest) (*pb.RoomInfo, error) {
log.Printf("Add Room Request")
// チャットルームをスライスに追加
s.rooms = append(s.rooms, room{id: p.Id, contents: []message{}})
// チャットルームの探索
index, err := searchRooms(s.rooms, p.Id)
if err != nil {
return nil, err
}
room := s.rooms[index]
return &pb.RoomInfo{Id: room.id, MessageCount: int32(len(room.contents))}, nil
}
// チャットルームの情報取得
func (s *server) GetRoomInfo(ctx context.Context, p *pb.RoomRequest) (*pb.RoomInfo, error) {
log.Printf("Get Room Request")
// チャットルームの探索
index, err := searchRooms(s.rooms, p.Id)
if err != nil {
return nil, err
}
room := s.rooms[index]
return &pb.RoomInfo{Id: room.id, MessageCount: int32(len(room.contents))}, nil
}
// チャットルームの一覧を取得
func (s *server) GetRooms(ctx context.Context, p *pb.Null) (*pb.RoomList, error) {
log.Printf("Get Rooms Request")
return &pb.RoomList{Rooms: buildRoomInfo(s.rooms)}, nil
}
// チャットルームへstreamを使いメッセージを送信する
func (s *server) SendMessage(stream pb.HelloGrpc_SendMessageServer) error{
// 無限ループ
for {
// クライアントからメッセージ受信
m, err := stream.Recv()
log.Printf("Receive message>> [%s] %s", m.Name, m.Content)
// EOF、エラーなら終了
if err == io.EOF {
// EOFなら接続終了処理
return stream.SendAndClose(&pb.SendResult{
Result: true,
})
}
if err != nil {
return err
}
// 終了コマンド
if m.Content == "/exit" {
return stream.SendAndClose(&pb.SendResult{
Result: true,
})
}
// チャットルームの探索
index, err := searchRooms(s.rooms, m.Id)
if err != nil {
return err
}
// メッセージの追加
s.rooms[index].contents = append(
s.rooms[index].contents,
message{
author: m.Name,
content: m.Content,
},
)
}
}
// チャットルームの新着メッセージをstreamを使い配信する
func (s *server) GetMessages(p *pb.MessagesRequest, stream pb.HelloGrpc_GetMessagesServer) error{
// チャットルームの探索
index, err := searchRooms(s.rooms, p.Id)
if err != nil {
return err
}
// 対象チャットルーム
targetRoom := s.rooms[index]
// 差を使って新着メッセージを検知する
previousCount := len(targetRoom.contents)
currentCount := 0
// 無限ループ
for {
targetRoom = s.rooms[index]
currentCount = len(targetRoom.contents)
// 現在のmessageCountが前回より多ければ新着メッセージあり
if previousCount < currentCount {
msg, _ := latestMessage(targetRoom.contents)
// クライアントへメッセージ送信
if err := stream.Send(&pb.Message{Id: targetRoom.id, Name: msg.author, Content: msg.content}); err != nil {
return err
}
}
previousCount = currentCount
}
}
func latestMessage(messages []message) (message, error) {
length := len(messages)
if length == 0 {
return message{}, errors.New("Not found")
}
return messages[length - 1], nil
}
func buildRoomInfo(rooms []room) ([]*pb.RoomInfo) {
r := make([]*pb.RoomInfo, 0)
for _, v := range(rooms) {
r = append(r, &pb.RoomInfo{Id: v.id, MessageCount: int32(len(v.contents))})
}
return r
}
func searchRooms(r []room, id string) (int, error) {
for i, v := range(r) {
if v.id == id {
return i, nil
}
}
return -1, errors.New("Not found")
}
func main() {
// gRPC
port := 10000
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
log.Fatalf("lfailed to listen: %v", err)
}
log.Printf("Run server port: %d", port)
grpcServer := grpc.NewServer()
pb.RegisterHelloGrpcServer(grpcServer, &server{rooms: []room{}})
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
クライアント
Client-side streaming
// streamを使いサーバへ連続してチャットメッセージを送信する
func sendMessage(c pb.HelloGrpcClient, id string, name string) error {
// 標準入力を使ってメッセージを入力
stdin := bufio.NewScanner(os.Stdin)
// サーバへstreamを渡す
stream, err := c.SendMessage(context.Background())
if err != nil {
return err
}
// 無限ループを使ってで連続してメッセージ送信
for {
// 入力待ち
stdin.Scan()
text := stdin.Text()
// サーバへSendRequest型のメッセージを送信
if err := stream.Send(&pb.SendRequest{Id: id, Name: name, Content: text}); err != nil {
log.Fatalf("Send failed: %v", err)
}
// /exitを入力すると終了
if text == "/exit" {
break
}
}
// 接続終了処理
_, err = stream.CloseAndRecv()
if err != nil {
return err
}
return nil
}
クライアント側はサーバサイドとは反転した(受信と送信が逆)実装です。ループを使って、標準入力をstream.Send()
します。
接続終了時にはstream.CloseAndRecv()
を忘れずに。
Server-side streaming
// streamを使いサーバから連続してメッセージを受信する
func getMessage(c pb.HelloGrpcClient, id string) error {
req := &pb.MessagesRequest{Id: id}
// サーバからstreamを受け取る
stream, err := c.GetMessages(context.Background(), req)
if err != nil {
return err
}
// 無限ループ
for {
// サーバからのメッセージを受信
msg, err := stream.Recv()
if err == io.EOF {
break
}
log.Printf("[%s] %s", msg.Name, msg.Content)
if err != nil {
return err
}
}
return nil
}
サーバからのメッセージをループでstream.Recv()
します。
全体
go:client/main.go
package main
import (
"bufio"
"context"
"io"
"log"
"os"
"time"
"google.golang.org/grpc"
pb "github.com/tokikokoko/hello_grpc/hellogrpc"
)
const (
address = "localhost:10000"
defaultName = "hoge"
)
func greetServer(ctx context.Context, c pb.HelloGrpcClient, name string) error {
r, err := c.GreetServer(ctx, &pb.GreetRequest{Name: name})
if err != nil {
return err
}
log.Printf("%s", r.Msg)
return nil
}
// チャットルームを作成するためのサーバのメソッドを呼び出す
func createRoom(ctx context.Context, c pb.HelloGrpcClient, id string) error {
r, err := c.AddRoom(ctx, &pb.RoomRequest{Id: id})
if err != nil {
return err
}
log.Printf("Created room. >> %s", r.Id)
return nil
}
// チャットルームの情報を取得するためのサーバのメソッドを呼び出す
func getRoom(ctx context.Context, c pb.HelloGrpcClient, id string) error {
r, err := c.GetRoomInfo(ctx, &pb.RoomRequest{Id: id})
if err != nil {
return err
}
log.Printf("Room information. >> id: %s, messageCount: %d", r.Id, r.MessageCount)
return nil
}
// チャットルームの一覧を取得するためのサーバのメソッドを呼び出す
func getRooms(ctx context.Context, c pb.HelloGrpcClient, name string) error {
r, err := c.GetRooms(ctx, &pb.Null{})
if err != nil {
return err
}
for _, v := range(r.Rooms) {
log.Printf("Name: %s, MessageCount: %d", v.Id, v.MessageCount)
}
return nil
}
// streamを使いサーバへ連続してチャットメッセージを送信する
func sendMessage(c pb.HelloGrpcClient, id string, name string) error {
// 標準入力を使ってメッセージを入力
stdin := bufio.NewScanner(os.Stdin)
// サーバへstreamを渡す
stream, err := c.SendMessage(context.Background())
if err != nil {
return err
}
// 無限ループを使ってで連続してメッセージ送信
for {
// 入力待ち
stdin.Scan()
text := stdin.Text()
// サーバへSendRequest型のメッセージを送信
if err := stream.Send(&pb.SendRequest{Id: id, Name: name, Content: text}); err != nil {
log.Fatalf("Send failed: %v", err)
}
// /exitを入力すると終了
if text == "/exit" {
break
}
}
// 接続終了処理
_, err = stream.CloseAndRecv()
if err != nil {
return err
}
return nil
}
// streamを使いサーバから連続してメッセージを受信する
func getMessage(c pb.HelloGrpcClient, id string) error {
req := &pb.MessagesRequest{Id: id}
// サーバからstreamを受け取る
stream, err := c.GetMessages(context.Background(), req)
if err != nil {
return err
}
// 無限ループ
for {
// サーバからのメッセージを受信
msg, err := stream.Recv()
if err == io.EOF {
break
}
log.Printf("[%s] %s", msg.Name, msg.Content)
if err != nil {
return err
}
}
return nil
}
func main() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewHelloGrpcClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// case文で分岐
if len(os.Args) > 2 {
switch os.Args[1] {
case "greet":
err := greetServer(ctx, c, os.Args[2])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
case "add":
err := createRoom(ctx, c, os.Args[2])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
case "get":
err := getRoom(ctx, c, os.Args[2])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
case "list":
err := getRooms(ctx, c, os.Args[2])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
case "send":
err := sendMessage(c, os.Args[2], os.Args[3])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
case "stream":
err := getMessage(c, os.Args[2])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
case "chat":
// goroutine(非同期処理)を使ってメッセージ受信・表示
go getMessage(c, os.Args[2])
// メッセージ送信はmainで実行
err := sendMessage(c, os.Args[2], os.Args[3])
if err != nil {
log.Fatalf("Couldn't execute: %v", err)
}
default:
log.Fatalf("Unknown command.")
}
} else {
log.Fatalf("Need arguments.")
}
}
## 動作確認
* サーバは`go run server/main.go`
* クライアント
* `go run client/main.go add {追加するチャットルーム名}`
* `go run client/main.go chat {接続するチャットルーム名} {表示名}`
![ezgif-1-489e8a177e3f.gif](https://qiita-image-store.s3.amazonaws.com/0/314367/d45e916d-994f-ab70-b0d2-b3668e3e20b8.gif)
## まとめ
streamを使って簡易なチャットのようなことができるツールを書いてみました。かなりざっくりとした実装で、
* ある程度メッセージ、若しくは接続が確立するとサーバが落ちる(おそらくデータの格納にスライスを利用しているため、スライスの容量を超過し、コピーが発生し、メモリアドレスを見失ってしまう)
* 本来RedisとかDBとかにデータを格納したほうが楽でしょうね...
* チャット入力プロンプトがしょっぱい
* server-side streaming使用時の正しい接続のとじ方がわからなかった
等様々な問題を抱えています。gRPCには`bidirectional streaming RPC`という双方向のstream通信方式もあり、こちらを使えばもっときれいな実装になると思います。
次回は@KazuyaTomitaさんです!