39
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ユニークビジョン株式会社Advent Calendar 2018

Day 12

gRPC & Goで超簡素なチャットツールを実装する

Last updated at Posted at 2018-12-12

はじめに

この記事はユニークビジョン株式会社 Advent Calendar 2018の12日目の記事です。

11日目から続けてgRPCで超簡素なチャットツールを実装してみました。

今回はgRPCのServer-side streaming RPCClient-side streaming RPCを活用してリアルタイムなテキストのやりとりを実装します。

streamとは

クライアントもしくはサーバが断続的なメッセージの末尾(EOF等)に到達するまで受信もしくは送信できる方式で、今回はリアルタイムなチャットメッセージの送受信に使用します。

準備

基本的に前回のプロジェクトを踏襲しています。

ディレクトリ構成

.
├── client
│   └── main.go
├── hellogrpc
│   └── hellogrpc.proto
└── server
    └── main.go

.protoの定義

hellogrpc/hellogrpc.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.tokikokoko.hellogrpc.server";
option java_outer_classname = "HelloGrpc";

package HelloGrpc;

// The greeting service definition.
service HelloGrpc {
	// Sends a greeting
	rpc GreetServer (GreetRequest) returns (GreetMessage) {}
	// チャットルーム追加
	rpc AddRoom (RoomRequest) returns (RoomInfo) {}
	// チャットルーム情報取得
	rpc GetRoomInfo (RoomRequest) returns (RoomInfo) {}
	// チャットルーム一覧取得
	rpc GetRooms (Null) returns (RoomList) {}
	// Client-side streamingを用いてメッセージを交換する
	rpc SendMessage (stream SendRequest) returns (SendResult) {}
	// Server-side streamingを用いてメッセージを交換する
	rpc GetMessages (MessagesRequest) returns (stream Message) {}
}

// 空のmessage
message Null {
}

message GreetRequest {
	string name = 1;
}

message GreetMessage {
	string msg = 1;
}

message RoomRequest {
	string id = 1;
}

message RoomInfo {
	string id = 1;
	int32 messageCount = 2;
}

message RoomList {
	repeated RoomInfo rooms = 1;
}

message SendRequest {
	string id = 1;
	string name = 2;
	string content = 3;
}

message SendResult {
	bool result = 1;
}

message MessagesRequest {
	string id = 1;
}

message Message {
	string id = 1;
	string name = 2;
	string content = 3;
}

今回新たに幾つかのメソッドを定義しましたが、クライアントのメッセージ送信のためにClient-side streaming(サーバ側がstreamを受信する)と、クライアントのメッセージ受信のためにServer-side streaming(クライアント側がstreamを受信する)を利用します。streamを使用するためには、

// Client-side streamingを用いてメッセージを交換する
rpc SendMessage (stream SendRequest) returns (SendResult) {}
// Server-side streamingを用いてメッセージを交換する
rpc GetMessages (MessagesRequest) returns (stream Message) {}

引数もしくは返り値のmessage型の前にstreamを付加するだけでstreamを使用できるようになります。

またmessageの定義にrepeatedという初出の定義がありますが、

message RoomList {
	repeated RoomInfo rooms = 1;
}

gRPCで配列やリストのように扱うことのできる型です。goでは、

type RoomList struct {
	Rooms                []*RoomInfo `protobuf:"bytes,1,rep,name=rooms,proto3" json:"rooms,omitempty"`
	XXX_NoUnkeyedLiteral struct{}    `json:"-"`
	XXX_unrecognized     []byte      `json:"-"`
	XXX_sizecache        int32       `json:"-"`
}

というようにポインタ型のスライスが出力されます。

サーバ

Client-side streaming

// チャットルームへstreamを使いメッセージを送信する
func (s *server) SendMessage(stream pb.HelloGrpc_SendMessageServer) error{
    // 無限ループ
    for {
        // クライアントからメッセージ受信
        m, err := stream.Recv()
        log.Printf("Receive message>> [%s] %s", m.Name, m.Content)
        // EOF、エラーなら終了
        if err == io.EOF {
            // EOFなら接続終了処理
			return stream.SendAndClose(&pb.SendResult{
                Result: true,
			})
		}
        if err != nil {
            return err
        }
        // 終了コマンド
        if m.Content == "/exit" {
			return stream.SendAndClose(&pb.SendResult{
                Result: true,
			})
        }
        // チャットルームの探索
        index, err := searchRooms(s.rooms, m.Id)
        if err != nil {
            return err
        }
        // メッセージの追加
        s.rooms[index].contents = append(
            s.rooms[index].contents,
            message{
                author: m.Name,
                content: m.Content,
            },
        )
    }
}

