0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Go言語における並行処理の基礎: ゴルーチンとチャネル

Posted at

Go言語における並行処理の基礎

「ゴルーチン(goroutine)」と「チャネル(channel)」の学習用の備忘録まとめです。

目次

  1. ゴルーチンの基本
  2. ゴルーチンの特性と制限
  3. チャネルの基本
  4. チャネルの種類と特性
  5. チャネルを使った同期パターン
  6. 高度な並行処理パターン
  7. よくある落とし穴と解決策
  8. ベストプラクティス

1. ゴルーチンの基本

1.1 ゴルーチンとは

ゴルーチンは、Go言語における軽量スレッドです。オペレーティングシステムのスレッドよりも遥かに軽量で、数千、数万のゴルーチンを同時に実行できます。

1.2 ゴルーチンの起動方法

ゴルーチンは、関数呼び出しの前にgoキーワードを付けるだけで起動できます:

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello, Goroutine!")
}

func main() {
    go sayHello() // ゴルーチンとして関数を実行
    
    // メイン関数がすぐに終了しないように少し待機
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Main function")
}

1.3 匿名関数を使ったゴルーチン

匿名関数(クロージャ)もゴルーチンとして実行できます:

go func() {
    fmt.Println("Anonymous function in goroutine")
}() // 即時実行のための括弧を忘れないようにする

1.4 ゴルーチンとメモリ消費

ゴルーチンは非常に軽量で、初期スタックサイズは約2KBです。必要に応じて拡張され、最大1GBまで成長可能です。これにより、数千のゴルーチンを同時に実行できます。

2. ゴルーチンの特性と制限

2.1 ゴルーチンの特性

  • 軽量性: OSスレッドより遥かに少ないリソースで動作
  • スケーラビリティ: 数千のゴルーチンを同時に実行可能
  • 簡易性: 単純なgoキーワードで起動可能

2.2 ゴルーチンの制限

  • 識別性の欠如: 個々のゴルーチンにIDはなく、直接参照できない
  • 制御の制限: 外部からゴルーチンを直接停止させる方法はない
  • 優先度制御なし: ゴルーチン間に優先度や親子関係はない
  • 終了検知の複雑さ: ゴルーチンの終了を検知するには別の仕組みが必要

2.3 ゴルーチンとクロージャの注意点

ループ内でゴルーチンを起動する際の一般的な落とし穴:

// 問題のあるコード
for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i) // すべてのゴルーチンが同じiを参照
    }()
}

上記のコードでは、すべてのゴルーチンが同じ変数iを参照するため、予期しない結果になります。正しい実装:

// 正しいコード
for i := 0; i < 5; i++ {
    i := i // ブロックスコープで新しい変数に代入
    go func() {
        fmt.Println(i) // 各ゴルーチンが独自のiを参照
    }()
}

// または引数として渡す
for i := 0; i < 5; i++ {
    go func(val int) {
        fmt.Println(val)
    }(i) // ループ変数をゴルーチンに渡す
}

3. チャネルの基本

3.1 チャネルとは

チャネルは、ゴルーチン間でデータを送受信するための通信パイプラインです。Go言語のチャネルは「送信者と受信者の同期」という重要な機能を提供します。

3.2 チャネルの宣言と初期化

// 整数型のチャネルを宣言
var ch chan int

// チャネルの初期化(makeを使用)
ch = make(chan int)

// 宣言と初期化を一度に行う
ch := make(chan int)

// バッファ付きチャネル(容量5)
bufferedCh := make(chan int, 5)

3.3 チャネルの基本操作

// チャネルへの送信
ch <- 42

// チャネルからの受信
value := <-ch

// 受信と値の使用
fmt.Println(<-ch)

// 受信して変数と確認フラグを取得
value, ok := <-ch // チャネルが閉じられていれば ok は false

3.4 チャネルのクローズ

close(ch) // チャネルを閉じる

チャネルを閉じると:

  • 閉じられたチャネルに送信すると、パニックが発生します
  • 閉じられたチャネルから受信しても、すでに送信されたデータはすべて受信できます
  • バッファ内のデータがすべて取り出された後、閉じられたチャネルからの受信は(zero_value, false)を返します

4. チャネルの種類と特性

4.1 バッファなしチャネル(同期チャネル)

バッファなしチャネルでは、送信と受信が同時に行われる必要があります:

ch := make(chan int) // バッファなしチャネル

// 送信側ゴルーチン
go func() {
    ch <- 42 // 受信側が準備できるまでブロック
    fmt.Println("値を送信しました")
}()

// 受信側
fmt.Println(<-ch) // 値を受信

バッファなしチャネルの特性:

  • 送信操作は対応する受信操作が行われるまでブロックする
  • 受信操作は対応する送信操作が行われるまでブロックする
  • ゴルーチン間の確実な同期を保証する

4.2 バッファ付きチャネル

バッファ付きチャネルは、指定した容量までのメッセージを格納できます:

