Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
32
Help us understand the problem. What are the problem?
@castaneai

goroutine を安全に止める方法

Goにはgoroutineという特定の関数を非同期で実行させられる便利な機能があります。
しかし、名前通り非同期処理なので実装によっては「裏でずっと無駄に動いていた」「気づいたら裏で増殖していた」といった事態になります。

Goに限らずあらゆるプログラムで言えることですが、 一度起動した非同期処理は終了までちゃんと面倒を見る ことが重要です。
もう少し具体的な例でいうと非同期処理はキャンセル(中断)を考慮しよう、というものが挙げられます。
本記事ではGoの非同期処理システムであるgoroutineを止める方法を解説していきます。

goroutineを止める方法はない?

いきなり結論からですが、goroutineを止める専用のAPIは存在しません。

では、どうすればgoroutineを止められるのでしょうか?
答えは簡単で goroutineで起動した関数をreturnして終わらせるだけです。
「いや、そんなことはわかってるよ!」って感じですね。
そういうことではなく、多くの人が知りたいのは 裏で動いているgoroutineをメインのスレッドからいい感じに中断させるにはどうすればいいか? だと思います。

ループしているgoroutineを止める

たとえば、次のように無限ループで何かを処理し続けるgoroutineがあるとします。
これをメイン処理から止めたい場合どうすればいいでしょうか?

package main

import (
    "time"
)

func main() {
    go loop()

    // 何かの処理

    // ここで loop() を止めたい

    time.Sleep(3 * time.Second)
    println("finish")
}

// 無限ループする関数
func loop() {
    for {
        println("loop...")
        time.Sleep(1 * time.Second)
    }
}

channelを使って中止を通知する

別のgoroutineに何かを通知したい場合、定番なのは channel を使う方法です。
流れとしては次のようになります。

  1. 中断通知用のchannelを作る
  2. goroutineに作ったchannelを渡す
  3. メイン処理からchannelを使って通知を送る(止まれ!)
  4. goroutineの中で通知を受け取ったら return で関数を終わらせる(止まった)
package main

import (
    "time"
)

func main() {
    cancel := make(chan struct{})
    go loop(cancel)

    // 何かの処理

    // ここで loop() を止める
    close(cancel)

    time.Sleep(3 * time.Second)
    println("finish")
}

// 無限ループする関数
func loop(cancel chan struct{}) {
    for {
        select {
        case <-cancel:
            // 中断通知が来た
            println("cancel")
            return
        default:
            println("loop...")
            time.Sleep(1 * time.Second)
        }

    }
}

実装のポイント

この実装にはいくつかポイントがあります。

  • 通知するだけなのでchannelの型は struct{} でよい
  • 通知を送る際、 channel にダミーの値を送る必要はなく、 close() でよい
  • select に default: をつけると 「どのcaseのchannelにも値が来てなかったとき」 を表現できる

それぞれについて軽く説明します。

channelの型は struct{} でよい

channelには「何の型を送信するか」という型指定ができますが、今回の場合「通知するだけ」なので、具体的な値を通知する必要はありません。
よって、サイズが0の struct{} を指定して「このchannelは具体的な値を運ばないよ(通知するだけだよ)」と明示しています。

通知を送る際は close() でよい

通常、channelへの送信は cancel <- struct{}{} のように値を生成して <- で送り込むといった方法を取るのですが、
本記事テーマの「中断を通知する」場合であれば close() でchannelごと閉じて大丈夫です。
なぜなら、close()を使うと 複数のgoroutineに一斉に通知できる からです。

詳細は次の記事が参考になります。

select に default: をつけると 「どのcaseのchannelにも値が来てなかったとき」 を表現できる

これは書いてある通りです。
よって「中断通知が来たらreturnしたいけど、来てない場合は普通に処理をする」といった場合はこの default: が使えます。

通知の応答を早くする

以上の実装でループするgoroutineを止めることができました。
しかし、この実装だと 中断通知を送ってすぐには止まりません。
なぜなら、selectによる中断のチェックは1秒毎にしか実行されないからです。

image.png

図のように、たとえばloopが開始してから 2.5秒 の時点で中断通知を送っても、実際にgoroutineが止まるのは 3秒 の時点になります。
最大1秒の遅延なら気にしないというケースであればこのままでも十分ですが、もし1秒のSleepではなくもっと長時間かかる処理が入っていたら 中断したはずなのに、全然goroutineが止まってくれない! という事態になります。

time.Sleepをtime.Tickerに置き換える

