0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Go言語のWebSocketとチャネル通信のきそ

0
Posted at

Go言語のWebSocketとチャネル通信のきそ

学者用の備忘録まとめです。

1. サンプルコード:WebSocketチャットクライアント

func (c *client) read() {
    for {
        _, msg, err := c.socket.ReadMessage(); err == nil {
            c.room.forward <- msg
        } else {
            break
        }
    }
    c.socket.Close()
}

func (c *client) write() {
    for msg := range c.send {
        if err := c.socket.WriteMessage(websocket.TextMessage, msg); 
            err != nil {
            break
        }
    }
    c.socket.Close()
}

このシンプルなコードを紐解き、理解を深めます。

まずは背景となる概念から順に理解していきます。

2. Go言語の並行処理モデル:CSP

Go言語の並行処理は「CSP(Communicating Sequential Processes)」という並行計算モデルに基づいています。このモデルは、Tony Hoare教授によって1978年に発表された理論で、Go言語の設計者である Rob Pike氏らによって実践的にプログラミング言語に取り入れられました。

2.1 従来の並行処理との違い

従来の多くのプログラミング言語では、スレッド間の通信に共有メモリを使用し、ロック、ミューテックス、セマフォなどの同期プリミティブでアクセスを制御します:

// Javaでの共有変数と同期の例
class SharedCounter {
    private int count = 0;
    
    public synchronized void increment() {
        count++;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

一方、Goの並行モデルでは以下の原則が強調されます:

「共有メモリを使って通信するのではなく、通信によって共有メモリを実現する」

つまり、複数のプロセス(ゴルーチン)間では、明示的なデータのやり取り(チャネルを介した通信)を基本とし、暗黙的な共有状態を避けます。これにより、デッドロックやレースコンディションといった並行処理の難しい問題を回避しやすくなります。

3. ゴルーチン(Goroutine):軽量スレッド

ゴルーチンはGoの並行処理の基本単位で、他の言語でいうところのスレッドに近い概念です。しかし、従来のOSスレッドよりもはるかに軽量で、数千から数百万のゴルーチンを同時に実行できます。

3.1 ゴルーチンの特徴

  1. 軽量性: スタックサイズが初期値2KBから始まり、必要に応じて動的に拡張/縮小する
  2. スケジューリング: Go言語のランタイムが管理(OSスケジューラではない)
  3. 簡単な生成: go キーワードだけで関数を並行実行できる

3.2 ゴルーチンの起動

// 関数をゴルーチンとして実行
go someFunction()

// 無名関数をゴルーチンとして実行
go func() {
    // 処理
}()

この go キーワードがない場合、関数は通常の同期呼び出しになります。

3.3 ゴルーチンの終了

ゴルーチンは以下の場合に終了します:

  1. 関数の実行が完了した場合
  2. ランタイムエラー(パニック)が発生し、recover されなかった場合
  3. メインゴルーチン(プログラム本体)が終了した場合

注意点: ゴルーチンには直接的な終了メカニズムはありません。別のゴルーチンを強制終了するには、通常はチャネルを通じたシグナリングが必要です。

4. チャネル(Channel):ゴルーチン間の通信

チャネルは、ゴルーチン間でデータをやり取りするための「パイプライン」として機能します。これはGoの並行処理モデルの中核をなす概念です。

4.1 チャネルの基本

// int型のチャネル作成
ch := make(chan int)

// 送信操作
ch <- 42

// 受信操作
value := <-ch

チャネルは特定の型のデータのみをやり取りでき、型安全です。

4.2 チャネル演算子 <-

Goでは <- 演算子が、チャネルとの間の全ての操作に使われます:

  1. 送信操作: ch <- value

    • 値をチャネルに送り込む
    • 左から右へ値が流れるイメージ
  2. 受信操作: value := <-ch

    • チャネルから値を取り出す

他の言語では、送信と受信に異なる演算子(例: -><-)を使うこともありますが、Goでは一貫して <- のみを使います。この矢印は「データの流れる方向」を示しています。

4.3 チャネルの種類

4.3.1 無バッファチャネル

ch := make(chan int)  // バッファサイズを指定しない

特徴:

  • 送信操作:受信側がデータを取り出すまでブロック
  • 受信操作:送信側がデータを送るまでブロック
  • 同期通信のように機能する

4.3.2 バッファ付きチャネル

ch := make(chan int, 10)  // バッファサイズ10

特徴:

  • 送信操作:バッファが満杯になるまではブロックしない
  • 受信操作:バッファが空の場合はブロック
  • 部分的な非同期通信が可能

4.4 チャネルのクローズと範囲節

チャネルは明示的に閉じることができます:

close(ch)  // チャネルchを閉じる

チャネルがクローズされた後:

  • 送信操作 (ch <- value) はパニックを引き起こす
  • 受信操作は:
    • バッファに残っている値があればそれらを返す
    • バッファが空になると、チャネルの型のゼロ値と false を返す
val, ok := <-ch  // okはチャネルが開いているかどうかを示す
if !ok {
    // チャネルは閉じられている
}

for range ループとの組み合わせ:

for val := range ch {
    // valはチャネルから受信した値
    // チャネルが閉じられると、ループは自動的に終了する
}

これは write() メソッドで使われているパターンです:

for msg := range c.send {
    // c.sendチャネルが閉じられるまで実行し続ける
}

4.5 select文によるマルチチャネル操作

Goの select 文は複数のチャネル操作を同時に待ち受ける強力な構文です:

select {
case val := <-ch1:
    // ch1からデータを受信した場合の処理
case ch2 <- value:
    // ch2にデータを送信できた場合の処理
case <-time.After(1 * time.Second):
    // タイムアウト処理
default:
    // どのケースも準備できていない場合の処理(非ブロッキングモード)
}

select の主な特徴:

  • 複数のチャネル操作を同時に待ち受ける
  • 準備できたケースをランダムに選択(複数が準備できている場合)
  • default ケースがあれば、他のケースが準備できていなくてもブロックしない
  • すべてのケースがブロックされ default がなければ、いずれかのケースが準備できるまでブロック

チャットアプリケーションでの活用例:

func (c *client) monitorConnection() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case msg := <-c.send:
            // 通常のメッセージ送信
            err := c.socket.WriteMessage(websocket.TextMessage, msg)
            if err != nil {
                return
            }
        case <-ticker.C:
            // 定期的にpingを送信
            if err := c.socket.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
                return
            }
        case <-c.disconnect:
            // 切断信号を受信
            return
        }
    }
}

