1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

メッセージングPF「Apache Pulsar」の使い方(クライアント編2)

Last updated at Posted at 2020-06-03

Yahoo! JAPAN Tech Blog向けに寄稿した記事を、会社の許可を得てこちらにも転載しています。
メッセージングPF「Apache Pulsar」の使い方(クライアント編2) - Yahoo! JAPAN Tech Blog

こんにちは。ヤフー株式会社システム統括本部の酒井です。
私は、過去の記事(第1回第2回)の執筆者と同じ、社内向けにキューイング、Pub-Sub、ストリーミングなどのメッセージングプラットフォームを提供するチームに所属しています。

前回はOSS Apache Pulsar(以降、Pulsarと記載します)のクライアントの基本的な使い方、いくつかの機能について紹介しました。今回も前回に引き続きクライアントの各クラスで設定可能な項目と前回紹介できなかった機能についてお話しします。

  • 各クラスの設定一覧
    • PulsarClientクラス
    • Producerクラス
    • Consumerクラス
  • 機能紹介
  • 認証・認可
  • Pulsar Schema
  • メッセージのバッチ送信

本稿ではPulsarのv2.5.0でJava版クライアントライブラリを用いて説明していきます。

ヤフー社内におけるPulsar活用状況

私の所属するチームはPulsarがOSS化される2016年9月頃から携わっており、Pulsarのコミッターも複数名在籍しています。
社内では、Pulsarを安定的にユーザーに利用していただけるように運用することに加えて、社内における需要・事例をもとに機能拡張・バグ修正等の開発を通じたOSSへの貢献もしています。

ここで運用するPulsarを社内のプロダクトが利用することにより、プロダクトを開発するエンジニアはメッセージングプラットフォームを個別に運用する必要がなくなります。
その結果、プロダクトは本来やるべきサービスやそれに伴うユーザー価値の創造に専念できるようになります。

ヤフーとPulsarの関わりについては、別記事を投稿しているのでこちらもご覧ください。

各クラスの設定一覧

ここではPulsarClient/Producer/Consumerクラスで指定可能な主な設定について紹介していきます。
Pulsarでは各クラスでbuilderパターンが採用されており、それぞれのクラスで以下のように指定することが可能です。

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
        
Producer<byte[]> producer = client.newProducer()
        .topic("persistent://public/default/my-topic")
        .create();

Consumer<byte[]> consumer = client.newConsumer()
        .topic("persistent://public/default/my-topic")
        .subscriptionName("my-subscription")
        .subscribe();

これから紹介する設定についてもserviceUrltopicsubscriptionNameと同様に指定することが可能です。

また各クラスの紹介の最後にJavadocへのリンクを載せていますが、執筆段階でv2.5.0のリリース版のドキュメントが存在しないためv2.4.2のリンクになっています。

PulsarClientクラス

まずはPulsarClientクラスから見ていきます。
過去の記事ではserviceUrlのみ設定していましたが、その他にも認証やスレッド数の設定なども可能です。
(認証の設定方法に関しては、後半の機能紹介でより詳しく説明します)

設定項目 説明 デフォルト値
serviceUrl 接続先のBroker。 None
authentication 利用する認証プラグインとプラグインに渡すパラメータ。
Pulsarでは現在、TLS認証、 Kerberos認証など主に4つの認証に対応しています。
例:
authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls", "{\"tlsCertFile\":\"/path/to/my-role.cert.pem\",\"tlsKeyFile\":\"/path/to/my-role.key-pk8.pem\"}")
None
ioThreads Brokerとの接続の処理に利用するスレッド数。
Producer/Consumer/Readerを複数生成して利用する場合などに変更します。
1
listenerThreads メッセージリスナで利用するスレッド数。
メッセージリスナを設定したConsumerを複数生成する場合などに変更します。
1
connectionsPerBroker Brokerごとのコネクション数。
1つのBrokerにProducer/Consumer/Readerを複数接続する場合などに変更します。
1
tlsTrustCertsFilePath 信頼するルート証明書のパス。 None
tlsAllowInsecureConnection Brokerからの信頼できないTLS証明書での接続を許可するか。 false
statsInterval 各Producer/Consumer/Readerの統計情報を取得する間隔。 60秒

