はじめに
🎄 本記事は ZOZO Advent Calendar 2024 シリーズ4の3日目です
Cloud Pub/Subを用いる際、構築するシステム要件によっては、
メッセージの処理順序を考慮する必要があります。
例えば、メッセージ内の主キーを用いてSubscriberでDBのレコードを挿入するか更新するかなどの処理です。
詳しくは順序指定の概要で紹介しますが、
Pub/Subはデフォルトではメッセージの受信の順序を保証していません。
「どれぐらいの頻度でメッセージの順序が逆転するんだろう?」と思ったので、調べてみます。
本記事では、単一Publisherで単一Subscriberの順序指定をしない場合において、
メッセージの処理が逆転する確率を調査してみます。
順序指定の概要
本記事の調査を行うために、そもそもPub/Subの順序指定における概要を調査します。
Pub/SubのSubscriberのデフォルトでは、メッセージの受信の順序を保証していません。
メッセージを順序に受信したい場合は、Order messagesの機能を用いることで実現できます。
Publisher側の設定
サブスクライバークライアントでメッセージを順に受信するには、順序指定キーを使用してメッセージをパブリッシュするようにパブリッシャークライアントを構成する必要があります。
引用通り、メッセージのPublish時に順序指定キーを使用します。
個人的に特筆すべき事項は以下です。
-
リージョン間の順序指定
- Pub/SubにはGlobal endpointsとLocational endpointsの2種類のエンドポイントが存在します(参照)
- デフォルトではGlobal endpointsが使用されますが、順序指定を行う際は同一のリージョンPublishする必要があるので、Locational endpointsを使用する必要があります
-
複数のPublisher間の順序指定
- Publisherが複数ある場合、同一の順序指定キーを使用した同時Publishは避けるべきだそうです
- つまり順序指定キーに対して、順次処理する1つのPublisherが望ましいですが、
複数のPublisherで構成する場合は、順序指定キー別にロックするような構成が必要です
Subscriber側の設定
Subscriptionを作成する際に、[順序指定キーを使用してメッセージの順序を指定する] を設定します。
また下記のような考慮事項があります。
-
メッセージの再配信
- Pub/Subでは、少なくとも1回の配信を保証されますので、重複したメッセージの配信の可能性があります
- 重複したメッセージが再配信されると、確認済みメッセージを含む後続のメッセージも再配信されます
-
確認応答の遅延とデッドレター トピック
- 確認応答処理が遅延すると、他の順序指定キーのメッセージの配信が遅れることがあります
- 即時に確認応答処理ができない場合は、デッドレタートピックを用いるなどが推奨されいます
-
Dataflow との統合
- Dataflowを用いる際は、Dataflow独自の順序指定を保証した処理を行います
- よって、順序指定オプションは指定しないようにしてください(パフォーマンスの低下の可能性がある)
調査
手順
最初にTopicの作成を行う。
gcloud pubsub topics create research_reversal_topic
次に、Subscription。
gcloud pubsub subscriptions create research_reversal_sub --topic=research_reversal_topic
ここで、 --enable-message-ordering
の指定はしない。
次にPublisherの実装。
from google.cloud import pubsub_v1
project_id = "your-project-id"
topic_id = "research_reversal_topic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
MESSAGE_SIZE = ここを変えながら複数試行する
for n in range(1, MESSAGE_SIZE + 1):
data_str = f"{n}"
data = data_str.encode("utf-8")
future = publisher.publish(topic_path, data)
while future.done() is False:
pass
print(f"Published message: {data_str}")
print(f"Published messages to {topic_path}.")
確実に順序通りPublishを行いため、Publishが終わるまでの待ち処理を入れている。
最後に、Subscriberの実装。
from google.api_core import retry
from google.cloud import pubsub_v1
project_id = "your-project-id"
subscription_id = "research_reversal_sub"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
received_messages = []
def pull_messages():
global received_messages # validate_reverse で使うため
try:
with subscriber:
while True:
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 1},
timeout=10,
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
data = received_message.message.data.decode("utf-8")
print(f"Received message: {data}")
current_message_number = int(data)
received_messages.append(current_message_number)
ack_ids.append(received_message.ack_id)
if ack_ids:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
except Exception as e:
print(f"Error while pulling messages: {e}")
finally:
validate_reverse()
def validate_reverse():
reverse_count = 0
# 隣接する要素を比較して逆転を検出
for i in range(1, len(received_messages)):
if received_messages[i] < received_messages[i - 1]:
reverse_count += 1
print(
f"Reversal detected: {received_messages[i - 1]} -> {received_messages[i]}"
)
print(f"Total reversals detected: {reverse_count}")
return reverse_count
# メッセージを引き出す処理を実行
pull_messages()
結果
上記コードを実行した結果が下記。
実験No | メッセージ数 | 逆転回数 | 割合(%) |
---|---|---|---|
1 | 100 | 0 | 0.00 |
2 | 100 | 2 | 2.00 |
3 | 100 | 1 | 1.00 |
4 | 100 | 7 | 7.00 |
5 | 100 | 1 | 1.00 |
6 | 1,000 | 25 | 2.50 |
7 | 1,000 | 19 | 1.90 |
8 | 1,000 | 20 | 2.00 |
9 | 1,000 | 18 | 1.80 |
10 | 1,000 | 17 | 1.70 |
11 | 10,000 | 141 | 1.41 |
12 | 10,000 | 176 | 1.76 |
13 | 10,000 | 156 | 1.56 |
14 | 10,000 | 176 | 1.76 |
15 | 10,000 | 153 | 1.53 |
またある程度、Subscriptionにメッセージが溜まった場合、比較的にPublish時間が最新のものが処理されることが確認できた。
例えば、メッセージ数が10,000の実験において、1~5,000番台は順調にpullしていたが、次に急に8,000番台のメッセージがpullされたなどである。(時間幅が大きい)
おわりに
割と順序通りではなく、驚き...😭
また、本記事ではSubscriberをすでに起動した状態で待機して、メッセージのPublishを行ったが、先にPublishして、Subscriptionにメッセージが溜まった状態でSubscriberを起動すると、逆転回数が多くなる傾向がみられた。
本記事では、簡単な処理を行うSubscriberだったので応答確認処理(ack)が早かったが、この辺りも実運用を考えると更に逆転が増えそう...😭
そもそもこの調査のモチベーションとして、
公式ドキュメントより、順序指定を行う際は考慮事項が多かったり、順序指定キーの粒度が荒かったりするとパフォーマンスが悪く1なるようなので、
「めっちゃ稀に逆転起きて、逆転する時間幅が狭い」なら許容できるケースあるのでは?と思い、そのある程度の基準値を調べようとおもったのがきっかけ。
しかし調査結果的に、逆転回数が多く、また逆転の時間幅が大きいので、順序を考慮する要件がある場合は、順序指定を用いた方が良さそう...