Help us understand the problem. What is going on with this article?

GraphQL SubscriptionsとRedis PubSubを使ってリアルタイムチャットサーバーを作る

More than 1 year has passed since last update.

はじめに

今まで触ってみたいと思っていたGraphQLとRedisを使って、リアルタイムチャットサーバーを作ってみました。
wt4f2-5jcn0.gif

この記事では、主にGraphQLに重点を置いて実装を紹介していきます。
ソースコードはGitHubに上がっているので、そちらも合わせてご覧ください。
README.mdを見ればすぐにサーバーを建てることができるので、先に試してみるのも良いかもしれません。

アーキテクチャ

今回は以下のような構成になっています。
architecture_image.png

フロントエンドとサーバー間はGraphQLを用いた通信を行っています。
メッセージの送信やユーザーの作成は通常のGraphQLのMutationを、メッセージの受信は Websocket上で動作するGraphQL Subscriptionsを使用しています。

また、メッセージの配信にはRedisのPubSub機能を使っています。
これは、GraphQLサーバーがスケールアウトしたときに、チャットルームが別れないようにする対策です。 

GraphQL Subscriptionsとは

GraphQLは聞いたことがある人が多いと思います。
GraphQL SubscriptionsはPubSub通信の一つで、Websocket上でGraphQLのスキーマに従ったデータのやり取りを行う仕組みです。

素のWebsocketを使った通信に比べ、型安全な通信ができる他、gRPC等の他のプロトコルよりもブラウザで扱いやすいのが特徴です。

Subscriptions | Apollo Client

Redis PubSubとは

RedisはオンメモリのKVSとして有名ですが、PubSub機能を備えています。
今回はGraphQLサーバー間の同期ができるようにPubsub機能を利用しました。

私はRedisのPubSubを最近まで知らなかったのですが、大きいところだとLINE LIVEのチャットサーバーでもRedisのPubSub機能が使われているようです。
LINE LIVE のチャットが
30,000+/min のコメント投稿を捌くようになるまで

実装

今回GraphQLサーバーはGolangを使って実装します。

GraphQLのスキーマを定義する

まずは、GraphQLのスキーマを定義していきます。

graphql/schema.graphql
type Message {
    user: String!
    message: String!
}

type Mutation {
    #チャットを投稿する
    postMessage(user: String!, message: String!): Message 
    #ユーザーを作成する(作成されていないユーザーではメッセージの受信や投稿が出来ない)
    createUser(user: String!): String! 
}

type Query {
    users: [String!]!
}

# この部分でSubscriptionを定義しています。
# 引数を指定してSubscribeをすると、サーバーからPublishされるデータを受け取れます。
type Subscription {
    messagePosted(user: String!): Message!
    userJoined(user: String!): String!
}

スキーマからinterfaceを作成する

GraphQLのResolver(リクエストを処理する部分)の実装を一から行うのはしんどいです。
そこで、GraphQLのスキーマからGolangのinterfaceを出力してくれるツールを使用しましょう。
今回は、GraphQL Subscriptionsにも対応しているgqlgenを使用します。

go get -u github.com/99designs/gqlgen/cmd

まずは、gqlgenを実行するためのスクリプトを作成します。

graphql/scripts/gqlgen.go
package main

import "github.com/99designs/gqlgen/cmd"

func main() {
    cmd.Execute()
}

次に、そのスクリプトを実行します。

cd grapqh
go run scripts/gqlgen.go init

するといくつかのファイルが生成されます。一番重要なのは、resolver.goです。
生成直後はpanic()を起こすだけの実装がなされているので、これをinterfaceに合うように実装していきます。

Resolverを実装する

ここがサーバーの本質部分ですが、全文載せるとコード量が少し多いので抜粋していきます。
先にGitHubから全文を眺めた方が理解しやすいかもしれません。

また、実際のコードには書かれているエラー処理を行数削減のため省略しています。ご注意ください。

実体の生成

graphql/resolver.go
type Resolver struct {
    redisClient     *redis.Client //redisへアクセスするのに使用
    messageChannels map[string]chan Message
    userChannels    map[string]chan string
    mutex           sync.Mutex
}