サーバ側はクライアントからのメッセージをループで受け続け(stream.Recv())、そのメッセージに対して処理を行います。メソッドの引数が少し特殊な型を取るので注意です。

接続終了時にはstream.SendAndClose()を実行します。

Server-side streaming

// チャットルームの新着メッセージをstreamを使い配信する
func (s *server) GetMessages(p *pb.MessagesRequest, stream pb.HelloGrpc_GetMessagesServer) error{
    // チャットルームの探索
    index, err := searchRooms(s.rooms, p.Id)
    if err != nil {
        return err
    }
    // 対象チャットルーム
    targetRoom := s.rooms[index]
    // 差を使って新着メッセージを検知する
    previousCount := len(targetRoom.contents)
    currentCount := 0
    // 無限ループ
    for {
        targetRoom = s.rooms[index]
        currentCount = len(targetRoom.contents)
        // 現在のmessageCountが前回より多ければ新着メッセージあり
        if previousCount < currentCount {
            msg, _ := latestMessage(targetRoom.contents)
            // クライアントへメッセージ送信
            if err := stream.Send(&pb.Message{Id: targetRoom.id, Name: msg.author, Content: msg.content}); err != nil {
				return err
			}
        }
        previousCount = currentCount
    }
}

ここでは逆にforループで新着メッセージがあれば、stream.Send()し続けるという実装です。

全体

server/main.go
package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "net"

	"google.golang.org/grpc"
    "log"

    pb "github.com/tokikokoko/hello_grpc/hellogrpc"
)

// gRPC struct
type server struct {
    rooms []room
}

// チャットルーム
type room struct {
    id string
    contents []message
}

// チャットメッセージ
type message struct {
    author string
    content string
}

// Greet
func (s *server) GreetServer(ctx context.Context, p *pb.GreetRequest) (*pb.GreetMessage, error) {
    log.Printf("Request from: %s", p.Name)
    return &pb.GreetMessage{Msg: fmt.Sprintf("Hello, %s. ", p.Name)}, nil
}

// チャットルームの追加
func (s *server) AddRoom(ctx context.Context, p *pb.RoomRequest) (*pb.RoomInfo, error) {
    log.Printf("Add Room Request")
    // チャットルームをスライスに追加
    s.rooms = append(s.rooms, room{id: p.Id, contents: []message{}})
    // チャットルームの探索
    index, err := searchRooms(s.rooms, p.Id)
    if err != nil {
        return nil, err
    }
    room := s.rooms[index]
    return &pb.RoomInfo{Id: room.id, MessageCount: int32(len(room.contents))}, nil
}

// チャットルームの情報取得
func (s *server) GetRoomInfo(ctx context.Context, p *pb.RoomRequest) (*pb.RoomInfo, error) {
    log.Printf("Get Room Request")
    // チャットルームの探索
    index, err := searchRooms(s.rooms, p.Id)
    if err != nil {
        return nil, err
    }
    room := s.rooms[index]
    return &pb.RoomInfo{Id: room.id, MessageCount: int32(len(room.contents))}, nil
}

// チャットルームの一覧を取得
func (s *server) GetRooms(ctx context.Context, p *pb.Null) (*pb.RoomList, error) {
    log.Printf("Get Rooms Request")
    return &pb.RoomList{Rooms: buildRoomInfo(s.rooms)}, nil
}

// チャットルームへstreamを使いメッセージを送信する
func (s *server) SendMessage(stream pb.HelloGrpc_SendMessageServer) error{
    // 無限ループ
    for {
        // クライアントからメッセージ受信
        m, err := stream.Recv()
        log.Printf("Receive message>> [%s] %s", m.Name, m.Content)
        // EOF、エラーなら終了
        if err == io.EOF {
            // EOFなら接続終了処理
			return stream.SendAndClose(&pb.SendResult{
                Result: true,
			})
		}
        if err != nil {
            return err
        }
        // 終了コマンド
        if m.Content == "/exit" {
			return stream.SendAndClose(&pb.SendResult{
                Result: true,
			})
        }
        // チャットルームの探索
        index, err := searchRooms(s.rooms, m.Id)
        if err != nil {
            return err
        }
        // メッセージの追加
        s.rooms[index].contents = append(
            s.rooms[index].contents,
            message{
                author: m.Name,
                content: m.Content,
            },
        )
    }
}

