Go
Redis
Docker
GraphQL
nuxt.js

GraphQL SubscriptionsとRedis PubSubを使ってリアルタイムチャットサーバーを作る


はじめに

今まで触ってみたいと思っていたGraphQLとRedisを使って、リアルタイムチャットサーバーを作ってみました。

wt4f2-5jcn0.gif

この記事では、主にGraphQLに重点を置いて実装を紹介していきます。

ソースコードはGitHubに上がっているので、そちらも合わせてご覧ください。

README.mdを見ればすぐにサーバーを建てることができるので、先に試してみるのも良いかもしれません。


アーキテクチャ

今回は以下のような構成になっています。

architecture_image.png

フロントエンドとサーバー間はGraphQLを用いた通信を行っています。

メッセージの送信やユーザーの作成は通常のGraphQLのMutationを、メッセージの受信は Websocket上で動作するGraphQL Subscriptionsを使用しています。

また、メッセージの配信にはRedisのPubSub機能を使っています。

これは、GraphQLサーバーがスケールアウトしたときに、チャットルームが別れないようにする対策です。 


GraphQL Subscriptionsとは

GraphQLは聞いたことがある人が多いと思います。

GraphQL SubscriptionsはPubSub通信の一つで、Websocket上でGraphQLのスキーマに従ったデータのやり取りを行う仕組みです。

素のWebsocketを使った通信に比べ、型安全な通信ができる他、gRPC等の他のプロトコルよりもブラウザで扱いやすいのが特徴です。

Subscriptions | Apollo Client


Redis PubSubとは

RedisはオンメモリのKVSとして有名ですが、PubSub機能を備えています。

今回はGraphQLサーバー間の同期ができるようにPubsub機能を利用しました。

私はRedisのPubSubを最近まで知らなかったのですが、大きいところだとLINE LIVEのチャットサーバーでもRedisのPubSub機能が使われているようです。

LINE LIVE のチャットが
30,000+/min のコメント投稿を捌くようになるまで


実装

今回GraphQLサーバーはGolangを使って実装します。


GraphQLのスキーマを定義する

まずは、GraphQLのスキーマを定義していきます。


graphql/schema.graphql

type Message {

user: String!
message: String!
}

type Mutation {
#チャットを投稿する
postMessage(user: String!, message: String!): Message
#ユーザーを作成する(作成されていないユーザーではメッセージの受信や投稿が出来ない)
createUser(user: String!): String!
}

type Query {
users: [String!]!
}

# この部分でSubscriptionを定義しています。
# 引数を指定してSubscribeをすると、サーバーからPublishされるデータを受け取れます。
type Subscription {
messagePosted(user: String!): Message!
userJoined(user: String!): String!
}



スキーマからinterfaceを作成する

GraphQLのResolver(リクエストを処理する部分)の実装を一から行うのはしんどいです。

そこで、GraphQLのスキーマからGolangのinterfaceを出力してくれるツールを使用しましょう。

今回は、GraphQL Subscriptionsにも対応しているgqlgenを使用します。

go get -u github.com/99designs/gqlgen/cmd

まずは、gqlgenを実行するためのスクリプトを作成します。


graphql/scripts/gqlgen.go

package main

import "github.com/99designs/gqlgen/cmd"

func main() {
cmd.Execute()
}


次に、そのスクリプトを実行します。

cd grapqh

go run scripts/gqlgen.go init

するといくつかのファイルが生成されます。一番重要なのは、resolver.goです。

生成直後はpanic()を起こすだけの実装がなされているので、これをinterfaceに合うように実装していきます。


Resolverを実装する

ここがサーバーの本質部分ですが、全文載せるとコード量が少し多いので抜粋していきます。

先にGitHubから全文を眺めた方が理解しやすいかもしれません。



また、実際のコードには書かれているエラー処理を行数削減のため省略しています。ご注意ください。


実体の生成


graphql/resolver.go

type Resolver struct {

redisClient *redis.Client //redisへアクセスするのに使用
messageChannels map[string]chan Message
userChannels map[string]chan string
mutex sync.Mutex
}

func newResolver(redisClient *redis.Client) *Resolver {
return &Resolver{
redisClient: redisClient,
messageChannels: map[string]chan Message{},
userChannels: map[string]chan string{},
mutex: sync.Mutex{},
}
}


各ユーザーの接続はchannelを使って管理するので、構造体にchannelのmapを持たせます。


メッセージの投稿を実装


graphql/resolver.go

type mutationResolver struct{ *Resolver }

func (r *mutationResolver) PostMessage(ctx context.Context, user string, message string) (*Message, error) {
isLogined, err := r.checkLogin(user)
if !isLogined {
return nil, errors.New("This user does not exists")
}
// ユーザー情報はAFK(Away From Keyboard)対策で60minで削除されるようにしている。
// メッセージの投稿を行った場合はExpireまでの時間をリセットする。
val, err := r.redisClient.SetXX(user, user, 60*time.Minute).Result()
if val == false {
return nil, errors.New("This user does not exists")
}

// 以下の部分で、[]byteに変換したMessageをredisのPubSubで配信しています。
m := Message{
User: user,
Message: message,
}
mb, err := json.Marshal(m)
r.redisClient.Publish("room", mb)
return &m, nil
}



Subscriptionを実装


graphql/resolver.go

type subscriptionResolver struct{ *Resolver }

func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan Message, error) {
isLogined, err := r.checkLogin(user)
if !isLogined {
return nil, errors.New("This user has not been created")
}

messageChan := make(chan Message, 1)
r.mutex.Lock()
r.messageChannels[user] = messageChan
r.mutex.Unlock()

go func() {
<-ctx.Done()
r.mutex.Lock()
delete(r.messageChannels, user)
r.mutex.Unlock()
r.redisClient.Del(user)
}()
return messageChan, nil
}