func newResolver(redisClient *redis.Client) *Resolver {
    return &Resolver{
        redisClient:     redisClient,
        messageChannels: map[string]chan Message{},
        userChannels:    map[string]chan string{},
        mutex:           sync.Mutex{},
    }
}

各ユーザーの接続はchannelを使って管理するので、構造体にchannelのmapを持たせます。

メッセージの投稿を実装

graphql/resolver.go
type mutationResolver struct{ *Resolver }

func (r *mutationResolver) PostMessage(ctx context.Context, user string, message string) (*Message, error) {
    isLogined, err := r.checkLogin(user)
    if !isLogined {
        return nil, errors.New("This user does not exists")
    }
    // ユーザー情報はAFK(Away From Keyboard)対策で60minで削除されるようにしている。
    // メッセージの投稿を行った場合はExpireまでの時間をリセットする。
    val, err := r.redisClient.SetXX(user, user, 60*time.Minute).Result()
    if val == false {
        return nil, errors.New("This user does not exists")
    }

    // 以下の部分で、[]byteに変換したMessageをredisのPubSubで配信しています。
    m := Message{
        User:    user,
        Message: message,
    }
    mb, err := json.Marshal(m)
    r.redisClient.Publish("room", mb)
    return &m, nil
}

Subscriptionを実装

graphql/resolver.go
type subscriptionResolver struct{ *Resolver }

func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan Message, error) {
    isLogined, err := r.checkLogin(user)
    if !isLogined {
        return nil, errors.New("This user has not been created")
    }

    messageChan := make(chan Message, 1)
    r.mutex.Lock()
    r.messageChannels[user] = messageChan
    r.mutex.Unlock()

    go func() {
        <-ctx.Done()
        r.mutex.Lock()
        delete(r.messageChannels, user)
        r.mutex.Unlock()
        r.redisClient.Del(user)
    }()
    return messageChan, nil
}

MessagePostedは各ユーザーごとのchannelを生成しreturnする必要があります。
このchannelにデータを流し込むと、各ユーザーにデータが配信されるので、後で使えるようにchannelをResolverで保持させます。

また、goroutineでは接続が切れたときの処理を記述しています。
不要になったchannelやredisのユーザーデータを削除しています。

Redisから配信されるメッセージを受け取る

graphql/resolver.go
func (r *Resolver) startSubscribingRedis() {
    go func() {
        pubsub := r.redisClient.Subscribe("room")
        defer pubsub.Close()

        for {
            msgi, err := pubsub.Receive()
            switch msg := msgi.(type) {
            case *redis.Message:
                // Convert recieved string to Message.
                m := Message{}
                if err := json.Unmarshal([]byte(msg.Payload), &m); err != nil {
                    log.Println(err)
                    continue
                }
                // Notify new message.
                r.mutex.Lock()
                for _, ch := range r.messageChannels {
                    ch <- m
                }
                r.mutex.Unlock()
            default:
            }
        }
    }()
}

redisから受け取ったメッセージがMessage型だった場合、構造体に変換してchannelに流し込みます。
channelに流し込まれた後は、ライブラリ側がよしなにクライアントにデータを送ってくれます(楽チン😊)

サーバーの作成

go run scripts/gqlgen.go initをした際に、graphql/server/server.goが自動的に生成されますが、今回はそれを削除し別のところに書き直しました。

理由としては、package mainをプロジェクトルートに持っていきたかったからです。

infrastructure/server.go
type GraphQLServer struct {
    redisClient *redis.Client
}

// NewGraphQLServer returns GraphQL server.
func NewGraphQLServer(redisClient *redis.Client) *GraphQLServer {
    return &GraphQLServer{
        redisClient: redisClient,
    }
}