time.Sleepは直感的でシンプルなので簡単な例だと使いたくなりますが中断を考慮すると微妙です。
なぜなら、time.Sleep自体を中断させることができないからです。

そこで、代わりに time.Ticker が使えます。
time.Ticker を使うと「1秒経過、1秒経過、1秒経過、…」といった定期タイマーをchannelで通知してくれます。
中断通知と1秒経過のタイマーがどちらもchannelになるので、selectで先に通知されたものを優先して処理できます。

package main

import (
    "time"
)

func main() {
    cancel := make(chan struct{})
    go loop(cancel)

    // 2.5秒待つ
    time.Sleep(2500 * time.Millisecond)

    // ここで loop() を止める
    close(cancel)

    time.Sleep(3 * time.Second)
    println("finish")
}

// 無限ループする関数
func loop(cancel chan struct{}) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-cancel:
            // 中断通知が来た
            println("cancel")
            return
        case <-ticker.C:
            // 1秒ごとのtickerが来た
            println("loop...")
        }

    }
}

これを実行すると、ちゃんと 2.5秒 経過の時点でgoroutineが止まります。

Sleep以外の処理も即座に中断したい場合

time.Sleep を安全に中断できましたが、実際のアプリケーションでは単純なSleepではなくもっと他の非同期処理をしているはずです。
具体的にはネットワーク通信やファイルのI/Oなどが多いと思います。

基本的な考え方は time.Sleep の置き換えと同じです。
長時間かかる可能性がある呼び出しは、すべて中断ができる形に置き換えていけばよいのです。

次のように、TCP通信で相手からデータを受け取るコードを実行してみます。

package main

import (
    "net"
    "time"
)

func main() {
    cancel := make(chan struct{})
    go loop(cancel)

    // 2.5秒待つ
    time.Sleep(2500 * time.Millisecond)

    // ここで loop() を止める
    close(cancel)

    time.Sleep(3 * time.Second)
    println("finish")
}

