はじめに
🎄 本記事は ZOZO Advent Calendar 2024 シリーズ4の10日目です
Cloud Pub/Subを用いる際、構築するシステム要件によっては、
メッセージの重複を許容するか検討する必要があります。
冪等性がないメッセージの処理の場合などです。
詳しくはExactly-onceの概要で紹介しますが、
Pub/Subは、Exactly-once と At Least Onceがあり、 Exactly-once の場合のみ一度限りの配信を保証し、デフォルトの設定である At Least Once ではメッセージが重複する可能性があります。
At Least Once を用いる際、
「どれぐらいの頻度でメッセージが重複する?」と思ったので、調べてみます。
本記事では、単一Publisherで単一SubscriberのExactly-onceを設定しない場合の、
メッセージが重複する確率を調査してみます。
Exactly-onceの概要
本記事の調査を行うために、Pub/SubのExactly-onceの概要を紹介します。
Pub/SubのSubscriberのデフォルトでは、 At Leaset Once であり、一回以上の配信を保証します。
メッセージの配信を一度のみにしたい場合は、 Exactly-once を設定する必要があります。
Pub/Sub では、Pub/Sub で定義された一意のメッセージ ID に基づき、クラウド リージョン内で exactly-once(1 回限り)の配信がサポートされます。
Topicの設定は不要で、Subscriberのみ設定します。
再配信と重複
再配信と重複は全く違うものです。
- 再配信
- メッセージに対してクライアントによる否定確認応答が行われた場合、または確認応答期限が切れる前にクライアントが確認応答期限を延長しなかった場合のいずれかか原因で発生することがある
- 再配信は、システムが期待する動作である(Ex. リトライ処理など)
- 重複
- 確認応答が成功した後、または確認応答期限が切れる前にメッセージが再送信された場合を指す
リージョンに関する考慮事項
Exactly-onceの保証は、Subscriberが同じリージョンのサービスに接続する場合にのみ適用されます。
Subscriberが複数のリージョンに分散されている場合、Exactly-onceが有効になっていても、重複する可能性があります。
デフォルトではGlobal endpointsが使用されますが、Exactly-onceを行う際は同一のリージョンPublishする必要があるので、Locational endpointsを使用する必要があります。
調査
手順
最初にTopicの作成を行う。
gcloud pubsub topics create research_duplication_topic
次に、Subscription。
gcloud pubsub subscriptions create research_duplication_sub --topic=research_duplication_topic --ack-deadline=600
ここで、 --enable-exactly-once-delivery
の指定はしない。
次にPublisherの実装。
from google.cloud import pubsub_v1
project_id = "your-project-id"
topic_id = "research_duplication_topic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
MESSAGE_SIZE = 100000
for n in range(1, MESSAGE_SIZE + 1):
data_str = f"{n}"
data = data_str.encode("utf-8")
future = publisher.publish(topic_path, data)
while future.done() is False:
pass
print(f"Published message: {data_str}")
print(f"Published messages to {topic_path}.")
最後に、Subscriberの実装。
from collections import Counter
from google.api_core import retry
from google.cloud import pubsub_v1
project_id = "your-project-id"
subscription_id = "research_duplication_sub"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
received_messages = []
def pull_messages():
global received_messages
try:
with subscriber:
while True:
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 1},
timeout=10,
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
data = received_message.message.data.decode("utf-8")
print(f"Received message: {data}")
current_message_number = int(data)
received_messages.append(current_message_number)
ack_ids.append(received_message.ack_id)
if ack_ids:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
except Exception as e:
print(f"Error while pulling messages: {e}")
finally:
validate_duplication()
def validate_duplication():
# カウンターを使って各値の出現回数を数える
message_counter = Counter(received_messages)
duplication_count = 0
# 重複がある値を探し、その重複回数を出力
print("\n--- Duplicated Messages ---")
for value, count in message_counter.items():
if count > 1: # 重複している場合
print(f"Value {value} is duplicated {count} times.")
duplication_count += 1
if duplication_count == 0:
print("No duplicates found.")
# メッセージを引き出す処理を実行
pull_messages()
結果
実験No | メッセージ数 | 重複回数 | 割合(%) |
---|---|---|---|
1 | 100,000 | 2 | 0.002 |
重複の出現頻度が低く、再現が難しい...
おわりに
重複に関して、稀に起きることが分かった。
本記事の調査モチベーションとして、通常のSubsriptionよりレイテンシーが高くなったり、リージョンに関する考慮をしたりと面倒なので、極端に重複が少なければ許容できるケースがあるのでは?と思ったのがきっかけ。
個人的な感想としては、デフォルトの設定であるAt Least Onceを用いて、Subscriberで冪等性なメッセージ処理を実装可能なら、それで充分かと思う。
また、本記事の調査実験後に同等の調査をした記事を見つけた。
この記事では同じように10万件で実験したようだが、重複は観測できなかった様子。
本記事と紹介記事の違いとしてはSubscriberのPull時のメッセージ数ぐらいで、本記事では1で紹介記事では1,000だった。
重複が起きやすい条件などは、別途調査したい。
参考