// Serve starts GraphQL server.
func (s *GraphQLServer) Serve(route string, port int) error {
    mux := http.NewServeMux()
    mux.Handle(
        route,
        handler.GraphQL(graphql.NewExecutableSchema(graphql.NewGraphQLConfig(s.redisClient)),
            handler.WebsocketUpgrader(websocket.Upgrader{
                CheckOrigin: func(r *http.Request) bool {
                    return true
                },
            }),
        ),
    )
    mux.Handle("/", handler.Playground("GraphQL playground", route))
    handler := cors.AllowAll().Handler(mux)
    return http.ListenAndServe(fmt.Sprintf(":%d", port), handler)
}

main.go
type config struct {
    RedisURL string `envconfig:"REDIS_URL"`
    Port     int    `envconfig:"PORT"`
}

func main() {
    var config config
    err := envconfig.Process("", &config)

    client, err := infrastructure.NewRedisClient(config.RedisURL)
    defer client.Close()

    s := infrastructure.NewGraphQLServer(client)
    log.Fatal(s.Serve("/query", config.Port))
}

それぞれの構造体を生成してサーバーを起動します。

GraphQLサーバーとRedisサーバーをdocker-composeで起動する

最後にDockerを使って、サーバーを起動します。

FROM golang:1.11.4

WORKDIR /go/src/github.com/p1ass/graphql-redis-realtime-chat
COPY . .
ENV GO111MODULE=on

RUN go get github.com/pilu/fresh
docker-compose.yml
version: '3'
volumes:
    unsync:
services:
    api:
        build: .
        volumes:
            - ./:/go/src/github.com/p1ass/graphql-redis-realtime-chat
            - unsync:/go/src/github.com/p1ass/graphql-redis-realtime-chat/frontend
        ports:
            - '8080:8080'
        depends_on:
            - redis

        command: fresh

        environment:
            REDIS_URL: 'redis:6379'
            PORT: '8080'
    redis:
        image: redis:latest
        ports:
            - '6379:6379'

freshを使ってホットリロード環境で開発ができるようにしています。
また、不要なディレクトリがマウントされるのを防ぐためにunsyncというダミーのvolumeを作成しています。
後は立ち上げたら完了です。

docker-compose up

ブラウザで、http://localhost:【PORT】を開くと、GraphQL Playgroundが開き、QueryやMutation、Subscriptionが試せます。

おまけ : Nuxt.jsのクライアントを作る

GraphQL PlaygroundでもSubscriptionを含めすべて試すことが出来ますが、どうせならWebアプリっぽくしたいということでNuxt.jsでクライアントを作りました。

実装は以下の記事を参考にさせていただきました。
GraphQL と Nuxt.js でチャットを作る

ほとんど実装は同じなのですが、一つだけ違う点があるので紹介しておきます。

Simple subscription

Simple subscription | Vue Apollo

GraphQLのSubscriptionは別のQueryと同じ型のデータをやり取りすることが多いです。
しかし、今回のpostMessageはどのQueryとも紐付いていません。
その場合、通常のsubscribeToMoreを用いた実装ではなく、$subscribeを使った実装を行います。

import SMessagePosted from '@/apollo/subscriptions/messagePosted.gql'

apollo: {
    // Queryに紐付いているSubscription
    users: {
        query: QUsers,
        subscribeToMore: {
            document: SUserJoined,
            variables() {
                return {
                    user: this.user
                }
            },
            updateQuery: (prev, { subscriptionData }) => {
                // do something
            }
        }
    },

    // Queryに紐付いていないSubscription
    $subscribe: {
        messagePosted: {
            query: SMessagePosted,
            variables() {
                return {
                    user: this.user
                }
            },
            result(res) {
                this.messages.unshift(res.data.messagePosted)
            }
        }
    }
}

まとめ

  • GraphQL Subscriptionsを使うと、型に沿ったリアルタイム通信ができる
  • Redis PubSubを使うと複数台サーバーの時にデータの同期ができる
  • Queryと紐付かないSubscriptionを作成するときはクライアントの実装に注意しよう(vue-apolloの場合)

参考

p1ass
サーバサイドエンジニア。最近はGoをよく書いています。 ブログ : https://blog.p1ass.com
https://p1ass.com
dena_coltd
    Delight and Impact the World
https://dena.com/jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away