こんにちは!フリーランスエンジニアのこたろうです。
今回は、sync.Mutex
を使ってWebSocketクライアントをスレッドセーフに管理する方法について解説します。
なぜMutexが必要か?
WebSocketサーバーでは、複数のゴルーチンが同時にクライアントリストにアクセスする可能性があります:
- 新規クライアントの接続
- クライアントの切断
- メッセージのブロードキャスト
これらの同時アクセスを制御しないと、データ競合が発生する可能性があります。
実装例
クライアントマネージャーの構造体
type ClientManager struct {
clients map[*Client]bool
mutex sync.Mutex
}
type Client struct {
ID string
Conn *websocket.Conn
}
func NewClientManager() *ClientManager {
return &ClientManager{
clients: make(map[*Client]bool),
}
}
クライアントの追加
func (m *ClientManager) AddClient(client *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.clients[client] = true
log.Printf("Client added: %s. Total clients: %d", client.ID, len(m.clients))
}
クライアントの削除
func (m *ClientManager) RemoveClient(client *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
if _, exists := m.clients[client]; exists {
delete(m.clients, client)
client.Conn.Close()
log.Printf("Client removed: %s. Total clients: %d", client.ID, len(m.clients))
}
}
メッセージのブロードキャスト
func (m *ClientManager) Broadcast(message []byte) {
m.mutex.Lock()
defer m.mutex.Unlock()
for client := range m.clients {
err := client.Conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Printf("Error broadcasting to client %s: %v", client.ID, err)
// エラー時はクライアントを削除
delete(m.clients, client)
client.Conn.Close()
}
}
}
実際の使用例
func main() {
manager := NewClientManager()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Upgrade error: %v", err)
return
}
client := &Client{
ID: generateID(), // UUIDなどでユニークなID生成
Conn: conn,
}
// クライアントを追加
manager.AddClient(client)
// クライアントが切断したら削除
defer manager.RemoveClient(client)
// メッセージの受信処理
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("Read error: %v", err)
break
}
// 受信したメッセージを全クライアントにブロードキャスト
manager.Broadcast(message)
}
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
エラーハンドリング
コネクション切断の検出
func (m *ClientManager) handleConnection(client *Client) {
defer m.RemoveClient(client)
for {
_, _, err := client.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
log.Printf("Unexpected close error: %v", err)
}
break
}
}
}
クライアント数の取得
func (m *ClientManager) GetClientCount() int {
m.mutex.Lock()
defer m.mutex.Unlock()
return len(m.clients)
}
まとめ
sync.Mutex
を使用することで:
- スレッドセーフなクライアント管理が可能
- データ競合を防止
- 安全なメッセージブロードキャスト
注意点:
- Mutexのロック時間は最小限に
- デッドロックを防ぐためにdefer文を活用
- エラーハンドリングを適切に実装
この実装により、安全で効率的なWebSocketサーバーを構築できます。