1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

GoでTopicごとにPubsubするWebsocket

Posted at

はじめに

こんにちは
クェスタ株式会社エンジニアの下澤です。

毎秒定期的に送られてくるデータをリアルタイム表示する開発があり、AppSyncを検討していたのですが、コストがかかるため、Websocketで代用しました。

PubsubをGolangで実装してみたので、簡単にシェアします

環境

Golanggorilla/websocket, echoを使います。

go version go1.16.5 darwin/amd64
github.com/gorilla/websocket v1.5.0
github.com/labstack/echo

Github

環境構築編

適当にファイルを作成し、Go環境の準備

mkdir lil-pubsub && cd lil-pubsub
go mod init <module name なんでも良き> # e.g. go mod init github.com/lil-shimon/lil-pubsub

gorilla/websocketechoの導入

go get github.com/gorilla/websocket
go get github.com/labstack/echo

まずは簡単なhttpでアクセスできるかのテストのための最低限の記述

mkdir server && touch server/main.go

main.go

package main

import (
	"net/http"

	"github.com/labstack/echo"
)

func healthCheck(c echo.Context) error {
	return c.String(http.StatusOK, "OK")
}

func main() {
	e := echo.New()

	e.GET("/healthcheck", healthCheck)

	e.Logger.Fatal(e.Start(":1323"))
}

http://localhost:1323/healthcheck
にアクセスしてOKと表示されているかを確認します。

以上で環境構築は終了です。

Pubsub作成編

ここからがWebsocket本番です。
まずはTopicごとにではなく、全クライアントにメッセージをPublishするWebsocketを構築します

websocket/client.goを作成し、クライアント情報を保持するClientの構造体を定義します。

package websocket

import "github.com/gorilla/websocket"

type Client struct {
	Ws *websocket.Conn
}

次に全てのクライアント情報を格納するRoomの構造体を作ります。
websocket/room.go

package websocket

type Rooms struct {
	Clients []*Client
}

Roomに接続してきたクライアントを追加するメソッド・取得するメソッドを定義します。
websocket/room.go

func (rooms *Rooms) AddClient(client *Client) {
	rooms.Clients = append(rooms.Clients, client)
}

func (room *Room) GetClients() []Client {
	var cs []Client

	for _, client := range room.Clients {
		cs = append(cs, *client)
	}

	return cs
}

クライアントの追加と取得はできたので、Roomに格納されているClientに対してメッセージをPublishするメソッドを定義します。

func (room *Room) Publish(msg []byte) {
	for _, client := range room.Clients {
		client.Send(msg) // この後に作る
	}
}

websocket/client.goに一つのクライアントに対してメッセージを送信するメソッドを追加します。

func (client *Client) Send (msg []byte) error {
	return client.Ws.WriteMessage(websocket.TextMessage, msg)
}

最後にこれらのメソッドを使うため、Websocket接続からメッセージ送信までを実行するメソッドを作成します。

websocket/handler.go

package websocket

import (
	"github.com/gorilla/websocket"
	"github.com/labstack/echo"
)

var upgrader = websocket.Upgrader{}
var rooms = Room{}

func ServeWs(c echo.Context) error {
     upgrader.CheckOrigin = func(r *http.Request) bool { return true }
	ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)

	if err != nil {
		c.Logger().Error(err)
	}

	defer ws.Close()

	client := &Client{Ws: ws}

	rooms.AddClient(client)

	for {
		_, msg, err := ws.ReadMessage()

		if err != nil {
			c.Logger().Error(err)
			break
		}

		rooms.Publish(msg)
	}

	return nil
}

少し解説します。
まず、最初のここでwebsocketに接続します。
一行目は"message":"websocket: request origin not allowed by Upgrader.CheckOrigin"}のエラーを回避するために追加してます。

upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) // 接続

その後、接続してきたクライアントを宣言し、Roomにそのクライアントを追加します

client := &Client{Ws: ws}
rooms.AddClient(client)

その下はメッセージを受信した時の処理です。

	for {
		_, msg, err := ws.ReadMessage() // メッセージを受信 messageType, message, error

		if err != nil {
			c.Logger().Error(err)
			break
		}

		rooms.Publish(msg) // メッセージをクライアントへ送信
	}

main.goにwebsocket用のRouteを追加します。

import (
	"net/http"

	"github.com/labstack/echo"
	"github.com/lil-shimon/lil-pubsub.git/server/websocket" // 追加
)

