0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Cloud Pubsub でpublishした順序通り(順序指定)にpullする

Posted at

背景

Pubsubでpublishされたメッセージをpullを順序通りに取得したい

方針

  • subscribeを順序有効で作成し
  • publishの実装で以下3つを行った実装をする
    • enable_message_orderingをTrue
    • region指定
    • ordering_keyを指定

subscribe を 順序有効で作成

コンソールでsakuseijini 順序指定キーを使用してメッセージの順序を指定する にチェックするのが楽

publish の実装

参考

from google.cloud import pubsub_v1

project_id = "your-project"
topic_id = "my-topic"


# enable_message_ordering=Trueでoptionを作成する
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# 同じリージョンのapi_endpontを指定する. 今回はasia-northeast-1
client_options = {"api_endpoint": "asia-northeast1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options, client_options=client_options)
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data = "Message number {}".format(n)
    data = data.encode("utf-8")
    # ordering_keyは任意の文字列を指定。このvalueが同じ物同士が順位付される. (注意: 勘違いしてここに順序を意味する "1", "2", "3"などを入れないこと) 
    future = publisher.publish(topic_path, data, ordering_key='key1')
    print(future.result())

print(f"Published messages to {topic_path}.")

subscriberの実装(同期)

通常の同期pullと同様

from google.api_core import retry
from google.cloud import pubsub_v1

project_id = "your_project"
subscription_id = "my-sub"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 1

with subscriber:
    while True:
        response = subscriber.pull(
            request={"subscription": subscription_path, "max_messages": 1},
            timeout=10,
            retry=retry.Retry(deadline=300),
        )

        ack_ids = []
        for received_message in response.received_messages:
            print(f"Received: {received_message.message.data}.")
            ack_ids.append(received_message.ack_id)

        # Acknowledges the received messages so they will not be sent again.
        subscriber.acknowledge(
            request={"subscription": subscription_path, "ack_ids": ack_ids}
        )

        print(
            f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
        )

以上でpublishされた順序通りにpullすることができる。

順序指定のデメリット

自分の手元で確認したところ、、、

順序指定を有効にすると、非同期pullでは1つずつしか処理できず(並列化できない)、同期pullでも同時複数の取得ができなくなるので、パフォーマンスを重視したpullシステムを作りたい場合は順序指定は慎重に導入する必要がある。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?