はじめに
今まで触ってみたいと思っていたGraphQLとRedisを使って、リアルタイムチャットサーバーを作ってみました。
この記事では、主にGraphQLに重点を置いて実装を紹介していきます。
ソースコードはGitHubに上がっているので、そちらも合わせてご覧ください。
README.md
を見ればすぐにサーバーを建てることができるので、先に試してみるのも良いかもしれません。
アーキテクチャ
フロントエンドとサーバー間はGraphQLを用いた通信を行っています。
メッセージの送信やユーザーの作成は通常のGraphQLのMutationを、メッセージの受信は Websocket上で動作するGraphQL Subscriptionsを使用しています。
また、メッセージの配信にはRedisのPubSub機能を使っています。
これは、GraphQLサーバーがスケールアウトしたときに、チャットルームが別れないようにする対策です。
GraphQL Subscriptionsとは
GraphQLは聞いたことがある人が多いと思います。
GraphQL SubscriptionsはPubSub通信の一つで、Websocket上でGraphQLのスキーマに従ったデータのやり取りを行う仕組みです。
素のWebsocketを使った通信に比べ、型安全な通信ができる他、gRPC等の他のプロトコルよりもブラウザで扱いやすいのが特徴です。
Redis PubSubとは
RedisはオンメモリのKVSとして有名ですが、PubSub機能を備えています。
今回はGraphQLサーバー間の同期ができるようにPubsub機能を利用しました。
私はRedisのPubSubを最近まで知らなかったのですが、大きいところだとLINE LIVEのチャットサーバーでもRedisのPubSub機能が使われているようです。
LINE LIVE のチャットが
30,000+/min のコメント投稿を捌くようになるまで
実装
今回GraphQLサーバーはGolangを使って実装します。
GraphQLのスキーマを定義する
まずは、GraphQLのスキーマを定義していきます。
type Message {
user: String!
message: String!
}
type Mutation {
#チャットを投稿する
postMessage(user: String!, message: String!): Message
#ユーザーを作成する(作成されていないユーザーではメッセージの受信や投稿が出来ない)
createUser(user: String!): String!
}
type Query {
users: [String!]!
}
# この部分でSubscriptionを定義しています。
# 引数を指定してSubscribeをすると、サーバーからPublishされるデータを受け取れます。
type Subscription {
messagePosted(user: String!): Message!
userJoined(user: String!): String!
}
スキーマからinterfaceを作成する
GraphQLのResolver(リクエストを処理する部分)の実装を一から行うのはしんどいです。
そこで、GraphQLのスキーマからGolangのinterfaceを出力してくれるツールを使用しましょう。
今回は、GraphQL Subscriptionsにも対応しているgqlgenを使用します。
go get -u github.com/99designs/gqlgen/cmd
まずは、gqlgen
を実行するためのスクリプトを作成します。
package main
import "github.com/99designs/gqlgen/cmd"
func main() {
cmd.Execute()
}
次に、そのスクリプトを実行します。
cd grapqh
go run scripts/gqlgen.go init
するといくつかのファイルが生成されます。一番重要なのは、**resolver.go
**です。
生成直後はpanic()
を起こすだけの実装がなされているので、これをinterfaceに合うように実装していきます。
Resolverを実装する
ここがサーバーの本質部分ですが、全文載せるとコード量が少し多いので抜粋していきます。
先にGitHubから全文を眺めた方が理解しやすいかもしれません。
また、実際のコードには書かれているエラー処理を行数削減のため省略しています。ご注意ください。
実体の生成
type Resolver struct {
redisClient *redis.Client //redisへアクセスするのに使用
messageChannels map[string]chan Message
userChannels map[string]chan string
mutex sync.Mutex
}
func newResolver(redisClient *redis.Client) *Resolver {
return &Resolver{
redisClient: redisClient,
messageChannels: map[string]chan Message{},
userChannels: map[string]chan string{},
mutex: sync.Mutex{},
}
}
各ユーザーの接続はchannelを使って管理するので、構造体にchannelのmapを持たせます。
メッセージの投稿を実装
type mutationResolver struct{ *Resolver }
func (r *mutationResolver) PostMessage(ctx context.Context, user string, message string) (*Message, error) {
isLogined, err := r.checkLogin(user)
if !isLogined {
return nil, errors.New("This user does not exists")
}
// ユーザー情報はAFK(Away From Keyboard)対策で60minで削除されるようにしている。
// メッセージの投稿を行った場合はExpireまでの時間をリセットする。
val, err := r.redisClient.SetXX(user, user, 60*time.Minute).Result()
if val == false {
return nil, errors.New("This user does not exists")
}
// 以下の部分で、[]byteに変換したMessageをredisのPubSubで配信しています。
m := Message{
User: user,
Message: message,
}
mb, err := json.Marshal(m)
r.redisClient.Publish("room", mb)
return &m, nil
}
Subscriptionを実装
type subscriptionResolver struct{ *Resolver }
func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan Message, error) {
isLogined, err := r.checkLogin(user)
if !isLogined {
return nil, errors.New("This user has not been created")
}
messageChan := make(chan Message, 1)
r.mutex.Lock()
r.messageChannels[user] = messageChan
r.mutex.Unlock()
go func() {
<-ctx.Done()
r.mutex.Lock()
delete(r.messageChannels, user)
r.mutex.Unlock()
r.redisClient.Del(user)
}()
return messageChan, nil
}
MessagePosted
は各ユーザーごとのchannelを生成しreturnする必要があります。
このchannelにデータを流し込むと、各ユーザーにデータが配信されるので、後で使えるようにchannelをResolver
で保持させます。
また、goroutineでは接続が切れたときの処理を記述しています。
不要になったchannelやredisのユーザーデータを削除しています。
Redisから配信されるメッセージを受け取る
func (r *Resolver) startSubscribingRedis() {
go func() {
pubsub := r.redisClient.Subscribe("room")
defer pubsub.Close()
for {
msgi, err := pubsub.Receive()
switch msg := msgi.(type) {
case *redis.Message:
// Convert recieved string to Message.
m := Message{}
if err := json.Unmarshal([]byte(msg.Payload), &m); err != nil {
log.Println(err)
continue
}
// Notify new message.
r.mutex.Lock()
for _, ch := range r.messageChannels {
ch <- m
}
r.mutex.Unlock()
default:
}
}
}()
}
redisから受け取ったメッセージがMessage
型だった場合、構造体に変換してchannelに流し込みます。
channelに流し込まれた後は、ライブラリ側がよしなにクライアントにデータを送ってくれます(楽チン😊)
サーバーの作成
go run scripts/gqlgen.go init
をした際に、graphql/server/server.go
が自動的に生成されますが、今回はそれを削除し別のところに書き直しました。
理由としては、package main
をプロジェクトルートに持っていきたかったからです。
type GraphQLServer struct {
redisClient *redis.Client
}
// NewGraphQLServer returns GraphQL server.
func NewGraphQLServer(redisClient *redis.Client) *GraphQLServer {
return &GraphQLServer{
redisClient: redisClient,
}
}
// Serve starts GraphQL server.
func (s *GraphQLServer) Serve(route string, port int) error {
mux := http.NewServeMux()
mux.Handle(
route,
handler.GraphQL(graphql.NewExecutableSchema(graphql.NewGraphQLConfig(s.redisClient)),
handler.WebsocketUpgrader(websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}),
),
)
mux.Handle("/", handler.Playground("GraphQL playground", route))
handler := cors.AllowAll().Handler(mux)
return http.ListenAndServe(fmt.Sprintf(":%d", port), handler)
}
type config struct {
RedisURL string `envconfig:"REDIS_URL"`
Port int `envconfig:"PORT"`
}
func main() {
var config config
err := envconfig.Process("", &config)
client, err := infrastructure.NewRedisClient(config.RedisURL)
defer client.Close()
s := infrastructure.NewGraphQLServer(client)
log.Fatal(s.Serve("/query", config.Port))
}
それぞれの構造体を生成してサーバーを起動します。
GraphQLサーバーとRedisサーバーをdocker-composeで起動する
最後にDockerを使って、サーバーを起動します。
FROM golang:1.11.4
WORKDIR /go/src/github.com/p1ass/graphql-redis-realtime-chat
COPY . .
ENV GO111MODULE=on
RUN go get github.com/pilu/fresh
version: '3'
volumes:
unsync:
services:
api:
build: .
volumes:
- ./:/go/src/github.com/p1ass/graphql-redis-realtime-chat
- unsync:/go/src/github.com/p1ass/graphql-redis-realtime-chat/frontend
ports:
- '8080:8080'
depends_on:
- redis
command: fresh
environment:
REDIS_URL: 'redis:6379'
PORT: '8080'
redis:
image: redis:latest
ports:
- '6379:6379'
fresh
を使ってホットリロード環境で開発ができるようにしています。
また、不要なディレクトリがマウントされるのを防ぐためにunsync
というダミーのvolumeを作成しています。
後は立ち上げたら完了です。
docker-compose up
ブラウザで、http://localhost:【PORT】
を開くと、GraphQL Playgroundが開き、QueryやMutation、Subscriptionが試せます。
おまけ : Nuxt.jsのクライアントを作る
GraphQL PlaygroundでもSubscriptionを含めすべて試すことが出来ますが、どうせならWebアプリっぽくしたいということでNuxt.jsでクライアントを作りました。
実装は以下の記事を参考にさせていただきました。
GraphQL と Nuxt.js でチャットを作る
ほとんど実装は同じなのですが、一つだけ違う点があるので紹介しておきます。
Simple subscription
Simple subscription | Vue Apollo
GraphQLのSubscriptionは別のQueryと同じ型のデータをやり取りすることが多いです。
しかし、今回のpostMessage
はどのQueryとも紐付いていません。
その場合、通常のsubscribeToMore
を用いた実装ではなく、$subscribe
を使った実装を行います。
import SMessagePosted from '@/apollo/subscriptions/messagePosted.gql'
apollo: {
// Queryに紐付いているSubscription
users: {
query: QUsers,
subscribeToMore: {
document: SUserJoined,
variables() {
return {
user: this.user
}
},
updateQuery: (prev, { subscriptionData }) => {
// do something
}
}
},
// Queryに紐付いていないSubscription
$subscribe: {
messagePosted: {
query: SMessagePosted,
variables() {
return {
user: this.user
}
},
result(res) {
this.messages.unshift(res.data.messagePosted)
}
}
}
}
まとめ
- GraphQL Subscriptionsを使うと、型に沿ったリアルタイム通信ができる
- Redis PubSubを使うと複数台サーバーの時にデータの同期ができる
- Queryと紐付かないSubscriptionを作成するときはクライアントの実装に注意しよう(vue-apolloの場合)