Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
1
Help us understand the problem. What is going on with this article?
@rai_wtnb

「Go言語による並行処理」ポイントまとめ・下

Go言語による並行処理のポイントまとめ・下です。
go言語による並行処理

以下の記事の続きです!
15分で読める「Go言語による並行処理」ポイントまとめ・上

Goの並行処理パターン

拘束

・平行なコードで安全な操作をするには、①Mutex ②チャネル 以外に重要なのは
 ③イミュータブルなデータ
  → 各並行プロセスが同じデータを操作できないようにする
   = メモリ内のポインタの代わりに値のコピーを使う
 ④拘束により保護されたデータ
  → アドホック拘束レキシカル拘束がある

アドホック拘束
・規約(言語コミュニティ、職場のチーム、触っているコードベースなどにより指定されているもの)によって拘束する

data := make([]int, 4)

loopData := func(handleData chan<- int) {
    defer close(handleData)

    for i := range data {
        // dataにここからのみアクセス
        handleData <- data[i]
    }
}

handleData := make(chan int)
go loopData(handleData)

for num := range handleData {
    fmt.Println(num)
}

↑本来はdataはメインゴルーチンからもアクセスできるが規約によりloopDataからのみアクセスすることにしている
 → しかし多くの人がこのコードに着手するにつれ規約違反のコードが混在する恐れ

レキシカル拘束
・レキシカルスコープによりチャネルへの読み書きのうち必要な権限のみ公開する

prirntData := func(wg *sync.WaitGroup, data []byte) {
    defer wg.Done()

    var buff bytes.Buffer
    for _, b := range data {
        fmt.Fprintf(&buff, "%c", b)
    }
    fmt.Println(buff.String())
}

var wg sync.WaitGroup
wg.Add(2)

// printDataの後にdataスライスを宣言しているためprintDataは直接アクセスできない
data := []byte("golang")
go prirntData(&wg, data[:3])
go prirntData(&wg, data[3:])

wg.Wait()

// =>
// ang
// gol

↑あえて同期ではなく拘束を利用することでパフォーマンス向上とコードの可読性向上が見込める
(可読性向上の理由はレキシカル拘束を利用するとレキシカルスコープ内では同期なコードが書けるから)

for-selectループ

select

for-selectループを使う場面
 ①チャネルから繰り返しの変数を送出

// 繰り返し可能なものをチャネル上の変数に変換したい時
for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:
        return
    case stringCh <- s:
    }
}

 ②停止シグナルを待つ無限ループ

for {
    select {
    case <-done:
        return
    default:
        // 停止シグナルが来るまで処理継続
    }
}

orチャネル

・doneチャネルをひとまとめにしてそのうちひとつがcloseしたらまとめた全channelをcloseする
・複数モジュールを組み合わせる際の継ぎ目として利用すると便利

var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
    // 再帰関数orの停止条件(スライスが空の時nilチャネルを返す または 要素が1つのみのときはそれを返す)
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            // 再起的にorチャネルを作成
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

// 指定した時間経過後にcloseするチャネルを生成する関数
sig := func(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

start := time.Now()
<-or(
    sig(2*time.Hour),
    sig(5*time.Minute),
    sig(1*time.Second),
    sig(1*time.Hour),
    sig(1*time.Minute),
)
fmt.Printf("読込時間: %v\n", time.Since(start)) // => 読込時間: 1.005219375s

エラーハンドリング

・ゴルーチンがエラーを生成する場合、正常系の結果と強く結びつけて、正常系と同じ経路で渡されるべき
・メインゴルーチンが複数ゴルーチンから結果を取りまとめ、子ゴルーチンを継続させるか否か判断する、といったこともできる

// *http.Responseとerrorを囲む
type Result struct {
    Error    error
    Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)

    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp}

            select {
            case <-done:
                return
            // resultsチャネルに書き込む
            case results <- result:
            }
        }
    }()
    return results
}

done := make(chan interface{})
defer close(done)