func main() {
	e := echo.New()

	e.GET("/healthcheck", healthCheck)
	e.GET("/ws", websocket.ServeWs) // 追加

	e.Logger.Fatal(e.Start(":1323"))
}

ws://localhost:1323/wsをテストします。
私はいつもwebsocketのテストでGoogle拡張機能のWebsocket Client Testを使っているので、今回もそちらを使ってテストします。


適当にメッセージを送信したら送信、受信できていることを確認できますね!

全クライアントでのPubはできました

TopicごとのPubsub編

Topicごとにメッセージを受け取れるように改善していきます
この様な形でパスパラメータにTopicを入れ、接続するWebsocketを想定しています

ws/{topic}

まず、main.goのrouteを変更

e.GET("/ws/:topic", websocket.ServeWs)

Topicを追加するため、クライアントとTopicを管理する構造体を新しく定義します。

type Subscription struct {
	Topic string
	Client *Client
}

Roomにそのサブスクを格納したいので、クライアントではなく、サブスクに修正

type Room struct {
	Subscription []*Subscription
}

クライアントの追加や取得をしていたメソッドをサブスクへ変更

func (room *Room) AddSubscription(subscription *Subscription) {
	room.Subscriptions = append(room.Subscriptions, subscription)
}

取得時はTopicを受け取り、一致するTopicを持っているサブスクを返す様にしています

func (room *Room) GetSubscription(topic string) []Subscription {
	var subs []Subscription

	for _, sub := range room.Subscriptions {
		if sub.Topic == topic {
			subs = append(subs, *sub)
		}
	}

	return subs
}

また、Publishメソッドも改良

func (room *Room) Publish(msg []byte, topic string) {
	subs := room.GetSubscription(topic)
	for _, sub := range subs {
		err := sub.Client.Send(msg)
		if err != nil {
			return
		}
	}
}

最後にWebsocket Handlerを修正します。

func ServeWs(c echo.Context) error {

	topic := c.Param("topic") // parameterのtopicを取得

	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
	ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)

	if err != nil {
		c.Logger().Error(err)
	}

	defer ws.Close()

	client := &Client{Ws: ws}

	rooms.AddSubscription(&Subscription{Topic: topic, Client: client}) // サブスクを登録する様に修正

	for {
		_, msg, err := ws.ReadMessage()

		if err != nil {
			c.Logger().Error(err)
			break
		}

		rooms.Publish(msg, topic) // PublishにTopicを渡す
	}

今回はトピックごとにメッセージ配信が切り分けられているかをテストしたいため、クライアントテストを追加

mkdir public && touch public/index.html

public/index.html

<!doctype html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>WebSocket</title>
</head>

<body>
<p>トピックを入力</p>
<label for="topic"></label><input type="text" id="topic"/>
<button type="button" id="button" onclick="sendMsg()">テスト</button>
<p id="output"></p>

<script>
    function sendMsg() {
        const topic = document.getElementById('topic').value;
        const loc = window.location;
        let uri = 'ws:';

        if (loc.protocol === 'https:') {
            uri = 'wss:';
        }
        uri += '//' + loc.host;
        uri += `/ws/${topic}`;

        console.log("uri: " + uri);
        const ws = new WebSocket(uri);

        ws.onopen = function () {
            console.log('Connected')
        }

        ws.onmessage = function (evt) {
            const out = document.getElementById('output');
            out.innerHTML += evt.data + '<br>';
        }

        const msg = {
          topic: topic,
          message: "topic is test"
        }

        setInterval(function () {
            ws.send(JSON.stringify(nvData));
        }, 1000);
    }
</script>
</body>

</html>

clientテストのindex.html用のRouteを追加

func main() {
	e := echo.New()

	e.GET("/healthcheck", healthCheck)
	e.GET("/ws/:topic", websocket.ServeWs)
	e.Static("/ws-client", "public") // 追加

	e.Logger.Fatal(e.Start(":1323"))
}

go run server/main.goでサーバーを立ち上げ、
http://localhost:1323/ws-client
にアクセス
Topicを入力し、送信

同じトピックでサブスクしているので、こちらでも受け取れています。

topicを変えると

こっちは違うトピックなので受け取りません

最後に

簡単にですが、GolangでPubsubを実装してみました。本来ならmessageTypeによってPublishやUnsubscriptionなどの他処理も入っていますが、今回は省略させていただきます。

GolangでWebsocketを開発するイメージでも掴めていただけたら幸いです!

参考

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?