// チャットルームの新着メッセージをstreamを使い配信する
func (s *server) GetMessages(p *pb.MessagesRequest, stream pb.HelloGrpc_GetMessagesServer) error{
    // チャットルームの探索
    index, err := searchRooms(s.rooms, p.Id)
    if err != nil {
        return err
    }
    // 対象チャットルーム
    targetRoom := s.rooms[index]
    // 差を使って新着メッセージを検知する
    previousCount := len(targetRoom.contents)
    currentCount := 0
    // 無限ループ
    for {
        targetRoom = s.rooms[index]
        currentCount = len(targetRoom.contents)
        // 現在のmessageCountが前回より多ければ新着メッセージあり
        if previousCount < currentCount {
            msg, _ := latestMessage(targetRoom.contents)
            // クライアントへメッセージ送信
            if err := stream.Send(&pb.Message{Id: targetRoom.id, Name: msg.author, Content: msg.content}); err != nil {
				return err
			}
        }
        previousCount = currentCount
    }
}

func latestMessage(messages []message) (message, error) {
    length := len(messages)
    if length == 0 {
        return message{}, errors.New("Not found")
    }
    return messages[length - 1], nil
}

func buildRoomInfo(rooms []room) ([]*pb.RoomInfo) {
    r := make([]*pb.RoomInfo, 0)
    for _, v := range(rooms) {
        r = append(r, &pb.RoomInfo{Id: v.id, MessageCount: int32(len(v.contents))})
    }
    return r
}

func searchRooms(r []room, id string) (int, error) {
    for i, v := range(r) {
        if v.id == id {
            return i, nil
        }
    }
    return -1, errors.New("Not found")
}