このパターンはチャネルを使った複雑な状態管理や、タイムアウト、キャンセル処理などに非常に有効です。

4.6 チャネル方向の制限

チャネル型は方向を制限することができます:

// 送信専用チャネル
func send(ch chan<- int) {
    ch <- 42
    // value := <-ch  // コンパイルエラー:受信できない
}

// 受信専用チャネル
func receive(ch <-chan int) {
    value := <-ch
    // ch <- 42  // コンパイルエラー:送信できない
}

これにより、関数が意図した操作だけを行うことを型システムレベルで保証できます。

4.7 チャネルのゼロ値とnil チャネルの挙動

チャネルのゼロ値は nil です。nil チャネルの挙動を理解することは重要です:

var ch chan int // 宣言のみ、ゼロ値はnil

nil チャネルの特性:

  1. 送信操作 (ch <- value) - 永久ブロック
  2. 受信操作 (<-ch) - 永久ブロック
  3. close操作 (close(ch)) - パニック

この特性を利用した一般的なパターン:

func selectiveReceive(ch1, ch2 chan int) int {
    // チャネルを動的に無効化する例
    for {
        select {
        case v := <-ch1:
            // 条件を満たしたらch1を無効化
            if someCondition(v) {
                ch1 = nil // 以降、このselect文ではch1からは受信しない
                continue
            }
            return v
        case v := <-ch2:
            return v
        }
    }
}

このテクニックは複雑なイベント処理やステートマシンの実装に役立ちます。

5. サンプルコードの詳細解析

これまでの概念を踏まえて、WebSocketクライアントコードを詳しく分析しましょう。

5.1 clientの構造

コードを理解するには、まずclient構造体がどのように定義されているかを推測する必要があります:

type client struct {
    socket *websocket.Conn   // WebSocketコネクション
    send   chan []byte       // 送信用チャネル
    room   *room             // 所属するチャットルーム
}

type room struct {
    forward chan []byte      // メッセージ転送用チャネル
    // その他のフィールド
}

5.2 read() メソッドの解説

func (c *client) read() {
    for {
        _, msg, err := c.socket.ReadMessage(); err == nil {
            c.room.forward <- msg
        } else {
            break
        }
    }
    c.socket.Close()
}

このメソッドは:

  1. 無限ループでWebSocketからのメッセージを継続的に読み取ります
  2. エラーがない場合、受信したメッセージをルームのforwardチャネルに送信します
  3. エラーが発生した場合、ループを抜けてWebSocketを閉じます

ポイント: c.socket.ReadMessage() の返り値は3つあります:

  1. メッセージタイプ(ここでは使用していないため _ で無視)
  2. メッセージ本体 (msg)
  3. エラー情報 (err)

この関数はGoの複数戻り値を活用した例で、式 err == nilif 条件の一部として直接使う簡潔な書き方も特徴的です。

