2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

あわてんぼうのPub/Sub 〜DelayThreshold待ち時間前にやってきた〜

Last updated at Posted at 2025-12-07

概要

Pub/Subのバッチング処理機能について、調べたところバッファリングする仕組みがありましたので、バッファリングをコントロールする事で少しはやくPub/Sub TopicにPublishできるのではないかと考えた検証結果となります。

背景: Pub/Subのバッチング機能

Google CloudのPub/Subには、複数のメッセージを効率的に送信するための「バッチング機能」があります。この機能により、個別に送信するよりもネットワーク効率を高め、スループットを向上させることができます。

Pub/Subのバッチングは、以下の3つの閾値のいずれかを満たすと発火します:

閾値 パラメータ デフォルト値 説明
メッセージ数 CountThreshold 100 バッチに含めるメッセージの数
データサイズ ByteThreshold 1 MB バッチの合計サイズ
待機時間 DelayThreshold 10ms メッセージを待つ最大時間

参考:Batch messaging | Pub/Sub | Google Cloud Documentation

問題点: 単一メッセージでも10ms待つ

デフォルト設定では、CountThreshold=100DelayThreshold=10ms となっています。
このため、単一メッセージを送信する場合でも、他のメッセージが来ることを期待して10ms待機してしまいます

メッセージ送信要求
 ↓
待機... (10ms) ← 他のメッセージが来るかも?
 ↓
実際に送信

デフォルト設定の実装を見てみる

Go SDKの実装を確認してみましょう。以下のようにデフォルト値が定義されています:

Publish()の内部実装: どこで10ms待つのか?

では、実際にPublisher.Publish()を呼んだときの挙動を確認します。
呼び出しフローは以下のようになっています:

1. Publisher.Publish(ctx, msg)
   ↓
2. PublishScheduler.Add(key, item, size)
   ↓
3. bundler.Add(item, size)
   ↓
4. bundler.add(item, size)
   ↓
5. 閾値チェック
   - CountThreshold に達した? → 即座に送信
   - ByteThreshold に達した? → 即座に送信
   - どちらも達していない? → time.AfterFunc(DelayThreshold, ...) でタイマー設定

Pub/Sub SDKは内部的にbundlerという汎用的なバッチング機構を使用しています。 bundler.add()の詳細は以下の通りです。

上記コードから、以下のロジックが分かります:

  1. 閾値に達した場合(CountThresholdまたはByteThreshold):
    • enqueueCurBundle()が即座に呼ばれる
  2. 閾値に達していない場合:
    • time.AfterFunc(b.DelayThreshold, ...)でタイマーが設定される
    • DelayThreshold(デフォルト10ms)後にtryHandleBundles()が呼ばれる

解決策: CountThresholdを動的に設定

解決策は非常にシンプルです。送信するメッセージ数に合わせて CountThreshold を動的に設定するだけです。

実装例

func PublishMessages(projectID, topicNameOrID string, messages [][]byte) error {
    ctx := context.Background()
  	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return err
	}
   defer func() {
		if err := client.Close(); err != nil {
			// operation
		}
	}()

    publisher := client.Publisher(topicNameOrID)
    defer publisher.Stop()

    publisher.PublishSettings.CountThreshold = len(messages)         // メッセージ数に応じて変動
    publisher.PublishSettings.ByteThreshold = 1048576                // 余裕を持った容量
    publisher.PublishSettings.DelayThreshold = 50 * time.Millisecond // 余裕を持った遅延時間
    publisher.PublishSettings.EnableCompression = true               // デフォルトfalseなので、あわてんぼう仕様として有効化

    // メッセージを送信
    results := make([]*pubsub.PublishResult, 0, len(messages))
    for _, message := range messages {
        result := publisher.Publish(ctx, &pubsub.Message{
            Data: message,
        })
        results = append(results, result)
    }

    // 結果を確認
    errs := make([]error, 0, len(messages))
    for i, result := range results {
        if _, err := result.Get(ctx); err != nil {
            errs = append(errs, fmt.Errorf("message %d failed: %w", i, err))
        }
    }
	if 0 < len(errs) {
		return fmt.Errorf("failed to publish %d/%d messages: %v", len(errs), len(messages), errs)
	}

    return nil
}

効果測定

実測結果

以下は、東京リージョンのCloud Run jobからPub/Sub Topicにメッセージを送信した際の測定結果です。
想定よりも効果がある事がわかります。

