背景
Pubsubでpublishされたメッセージをpullを順序通りに取得したい
方針
- subscribeを順序有効で作成し
- publishの実装で以下3つを行った実装をする
- enable_message_orderingをTrue
- region指定
- ordering_keyを指定
subscribe を 順序有効で作成
コンソールでsakuseijini 順序指定キーを使用してメッセージの順序を指定する
にチェックするのが楽
publish の実装
参考
from google.cloud import pubsub_v1
project_id = "your-project"
topic_id = "my-topic"
# enable_message_ordering=Trueでoptionを作成する
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# 同じリージョンのapi_endpontを指定する. 今回はasia-northeast-1
client_options = {"api_endpoint": "asia-northeast1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options, client_options=client_options)
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data = "Message number {}".format(n)
data = data.encode("utf-8")
# ordering_keyは任意の文字列を指定。このvalueが同じ物同士が順位付される. (注意: 勘違いしてここに順序を意味する "1", "2", "3"などを入れないこと)
future = publisher.publish(topic_path, data, ordering_key='key1')
print(future.result())
print(f"Published messages to {topic_path}.")
subscriberの実装(同期)
通常の同期pullと同様
from google.api_core import retry
from google.cloud import pubsub_v1
project_id = "your_project"
subscription_id = "my-sub"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 1
with subscriber:
while True:
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 1},
timeout=10,
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
以上でpublishされた順序通りにpullすることができる。
順序指定のデメリット
自分の手元で確認したところ、、、
順序指定を有効にすると、非同期pullでは1つずつしか処理できず(並列化できない)、同期pullでも同時複数の取得ができなくなるので、パフォーマンスを重視したpullシステムを作りたい場合は順序指定は慎重に導入する必要がある。