5.3 write() メソッドの解説

func (c *client) write() {
    for msg := range c.send {
        if err := c.socket.WriteMessage(websocket.TextMessage, msg); 
            err != nil {
            break
        }
    }
    c.socket.Close()
}

このメソッドは:

  1. for ... range 構文を使って c.send チャネルからメッセージを継続的に受信します
  2. 受信したメッセージをWebSocketに書き込みます
  3. エラーが発生した場合、ループを抜けてWebSocketを閉じます

for msg := range c.send は、チャネルが閉じられるまでチャネルからの値を継続的に受け取るイディオムです。これはチャネルのクローズを検出して自動的にループを終了するGoの洗練された機能です。

5.4 full-duplex通信モデル

この実装で特筆すべき点は、Goのゴルーチンとチャネルを使用して、WebSocketの全二重(full-duplex)通信モデルを自然に表現している点です:

            │   WebSocket   │
            │   Connection  │
            └───────┬───────┘
                    │
        ┌───────────┴───────────┐
        │                       │
┌───────▼────────┐     ┌────────▼───────┐
│  read()        │     │  write()       │
│  Goroutine     │     │  Goroutine     │
└───────┬────────┘     └────────▲───────┘
        │                       │
        │                       │
┌───────▼────────┐     ┌────────┴───────┐
│ room.forward   │     │   c.send       │
│ Channel        │     │   Channel      │
└───────┬────────┘     └────────▲───────┘
        │                       │
        │                       │
        └───────────►───────────┘
           (to other clients)

この設計により:

  1. 読み取りと書き込みが互いにブロックすることなく同時に行われます
  2. 受信したメッセージは room.forward チャネルを介して他のクライアントに配信されます
  3. 送信すべきメッセージは c.send チャネルを介して書き込みゴルーチンに渡されます

これはGoの並行モデルがWebSocketのような双方向通信プロトコルと非常に相性が良いことを示しています。ネットワーク接続で発生する自然な非同期性を、Go言語のゴルーチンとチャネルが効果的に扱っています。

6. 無限ループとエラーハンドリング

両方のメソッドで無限ループが使われていますが、これは意図的な設計です:

  • read(): WebSocketの接続が維持されている限り、継続的にメッセージを読み取ります
  • write(): チャネルが閉じられる、またはエラーが発生するまで、送信すべきメッセージを処理し続けます

エラーハンドリングも重要です:

  • WebSocket接続のエラー(切断など)が発生した場合、ループを終了してリソースをきれいに解放します

このパターンは、接続指向のプロトコルを扱う際の一般的なアプローチです。

7. 並行実行のパターン

これらのメソッドは別々のゴルーチンで実行されることを想定しています:

// クライアント作成時
client := &client{
    socket: conn,
    send:   make(chan []byte),
    room:   r,
}

// 別々のゴルーチンでread()とwrite()を起動
go client.read()
go client.write()

このパターンにより:

  1. メッセージの読み取りと書き込みが並行して行われます
  2. 一方がブロックされても、もう一方は独立して動作できます
  3. 双方向の通信が効率的に処理されます

8. コンテキスト(Context)によるキャンセル伝播

Go 1.7以降で導入された context パッケージは、チャネルと組み合わせて使うことで、ゴルーチン間でのキャンセル信号の伝播、タイムアウト、値の受け渡しを実現します。

func (c *client) readWithContext(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            // 親コンテキストがキャンセルされた
            c.socket.Close()
            return
        default:
            // 通常のメッセージ読み取り処理
            _, msg, err := c.socket.ReadMessage()
            if err != nil {
                return
            }
            c.room.forward <- msg
        }
    }
}

コンテキストは以下のような場面で特に有効です:

  1. リクエストのスコープ管理: HTTPリクエストごとにコンテキストを作成し、そのリクエストに関連するすべてのゴルーチンにキャンセル信号を伝播
  2. タイムアウト設定: 処理に制限時間を設ける
  3. 値の伝播: リクエストIDなどの値をゴルーチン間で共有
// タイムアウト付きコンテキストの作成
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // 必ず呼び出す

// コンテキスト付きの処理起動
go client.readWithContext(ctx)

コンテキストはGoの並行処理パターンにおいて、複数のゴルーチンを適切に管理・終了させるための重要な要素です。

9. 他言語との比較

Goの並行処理モデルをより理解するために、他の言語での同様の実装と比較してみましょう。

9.1 Javaとの比較

// Javaのスレッドとキューを使った類似実装
class Client {
    private final BlockingQueue<byte[]> messageQueue = new LinkedBlockingQueue<>();
    
