1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ErrGroup: Goで並行処理を制する鍵

Posted at

Group58.png

Leapcell: ウェブホスティング、非同期タスク、Redisのための次世代サーバレスプラットフォーム

Go言語のerrgroupライブラリ:強力な並列処理制御ツール

errgroupは、公式のGoライブラリx内のユーティリティで、複数のgoroutinesを並列実行し、エラーを処理するために使用されます。これは、sync.WaitGroupをベースにerrgroup.Groupを実装しており、並列プログラミングにより強力な機能を提供します。

errgroupの利点

sync.WaitGroupと比較して、errgroup.Groupには以下の利点があります:

  1. エラー処理sync.WaitGroupgoroutinesの完了を待つだけで、戻り値やエラーを処理しません。一方、errgroup.Groupは直接戻り値を処理することはできませんが、goroutineがエラーに遭遇した場合、直ちに他の実行中のgoroutinesをキャンセルし、Waitメソッドで最初のnilでないエラーを返すことができます。
  2. コンテキストキャンセルerrgroupcontext.Contextと組み合わせて使用できます。goroutineがエラーに遭遇した場合、自動的に他のgoroutinesをキャンセルし、効果的にリソースを制御し、不要な作業を回避します。
  3. 並列プログラミングの簡素化errgroupを使用することで、エラー処理のための定型コードを削減できます。開発者は手動でエラー状態や同期ロジックを管理する必要がなく、並列プログラミングをより簡単かつ保守しやすくできます。
  4. 並列度の制限errgroupは、並列実行するgoroutinesの数を制限するインターフェイスを提供し、過負荷を回避することができます。これはsync.WaitGroupにはない機能です。

sync.WaitGroupの使用例

errgroup.Groupを紹介する前に、まずsync.WaitGroupの使い方を見てみましょう。

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/", 
    }
    var err error

    var wg sync.WaitGroup 

    for _, url := range urls {
        wg.Add(1) 

        go func() {
            defer wg.Done() 

            resp, e := http.Get(url)
            if e != nil { 
                err = e
                return
            }
            defer resp.Body.Close()
            fmt.Printf("fetch url %s status %s\n", url, resp.Status)
        }()
    }

    wg.Wait()
    if err != nil { 
        fmt.Printf("Error: %s\n", err)
    }
}

実行結果:

$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host

sync.WaitGroupの典型的なアイディオム:

var wg sync.WaitGroup

for ... {
    wg.Add(1)

    go func() {
        defer wg.Done()
        // do something
    }()
}

wg.Wait()

errgroup.Groupの使用例

基本的な使い方

errgroup.Groupの使用パターンはsync.WaitGroupと似ています。

package main

import (
    "fmt"
    "net/http"
    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/", 
    }

    var g errgroup.Group 

    for _, url := range urls {
        g.Go(func() error {
            resp, err := http.Get(url)
            if err != nil {
                return err 
            }
            defer resp.Body.Close()
            fmt.Printf("fetch url %s status %s\n", url, resp.Status)
            return nil 
        })
    }

    if err := g.Wait(); err != nil {
        fmt.Printf("Error: %s\n", err)
    }
}

実行結果:

$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host

コンテキストキャンセル

errgrouperrgroup.WithContextを提供して、キャンセル機能を追加します。

package main

import (
    "context"
    "fmt"
    "net/http"
    "sync"
    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/", 
    }

    g, ctx := errgroup.WithContext(context.Background())

    var result sync.Map

    for _, url := range urls {
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err 
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err 
            }
            defer resp.Body.Close()

            result.Store(url, resp.Status)
            return nil 
        })
    }

    if err := g.Wait(); err != nil {
        fmt.Println("Error: ", err)
    }

    result.Range(func(key, value any) bool {
        fmt.Printf("fetch url %s status %s\n", key, value)
        return true
    })
}

実行結果:

$ go run examples/withcontext/main.go
Error:  Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK

http://www.somestupidname.com/ へのリクエストがエラーを報告したため、プログラムは http://www.golang.org/ へのリクエストをキャンセルしました。

並列度の制限

errgrouperrgroup.SetLimitを提供して、同時に実行されるgoroutinesの数を制限します。

package main

import (
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)