指定可能な全設定一覧はこちら。

Producerクラス

次にProducerクラスを見ていきます。
前回の記事でも紹介したメッセージの暗号化やパーティションドトピックでの設定以外にもバッチ送信や内部キュー周りの設定なども可能です。

設定項目 説明 デフォルト値
topic メッセージの送信先トピック。 None
producerName Producer名。
統計情報での表示などに利用されます。
None
sendTimeout 設定したtimeout値を過ぎてもBrokerからメッセージのackが返ってこない場合にエラーになります。 30秒
blockIfQueueFull Producerの内部キューがいっぱいになった時に、
trueの場合はsend, sendAsyncメソッドをブロックします。
falseの場合はsend, sendAsyncメソッドが失敗しProducerQueueIsFullErrorの例外を投げます。

maxPendingMessagesの値が内部キューのサイズです。
false
maxPendingMessages 保留中のメッセージを保持するProducerの内部キューのサイズ。
保留中のメッセージとは、例えばBrokerからのackを待っているメッセージです。

またデフォルトでは、(blockIfQueueFullがfalseなので)キューがいっぱいになるとsend, sendAsyncメソッドの全ての呼び出しが失敗になります。
1000
maxPendingMessagesAcrossPartitions パーティション全体での保留中のメッセージを保持するProducerの内部キューのサイズ。

パーティションドトピックを利用している場合はこちらの値にも気をつけましょう。
50000
messageRoutingMode パーティションドトピックでのメッセージのルーティングモード。
指定できるモードは、
- MessageRoutingMode.RoundRobinPartition
- MessageRoutingMode.UseSinglePartition
- MessageRoutingMode.CustomPartition
の3つです。
MessageRoutingMode.RoundRobinPartition
cryptoKeyReader メッセージの暗号化機能でキーストアにアクセスするためのCryptoKeyReaderインターフェースの実装クラス。 None
addEncryptionKey 暗号化キー。 None
cryptoFailureAction 暗号化に失敗した時のアクション。
- ProducerCryptoFailureAction.FAIL (メッセージの送信に失敗します)
- ProducerCryptoFailureAction.SEND (暗号化されていないメッセージを送信します)
ProducerCryptoFailureAction.FAIL
batchingEnabled メッセージのバッチ送信を有効にするか。 true
batchingMaxPublishDelay バッチ送信の最大送信間隔。 1ミリ秒
batchingMaxMessages 1度にバッチ送信する最大メッセージ数。 1000
compressionType メッセージの圧縮形式。
指定できる形式は、
- CompressionType.LZ4
- CompressionType.ZLib
- CompressionType.ZSTD
- CompressionType.SNAPPY
の4つです。
None

指定可能な全設定一覧はこちら。
https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/ProducerBuilder.html

Consumerクラス

最後にConsumerクラスを見ていきます。
過去の記事でもサブスクリプションやack、暗号化などの設定をしていましたが、それ以外にも内部キューやdead letter topicのポリシーなどの設定が可能です。

設定項目 説明 デフォルト値
consumerName Consumer名。
統計情報での表示などに利用されます。
None
topic メッセージの受信先トピック。 None
topics メッセージの受信先トピックをリストで複数指定します。

ただし同一のネームスペース配下のトピックのみ指定できます。
None
topicsPattern メッセージの受信先トピックを正規表現で複数指定します。