    public void startReading() {
        new Thread(() -> {
            try {
                while (true) {
                    byte[] message = socket.readMessage();
                    if (message == null) break;
                    room.forward(message);
                }
            } catch (Exception e) {
                // エラー処理
            } finally {
                socket.close();
            }
        }).start();
    }
    
    public void startWriting() {
        new Thread(() -> {
            try {
                while (true) {
                    byte[] message = messageQueue.take();  // ブロッキング操作
                    socket.writeMessage(message);
                }
            } catch (Exception e) {
                // エラー処理
            } finally {
                socket.close();
            }
        }).start();
    }
}

9.2 TypeScriptとの比較

// 型定義
type MessageType = Uint8Array;
type MessageResolver = (message: MessageType) => void;

interface WebSocket {
    send(data: MessageType): Promise<void>;
    close(): void;
}

interface Room {
    forward(message: MessageType): void;
}

class Client {
    private socket: WebSocket;
    private room: Room;
    private messageQueue: MessageType[] = [];
    private resolvers: MessageResolver[] = [];
    
    constructor(socket: WebSocket, room: Room) {
        this.socket = socket;
        this.room = room;
    }
    
    async read(): Promise<void> {
        try {
            while (true) {
                const message: MessageType = await this.receiveMessage();
                this.room.forward(message);
            }
        } catch (err: unknown) {
            // エラー処理
            console.error('Read error:', err);
        } finally {
            this.socket.close();
        }
    }
    
    async write(): Promise<void> {
        try {
            while (true) {
                const message: MessageType = await this.waitForMessage();
                await this.socket.send(message);
            }
        } catch (err: unknown) {
            // エラー処理
            console.error('Write error:', err);
        } finally {
            this.socket.close();
        }
    }
    
    // メッセージキューの管理(チャネルの代わり)
    queueMessage(message: MessageType): void {
        if (this.resolvers.length > 0) {
            const resolve = this.resolvers.shift()!;
            resolve(message);
        } else {
            this.messageQueue.push(message);
        }
    }
    
    waitForMessage(): Promise<MessageType> {
        if (this.messageQueue.length > 0) {
            return Promise.resolve(this.messageQueue.shift()!);
        }
        return new Promise<MessageType>(resolve => {
            this.resolvers.push(resolve);
        });
    }
    
    // この実装では省略されている受信メソッド
    private receiveMessage(): Promise<MessageType> {
        // 実際の実装ではWebSocketからメッセージを受信する処理
        return Promise.resolve(new Uint8Array(0));
    }
}

これらの比較から見えてくるGoの特徴:

  1. 言語組み込みの並行プリミティブ: 他言語では別途ライブラリや複雑なパターンが必要
  2. シンプルさ: ゴルーチンとチャネルだけで複雑な非同期処理を表現できる
  3. 型安全性: チャネルを介したデータのやり取りも型安全
  4. エラー処理の統一性: 例外ではなく、明示的なエラー値の返却
  5. コード量の少なさ: 同等の機能を実現するコードが非常にコンパクト

10. Go言語チャネルのユースケース

チャネルは特に以下のような場面で効果的です:

  1. ワーカープール: 複数のワーカーゴルーチンに仕事を分配
  2. パイプライン: 一連の処理ステップをつなげる
  3. シグナリング: ゴルーチン間の実行タイミングを制御
  4. ミューテックスの代替: 共有状態へのアクセスを同期する
  5. イベント駆動システム: イベントの発行と購読

11. まとめ

Go言語のチャネルとゴルーチンを使った並行処理は、WebSocketのような双方向通信に非常に適しています。この記事で説明した概念をまとめると:

  • チャネル (<- 演算子) を使ってゴルーチン間で安全にデータをやり取り
  • 無限ループで継続的にメッセージを処理(エラー発生まで)
  • 読み取りと書き込みを別々のゴルーチンで並行処理
  • select文で複数のチャネル操作を同時に待ち受け
  • コンテキストでゴルーチン間のキャンセル伝播やタイムアウトを管理
  • チャネルのクローズを使ったシグナリングパターン
  • nilチャネルの特性を活用した高度な制御

これらの機能を組み合わせることで、Goは並行処理を驚くほどシンプルかつ強力に実装できます。

Goの「通信によって共有する」というアプローチは、並行プログラミングの複雑さを大幅に軽減し、デッドロックやレースコンディションのような一般的な問題を回避しやすくします

12. さらなる学習のために

Goの並行処理についてさらに学ぶためのリソース:

  1. Effective Go - Concurrency
  2. Go by Example: Goroutines
  3. Go by Example: Channels
  4. The Go Programming Language (書籍) - 特に第8章
  5. Concurrency is not Parallelism (Rob Pike のトーク)
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?