urls := []string{"https://note.com/muku_69", "https://badhost"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v\n", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

// =>
// Response: 200 OK
// error: Get "https://badhost": dial tcp: lookup badhost: no such host

パイプライン

・パイプラインのステージの性質
 1. 受取るものと返すものが同じ型

 2. ステージは引き回せるよう具体化していなければならない

 3. バッチ処理とストリーム処理
  ・バッチ処理 ... データをまとめて処理
  ・ストリーム処理 ... ステージが要素を1つ1つ受け渡しをする。

// 受け取ったスライスと同じ長さのバッファ付きチャネルを作成するジェネレーター
generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intCh := make(chan int, len(integers))

    go func() {
        defer close(intCh)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intCh <- i:
            }
        }
    }()

    return intCh
}

multiply := func(
    done <-chan interface{},
    intCh <-chan int,
    multiplier int,
) <-chan int {
    multipliedCh := make(chan int)
    go func() {
        defer close(multipliedCh)
        for i := range intCh {
            select {
            case <-done:
                return
            case multipliedCh <- i * multiplier:
            }
        }
    }()
    return multipliedCh
}

add := func(
    done <-chan interface{},
    intCh <-chan int,
    additive int,
) <-chan int {
    // additiveを足してチャネルを返す処理
    // ...
    return addedCh
}

done := make(chan interface{})
defer close(done)

intCh := generator(done, 1, 2, 3, 4)
// パイプライン作成(*2 → +1 → *2)
pipeline := multiply(done, add(done, multiply(done, intCh, 2), 1), 2)

for v := range pipeline {
    fmt.Println(v)
}

// =>
// 6
// 10
// 14
// 18

チャネルを使うことで各ステージを安全に並行処理ができ、またそれにより各ステージで入力値だけ待てば良いのですぐに出力を送れる

ファンアウト・ファンイン

ファンアウト
・パイプラインからの入力を扱うために複数ゴルーチンを起動するプロセス
ファンイン
・複数結果を1チャンネルに結合するプロセス

ファンアウト・ファンインを使うべき場合は?
→ 長時間実行かつそのステージが前の計算結果に依存していないとき

or-doneチャネル

・システムの全く異なる部分からチャネルを受け取るとき、ゴルーチンがキャンセルされた=読み込み先のチャネルがキャンセルされた、という意味にならない場合がある
→ doneチャネルも条件に入ったselect文で読み込み先のチャネルを囲む必要がある

orDone := func(done, c <-chan interface{}) <-chan interface{} {
    valCh := make(chan interface{})

    go func() {
        defer close(valCh)
        for {
            // select文で囲む
            select {
            case <-done:
                return

            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valCh <- v:
                case <-done:
                }
            }
        }
    }()
    return valCh
}

teeチャネル

・チャネルからの値を2つに分けて扱いとき
 ex. ユーザの入力コマンドを①実行、②後々検査するために記録する

・Unix系システムでのteeコマンドから由来
teeコマンドについて詳しくまとめました 【Linuxコマンド集】

tee := func(
    done <-chan interface{},
    in <-chan interface{},
) (_, _ <-chan interface{}) {
    out1 := make(chan interface{})
    out2 := make(chan interface{})

    go func() {
        defer close(out1)
        defer close(out2)

        // 前述したorDoneを使用
        for val := range orDone(done, in) {
            // コピー変数を用意
            var out1, out2 = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case out1 <- val:
                    out1 = nil // チャネルへの書き込み終了後コピー変数にnilを代入して書き込みをブロック
                case out2 <- val:
                    out2 = nil // チャネルへの書き込み終了後コピー変数にnilを代入して書き込みをブロック
                }
            }
        }
    }()
    return out1, out2
}

bridgeチャネル

・複数のチャネルをまとめて1つのチャネルで扱うときに使用
<-chan <-chan interface{}<-chan interface{}として扱えるようにする

bridge := func(
    done <-chan interface{},
    chanCh <-chan <chan interface{},
) <-chan interface{} {
    // 全ての値を返すチャネル
    valCh := make(chan interface{})

    go func() {
        defer close(valCh)
        for { // 引数のchanChからチャネルを剥ぎ取りネストされたループへ渡す
            var ch <-chan interface{}
            select{
            case maybeCh, ok := <-chanCh:
                if ok == false {
                    return
                }
                ch = maybeCh
            case <-done:
                return
            }

            // 前述のorDoneを使用でdoneチャネルからの受信によるキャンセルに対応
            for val := range orDone(done, ch) {
                select {
                case valCh <- val:
                case <-done:
                }
            }
        }
    }()
    return valCh
}

キュー(待ち行列)