// 無限ループする関数
func loop(cancel chan struct{}) {
    for {
        select {
        case <-cancel:
            // 中断通知が来た
            println("cancel")
            return
        default:
            conn, err := net.Dial("tcp", "127.0.0.1:12345")
            if err != nil {
                println("dial failed")
                return
            }
            buf := make([]byte, 1024)
            // ネットワークの相手からデータを読み取る(なにかのデータが来るまでここでずっと待つ
            conn.Read(buf)
        }

    }

この 127.0.0.1:12345 は適当なアドレスですが、仮にこの通信相手がいつまでもデータを送ってくれない場合どうなるでしょうか?
メイン処理は 2.5秒 後に中断を通知したのに、 goroutineは止まってくれません。
コードのコメントにある通り、 conn.Read がデータを受信するまで待ち続けてしまうからです。
よってこの関数は time.Sleep と同じように中断可能に置き換えないと、メイン処理の中断通知を認識できません。

conn.Read を中断させるには 別のgoroutineから TCPコネクションを切断するしかないみたいです。
つまり、conn.Close を呼び出します。

具体的なコード例をあげるとこのようになります。
ずっと待ってしまう conn.Read をさらに別のgoroutineで実行して、loop goroutineは中断通知を待ち続け
中断通知が来たら conn.Close で切断して Read のgoroutineごとすべてを中断するという流れになっています。

package main

import (
    "fmt"
    "net"
    "time"
)

func main() {
    cancel := make(chan struct{})
    go loop(cancel)

    // 2.5秒待つ
    time.Sleep(2500 * time.Millisecond)

    // ここで loop() を止める
    close(cancel)

    time.Sleep(3 * time.Second)
    println("finish")
}

// 無限ループする関数
func loop(cancel chan struct{}) {
    conn, err := net.Dial("tcp", "127.0.0.1:12345")
    if err != nil {
        println("dial failed")
        return
    }

    go func() {
        buf := make([]byte, 1024)
        // ネットワークの相手からデータを読み取る(なにかのデータが来るまでここでずっと待つ
        conn.Read(buf)
    }()

    for {
        select {
        case <-cancel:
            // 中断通知が来た
            println("cancel")
            if err := conn.Close(); err != nil {
                fmt.Printf("failed to close connection: %+v", err)
            }
            return
        }
    }
}

コードが複雑化してきましたが、これでTCP通信も 2.5秒 後に中断することができました。

contextパッケージを使う

さて、ここまでchannelを使って中断を通知する方法を紹介してきました。
time.Sleep やネットワーク通信の読み書きなど、そのまま呼び出すと中断できないものは中断可能に置き換えることが重要とわかりました。

しかし、非同期処理を書く度に上のような中断可能性を考えるのはなかなかに難しいですし、コードも複雑化してしまいます。
また、channelの仕様的にやりづらい点もいくつか存在します。たとえば、 同じchannelを2回closeするとクラッシュする といった点です。

そこで 「あらゆるキャンセル処理を統一的に扱えるようにしよう!」 という考えからcontextパッケージ が生まれました。
contextパッケージの具体的な使い方はネット上にもたくさんあるのでここでは省略して、contextを使うとchannelに比べて何が良いのかという点を解説します。

書き方が統一される

contextパッケージを使った関数は基本的に 第1引数にcontext.Context型の引数を取る ようになっています。
これは強制ではありませんが、有名なGoのパッケージは基本これに従っています。
こういったルールがあることで、第1引数にcontext.Contextを取る関数は長時間かかる可能性があるんだな と認識できるので理解が楽になります。

たとえば、grpc-goの自動生成されたクライアントは次のようになっています。

type GreeterClient interface {
    // Sends a greeting
    SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

Contextは親から子に伝搬していく

context.Contextは親子関係を作ることができます。
大本の親のcontextが中断されると、それが子のcontextにも伝搬して伝わります。
よって、非同期処理の中でさらに非同期処理を呼び出して…といった階層的な処理を一括で中断できます。

Cloud Native Go Chapter 4. Cloud Native Patterns - The Context Packageにとても良い図があったので引用します。
一番最初のユーザーのリクエストを中止すると、同時に非同期で呼び出された他のサービスへのリクエストもすべて一括で中断される…といったイメージです。

image.png

Contextは中断通知を複数回送っても大丈夫

同じchannelに対して複数回 close() を呼ぶとクラッシュしてしまう(panicが起きる)という問題があるのですが、 context.WithCancel で生成される中断関数(context.CancelFunc) は複数回呼び出しても大丈夫な作りになっています。2回め以降の呼び出しは無視され何も起きません。

たとえば、上で紹介したTCP通信を中断する例をcontextに置き換え、2回中断を呼び出してみた例がこちらです。

package main

import (
    "context"
    "fmt"
    "net"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go loop(ctx)

    // 2.5秒待つ
    time.Sleep(2500 * time.Millisecond)

    // ここで loop() を止める
    cancel()

    // 2回呼び出しても大丈夫
    cancel()

    time.Sleep(3 * time.Second)
    println("finish")
}

// 無限ループする関数
func loop(ctx context.Context) {
    conn, err := net.Dial("tcp", "127.0.0.1:12345")
    if err != nil {
        println("dial failed")
        return
    }

    go func() {
        buf := make([]byte, 1024)
        // ネットワークの相手からデータを読み取る(なにかのデータが来るまでここでずっと待つ
        conn.Read(buf)
    }()

    for {
        select {
        case <-ctx.Done():
            // 中断通知が来た
            println("cancel")
            if err := conn.Close(); err != nil {
                fmt.Printf("failed to close connection: %+v", err)
            }
            return
        }
    }
}

time.SleepConn.Read はcontextに対応しないのか?

ここまで読むと、contextパッケージを使ったほうが良いのだなという認識になってくると思います。
しかし、前半にでてきた標準パッケージの time.SleepConn.Read は第1引数に context.Context を取らず従っていないように見えます。
長時間かかる可能性のある処理なのに、contextパッケージに対応しないのでしょうか?

これに関してはGo本家のissueでも熱く(?)議論されていました。

筆者自身もあまり詳しく読み込めてはいないのですが、contextパッケージは比較的最近に生まれたパッケージであり、 昔からずっと存在している標準の time.SleepConn.Read のAPIを破壊してまで置き換えるのは違う、アプリ側でラップした実装を自由に作れば良い。 という主張が優位なように見えます。

まとめ

「goroutineを安全に止めたい」という一見単純な課題ですが、停止の即座性などを考えると考慮すべきことは多いです。
長くなってしまいましたが、箇条書きでまとめると次のようになります。

  • goroutineはreturnしたら止まる(当たり前)
  • returnさせるためには、別goroutineから通知を送る必要がある
  • 即座に通知を受け取れるように、 time.SleepConn.Read といった長時間止まる可能性のあるAPI呼び出しを中断可能に置き換える必要がある
  • contextという非同期処理の中断に最適化されたパッケージがあり、できるだけこれに従って実装する
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
32
Help us understand the problem. What are the problem?