0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

HowtelevisionAdvent Calendar 2022

Day 22

Goroutine はじめました

Last updated at Posted at 2022-11-28

この記事は Howtelevision Advent Calendar 2022 の22日目の記事です。

外資就活 のプッシュ通知処理を Go にリプレースする際に、Goroutine を利用して配信を並行処理するようにしたため、その実装を紹介します。
手探りで実装したため、Goroutine の同時実行数の制御などの考慮はできていなかったりしますが、実装して運用するに至ることはできたので備忘録も兼ねた記事となります。

FCM を利用してプッシュ通知配信

最大で20,000件の配信を行う必要があります。
なるべく一括で配信して FCM へのリクエスト数を減らしたかったため、 500件のメッセージを一括送信 をする API を利用しました。
また500件の一括配信を Goroutine を利用して並行処理しました。

実現したい事

  • Goroutine を利用した並行処理
  • 送信に失敗した FCM トークンの削除
  • エラーハンドリング

Goroutine では戻り値を取得することができませんが、channel を利用することで値を送受信することができます。
そのためエラーハンドリングに必要な情報と、送信に失敗したトークンを channel を介して送受信することにしました。

// 送受信する構造体
type BatchSendResult struct {
	FailedTokens []string
	Err          error
}

一括送信処理

channel を受け取りレスポンスを整形して送信します。
fcm という FCM クライアントをフィールドに持つ構造体のメソッドとして定義し、channel の他に FCM のリクエストに必要な情報も引数として受け取っています。

func (f fcm) BatchSendUpToMaxMessages(ch chan BatchSendResult, input BatchSendInput) {
    br, err := f.client.SendAll(context.Background(), input.messages)
	if err != nil {
		ch <- BatchSendResult{
			FailedTokens: nil,
			Err:          err,
		}
		return
	}

	if br.FailureCount > 0 {
		var failedTokens []string
		for i, resp := range br.Responses {
			if !resp.Success {
				failedTokens = append(failedTokens, input.messages[i].Token)
			}
		}
		ch <- BatchSendResult{
			FailedTokens: failedTokens,
			Err:          err,
		}
		return
	}

	ch <- BatchSendResult{
		FailedTokens: nil,
		Err:          err,
	}
}

エラーハンドリング

FCM へのリクエスト時点でエラーが返ってきた際のエラーハンドリングです。
NW エラーや不正なリクエストボディになっている場合にはエラーが返ってきます。
エラーが返ってきた時点で配信は行われていないため、配信失敗トークン: nil としてエラーの内容を添えて channel へ送信しています。
なお、FCM のクライアントが内部でリトライ処理を行なっているため、特に自身での実装はしていません。

if err != nil {
    ch <- BatchSendResult{
        FailedTokens: nil,
        Err:          err,
    }
    return
}

送信に失敗した FCM トークンの送信

FCM へのリクエストの戻り値である br には失敗数の内訳があるため、レスポンスから失敗したトークンを取り出してそのリストを送信します。
なお、レスポンスには配信に失敗したトークンの他に、配信に失敗したエラー内容も含まれていますが、トークンが何らかの理由で有効でないと判断し、個別のエラーに対応せずに削除したいため err (nil) を送信して握りつぶしています。

if br.FailureCount > 0 {
    var failedTokens []string
    for i, resp := range br.Responses {
        if !resp.Success {
            failedTokens = append(failedTokens, input.messages[i].Token)
        }
    }
    ch <- BatchSendResult{
        FailedTokens: failedTokens,
        Err:          err,
    }
    return
}

正常に一括配信が行われた場合の channel への送信

すべてのフィールドを nil で送信します。

ch <- BatchSendResult{
    FailedTokens: nil,
    Err:          err,
}

一括送信の呼び出し元の実装

送受信したい構造体の channel を作成し、一括送信したメッセージのリスト数分 Goroutine で一括送信処理を呼び出します。
またエラーハンドリングを実現したいとしましたが、エラーの際に処理を中断することで多重配信が行われることを避けたかったため、どのようなエラーが生じたかをログに刻むためにエラーを検知する程度の処理をしています。err の値を return する最低限の処理にとどめ、受け取った側でログに刻んでいます。
さらにタイムアウトを設けて、処理に遅延が生じた場合にプロセスが立ち上がり続けて負荷になることを避けています。

実際の実装から省略した部分が多いものになってはいますが概ね以下のようにしました。

const timeoutDuration = 5 * time.Minute

ch := make(chan BatchSendResult)
defer close(ch)

for _, message := range messages {
    go s.fcm.BatchSendUpToMaxMessages(ch, message)
}

timeout := time.After(timeoutDuration)
parallel := len(messages)
completed := 0
var failedTokens []string

push:
	for {
		select {
		case res := <-ch:
			failedTokens = append(failedTokens, res.FailedTokens...)

			if res.Err != nil {
				err = res.Err
			}
			completed++
			if completed == parallel {
				break push
			}
		case <-timeout:
			break push
		}
	}

db.Where("token IN (?)", failedTokens).Delete(&Token{})

return err

Goroutine での呼び出し

channel を作成し、一括送信の結果を送受信可能としています。

ch := make(chan BatchSendResult)
defer close(ch)

for _, message := range messages {
    go s.fcm.BatchSendUpToMaxMessages(ch, message)
}

並行処理した一括送信からの受信

例えば1,000件の配信が必要な場合は、1,000 / 500 = 2 で2つ並行で一括送信処理が走ります。
select で2つの channel からの値を受信して、値に応じて処理を行うようにしています。

後述するタイムアウトにならない場合は res のケースの処理が実行されます。
複数の channel から受信した配信失敗したトークンを連結させて、最終的に配信に失敗したトークンのリストを作成しています。

parallel := len(messages)
completed := 0
var failedTokens []string

push:
	for {
		select {
        case res := <-ch:
            failedTokens = append(failedTokens, res.FailedTokens...)

			if res.Err != nil {
				err = res.Err
			}
			completed++
			if completed == parallel {
				break push
			}
        case <-timeout:
            // ...
        }
    }

タイムアウトの設定

5分のタイムアウトを設けています。
func After(d Duration) <-chan Time となっており、timeout は channel なため5分経過すると timeout のケースが実行されて一括送信処理の受信を中断するようになっています。

const timeoutDuration = 5 * time.Minute
timeout := time.After(timeoutDuration)

push:
	for {
		select {
            // ...
        case <-timeout:
            break push
        }
    }

他の処理

配信に失敗して無効と判断したトークンの削除と、取得したエラーを return して一括送信の処理を終了しました。

さいごに

Goroutine は難解なイメージがあり、また実際に実装するにあたっても試行錯誤や考慮することが多かったです。
ただ今回自力でなんとか実装と運用をし、Goroutine の使い方や便利な点を理解でき、その後も Kubernetes の API を利用して複数の job を削除する実装では sync/errgroup を利用するなど、実装の幅を広げることができました。

ちなみに Go でリプレースしたいものは他にもあります!
ぜひ一緒に取り組んでくれるエンジニアの方、募集しております!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?