・パイプラインの処理が追いついていなくても処理すべき仕事を受け付けたい時

キューはプログラムの実行時間はほとんど改善しない
 →キュー導入の利点はステージのブロック時間を短くできること

・キューがシステム性能を向上させうる状況
 1. ステージ内のバッチ処理で時間を節約するとき
 2. ステージでの遅延がシステムにフィードバックループを発生させるとき

・メモリサイズの拡大の処理はコスト大
 →メモリサイズ拡大の回数を少なくすることで効率的に(=キューの活用で効率的に)

・キューが実装されるべき場所
 1. パイプラインの入口
 2. バッチ処理で効率的になるステージの中

contextパッケージ

・並行処理のキャンセルの通知に付随して追加情報(キャンセル理由や関数処理を終わらせるべきデッドラインがあるかなど)を伝達したいときに使用
・こちらも参照 → Go の context パッケージの使い方について整理してみた

type Context interface {
    // デッドラインが設定されている時はその時刻を返す
    // デッドラインがセットされていない時ok==falseを返す
    Deadline() (deadline time.Time, ok bool)

    // Contextがキャンセルまたはタイムアウト時にcloseされる
    Done() <-chan struct{}

    // ゴルーチンがキャンセルされた時にnon-nilな値を返す(キャンセル理由を返す)
    Err() error

    // keyに紐づいた値を返す
    // 設定した値がない時はnilを返す
    Value(key interface{}) interface{}
}
// 返されたキャンセル関数が呼ばれたときにdoneチャネルを閉じる新しいContextを返す
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 渡されたデッドライン時にdoneチャネルを閉じる新しいContextを返す
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
// 内部ではWithDeadlineを呼んでる
// 現在時刻を気にせずにdeadlineを管理するためのWrapper
func WithTimeout(parent Context, timeout time.Time) (Context, CancelFunc)
// 値を追加したContextを返す
func WithValue(parent Context, key, val interface{}) Context
// Contextの空インスタンスを作成する関数たち
func Background() Context
func TODO() Context // どのコンテキストを利用するか決まってない時に暫定的に使用

大規模開発での並行処理

エラー伝播

エラーが持つべき情報
1. 何が起きたか
2. いつどこでエラー発生したか
3. ユーザに表示するメッセージは上記2点を要約してわかりやすく表示
4. ユーザが次にすべき行動(さらにエラーについて情報を得る方法)

・モジュールの境界において、元のエラーに適切な情報を追加しつつ自分らのモジュールのエラー型に書き換える(このように内包する必要があるのは公開される関数やメソッド、有益なコンテキストを追加しうるときのみ)

・バグと既知のエッジケースの区別をしっかり行う

タイムアウトとキャンセル処理

並行処理のプロセスでタイムアウトを使うべきとき
1. システム飽和状態
 ・リクエスト処理の限界に来ているとき
 ・リクエストを保存するリソースがないとき
2. 新鮮でないデータが不必要である
 ・より新しいデータがくる、またはデータ処理に期限があるとき
3. デッドロック発生を防ぎたい

並行処理がキャンセルされるとき
1. タイムアウト
2. ユーザによるキャンセル
3. 親のキャンセル
4. 複製されたリクエスト
 ・複数の並行処理のプロセスにそのうちのどれかがレスポンスを早く返してくれることを望んで複製データを送り、最初の1つが帰ってきたとき他プロセスをキャンセルする

・並行処理のプロセスが割り込み可能になる期間を定め、これより長くかかる機能はそれ自身を割り込み可能にする

ハートビート(拍動)

並行処理プロセスが正常動作していることを外部に伝える方法

ハートビートの種類
1. 一定周期のハートビート
 ・他の処理を待っているような並行処理のコードに便利
2. 仕事単位ごとに鼓動を送信するハートビート
 ・テストなどで便利

複製されたリクエスト

・レスポンスをなるだけ速く受け取りたい時
→リクエストを複数ハンドラに対して複製、1番早く帰ってきた結果を使う

・複製する場合は異なるランタイム条件(プロセス、マシン、データストアへのパス、異なるデータストアへのアクセスなどをまとめたもの)のハンドラーにだけ複製すべき

・この手法は結果的に耐障害性、スケーラビリティももたらす

流量制限

・流量制限 ... 一定時間の間に可能なリソースへのアクセス回数を制限すること

