こんにちは。
普段はネットワークの組込み系F/W開発をしています。個人的にWebSocketで高効率なソケットサーバ作れないかと思い、GorillaのWebSocketライブラリ内のサンプルコードから設計思想を拝借しました。
私が理解した内容を纏めてみました。Go言語勉強して約2日で知見の浅い記事になってると思います。
間違い等あればご指摘ください。
##趣旨
Go言語のWebSocketサーバの効率よい設計を理解する。
前提知識
-
Go言語
プログラミング言語。 -
Gorilla
WebSocketやHTTPなどのライブラリ。下記記事が分かりやすいです。
https://qiita.com/gold-kou/items/99507d33b8f8ddd96e3a
参考文献
100万回のWebSocket接続とGo
Go言語におけるWebSocketの設計記事。
ゴルーチンとチャネル
ゴルーチンとチャネルについて分かりやすく解説されている。
goroutineとチャネルの動きを図を使って理解する(和訳)
goroutineとチャネルのメカニズムについて詳細に言及されている。
低レベルアクセスへの入り口(2):io.Reader前編
内部のioのメカニズムを理解するために参照。
Linux Programming、epollの話
ePollについての説明。
GoのWebSocketサーバが高効率に設計できる理由
特に意識をしなくても、Non-Blockingで設計できることです。
例えば、各コネクションを各スレッドを設計する場合、各コネクションのスレッドの受信待ち処理を都度実行することになるため、非効率です。
Go言語で設計する場合、goroutineを使い、ソケット受信処理を各コネクションのgoroutineで行うと思います。
作り方は、Blocking的な書き方に見えますが、内部的にはNon-Blockingの動作として振舞ってくれます。
goroutineで受信待ちしている間は、当該goroutineの処理を行いません。100万回のWebSocket接続とGo内にも言及されているのですが、Read処理を行う際、内部で(Linux環境なら)ePollを呼び出し、ディスクリプタの準備が整うまで待ちます。
そのため、Non-Blockingの様に、受信されない間はgoroutineの処理を止め、他のgoroutineを実行するという動作になるので、効率的に動作します。
また、クロスプラットフォームに対応しているため、OSに依存しない部分もよいです。
サンプルコードを理解する
Webサイトからリポジトリ取得
Gorillaのリポジトリをダウンロードしましょう。
https://github.com/gorilla/websocket
examples/chatの中身確認。
ファイル構成は下記の通り
- home.html : クライアントのhtml
- client.go : クライアントのgoroutine, WebSocketのコネクションハンドラ
- hub.go : WebSocketのコネクション管理
- main.go : メイン関数
メカニズム説明(WebSocketの接続フロー)
ソースコードの説明に入る前に、下記図でWebSocketの接続フローを記述します。
WebSocketの接続フローは、
- クライアントからWebSocketのコネクションの接続要求
- serveWs()でコネクションを受け取り、コネクション情報をHubのregisterメッセージに送信
(Hubでコネクション情報をmapに登録する。) - クライアントのreadPump(), writePump()のgoroutineを起動
となります。
メカニズム説明(WebSocketの送信フロー)
ソースコードの説明に入る前に、下記図でWebSocketの送信フローを記述します。
WebSocketの送信フローは、
- クライアントが、メッセージを送信する。
(該当のクライアントのreadPump()のc.conn.ReadMessage()がメッセージを受信する。) - 受信したメッセージをHubのbroadcastメッセージに送信する。
- Hubは、受信したメッセージを各クライアントのコネクション情報のマップから各クライアントのsendに対して送信。
- クライアントのwritePump()でsendメッセージを受け取り、各クライアントへメッセージを送信する。
となります。
###ソースコード理解
上記のメカニズムを頭に入れた状態でソースコードを見ていきましょう。
まずは、main.goから。
main.goのmain関数では、大きく分けて2つの処理を行っています。
- hub.run()のgoroutineを起動。
- HTTP, WebSocketのハンドラを定義
func main() {
flag.Parse()
hub := newHub()
go hub.run()
http.HandleFunc("/", serveHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
err := http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
次にClientの中身を見てみましょう。
serveWsでは、WebSocketのコネクション接続確立時に、
readPump(), writePump()のgoroutineを起動しています。
関数の意味は、下記の理解で良いです。
- readPump()がクライアントからの受信処理
- writePump()がクライアントへの送信処理
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}
最後にHubの中身を見てみましょう。
main.goでは、hub.run()のgoroutineを起動していました。
この処理では、クライアントからメッセージを受け取り、メッセージに応じて処理を行います。
メッセージの種類は3種類
- register: クライアントから接続要求があった際に、WebSocketのコネクションをmapに登録します。
- unregister: クライアントからクローズ要求があった際に、WebSocketのコネクションをmapから除外します。
- broadcast: クライアントからメッセージを受信した際に、登録しているmapから各クライアントへメッセージを送信します。
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
終わりに
Go言語を勉強し始めて、2日程度と日が浅いですが、C++等と比べると大分、楽に設計でき、処理速度もある程度期待できるので、
今後も継続して勉強していきます。
特に、マルチスレッド感覚でNon-Blocking I/Oを実現してくれるのは、わかりやすいですね。
記載内容に間違い等あれば、ご指摘いただけると助かります。