## Cloud Logging

INFO Optimized settings result average_ms=18.615"
INFO Default   settings result average_ms=33.652"
設定 CountThreshold 平均レイテンシ 改善効果
デフォルト 100 約33.65ms -
最適化後 1 約18.61ms 約15ms削減

※ 実際の数値は環境やネットワーク状況により変動します

考察

順番を入れ替えて検証しました。結果的には5ms程度の改善という事で順番が大きく影響することの確認がとれましたが、それでも最適化後の方がより改善している事は明らかでした。

## Cloud Logging

INFO Default   settings result average_ms=29.376
INFO Optimized settings result average_ms=23.502
設定 CountThreshold 平均レイテンシ 改善効果
デフォルト 100 約29.27ms -
最適化後 1 約23.50ms 約5ms削減

※ 実際の数値は環境やネットワーク状況により変動します

まとめ

Google Cloud のPub/Subの CountThreshold を動的に設定することで:

  • 単一メッセージ送信時の10ms遅延を削減
  • バッチ送信でも無駄な待ち時間をなくす
  • リアルタイム性の向上
  • EnableCompressionはデータ量が多い場合に効果があるはず。。。

システム全体のパフォーマンスを改善できます。
あわてんぼうなPub/Subは、クリスマス(のような繁忙期)も安心してプレゼント(通知)を届けられますね!

付録: ベンチマークコード

記事で紹介した効果を実際に検証するためのコードを用意しました。

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

const (
	projectID     = "your-project-id"
	topicNameOrID = "your-topic-name-or-id"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		slog.Error("Failed to create client", "error", err)
		return
	}
	defer func() {
		if err := client.Close(); err != nil {
			// Log the error but do not override the main error return
		}
	}()

	// ウォームアップ: 初回接続のオーバーヘッドを排除
	warmup := client.Publisher(topicNameOrID)
	result := warmup.Publish(ctx, &pubsub.Message{Data: []byte("warmup")})
	_, _ = result.Get(ctx)
	warmup.Stop()

	iterations := 100

	// Default設定でのテスト
	runDefaultTest(ctx, client, topicNameOrID, iterations)

	// Optimized設定でのテスト
	runOptimizedTest(ctx, client, topicNameOrID, iterations)
}

func runDefaultTest(ctx context.Context, client *pubsub.Client, topicNameOrID string, iterations int) {
	var total time.Duration
	for i := 0; i < iterations; i++ {
		messages := [][]byte{[]byte(fmt.Sprintf("test %d", i))}
		publisher := client.Publisher(topicNameOrID)

		start := time.Now()
		for _, message := range messages {
			r := publisher.Publish(ctx, &pubsub.Message{
				Data: message,
			})
			if _, err := r.Get(ctx); err != nil {
				slog.Error("Publish failed", "error", err)
			}
		}
		total += time.Since(start)

		publisher.Stop()
	}
	avg := total / time.Duration(iterations)
	slog.Info("Default settings result",
		"average_us", avg.Microseconds(),
		"average_ms", fmt.Sprintf("%.3f", float64(avg.Microseconds())/1000),
	)
}

func runOptimizedTest(ctx context.Context, client *pubsub.Client, topicNameOrID string, iterations int) {
	var total time.Duration
	for i := 0; i < iterations; i++ {
		messages := [][]byte{[]byte(fmt.Sprintf("test %d", i))}
		publisher := client.Publisher(topicNameOrID)

		publisher.PublishSettings.CountThreshold = len(messages)
		publisher.PublishSettings.ByteThreshold = 1048576
		publisher.PublishSettings.DelayThreshold = 50 * time.Millisecond
		publisher.PublishSettings.EnableCompression = true

		start := time.Now()
		for _, message := range messages {
			r := publisher.Publish(ctx, &pubsub.Message{
				Data: message,
			})
			if _, err := r.Get(ctx); err != nil {
				slog.Error("Publish failed", "error", err)
			}
		}
		total += time.Since(start)

		publisher.Stop()
	}
	avg := total / time.Duration(iterations)
	slog.Info("Optimized settings result",
		"average_us", avg.Microseconds(),
		"average_ms", fmt.Sprintf("%.3f", float64(avg.Microseconds())/1000),
	)
}

コードの詳細

主なテスト内容:

  • デフォルト設定での測定: CountThreshold=100(デフォルト)での送信時間
  • 動的設定での測定: CountThreshold=len(messages)での送信時間
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?