func main() {
    // gRPC
    port := 10000
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
    if err != nil {
        log.Fatalf("lfailed to listen: %v", err)
    }
    log.Printf("Run server port: %d", port)
    grpcServer := grpc.NewServer()
    pb.RegisterHelloGrpcServer(grpcServer, &server{rooms: []room{}})
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

クライアント

Client-side streaming

// streamを使いサーバへ連続してチャットメッセージを送信する
func sendMessage(c pb.HelloGrpcClient, id string, name string) error {
    // 標準入力を使ってメッセージを入力
    stdin := bufio.NewScanner(os.Stdin)
    // サーバへstreamを渡す
    stream, err := c.SendMessage(context.Background())
    if err != nil {
        return err
    }
    // 無限ループを使ってで連続してメッセージ送信
    for {
        // 入力待ち
        stdin.Scan()
        text := stdin.Text()
        // サーバへSendRequest型のメッセージを送信
        if err := stream.Send(&pb.SendRequest{Id: id, Name: name, Content: text}); err != nil {
            log.Fatalf("Send failed: %v", err)
        }
        // /exitを入力すると終了
        if text == "/exit" {
            break
        }
    }
    // 接続終了処理
    _, err = stream.CloseAndRecv()
    if err != nil {
        return err
    }
    return nil
}

クライアント側はサーバサイドとは反転した(受信と送信が逆)実装です。ループを使って、標準入力をstream.Send()します。

接続終了時にはstream.CloseAndRecv()を忘れずに。

Server-side streaming

// streamを使いサーバから連続してメッセージを受信する
func getMessage(c pb.HelloGrpcClient, id string) error {
    req := &pb.MessagesRequest{Id: id}
    // サーバからstreamを受け取る
    stream, err := c.GetMessages(context.Background(), req)
    if err != nil {
        return err
    }
    // 無限ループ
    for {
        // サーバからのメッセージを受信
        msg, err := stream.Recv()
        if err == io.EOF {
            break
        }
        log.Printf("[%s] %s", msg.Name, msg.Content)
        if err != nil {
            return err
        }
    }
    return nil
}

サーバからのメッセージをループでstream.Recv()します。

全体

go:client/main.go
package main

import (
"bufio"
"context"
"io"
"log"
"os"
"time"

"google.golang.org/grpc"
pb "github.com/tokikokoko/hello_grpc/hellogrpc"

)

const (
address = "localhost:10000"
defaultName = "hoge"
)

func greetServer(ctx context.Context, c pb.HelloGrpcClient, name string) error {
r, err := c.GreetServer(ctx, &pb.GreetRequest{Name: name})
if err != nil {
return err
}
log.Printf("%s", r.Msg)
return nil
}

// チャットルームを作成するためのサーバのメソッドを呼び出す
func createRoom(ctx context.Context, c pb.HelloGrpcClient, id string) error {
r, err := c.AddRoom(ctx, &pb.RoomRequest{Id: id})
if err != nil {
return err
}
log.Printf("Created room. >> %s", r.Id)
return nil
}

// チャットルームの情報を取得するためのサーバのメソッドを呼び出す
func getRoom(ctx context.Context, c pb.HelloGrpcClient, id string) error {
r, err := c.GetRoomInfo(ctx, &pb.RoomRequest{Id: id})
if err != nil {
return err
}
log.Printf("Room information. >> id: %s, messageCount: %d", r.Id, r.MessageCount)
return nil
}

// チャットルームの一覧を取得するためのサーバのメソッドを呼び出す
func getRooms(ctx context.Context, c pb.HelloGrpcClient, name string) error {
r, err := c.GetRooms(ctx, &pb.Null{})
if err != nil {
return err
}
for _, v := range(r.Rooms) {
log.Printf("Name: %s, MessageCount: %d", v.Id, v.MessageCount)
}
return nil
}

// streamを使いサーバへ連続してチャットメッセージを送信する
func sendMessage(c pb.HelloGrpcClient, id string, name string) error {
// 標準入力を使ってメッセージを入力
stdin := bufio.NewScanner(os.Stdin)
// サーバへstreamを渡す
stream, err := c.SendMessage(context.Background())
if err != nil {
return err
}
// 無限ループを使ってで連続してメッセージ送信
for {
// 入力待ち
stdin.Scan()
text := stdin.Text()
// サーバへSendRequest型のメッセージを送信
if err := stream.Send(&pb.SendRequest{Id: id, Name: name, Content: text}); err != nil {
log.Fatalf("Send failed: %v", err)
}
// /exitを入力すると終了
if text == "/exit" {
break
}
}
// 接続終了処理
_, err = stream.CloseAndRecv()
if err != nil {
return err
}
return nil
}

// streamを使いサーバから連続してメッセージを受信する
func getMessage(c pb.HelloGrpcClient, id string) error {
req := &pb.MessagesRequest{Id: id}
// サーバからstreamを受け取る
stream, err := c.GetMessages(context.Background(), req)
if err != nil {
return err
}
// 無限ループ
for {
// サーバからのメッセージを受信
msg, err := stream.Recv()
if err == io.EOF {
break
}
log.Printf("[%s] %s", msg.Name, msg.Content)
if err != nil {
return err
}
}
return nil
}

func main() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewHelloGrpcClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// case文で分岐
if len(os.Args) > 2 {
    switch os.Args[1] {
    case "greet":
        err := greetServer(ctx, c, os.Args[2])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    case "add":
        err := createRoom(ctx, c, os.Args[2])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    case "get":
        err := getRoom(ctx, c, os.Args[2])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    case "list":
        err := getRooms(ctx, c, os.Args[2])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    case "send":
        err := sendMessage(c, os.Args[2], os.Args[3])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    case "stream":
        err := getMessage(c, os.Args[2])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    case "chat":
        // goroutine(非同期処理)を使ってメッセージ受信・表示
        go getMessage(c, os.Args[2])
        // メッセージ送信はmainで実行
        err := sendMessage(c, os.Args[2], os.Args[3])
        if err != nil {
            log.Fatalf("Couldn't execute: %v", err)
        }
    default:
        log.Fatalf("Unknown command.")
    }
} else {
    log.Fatalf("Need arguments.")
}

}


## 動作確認
* サーバは`go run server/main.go`
* クライアント
  * `go run client/main.go add {追加するチャットルーム名}`
  * `go run client/main.go chat {接続するチャットルーム名} {表示名}`

![ezgif-1-489e8a177e3f.gif](https://qiita-image-store.s3.amazonaws.com/0/314367/d45e916d-994f-ab70-b0d2-b3668e3e20b8.gif)

## まとめ
streamを使って簡易なチャットのようなことができるツールを書いてみました。かなりざっくりとした実装で、

* ある程度メッセージ、若しくは接続が確立するとサーバが落ちる(おそらくデータの格納にスライスを利用しているため、スライスの容量を超過し、コピーが発生し、メモリアドレスを見失ってしまう)
    * 本来RedisとかDBとかにデータを格納したほうが楽でしょうね...
* チャット入力プロンプトがしょっぱい
* server-side streaming使用時の正しい接続のとじ方がわからなかった

等様々な問題を抱えています。gRPCには`bidirectional streaming RPC`という双方向のstream通信方式もあり、こちらを使えばもっときれいな実装になると思います。

次回は@KazuyaTomitaさんです!
39
31
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
39
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?