func main() {
    var g errgroup.Group
    g.SetLimit(3)

    for i := 1; i <= 10; i++ {
        g.Go(func() error {
            fmt.Printf("Goroutine %d is starting\n", i)
            time.Sleep(2 * time.Second) 
            fmt.Printf("Goroutine %d is done\n", i)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        fmt.Printf("Encountered an error: %v\n", err)
    }

    fmt.Println("All goroutines complete.")
}

実行結果:

$  go run examples/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.

試して開始

errgrouperrgroup.TryGoを提供して、タスクを試して開始します。これはerrgroup.SetLimitと組み合わせて使用する必要があります。

package main

import (
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)

func main() {
    var g errgroup.Group
    g.SetLimit(3)

    for i := 1; i <= 10; i++ {
        if g.TryGo(func() error {
            fmt.Printf("Goroutine %d is starting\n", i)
            time.Sleep(2 * time.Second) 
            fmt.Printf("Goroutine %d is done\n", i)
            return nil
        }) {
            fmt.Printf("Goroutine %d started successfully\n", i)
        } else {
            fmt.Printf("Goroutine %d could not start (limit reached)\n", i)
        }
    }

    if err := g.Wait(); err != nil {
        fmt.Printf("Encountered an error: %v\n", err)
    }

    fmt.Println("All goroutines complete.")
}

実行結果:

$ go run examples/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.

ソースコード解説

errgroupのソースコードは主に3つのファイルで構成されています:

コア構造

type token struct{}

type Group struct {
    cancel func(error)
    wg sync.WaitGroup
    sem chan token
    errOnce sync.Once
    err     error
}
  • token:信号を渡すための空の構造体で、並列度を制御するために使用されます。
  • Group
    • cancel:コンテキストがキャンセルされたときに呼び出される関数。
    • wg:内部で使用されるsync.WaitGroup
    • sem:並列コルーチンの数を制御する信号チャネル。
    • errOnce:エラーが1回だけ処理されることを保証します。
    • err:最初のエラーを記録します。

主なメソッド

  • SetLimit:並列度を制限します。
func (g *Group) SetLimit(n int) {
    if n < 0 {
        g.sem = nil
        return
    }
    if len(g.sem) != 0 {
        panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
    }
    g.sem = make(chan token, n)
}
  • Go:新しいコルーチンを開始してタスクを実行します。
func (g *Group) Go(f func() error) {
    if g.sem != nil {
        g.sem <- token{}
    }

    g.wg.Add(1)
    go func() {
        defer g.done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(g.err)
                }
            })
        }
    }()
}
  • Wait:すべてのタスクが完了するのを待ち、最初のエラーを返します。
func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel(g.err)
    }
    return g.err
}
  • TryGo:タスクを試して開始します。
func (g *Group) TryGo(f func() error) bool {
    if g.sem != nil {
        select {
        case g.sem <- token{}:
        default:
            return false
        }
    }

    g.wg.Add(1)
    go func() {
        defer g.done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(g.err)
                }
            })
        }
    }()
    return true
}

まとめ

errgroupは、公式の拡張ライブラリであり、sync.WaitGroupのベースにエラー処理機能を追加しており、同期、エラー伝播、コンテキストキャンセルなどの機能を提供します。そのWithContextメソッドはキャンセル機能を追加でき、SetLimitは並列度を制限でき、TryGoはタスクを試して開始できます。ソースコードは巧みに設計されており、参考に値するものです。

Leapcell: ウェブホスティング、非同期タスク、Redisのための次世代サーバレスプラットフォーム

最後に、Go言語をデプロイするのに最適なプラットフォームをおすすめします:Leapcell

barndpic.png

1. 多言語対応

  • JavaScript、Python、Go、またはRustで開発できます。

2. 無料で無制限のプロジェクトをデプロイ

  • 使用量に応じてのみ支払い — リクエストがなければ、請求もありません。

3. 抜群のコスト効率

  • 実行した分だけ請求され、アイドル時には請求されません。
  • 例:平均応答時間60ミリ秒で694万件のリクエストに対応可能で、費用は25ドル。

4. 合理化された開発者体験

  • 直感的なUIにより、簡単にセットアップできます。
  • 完全自動化されたCI/CDパイプラインとGitOpsの統合。
  • 実行時のメトリクスとログ記録により、アクション可能なインサイトを提供します。

5. 簡単なスケーラビリティと高パフォーマンス

  • 高い並列度を容易に処理できる自動スケーリング機能。
  • 運用オーバーヘッドはゼロ — 構築に集中できます。

Frame3-withpadding2x.png

ドキュメントでさらに詳しく探る!

LeapcellのTwitter:https://x.com/LeapcellHQ

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?