0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoWatchを作りながら学ぶGoの並行処理 #2 — Worker Poolパターンでgoroutineを制御する

0
Last updated at Posted at 2026-04-12

はじめに

前回はgoroutineとchannelの基本を学びました。
前回記事

  • goroutineは go キーワードで起動できる軽量な並行実行の単位
  • channelはgoroutine間で値を安全に渡すための通路

今回はその知識を活かして、GoWatchの中核となる Worker Poolパターン を解説します。


goroutineを無制限に起動する問題

前回学んだようにgoroutineは非常に軽量です。では、監視対象のURLが増えるたびにgoroutineを1つずつ起動すればよいのでしょうか。

// ❌ URLごとにgoroutineを起動する素朴な実装
for _, url := range urls {
    go check(url)
}

少数のURLであれば問題ありません。しかし監視対象が100件、1000件と増えたとき、同じ数だけgoroutineが同時に起動します。goroutineは軽量とはいえ無制限に起動すれば以下の問題が起きます。

  • メモリを大量に消費する
  • 外部サービスへのリクエストが集中してサーバーに負荷をかける
  • リソースの枯渇によりシステム全体が不安定になる

並行処理の数を意図的に制御する仕組みが必要です。


Worker Poolパターンとは

Worker Poolパターンは、あらかじめ決まった数のworker(goroutine)を起動しておき、jobsをchannelで渡すことで並列数を制御するパターンです。

[送信側]
    │ jobsをchannelに送信
    ▼
  jobs channel
    │
    ├─ [worker 1]
    ├─ [worker 2]  ← 常にN個のworkerだけが動いている
    └─ [worker 3]
    │
    ▼
  results channel
    │
    ▼
[受信側]
    │ 結果を処理

workerの数を固定することで「同時に処理できるjobの上限」を制御できます。jobsがworkerの数より多くても、channelがバッファとして機能するため処理が詰まることなく順番にさばいていきます。


GoWatchでの実装

Checkerの構造体

GoWatchでは Checker 構造体がWorker Poolを管理しています。

type Checker struct {
    db          *store.Store
    hub         *websocket.Hub
    jobs        chan string
    results     chan CheckResult
    workerCount int
}

func New(db *store.Store, hub *websocket.Hub, workerCount int) *Checker {
    return &Checker{
        db:          db,
        hub:         hub,
        jobs:        make(chan string, workerCount),
        results:     make(chan CheckResult, workerCount),
        workerCount: workerCount,
    }
}

jobsresults はどちらも workerCount をバッファサイズとしています。workerが処理できる数だけ先行して受け付けられるようにするためです。

Start() — workerを起動する

func (c *Checker) Start(ctx context.Context) {
    for i := 0; i < c.workerCount; i++ {
        go c.worker(ctx)
    }

    go c.resultLoop(ctx)
    go c.tickerLoop(ctx)
}

Start() が呼ばれると workerCount の数だけ worker goroutineを起動します。以降、workerの数は増えも減りもしません。

worker() — jobsを受け取って処理する

func (c *Checker) worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case url, ok := <-c.jobs:
            if !ok {
                return
            }
            result := c.check(ctx, url)
            c.results <- result
        }
    }
}

各workerは jobs channelを監視し続け、URLが送られてきたらHTTPリクエストを送信して結果を results channelに流します。ctx.Done() を監視しているのはアプリケーション終了時に安全にworkerを止めるためです。これはシリーズ#3で詳しく解説します。

tickerLoop() — 定期的にjobsを送信する

func (c *Checker) tickerLoop(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            targets, err := c.db.ListTargets(ctx)
            if err != nil {
                continue
            }
            for _, t := range targets {
                c.jobs <- t.URL
            }
        }
    }
}

30秒ごとにDBから監視対象URLを取得し、1件ずつ jobs channelに送信します。workerは受け取った順に処理するため、一度に大量のHTTPリクエストが走ることはありません。


まとめ

この記事で学んだことは3つです。

  • goroutineを無制限に起動するとリソースが枯渇するリスクがある
  • Worker Poolパターンはworkerの数を固定することで並列数を制御する
  • GoWatchでは jobs / results channelを介してworkerに仕事を渡している

次回 はcontextを取り上げます。ctx.Done() が何者なのか、GoWatchのGraceful Shutdownでcontextがどう機能しているかを実装コードを交えて解説します。


参考


関連記事リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?