LoginSignup
2
0

More than 1 year has passed since last update.

サブスクリプションタイプKey_Sharedの紹介

Last updated at Posted at 2021-05-24

Apache Pulsarには、Exclusive,Failover,Sharedという3つのサブスクリプションタイプがありましたが、
v2.4.0からはこれらに加えてKey_Sharedという新しいサブスクリプションタイプが導入されました。
今回はこのKey_Sharedについて紹介します。

特徴

Key_Sharedを利用することで1つのサブスクリプションを複数Consumerで購読しながら、同じKeyを持つメッセージの順序保証ができます。
Sharedでは複数Consumerで購読できますが順序保証されないため、今までメッセージの順序を保証するためにはExclusiveまたはFailoverを利用し1つのサブスクリプションで1Consumerだけがメッセージを受信する必要がありました。

以下の図のようにKey_Sharedでの購読では同じKeyを持つメッセージは同じConsumerに配信することでKey単位での順序保証を実現しています。


出典:https://pulsar.apache.org/docs/en/2.7.1/concepts-messaging

今までのSharedでは以下のようにメッセージのKeyは関係なくラウンドロビンで各Consumerに配信されます。


出典:https://pulsar.apache.org/docs/en/2.7.1/concepts-messaging

試す

実際にKey_Sharedでメッセージを受信してみて、先述したように同じKeyのメッセージが同じConsumerに配信されることを確認していきます。
ここではJavaでの実装例を紹介します。

最初に2Consumerでサブスクリプションsub1を購読します。

for (int i = 0; i < 2; i++) {
    pulsarClient.newConsumer()
            .topic("persistent://my-tenant/my-ns/my-topic")
            .consumerName("consumerName-" + i)
            .subscriptionName("sub1")
            // サブスクリプションタイプKey_Sharedで購読
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener((cons, message) -> {
                System.out.println("Received: "
                        + new String(message.getData())
                        + " " + cons.getConsumerName()
                        + " " + message.getKey());
                cons.acknowledgeAsync(message);
            })
            .subscribe();
}

次にメッセージの送信です。
メッセージのKeyにはkey-0,1,2,3,4を順番に指定して送信します。

Producer<byte[]> producer = pulsarClient.newProducer()
                .topic("persistent://my-tenant/my-ns/my-topic")
                // Key_Sharedではバッチ送信はオフにする
                .enableBatching(false) 
                // もしもKey_Sharedでバッチ送信を利用したい場合は、
                // enableBatching(true)かつBatcherBuilder.KEY_BASEDを指定することで
                // Keyごとの順序を保証しつつバッチ送信が可能
                // .enableBatching(true)
                // .batcherBuilder(BatcherBuilder.KEY_BASED)
                .create();

// メッセージを送信
for (int i = 0; i < 10; i++) {
    final String content = "my-message-" + i;
    producer.newMessage()
            .key(String.format("key-%d", i % 5)) // Keyを指定
            .value(content.getBytes())
            .send();
            // バッチ送信する場合は非同期で送信
            // .sendAsync();
}

上記のコードを実行してみます。結果は、
key-0,1,2のメッセージがconsumerName-1のConsumerに、
key-3,4のメッセージがconsumerName-0のConsumerに配信されました。

Received: my-message-0 consumerName-1 key-0
Received: my-message-1 consumerName-1 key-1
Received: my-message-2 consumerName-1 key-2
Received: my-message-3 consumerName-0 key-3
Received: my-message-4 consumerName-0 key-4
Received: my-message-5 consumerName-1 key-0
Received: my-message-6 consumerName-1 key-1
Received: my-message-7 consumerName-1 key-2
Received: my-message-8 consumerName-0 key-3
Received: my-message-9 consumerName-0 key-4

デフォルトの設定では、consumerNameに対して担当Keyが決まってくるため上記のコードを何度実行しても同じ結果が得られます。
次にconsumerNameを変えて再度実行してみます。

-            .consumerName("consumerName-" + i)
+            .consumerName("newConsumerName-" + i)

実行結果は以下のように、
key-0,3,4がnewConsumerName-0に、
key-1,2がnewConsumerName-1に配信されました。

Received: my-message-0 newConsumerName-0 key-0
Received: my-message-1 newConsumerName-1 key-1
Received: my-message-2 newConsumerName-1 key-2
Received: my-message-3 newConsumerName-0 key-3
Received: my-message-4 newConsumerName-0 key-4
Received: my-message-5 newConsumerName-0 key-0
Received: my-message-6 newConsumerName-1 key-1
Received: my-message-7 newConsumerName-1 key-2
Received: my-message-8 newConsumerName-0 key-3
Received: my-message-9 newConsumerName-0 key-4

最後に

Key_Sharedは、keyごとに順序保証をしつつ処理を並列に行いたい場合に非常に役立つ機能であることがわかっていただけたかなと思います。
興味持たれた方はぜひ試してみてください。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0