MessagePostedは各ユーザーごとのchannelを生成しreturnする必要があります。

このchannelにデータを流し込むと、各ユーザーにデータが配信されるので、後で使えるようにchannelをResolverで保持させます。

また、goroutineでは接続が切れたときの処理を記述しています。

不要になったchannelやredisのユーザーデータを削除しています。


Redisから配信されるメッセージを受け取る


graphql/resolver.go

func (r *Resolver) startSubscribingRedis() {

go func() {
pubsub := r.redisClient.Subscribe("room")
defer pubsub.Close()

for {
msgi, err := pubsub.Receive()
switch msg := msgi.(type) {
case *redis.Message:
// Convert recieved string to Message.
m := Message{}
if err := json.Unmarshal([]byte(msg.Payload), &m); err != nil {
log.Println(err)
continue
}
// Notify new message.
r.mutex.Lock()
for _, ch := range r.messageChannels {
ch <- m
}
r.mutex.Unlock()
default:
}
}
}()
}


redisから受け取ったメッセージがMessage型だった場合、構造体に変換してchannelに流し込みます。

channelに流し込まれた後は、ライブラリ側がよしなにクライアントにデータを送ってくれます(楽チン😊)


サーバーの作成

go run scripts/gqlgen.go initをした際に、graphql/server/server.goが自動的に生成されますが、今回はそれを削除し別のところに書き直しました。

理由としては、package mainをプロジェクトルートに持っていきたかったからです。


infrastructure/server.go

type GraphQLServer struct {

redisClient *redis.Client
}

// NewGraphQLServer returns GraphQL server.
func NewGraphQLServer(redisClient *redis.Client) *GraphQLServer {
return &GraphQLServer{
redisClient: redisClient,
}
}

// Serve starts GraphQL server.
func (s *GraphQLServer) Serve(route string, port int) error {
mux := http.NewServeMux()
mux.Handle(
route,
handler.GraphQL(graphql.NewExecutableSchema(graphql.NewGraphQLConfig(s.redisClient)),
handler.WebsocketUpgrader(websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}),
),
)
mux.Handle("/", handler.Playground("GraphQL playground", route))
handler := cors.AllowAll().Handler(mux)
return http.ListenAndServe(fmt.Sprintf(":%d", port), handler)
}



main.go

type config struct {

RedisURL string `envconfig:"REDIS_URL"`
Port int `envconfig:"PORT"`
}

func main() {
var config config
err := envconfig.Process("", &config)

client, err := infrastructure.NewRedisClient(config.RedisURL)
defer client.Close()

s := infrastructure.NewGraphQLServer(client)
log.Fatal(s.Serve("/query", config.Port))
}


それぞれの構造体を生成してサーバーを起動します。


GraphQLサーバーとRedisサーバーをdocker-composeで起動する

最後にDockerを使って、サーバーを起動します。

FROM golang:1.11.4

WORKDIR /go/src/github.com/naoki-kishi/graphql-redis-realtime-chat
COPY . .
ENV GO111MODULE=on

RUN go get github.com/pilu/fresh


docker-compose.yml

version: '3'

volumes:
unsync:
services:
api:
build: .
volumes:
- ./:/go/src/github.com/naoki-kishi/graphql-redis-realtime-chat
- unsync:/go/src/github.com/naoki-kishi/graphql-redis-realtime-chat/frontend
ports:
- '8080:8080'
depends_on:
- redis

command: fresh

environment:
REDIS_URL: 'redis:6379'
PORT: '8080'
redis:
image: redis:latest
ports:
- '6379:6379'


freshを使ってホットリロード環境で開発ができるようにしています。

また、不要なディレクトリがマウントされるのを防ぐためにunsyncというダミーのvolumeを作成しています。

後は立ち上げたら完了です。

docker-compose up

ブラウザで、http://localhost:【PORT】を開くと、GraphQL Playgroundが開き、QueryやMutation、Subscriptionが試せます。


おまけ : Nuxt.jsのクライアントを作る

GraphQL PlaygroundでもSubscriptionを含めすべて試すことが出来ますが、どうせならWebアプリっぽくしたいということでNuxt.jsでクライアントを作りました。

実装は以下の記事を参考にさせていただきました。

GraphQL と Nuxt.js でチャットを作る

ほとんど実装は同じなのですが、一つだけ違う点があるので紹介しておきます。


Simple subscription

Simple subscription | Vue Apollo

GraphQLのSubscriptionは別のQueryと同じ型のデータをやり取りすることが多いです。

しかし、今回のpostMessageはどのQueryとも紐付いていません。

その場合、通常のsubscribeToMoreを用いた実装ではなく、$subscribeを使った実装を行います。

import SMessagePosted from '@/apollo/subscriptions/messagePosted.gql'

apollo: {
// Queryに紐付いているSubscription
users: {
query: QUsers,
subscribeToMore: {
document: SUserJoined,
variables() {
return {
user: this.user
}
},
updateQuery: (prev, { subscriptionData }) => {
// do something
}
}
},

// Queryに紐付いていないSubscription
$subscribe: {
messagePosted: {
query: SMessagePosted,
variables() {
return {
user: this.user
}
},
result(res) {
this.messages.unshift(res.data.messagePosted)
}
}
}
}


まとめ


  • GraphQL Subscriptionsを使うと、型に沿ったリアルタイム通信ができる

  • Redis PubSubを使うと複数台サーバーの時にデータの同期ができる

  • Queryと紐付かないSubscriptionを作成するときはクライアントの実装に注意しよう(vue-apolloの場合)


参考