9
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZOZOAdvent Calendar 2024

Day 3

PubSubの順序指定しない場合、メッセージ受信が逆転する確率を調べる

Posted at

はじめに

🎄 本記事は ZOZO Advent Calendar 2024 シリーズ4の3日目です

Cloud Pub/Subを用いる際、構築するシステム要件によっては、
メッセージの処理順序を考慮する必要があります。
例えば、メッセージ内の主キーを用いてSubscriberでDBのレコードを挿入するか更新するかなどの処理です。

詳しくは順序指定の概要で紹介しますが、
Pub/Subはデフォルトではメッセージの受信の順序を保証していません。

「どれぐらいの頻度でメッセージの順序が逆転するんだろう?」と思ったので、調べてみます。

本記事では、単一Publisherで単一Subscriberの順序指定をしない場合において、
メッセージの処理が逆転する確率を調査してみます。

順序指定の概要

本記事の調査を行うために、そもそもPub/Subの順序指定における概要を調査します。

Pub/SubのSubscriberのデフォルトでは、メッセージの受信の順序を保証していません。
メッセージを順序に受信したい場合は、Order messagesの機能を用いることで実現できます。

Publisher側の設定

サブスクライバークライアントでメッセージを順に受信するには、順序指定キーを使用してメッセージをパブリッシュするようにパブリッシャークライアントを構成する必要があります。

引用通り、メッセージのPublish時に順序指定キーを使用します。
個人的に特筆すべき事項は以下です。

  • リージョン間の順序指定
    • Pub/SubにはGlobal endpointsとLocational endpointsの2種類のエンドポイントが存在します(参照
    • デフォルトではGlobal endpointsが使用されますが、順序指定を行う際は同一のリージョンPublishする必要があるので、Locational endpointsを使用する必要があります
  • 複数のPublisher間の順序指定
    • Publisherが複数ある場合、同一の順序指定キーを使用した同時Publishは避けるべきだそうです
    • つまり順序指定キーに対して、順次処理する1つのPublisherが望ましいですが、
      複数のPublisherで構成する場合は、順序指定キー別にロックするような構成が必要です

Subscriber側の設定

Subscriptionを作成する際に、[順序指定キーを使用してメッセージの順序を指定する] を設定します。

また下記のような考慮事項があります。

  • メッセージの再配信
    • Pub/Subでは、少なくとも1回の配信を保証されますので、重複したメッセージの配信の可能性があります
    • 重複したメッセージが再配信されると、確認済みメッセージを含む後続のメッセージも再配信されます
  • 確認応答の遅延とデッドレター トピック
    • 確認応答処理が遅延すると、他の順序指定キーのメッセージの配信が遅れることがあります
    • 即時に確認応答処理ができない場合は、デッドレタートピックを用いるなどが推奨されいます
  • Dataflow との統合
    • Dataflowを用いる際は、Dataflow独自の順序指定を保証した処理を行います
    • よって、順序指定オプションは指定しないようにしてください(パフォーマンスの低下の可能性がある)

調査

手順

最初にTopicの作成を行う。

gcloud pubsub topics create research_reversal_topic

次に、Subscription。

gcloud pubsub subscriptions create research_reversal_sub --topic=research_reversal_topic

ここで、 --enable-message-ordering の指定はしない。

次にPublisherの実装。

from google.cloud import pubsub_v1

project_id = "your-project-id"
topic_id = "research_reversal_topic"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

MESSAGE_SIZE = ここを変えながら複数試行する

for n in range(1, MESSAGE_SIZE + 1):
    data_str = f"{n}"
    data = data_str.encode("utf-8")
    future = publisher.publish(topic_path, data)
    while future.done() is False:
        pass
    print(f"Published message: {data_str}")

print(f"Published messages to {topic_path}.")

確実に順序通りPublishを行いため、Publishが終わるまでの待ち処理を入れている。

最後に、Subscriberの実装。

from google.api_core import retry
from google.cloud import pubsub_v1

project_id = "your-project-id"
subscription_id = "research_reversal_sub"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

received_messages = []


def pull_messages():
    global received_messages  # validate_reverse で使うため
    try:
        with subscriber:
            while True:
                response = subscriber.pull(
                    request={"subscription": subscription_path, "max_messages": 1},
                    timeout=10,
                    retry=retry.Retry(deadline=300),
                )

                ack_ids = []
                for received_message in response.received_messages:
                    data = received_message.message.data.decode("utf-8")
                    print(f"Received message: {data}")

                    current_message_number = int(data)
                    received_messages.append(current_message_number)

                    ack_ids.append(received_message.ack_id)

                if ack_ids:
                    subscriber.acknowledge(
                        request={"subscription": subscription_path, "ack_ids": ack_ids}
                    )

                print(
                    f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
                )
    except Exception as e:
        print(f"Error while pulling messages: {e}")
    finally:
        validate_reverse()


def validate_reverse():
    reverse_count = 0

    # 隣接する要素を比較して逆転を検出
    for i in range(1, len(received_messages)):
        if received_messages[i] < received_messages[i - 1]:
            reverse_count += 1
            print(
                f"Reversal detected: {received_messages[i - 1]} -> {received_messages[i]}"
            )

    print(f"Total reversals detected: {reverse_count}")
    return reverse_count


# メッセージを引き出す処理を実行
pull_messages()

結果

上記コードを実行した結果が下記。

実験No メッセージ数 逆転回数 割合(%)
1 100 0 0.00
2 100 2 2.00
3 100 1 1.00
4 100 7 7.00
5 100 1 1.00
6 1,000 25 2.50
7 1,000 19 1.90
8 1,000 20 2.00
9 1,000 18 1.80
10 1,000 17 1.70
11 10,000 141 1.41
12 10,000 176 1.76
13 10,000 156 1.56
14 10,000 176 1.76
15 10,000 153 1.53

またある程度、Subscriptionにメッセージが溜まった場合、比較的にPublish時間が最新のものが処理されることが確認できた。
例えば、メッセージ数が10,000の実験において、1~5,000番台は順調にpullしていたが、次に急に8,000番台のメッセージがpullされたなどである。(時間幅が大きい)

おわりに

割と順序通りではなく、驚き...😭
また、本記事ではSubscriberをすでに起動した状態で待機して、メッセージのPublishを行ったが、先にPublishして、Subscriptionにメッセージが溜まった状態でSubscriberを起動すると、逆転回数が多くなる傾向がみられた。
本記事では、簡単な処理を行うSubscriberだったので応答確認処理(ack)が早かったが、この辺りも実運用を考えると更に逆転が増えそう...😭

そもそもこの調査のモチベーションとして、
公式ドキュメントより、順序指定を行う際は考慮事項が多かったり、順序指定キーの粒度が荒かったりするとパフォーマンスが悪く1なるようなので、
「めっちゃ稀に逆転起きて、逆転する時間幅が狭い」なら許容できるケースあるのでは?と思い、そのある程度の基準値を調べようとおもったのがきっかけ。

しかし調査結果的に、逆転回数が多く、また逆転の時間幅が大きいので、順序を考慮する要件がある場合は、順序指定を用いた方が良さそう...

  1. https://christina04.hatenablog.com/entry/gcp-cloud-pubsub-ordering-key-concern

9
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?