・流量制限することで悪意ある行為(大量アクセス、DDoS攻撃)や高負荷の操作によるデススパイラルを防ぐ

・ex. あるAPIにアクセスするドライバ((golang.org/x/time/rateパッケージのトークンバケットによる制限))↓

// ダミーAPI
type APIConnection struct {
    rateLimiter *rate.Limiter
}
func Open() *APIConnection {
    return &APIConnection{
        // 全てのAPI接続に対し1秒に1イベントという流量制限
        rateLimiter: rate.NewLimiter(rate.Limit(1), 1),
    }
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
    // 流量制限後リクエストの完結に必要な数のアクセストークンが揃うまで待機
    if err := a.rateLimiter.Wait(ctx); err != nil { return err }
    // 処理
    // ...
    return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    // 流量制限後リクエストの完結に必要な数のアクセストークンが揃うまで待機
    if err := a.rateLimiter.Wait(ctx); err != nil { return err }
    // 処理
    // ...
    return nil
}
defer log.Printf("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)

apiConnection := Open()

var wg sync.WaitGroup
wg.Add(20)

for i := 0; i < 10; i++ {
    go func() {
        defer wg.Done()
        err := apiConnection.ReadFile(context.Background())
        if err != nil {
            log.Printf("cannot ReadFile: %v", err)
        }
        log.Printf("ReadFile")
    }()
}
for i := 0; i < 10; i++ {
    go func() {
        defer wg.Done()
        err := apiConnection.ResolveAddress(context.Background())
        if err != nil {
            log.Printf("cannot ResolveAddress: %v", err)
        }
        log.Printf("ResolveAddress")
    }()
}
wg.Wait()

golang.org/x/time/rateについて↓

// あるイベントの頻度を制限する(1秒に何回)
type Limit float64
// 引数のrは速度、bはバケットの深さ(トークン数)
func NewLimiter(r Limit, b int) *Limiter
// 最小のタイムインターバルを指定するとLimitへ変換してくれる
func Every(interval time.Duration) Limit

// WaitN(ctx, 1)を呼んだことになる(WaitNの第二引数を1で呼んだことになる)
func (lim *Limiter) Wait(ctx context.Context)
// アクセストークンを得られるまでリクエストを拒否する
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

不健全なゴルーチンを直す(再起動)

・デーモンのようなプロセスには複数の長時間稼働し続けるゴルーチンがつきもの
・ゴルーチンが外部の助けなしに回復できない状態になる時がある
→回復する仕組みが必要

// 監視と再起動ができるゴルーチンのシグネチャを定義
type startGorutineFn func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (heartbeat <-chan interface{})


// ゴルーチンの健全性を監視する管理人(Steaward)を生成
newSteaward := func(
    timeout time.Duration,
    startGoroutine startGorutineFn,
) startGorutineFn { // 管理人(Steward)自身もstartGorutineFnを返す
    return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
        heartbeat := make(chan interface{})
        go func() {
            defer close(heartbeat)

            var wardDone chan interface{}
            var wardHeartbeat <-chan interface{}

            // 管理人が監視するゴルーチンである中庭(ward)を生成
            // クロージャを定義
            startWard := func() {
                wardDone = make(chan interface{})

                wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2)
            }
            startWard()
            pulse := time.Tick(pulseInterval)

        monitorLoop:
            for {
                timeoutSignal := time.After(timeout)

                for {
                    // 管理人(Steward)が自身の鼓動を確実に外へ送信できるようにしている
                    select {
                    case <-pulse:
                        select {
                        case heartbeat <- struct{}{}:
                        default:
                        }
                    case <-wardHeartbeat:
                        continue monitorLoop
                    case <-timeoutSignal:
                        log.Println("管理人: 中庭に異常あり; 再起動します")
                        // 既存の中庭(ward)を停止して新しい中庭(ward)を起動させる
                        close(wardDone)
                    }
                }
            }
        }()
        return heartbeat
    }
}

参考

https://golang.org/pkg/context
https://qiita.com/convto/items/0f9e844a1775b80cdbd9

1
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
rai_wtnb
バックエンドエンジニア志望の文系大学4年生です。 https://note.com/muku_69 / https://qiita.com/rai_wtnb

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
1
Help us understand the problem. What is going on with this article?