ch := make(chan int, 3) // 容量3のバッファ付きチャネル

ch <- 1 // ブロックしない(バッファに空きがある)
ch <- 2 // ブロックしない
ch <- 3 // ブロックしない
// ch <- 4 // ここでブロック(バッファが満杯)

fmt.Println(<-ch) // 1を受信
ch <- 4 // 今度はブロックしない(バッファに空きができた)

バッファ付きチャネルの特性:

  • バッファが満杯でない限り、送信操作はブロックしない
  • バッファが空でない限り、受信操作はブロックしない
  • 一時的な処理速度の差を吸収するのに有用

4.3 単方向チャネル

関数のパラメータとして、送信専用または受信専用のチャネルを定義できます:

// 送信専用チャネルを引数に取る関数
func send(ch chan<- int) {
    ch <- 42
    // <-ch // コンパイルエラー:送信専用チャネルから受信できない
}

// 受信専用チャネルを引数に取る関数
func receive(ch <-chan int) {
    val := <-ch
    fmt.Println(val)
    // ch <- 42 // コンパイルエラー:受信専用チャネルに送信できない
}

func main() {
    ch := make(chan int) // 双方向チャネル
    go send(ch)     // 双方向チャネルを送信専用として渡す
    receive(ch)     // 双方向チャネルを受信専用として渡す
}

単方向チャネルの利点:

  • 型安全性の向上
  • コードの意図が明確になる
  • コンパイル時の安全性検証

5. チャネルを使った同期パターン

5.1 待機グループ(WaitGroup)との組み合わせ

sync.WaitGroupは複数のゴルーチンの完了を待つための仕組みです:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 関数終了時にカウンタをデクリメント
    fmt.Printf("Worker %d starting\n", id)
    // 何か処理
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1) // ゴルーチンごとにカウンタをインクリメント
        go worker(i, &wg)
    }
    
    wg.Wait() // すべてのゴルーチンが完了するまで待機
    fmt.Println("All workers done")
}

5.2 チャネルを使った完了通知

func main() {
    done := make(chan bool)
    
    go func() {
        // 何か処理
        fmt.Println("Task completed")
        done <- true // 完了を通知
    }()
    
    <-done // タスクの完了を待機
    fmt.Println("Main function continues")
}

5.3 タイムアウトの実装

func main() {
    ch := make(chan string)
    
    go func() {
        // 時間のかかる処理
        time.Sleep(2 * time.Second)
        ch <- "Task result"
    }()
    
    select {
    case result := <-ch:
        fmt.Println("Received:", result)
    case <-time.After(1 * time.Second):
        fmt.Println("Timeout: operation took too long")
    }
}

6. 高度な並行処理パターン

6.1 Select文による複数チャネルの監視

select文は複数のチャネル操作から最初に準備できたものを実行します:

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "one"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received", msg2)
        }
    }
}

6.2 コンテキストを使った中断

contextパッケージを使用すると、ゴルーチンをグレースフルに中断できます:

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker: received cancellation signal")
            return
        default:
            fmt.Println("Worker: doing work")
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    go worker(ctx)
    
    // 3秒後に中断
    time.Sleep(3 * time.Second)
    cancel()
    
    // ワーカーが終了するのを少し待つ
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Main: worker cancelled")
}

6.3 ワーカープールパターン

複数のワーカーゴルーチンで処理を分散するパターン:

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, j)
        time.Sleep(time.Second) // シミュレートされた処理時間
        results <- j * 2 // 結果を送信
    }
}

func main() {
    const numJobs = 5
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // ワーカーの起動
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }
    
    // ジョブの送信
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 結果の収集
    for a := 1; a <= numJobs; a++ {
        fmt.Println("Result:", <-results)
    }
}

7. よくある落とし穴と解決策

7.1 チャネルの閉じ忘れ

チャネルが不要になったら閉じる必要があります。特にrangeでチャネルを反復処理する場合、チャネルが閉じられなければ無限ループになります:

// 問題のあるコード
func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    // closeし忘れ
}

// 正しいコード
func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // チャネルを閉じる
}

func consumer(ch <-chan int) {
    for n := range ch {
        fmt.Println(n)
    }
    // chが閉じられないと、ここには到達しない
}

7.2 閉じられたチャネルへの送信

閉じられたチャネルに送信するとパニックが発生します:

ch := make(chan int)
close(ch)
ch <- 1 // パニック: "send on closed channel"

安全な送信のためのイディオム:

func safeSend(ch chan int, value int) (closed bool) {
    defer func() {
        if recover() != nil {
            closed = true
        }
    }()
    ch <- value // パニックが発生する可能性がある
    return false
}

より良い方法は、送信側がチャネルを閉じ、受信側が閉じられたかどうかを確認するパターンを使用することです:

// 送信側
func producer(ch chan int, done chan struct{}) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // 生産完了を通知
    <-done    // 消費側の完了を待つ
}

