概要
Pub/Subのバッチング処理機能について、調べたところバッファリングする仕組みがありましたので、バッファリングをコントロールする事で少しはやくPub/Sub TopicにPublishできるのではないかと考えた検証結果となります。
背景: Pub/Subのバッチング機能
Google CloudのPub/Subには、複数のメッセージを効率的に送信するための「バッチング機能」があります。この機能により、個別に送信するよりもネットワーク効率を高め、スループットを向上させることができます。
Pub/Subのバッチングは、以下の3つの閾値のいずれかを満たすと発火します:
| 閾値 | パラメータ | デフォルト値 | 説明 |
|---|---|---|---|
| メッセージ数 | CountThreshold |
100 | バッチに含めるメッセージの数 |
| データサイズ | ByteThreshold |
1 MB | バッチの合計サイズ |
| 待機時間 | DelayThreshold |
10ms | メッセージを待つ最大時間 |
参考:Batch messaging | Pub/Sub | Google Cloud Documentation
問題点: 単一メッセージでも10ms待つ
デフォルト設定では、CountThreshold=100、DelayThreshold=10ms となっています。
このため、単一メッセージを送信する場合でも、他のメッセージが来ることを期待して10ms待機してしまいます。
メッセージ送信要求
↓
待機... (10ms) ← 他のメッセージが来るかも?
↓
実際に送信
デフォルト設定の実装を見てみる
Go SDKの実装を確認してみましょう。以下のようにデフォルト値が定義されています:
Publish()の内部実装: どこで10ms待つのか?
では、実際にPublisher.Publish()を呼んだときの挙動を確認します。
呼び出しフローは以下のようになっています:
1. Publisher.Publish(ctx, msg)
↓
2. PublishScheduler.Add(key, item, size)
↓
3. bundler.Add(item, size)
↓
4. bundler.add(item, size)
↓
5. 閾値チェック
- CountThreshold に達した? → 即座に送信
- ByteThreshold に達した? → 即座に送信
- どちらも達していない? → time.AfterFunc(DelayThreshold, ...) でタイマー設定
Pub/Sub SDKは内部的にbundlerという汎用的なバッチング機構を使用しています。 bundler.add()の詳細は以下の通りです。
上記コードから、以下のロジックが分かります:
-
閾値に達した場合(CountThresholdまたはByteThreshold):
-
enqueueCurBundle()が即座に呼ばれる
-
-
閾値に達していない場合:
-
time.AfterFunc(b.DelayThreshold, ...)でタイマーが設定される - DelayThreshold(デフォルト10ms)後に
tryHandleBundles()が呼ばれる
-
解決策: CountThresholdを動的に設定
解決策は非常にシンプルです。送信するメッセージ数に合わせて CountThreshold を動的に設定するだけです。
実装例
func PublishMessages(projectID, topicNameOrID string, messages [][]byte) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer func() {
if err := client.Close(); err != nil {
// operation
}
}()
publisher := client.Publisher(topicNameOrID)
defer publisher.Stop()
publisher.PublishSettings.CountThreshold = len(messages) // メッセージ数に応じて変動
publisher.PublishSettings.ByteThreshold = 1048576 // 余裕を持った容量
publisher.PublishSettings.DelayThreshold = 50 * time.Millisecond // 余裕を持った遅延時間
publisher.PublishSettings.EnableCompression = true // デフォルトfalseなので、あわてんぼう仕様として有効化
// メッセージを送信
results := make([]*pubsub.PublishResult, 0, len(messages))
for _, message := range messages {
result := publisher.Publish(ctx, &pubsub.Message{
Data: message,
})
results = append(results, result)
}
// 結果を確認
errs := make([]error, 0, len(messages))
for i, result := range results {
if _, err := result.Get(ctx); err != nil {
errs = append(errs, fmt.Errorf("message %d failed: %w", i, err))
}
}
if 0 < len(errs) {
return fmt.Errorf("failed to publish %d/%d messages: %v", len(errs), len(messages), errs)
}
return nil
}
効果測定
実測結果
以下は、東京リージョンのCloud Run jobからPub/Sub Topicにメッセージを送信した際の測定結果です。
想定よりも効果がある事がわかります。
## Cloud Logging
INFO Optimized settings result average_ms=18.615"
INFO Default settings result average_ms=33.652"
| 設定 | CountThreshold | 平均レイテンシ | 改善効果 |
|---|---|---|---|
| デフォルト | 100 | 約33.65ms | - |
| 最適化後 | 1 | 約18.61ms | 約15ms削減 |
※ 実際の数値は環境やネットワーク状況により変動します
考察
順番を入れ替えて検証しました。結果的には5ms程度の改善という事で順番が大きく影響することの確認がとれましたが、それでも最適化後の方がより改善している事は明らかでした。
## Cloud Logging
INFO Default settings result average_ms=29.376
INFO Optimized settings result average_ms=23.502
| 設定 | CountThreshold | 平均レイテンシ | 改善効果 |
|---|---|---|---|
| デフォルト | 100 | 約29.27ms | - |
| 最適化後 | 1 | 約23.50ms | 約5ms削減 |
※ 実際の数値は環境やネットワーク状況により変動します
まとめ
Google Cloud のPub/Subの CountThreshold を動的に設定することで:
- 単一メッセージ送信時の10ms遅延を削減
- バッチ送信でも無駄な待ち時間をなくす
- リアルタイム性の向上
- EnableCompressionはデータ量が多い場合に効果があるはず。。。
システム全体のパフォーマンスを改善できます。
あわてんぼうなPub/Subは、クリスマス(のような繁忙期)も安心してプレゼント(通知)を届けられますね!
付録: ベンチマークコード
記事で紹介した効果を実際に検証するためのコードを用意しました。
package main
import (
"context"
"fmt"
"log/slog"
"time"
"cloud.google.com/go/pubsub/v2"
)
const (
projectID = "your-project-id"
topicNameOrID = "your-topic-name-or-id"
)
func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
slog.Error("Failed to create client", "error", err)
return
}
defer func() {
if err := client.Close(); err != nil {
// Log the error but do not override the main error return
}
}()
// ウォームアップ: 初回接続のオーバーヘッドを排除
warmup := client.Publisher(topicNameOrID)
result := warmup.Publish(ctx, &pubsub.Message{Data: []byte("warmup")})
_, _ = result.Get(ctx)
warmup.Stop()
iterations := 100
// Default設定でのテスト
runDefaultTest(ctx, client, topicNameOrID, iterations)
// Optimized設定でのテスト
runOptimizedTest(ctx, client, topicNameOrID, iterations)
}
func runDefaultTest(ctx context.Context, client *pubsub.Client, topicNameOrID string, iterations int) {
var total time.Duration
for i := 0; i < iterations; i++ {
messages := [][]byte{[]byte(fmt.Sprintf("test %d", i))}
publisher := client.Publisher(topicNameOrID)
start := time.Now()
for _, message := range messages {
r := publisher.Publish(ctx, &pubsub.Message{
Data: message,
})
if _, err := r.Get(ctx); err != nil {
slog.Error("Publish failed", "error", err)
}
}
total += time.Since(start)
publisher.Stop()
}
avg := total / time.Duration(iterations)
slog.Info("Default settings result",
"average_us", avg.Microseconds(),
"average_ms", fmt.Sprintf("%.3f", float64(avg.Microseconds())/1000),
)
}
func runOptimizedTest(ctx context.Context, client *pubsub.Client, topicNameOrID string, iterations int) {
var total time.Duration
for i := 0; i < iterations; i++ {
messages := [][]byte{[]byte(fmt.Sprintf("test %d", i))}
publisher := client.Publisher(topicNameOrID)
publisher.PublishSettings.CountThreshold = len(messages)
publisher.PublishSettings.ByteThreshold = 1048576
publisher.PublishSettings.DelayThreshold = 50 * time.Millisecond
publisher.PublishSettings.EnableCompression = true
start := time.Now()
for _, message := range messages {
r := publisher.Publish(ctx, &pubsub.Message{
Data: message,
})
if _, err := r.Get(ctx); err != nil {
slog.Error("Publish failed", "error", err)
}
}
total += time.Since(start)
publisher.Stop()
}
avg := total / time.Duration(iterations)
slog.Info("Optimized settings result",
"average_us", avg.Microseconds(),
"average_ms", fmt.Sprintf("%.3f", float64(avg.Microseconds())/1000),
)
}
コードの詳細
主なテスト内容:
- デフォルト設定での測定: CountThreshold=100(デフォルト)での送信時間
- 動的設定での測定: CountThreshold=len(messages)での送信時間