ただし同一のネームスペース配下のトピックのみ指定できます。
None
subscriptionName サブスクリプション名。 None
subscriptionType サブスクリプションタイプ。
指定可能なサブスクリプションタイプは、
- SubscriptionType.Exclusive
- SubscriptionType.Shared
- SubscriptionType.Failover
- SubscriptionType.Key_Shared
の4つです。
SubscriptionType.Exclusive
receiverQueueSize Consumerの内部キューのサイズ。
ConsumerはBrokerからの受信メッセージを内部キューに保持します。

アプリケーションのメッセージ処理速度とトピックへのメッセージの送信速度などを考慮して設定するようにしましょう。
例えば処理速度に対してサイズが小さすぎるとBrokerからのメッセージの配信待ちが発生してしまいパフォーマンスが落ちる可能性があります。
また通常よりも処理に時間がかかるメッセージがあった場合には、そのConsumerの内部キューのメッセージはなかなか処理されずに残り続けてしまいます。こちらはサブスクリプションタイプがSharedで複数Consumerで分散処理する場合には注意しましょう。
1000
maxTotalReceiverQueueSizeAcrossPartitions パーティション全体での受信メッセージを保持するConsumerの内部キューのサイズです。

パーティションドトピックを利用している場合はこちらの値にも気をつけましょう。
50000
ackTimeout 未ackなメッセージのtimeout値。
内部キューからConsumerがメッセージを受け取ってから指定したtimeout値を超えると未ackなメッセージは再送されます。

デフォルトではackTimeoutは無効になっており、時間経過ではメッセージは再送されません。
0
negativeAckRedeliveryDelay negative ackされたメッセージはこのnegativeAckRedeliveryDelayが経過した後に再送されます。 1分
messageListener メッセージリスナ。
アプリケーションはメッセージリスナを通してメッセージを受信します。

メッセージリスナの使い方は前回の記事参照。
None
cryptoKeyReader メッセージの暗号化機能でキーストアにアクセスするためのCryptoKeyReaderインターフェースの実装クラス。 None
cryptoFailureAction 復号に失敗した時のアクション。
- ConsumerCryptoFailureAction.FAIL (復号が成功するまでメッセージの受信に失敗します)
- ConsumerCryptoFailureAction.DISCARD (暗黙的にackが返され、アプリケーションには配信されません)
- ConsumerCryptoFailureAction.CONSUME (暗号化されたメッセージがアプリケーションに配信されます。復号はアプリケーション側で行います)
ConsumerCryptoFailureAction.FAIL
replicateSubscriptionState replicated subscription機能を有効にします。

この機能についてはこちら
false(無効)
deadLetterPolicy dead letter topic機能のポリシー。

この機能についてはこちら
None

指定可能な全設定一覧はこちら。
https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/ConsumerBuilder.html

機能紹介

この章では前回の記事で紹介できなかった認証・認可、Pulsar Schema、バッチ送信の3機能を紹介していきます。

認証・認可

Pulsarでは認証・認可を設定することが可能です。
認証では、TLS、Token、Kerberos認証などのプラグインが用意されています(独自プラグインを生成することも可能)。
認可では、テナントに対してadmin権限、ネームスペースに対してproduce/consume権限を付与できます。
また全ての権限を持つスーパーユーザーを設定できます。

今回はToken認証のプラグインを利用して、認証・認可の設定方法について解説していきます。

Token認証とは

PulsarのToken認証はJSON Web Tokens (RFC-7519)をベースにしたセキュリティトークンを用いた認証です。
利用者は通常、管理者(または自動化サービス)からトークン文字列を受け取ります。

署名付きのJWTは以下のような文字列です。

eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY

これをPulsarClientクラスで以下のようにセットします。

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .authentication(
        AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY")
    .build();

では早速ですが、次から実際に試していきます。

事前準備

standaloneモードで試していきますので、Pulsar v2.5.0のダウンロード・展開を行ってください。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-bin.tar.gz
$ tar xfz apache-pulsar-2.5.0-bin.tar.gz
$ cd apache-pulsar-2.5.0

鍵とトークンの発行

Token認証を利用するには鍵とトークンを発行する必要があります。
JWTではトークンの生成と検証のために2種類の鍵をサポートしています。

  • 対称 : 1つの秘密鍵でトークンの生成と検証を行う方式
  • 非対称 : 秘密鍵でトークンを生成し、公開鍵でトークンの検証を行う方式

今回は後者の秘密鍵と公開鍵を利用した方式で試していきます。
まずは鍵を生成します。

$ bin/pulsar tokens create-key-pair \
            --output-private-key my-private.key \
            --output-public-key my-public.key

実際に本番環境で利用する際には、秘密鍵(my-private.key)は安全な場所に保管しておき、管理者が新しいトークンを発行する時にのみ利用します。
公開鍵(my-public.key)は、全てのBrokerに配布してトークンの検証に利用します。こちらはセキュリティ上の懸念なしに、公開して共有できます。

作成した秘密鍵を利用してtest-userとsuper-userというロールのトークンをそれぞれ発行します。
下記コマンドを実行するとトークンが発行され、それぞれ出力されます。ここではファイルに書き込んでいます。

$ bin/pulsar tokens create --private-key file:///path/to/my-private.key \
            --subject test-user > test-user-token.txt

$ bin/pulsar tokens create --private-key file:///path/to/my-private.key \
            --subject super-user > super-user-token.txt

Brokerの認証・認可の設定

次にToken認証を有効にしてBrokerをstandaloneモードで起動します。
まずconf/standalone.confを下記のように修正します。

# 認証・認可を有効化
authenticationEnabled=true
authorizationEnabled=true

# Token認証プラグインのproviderを指定
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken

# 公開鍵を指定
tokenPublicKey=file:///path/to/my-public.key

# スーパーユーザーにsuper-userロールを指定
superUserRoles=super-user

# Broker内部でBrokerにリクエストする際に利用する認証プラグイン・トークンを指定
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=file:///path/to/super-user-token.txt

修正が完了したら、Brokerをstandaloneモードで起動します。

$ bin/pulsar standalone

ロールへの権限付与

super-userロールに対してはconf/standalone.confでスーパーユーザーに設定しました。
ここではtest-userロールに対して、admin/produce/consume権限を付与していきます。

初めにconf/client.confを下記のように修正して、pulsar-adminコマンドをtest-userロールで実行するようにします。

# Brokerにリクエストする際に利用する認証プラグイン・トークンを指定
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=file:///path/to/test-user-token.txt

修正が完了したら、まずはテナント作成をします。この時テナントに対してadmin権限の設定も同時に可能です。
テナントの作成はスーパーユーザーで実行する必要があるため、
super-userロールで実行するようにauth-paramsオプションでファイルの指定を変更しています。

# my-tenantテナントを作成。test-userロールにadmin権限を付与
$ bin/pulsar-admin --auth-params file:///path/to/super-user-token.txt \
            tenants create my-tenant \
            --admin-roles test-user

次にネームスペースを作成します。
ここからはadmin権限があれば行えるので、admin権限のあるtest-userロールで実行していきます。

$ bin/pulsar-admin namespaces create my-tenant/my-namespace

作成したmy-tenant/my-namespaceネームスペース配下のトピックへのproduce, consume権限をtest-userロールに付与します。
my-tenantテナントにadmin権限があってもproduce, consume権限がないとメッセージの送受信はできません。

$ bin/pulsar-admin namespaces grant-permission my-tenant/my-namespace \
            --role test-user \
            --actions produce,consume

これで権限の設定は完了です。
最後にtest-userロールにそれぞれの権限が正しく付与されているか確認してみます。

# adminRolesにtest-userロールがあることを確認
$ bin/pulsar-admin tenants get my-tenant
{
  "adminRoles" : [ "test-user" ],
  "allowedClusters" : [ "standalone" ]
}

# produce, consume権限がtest-userロールに付与されていることを確認
$ bin/pulsar-admin namespaces permissions my-tenant/my-namespace
{
  "test-user" : [ "produce", "consume" ]
}

produce/consumeの実行

最後にmy-tenant/my-namespace配下のトピックにproduce/consumeしていきます。

最初はpulsar-clientコマンドを利用してproduce/consumeしてみます。
このコマンドもconf/client.confの設定を利用するためtest-userロールでproduce/consumeできます。
まずはconsumeします。

# persistent://my-tenant/my-namespace/my-topic1トピックに
# サブスクリプションsub1で購読、5つメッセージを受信するまで待機します。
$ bin/pulsar-client consume -s sub1 -n 5 persistent://my-tenant/my-namespace/my-topic1

次に別ターミナルでproduceします。
produce後にConsumer側でメッセージを受信できていることを確認してください。

$ bin/pulsar-client produce -m m1,m2,m3,m4,m5 persistent://my-tenant/my-namespace/my-topic1

次は実際にサンプルコードを実装して実行してみます。
このサンプルコードではProducer/Consumerを生成し5つのメッセージをそれぞれ送受信します。

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;

class ProduceConsumeWithAuth {

    public static void main(String[] args) throws Exception {

        PulsarClient pulsarClient = PulsarClient.builder()
                // Token認証プラグインとtest-userロールのトークンが書き込まれているファイルを指定
                .authentication(AuthenticationToken.class.getName(), "file:///path/to/test-user-token.txt")
                // トークン文字列を直接指定することでAuthenticationFactory.token()を利用することも可能
             // .authentication(AuthenticationFactory.token("eyJhbGci....."))
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic("persistent://my-tenant/my-namespace/my-topic2")
                .subscriptionName("sub1")
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic("persistent://my-tenant/my-namespace/my-topic2")
                .create();

        for (int i = 0; i < 5; i++) {
            // メッセージを送信する
            producer.send(String.format("my-message-%d", i).getBytes());
        }

        for (int i = 0; i < 5; i++) {
            // メッセージを受信する
            final Message<byte[]> message = consumer.receive();
            System.out.println("Received message: " + new String(message.getData()));

            // Brokerにackを返す
            consumer.acknowledge(message);
        }

        producer.close();
        consumer.close();
        pulsarClient.close();
    }
}

サンプルコード実行後、メッセージが正しく受信できていれば下記ログが出力されます。

Received message: my-message-0
Received message: my-message-1
Received message: my-message-2
Received message: my-message-3
Received message: my-message-4

認証・認可についての紹介は、以上です。
さらに詳細など興味持っていただけた方は下記ドキュメントも参考にしていただければと思います。

この後のPulsar Schema/バッチ送信を試す際に認証・認可を有効にしていると、
public/defaultネームスペースへのproduce/consume権限がtest-userロールにないためエラーになります。
もしもこのまま続けて試される場合にはconf/standalone.confを修正して認証・認可を無効にして試してください。
standloneモードで立ち上げているプロセスの再起動も必要です。

# 認証・認可を無効化
authenticationEnabled=false
authorizationEnabled=false

Pulsar Schema

Pulsarではスキーマを指定してメッセージを送受信できます。
この機能を使うことによりデータを表現するクラスを独自にシリアライズ・デシリアライズする必要がなくクライアントライブラリに任せることができます。
ここではSTRINGとJSONをスキーマに指定しての利用例を紹介します。

STRING

まずはSTRINGをスキーマに指定してメッセージを送受信するシンプルな例を見ていきます。
Producer側ではSchema.STRINGを指定してProducerを生成、TypedMessageBuilder::value()の引数に文字列を渡してメッセージを送信します。

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
    .topic("persistent://public/default/topic-schema-string")
    .create();

producer.newMessage().value("string-schema-my-message").send();

Consumer側でも同様にSchema.STRINGを指定してConsumerを生成します。
ジェネリクスで指定した型でメッセージを受信するためにMessage::getValue()を利用します。

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
    .topic("persistent://public/default/topic-schema-string")
    .subscriptionName("sub1")
    .subscribe();

Message<String> msg = consumer.receive();

System.out.println("Received: msg.getValue() = " + msg.getValue()); 

// 今まで通りgetData()で取得
System.out.println("Received: msg.getData()  = " + new String(msg.getData()));

Consumer, Producerの順にコードを実行するとConsumer側では以下のログが出力されます。

Received: msg.getValue() = string-schema-my-message
Received: msg.getData()  = string-schema-my-message

JSON

次にJSONでの利用例を見ていきます。
初めに下記のようなUserクラスを定義します。

class User {
  int age;
  String name;
  
  User(){}
  
  User(int age, String name){
    this.age = age;
    this.name = name;
  }

  public int getAge() {
    return age;
  }
  
  public String getName() { 
    return name;
  }

  public void setAge(int age) {
    this.age = age;
  }
  
  public void setName(String name) {
    this.name = name;
  }
}

Producer生成時にJSONSchemaを指定し、TypedMessageBuilder::value()の引数にジェネリクスで指定したUserクラスのインスタンスを渡します。

Producer<User> producer = pulsarClient.newProducer(JSONSchema.of(User.class))
    .topic("persistent://public/default/topic-schema-json")
    .create();

User user = new User(28, "Bob");
producer.newMessage().value(user).send();

Consumer側でも同様にJSONSchemaを指定し、Message::getValue()でメッセージをUserクラスのインスタンスとして受け取ることができます。
Message::getData()で取得しString型に変換した場合は、JSONデータ型の文字列として受け取ります。

Consumer<User> consumer = pulsarClient.newConsumer(JSONSchema.of(User.class))
    .topic("persistent://public/default/topic-schema-json")
    .subscriptionName("sub1")
    .subscribe();

Message<User> msg = consumer.receive();

System.out.println("Received: msg.getValue().getAge()  = " + msg.getValue().getAge()); 
System.out.println("Received: msg.getValue().getName() = " + msg.getValue().getName()); 

// 今まで通りgetData()で取得
System.out.println("Received: msg.getData()            = " + new String(msg.getData()));

Consumer, Producerの順にコードを実行するとConsumer側では以下のログが出力されます。

Received: msg.getValue().getAge()  = 28
Received: msg.getValue().getName() = Bob
Received: msg.getData()            = {"age":28,"name":"Bob"}

Pulsar Schemaでは、他にもAVROやProtobufを型に指定できます。
またversion管理なども可能です。
より詳しく知りたいと思われた方はこちら。

メッセージのバッチ送信

最後に紹介するのは、メッセージのバッチ送信です。
Pulsarでは一度に複数メッセージをまとめてBrokerに送信することが可能で、これによりパフォーマンスの向上が期待できます。
ただしその反面、機能の性質上送信で遅延が発生します。実際に利用する際は、最大送信間隔/最大同時送信数をアプリケーションの要件に応じて設定してください。

ここでは、非同期メソッドのProducer::sendAsync()を利用してメッセージのバッチ送信をします。
同期メソッドのProducer::send()では仕様上基本的にバッチ送信されません。

初めにメッセージを送信するトピックにサブスクリプションを作成します。
ここではsub1というサブスクリプションを作成しています。

$ ./bin/pulsar-admin topics create-subscription \
        -s sub1 \
        persistent://public/default/topic-enable-batching

デフォルトでバッチ送信は有効になっていますが、今回は明示的に有効にしています。
また最大送信間隔/最大同時送信数も設定しています。

    Producer<byte[]> producer = pulsarClient.newProducer()
        // バッチ送信を明示的に有効化
        .enableBatching(true)
        // メッセージ送信まで最大1秒の遅延
        .batchingMaxPublishDelay(1,TimeUnit.SECONDS)
        // 最大500メッセージを同時に送信する
        .batchingMaxMessages(500)
        .topic("persistent://public/default/topic-enable-batching")
        .create();

   // 600件メッセージを送信する
   for(int i=0;i<600;i++) {
       final String message = "my-message-" + i;
       producer.newMessage().value(message.getBytes()).sendAsync();
   }
   // 全メッセージの送信完了を待ちます
   producer.flush();

次に送信したメッセージを確認してみます。
pulsar-adminコマンドのpeek-messagesを使うことで(メッセージの購読位置を移動させずに)指定したサブスクリプションのバックログを覗き見できます。
X-Pulsar-num-batch-messageが同時に送信したメッセージ数です。
600件送信しましたが、batchingMaxMessagesを500に設定していたため500件と100件に分かれてバッチ送信されたことがわかります。

# サブスクリプションsub1のバックログにたまっている501件のメッセージをpeek。
$ ./bin/pulsar-admin topics peek-messages \
        -s sub1 \
        -n 501 \
        persistent://public/default/topic-enable-batching

# 1件目のメッセージ
Batch Message ID: 404:12:0
Tenants:
{
  "X-Pulsar-num-batch-message" : "500",
  "publish-time" : "2020-04-08T11:40:58.025+09:00"
}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6d 79 2d 6d 65 73 73 61 67 65 2d 30             |my-message-0    |
+--------+-------------------------------------------------+----------------+
-------------------------------------------------------------------------
.
.
# 501件目のメッセージ
Batch Message ID: 404:13:0
Tenants:
{
  "X-Pulsar-num-batch-message" : "100",
  "publish-time" : "2020-04-08T11:40:58.217+09:00"
}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6d 79 2d 6d 65 73 73 61 67 65 2d 35 30 30       |my-message-500  |
+--------+-------------------------------------------------+----------------+
-------------------------------------------------------------------------

次にバッチ送信を無効にしてメッセージを送信してみます。
最初に先ほどと同じようにサブスクリプションを作成します。

$ ./bin/pulsar-admin topics create-subscription \
        -s sub1 \
        persistent://public/default/topic-disable-batching

Producerでバッチ送信を無効にしてメッセージを送信

    Producer<byte[]> producer = pulsarClient.newProducer()
        // バッチ送信を無効化
        .enableBatching(false)
        .topic("persistent://public/default/topic-disable-batching")
        .create();

先ほどと同様にメッセージを見てみます。
バッチ送信ではないため、X-Pulsar-num-batch-messageがありません。メッセージが1つ1つ送信されたことがわかります。

$ ./bin/pulsar-admin topics peek-messages \
        -s sub1 \
        persistent://public/default/topic-disable-batching

Message ID: 434:0
Tenants:
{
  "publish-time" : "2020-04-08T11:48:42.516+09:00"
}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6d 79 2d 6d 65 73 73 61 67 65 2d 30             |my-message-0    |
+--------+-------------------------------------------------+----------------+

バッチ送信の紹介は以上です。
特にパフォーマンスが求められるようなアプリケーションでは、この機能をうまく利用することで性能要件が満たせるかもしれません。

おわりに

第3回目は前回に引き続きクライアントについて紹介しました。
ブログの前半では各クラスの設定一覧、後半では前回紹介できなかった3つの機能について紹介しました。
Pulsarにはまだ紹介できていない機能がたくさんあります。
興味を持たれた方はぜひ公式ドキュメントでいろいろな機能を探してみてください。
またこちらでも引き続きPulsarについて紹介していきますので、次回もよろしくお願いします。

告知

日本時間の7/17 24:30(7/17 8:30 PDT)からPULSAR SUMMIT VIRTUAL CONFERENCE 2020が2日間Zoom上で開催されます。
日本では深夜の時間帯になってしまいますが、参加費無料でPulsarの利用事例、技術的な話などさまざまなセッションを聴くことができるので、ぜひ参加いただければと思います。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?