// 受信側
func consumer(ch <-chan int, done chan struct{}) {
    for {
        val, ok := <-ch
        if !ok {
            // チャネルが閉じられたら終了
            break
        }
        fmt.Println(val)
    }
    close(done) // 消費完了を通知
}

7.3 デッドロックの防止

デッドロックは、すべてのゴルーチンがブロックされた状態で、互いに待ち合わせになる状況です:

// デッドロックを引き起こすコード
func main() {
    ch := make(chan int)
    ch <- 1 // ブロックする(受信側がない)
    fmt.Println(<-ch) // 実行されることはない
}

デッドロックを防ぐためのヒント:

  1. ブロッキング操作はゴルーチン内で行う
  2. チャネルの送受信を適切にペアリングする
  3. タイムアウトを設定する
  4. select文でブロッキング操作を組み合わせる
// デッドロックを防ぐコード
func main() {
    ch := make(chan int)
    
    // 送信をゴルーチンで行う
    go func() {
        ch <- 1
    }()
    
    fmt.Println(<-ch) // 正常に受信できる
}

7.4 チャネルのリーク

ゴルーチンがチャネルの送受信でブロックされたままだと、そのゴルーチンはリークします:

// リークを引き起こすコード
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch // チャネルが閉じられないとブロックされ続ける
        fmt.Println(val)
    }()
    // ここでchにデータを送信せず、閉じることもしない
}

リークを防ぐためには:

  1. チャネルを適切に閉じる
  2. コンテキストやタイムアウトを使用する
  3. 処理を早期に終了できるようにする
// リークを防ぐコード
func noLeak() {
    ch := make(chan int)
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            fmt.Println("Timed out waiting for value")
            return
        }
    }()
    
    // 何か処理
    // ...
    
    close(ch) // ゴルーチンがブロックされないようにチャネルを閉じる
}

8. ベストプラクティス

8.1 チャネルの所有権

チャネルの所有権の原則:

  • チャネルを作成したゴルーチンが「所有者」となる
  • 所有者だけがチャネルを閉じる責任を持つ
  • 所有者だけが書き込み可能なチャネルに書き込む
  • 所有者は必要に応じてチャネルを閉じる
func owner() {
    ch := make(chan int) // 所有者がチャネルを作成
    
    // 所有者以外には受信専用として渡す
    for i := 0; i < 5; i++ {
        go worker(i, ch)
    }
    
    // 所有者がデータを送信
    for i := 0; i < 10; i++ {
        ch <- i
    }
    
    // 所有者がチャネルを閉じる
    close(ch)
}

func worker(id int, ch <-chan int) {
    for val := range ch {
        fmt.Printf("Worker %d received %d\n", id, val)
    }
}

8.2 エラー処理

エラーをチャネル経由で伝達する方法:

func worker(jobs <-chan int, results chan<- struct{ index, result int, err error }) {
    for j := range jobs {
        if j < 0 {
            results <- struct{ index, result int; err error }{j, 0, fmt.Errorf("invalid job: %d", j)}
            continue
        }
        
        // 処理
        results <- struct{ index, result int; err error }{j, j * 2, nil}
    }
}

8.3 コンテキストの活用

ゴルーチンのライフサイクル管理にコンテキストを使用する:

func processRequest(ctx context.Context, data string) (string, error) {
    resultCh := make(chan string)
    errCh := make(chan error)
    
    go func() {
        result, err := heavyProcess(data)
        if err != nil {
            errCh <- err
            return
        }
        resultCh <- result
    }()
    
    select {
    case result := <-resultCh:
        return result, nil
    case err := <-errCh:
        return "", err
    case <-ctx.Done():
        return "", ctx.Err() // タイムアウトまたはキャンセル
    }
}

func main() {
    // 3秒のタイムアウトを設定
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    result, err := processRequest(ctx, "input data")
    if err != nil {
        fmt.Println("Error:", err)
        return
    }
    fmt.Println("Result:", result)
}

8.4 チャネルサイズの決定

  • バッファなしチャネル: ゴルーチン間の同期が重要な場合
  • 小さなバッファ付きチャネル: ごく短時間の処理速度の差を吸収する場合
  • 大きなバッファ付きチャネル: バースト的なデータ処理や、生産者と消費者の速度差が大きい場合

適切なバッファサイズの選択は、アプリケーションの特性によって異なります。パフォーマンステストを行って最適なサイズを決定することをお勧めします。

まとめ

重要なポイント:

  1. ゴルーチンは軽量: 数千のゴルーチンを同時に実行可能
  2. チャネルは通信の要: 「通信によって共有する、共有するために通信しない」
  3. 同期の重要性: 適切な同期メカニズムを使用することでデータ競合やデッドロックを防止
  4. パターンの活用: ワーカープールやタイムアウトなどの確立されたパターンを活用する
  5. 所有権の明確化: チャネルの所有権を明確にし、適切に閉じる責任を持つ
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?