4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache PulsarにおけるBacklogの概念とその挙動

Last updated at Posted at 2021-09-06

はじめに

Apache PulsarにおけるBacklogの概念とその挙動、および関連する機能について解説します。具体的にどのような設定があるか、設定時に注意すべき点などについても触れています。読者が「Backlogにまつわる挙動と設定を理解し、ユースケースに合わせた設定を行い、ストレージを使い果たすなどの問題を回避できるようになること」がこの記事の目的です。

注1. この記事ではApache Pulsarの概念及び基本的な用語(Topic, Producer, Consumer, Namespaceなど)の説明は省略します。Apache Pulsarの概要について知りたい方は以下の記事などが参考になると思います:

注2. 本稿ではApache Pulsar 2.7.3を前提にしています。

Ack, Cursor, Backlogの概念

Acknowledgement(以下Ack)とはConsumerがメッセージの処理を終えたときにBrokerに返すレスポンスのことで、日本語では"確認応答"などと訳されます。PulsarではSubscriptionごとの「どのメッセージがAck済みか」という情報をCursorと呼びます(ちなみにApache Kafkaユーザーの方はKafkaにおけるOffsetと同等のものと考えていただければ分かりやすいと思います)。またTopic内においてまだAckされていないメッセージの集合をBacklogと呼びます。

これらの関係を示したのが次の図です。
01.jpg
図1. Ack, Cursor, Backlog

この図はあるTopicにProducerが接続し、0から9までのメッセージがproduceされた状態を表しています。TopicにはConsumer-1がSubscription-1というSubscription名で接続しており、0から3までのメッセージは配信済みかつAck済みになっています。このときSubscription-1のCursorは次に読むメッセージである4を指しており、また4から9までのメッセージはまだAckされていないためSubscription-1にとってのBacklogとなります。

ちなみに図でSubscription-1に含まれているのはConsumer-1のみですが、1つのSubscriptionの中に複数のConsumerが含まれている場合があります。例えばSubscription Type: Sharedなどを利用すると、1つのSubscription中に複数のConsumerが含まれ、それらがラウンドロビンでメッセージを受け取るようになります。他にもいくつかSubscription Typeが存在しますが、この記事では詳細は省略します(気になった方は以前の記事を参照してください)。

次に、1つのTopicに複数のSubscriptionある場合はどのようになるでしょうか。それを表したのが次の図です。
02.jpg

図2. Subscriptionが複数ある場合

この図はSubscription-1が0から3まで、Subscription-2が0から6までのメッセージをAckしたときの様子を表しています。このとき、Subscription-1にとっては4-9がBacklogであり、Subscription-2にとっては7-9がBacklogになります。

Pulsarにおいて「すべてのSubscriptionでAck済み」であるメッセージは適宜削除されていきます。一方、「1つ以上のSubscriptionでAckが返されていない」メッセージは削除されずストレージに保持されます。

図2では、0-3のメッセージはSubscription-1, Subscription-2両方でAck済みのため、削除可能な状態になっています。対して4-6はSubscription-1で、7-9はSubscription-1とSubscription-2両方でAckが返されていないため、ストレージに保持されます。

このように1つでも"遅い"Subscriptionが存在するとそれに引きずられてストレージの使用量が増えることになります。この挙動がたびたび後述するBacklogの"詰まり"を引き起こす可能性があり、注意が必要です。

2種類のAck

Pulsarには2種類のAck方法が用意されています:

  1. Individually Ack
  2. Cumulative Ack

Individually Ackは1つ1つのメッセージに個別にAckを返す方法です。Javaのコードでは

consumer.acknowledge(message);

のように記述します。例えば0, 1, 3のメッセージはAckしたけれど2はまだ、のような状態を表現できます。

対してCumulative Ackはそれ"以前"のメッセージを一律でAckする方法です。Javaのコードでは

consumer.acknowledgeCumulative(message);

のように記述します。例えば3のメッセージに対してCumulative Ackをすると、0, 1, 2もAckされた扱いになります。

Cumulative Ackを利用することで後述のよくある問題の1つである"Ack hole"を防ぐことができます。ただし処理が失敗したメッセージについてわざとAckしないでおくことができない(後のメッセージをAkcした際に一緒にAckされてしまう)ため、これが必要な場合は必然的にIndividually Ackを使うことになると思います。またサブスクリプションタイプがSharedまたはKey_Sharedの場合、Cumulative Ackを利用することはできません。

参考:https://pulsar.apache.org/docs/en/concepts-messaging/#acknowledgement

Backlogの上限

NamespaceおよびTopicには

  • Backlogに溜めておけるメッセージ合計の上限
  • 上限に達したときの挙動

を設定することができます。

例えばpulsar-adminというCLIツールを使って下記のように設定することができます。(pulsar-admin含めAdmin APIの詳細について知りたい場合はこちらを参照してください)

注. この記事の以降の設定変更コマンド例ではNamespace単位で設定する場合のものを掲載します。Topic単位で設定する場合はnamespacesの部分をtopicsmy-tenant/my-nsの部分をpersistent://my-tenant/my-ns/my-topicに読み替えて実行してください。

$ bin/pulsar-admin namespaces set-backlog-quota --limit 10M --policy consumer_backlog_eviction my-tenant/my-ns

$ bin/pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
"destination_storage    BacklogQuota{limit=10485760, policy=consumer_backlog_eviction}"

"Backlog Quota"とはBacklogの上限および上限に達したときの挙動に関する設定項目です。set-backlog-quotaで値を設定し、get-backlog-quotasで設定された値を確認しています。

set-backlog-quotaで設定可能な項目は以下です:

オプション 説明 デフォルト値
-l, --limit 合計サイズの上限[bytes]。10M, 16Gのように指定可能。負数は無限(制限なし)を意味する。 broker.confのbacklogQuotaDefaultLimitGBの値
-p, --policy 上限に達したときの挙動。以下が指定可能:
  • producer_request_hold: Backlogの空きが出るまでproduceが停止される(タイムアウト値の設定次第でタイムアウトになる可能性もある)
  • producer_exception: produceが即座に失敗する
  • consumer_backlog_eviction: Backlogのメッセージが古いものから削除され始める
broker.confのbacklogQuotaDefaultRetentionPolicyの値

よくある問題: Backlogの"詰まり"

前述のpolicyを見ると分かる通り、Backlogが上限まで溜まった際はproduceか古いメッセージの保持のどちらかを諦めることになります(もちろん上限なしに設定することもできますが、HDDなどストレージのリソースが有限であることを考えると現実的には有限値を設定するケースがほとんどだと思われます)。

どちらが良いかはユースケースによりますが、例えば古いメッセージの破棄を許容できる場合はconsumer_backlog_evictionを、許容できない場合はproducer_request_holdまたはproducer_exceptionを設定することになるでしょう。

これらは上限に達したときのいわば"最後の手段"ですので、可能な限りBacklogを溜めないようにするのが望ましいです。しかし現実にはたびたび以下のような要因によりBacklogの"詰まり"が発生しがちです:

  • Ack hole
  • "遅い"Subscription

これらの詳細について次項で説明します。

Ack hole

この問題はIndividually Ackを利用している際に起こります。Individually Ackにより1つ1つのメッセージに個別にAckを返すことができますが、逆に言うとある特定のメッセージのAckがいつまでも返されない(以降のメッセージはAckされているが)という状態が発生する可能性があります。

このような不連続にAckが返されていない箇所のことを"Ack hole"と呼びます。ここで問題なのはAck holeが存在すると以降のメッセージすべてがAck済みかどうかに関わらずストレージに残り続けるという点です。
03.jpg
図3. Ack hole

上図はメッセージ3においてAckが返されていないことを表しています。このときメッセージ0から2はAck済みであるためストレージから削除可能です。しかし3以降のメッセージは3のAckが返されていないことに引きずられて削除されません(たとえ4以降がAck済みであってとしても)。

このような挙動になるのはPulsarのストレージとして利用されているApache Bookkeeperの仕様に依存しています。詳細は省略しますが、ストレージからAck済みのメッセージを1つずつ個別に消しているわけではなく、ある程度連続した単位で削除が行われています。

"遅い"Subscription

最初の章で説明した通り、「1つ以上のSubscriptionでAckが返されていない」メッセージはストレージから消えずに残ります。極端な例を言うと、1000個のSubscriptionがあり、そのうち999個のSubscriptionにおいてconsumeがproduceに追いついているとしても、1つのSubscriptionのconsume処理が遅れているだけで徐々にBacklogが溜まっていくことになります。

実際によくあるのは、下記のような遅いどころか止まっている(すでに使われていない)Subscriptionが足を引っ張っているケースです:

  1. 開発フェイズにおいて"お試し用"のSubscriptionを作成(e.g. test-subs)
  2. 本番稼働用に別名のSubscriptionを作成(e.g. prod-subs-N)
  3. 開発フェイズが完了しtest-subsからConsumerがいなくなるが、test-subs自体は消えずに残っている
  4. test-subsにおいてconsumeが行われないためCursorが動かず、Backlogが溜まりやがて上限に達する
    04.jpg
    図4. 遅いSubscription

このような問題を防ぐには不要になったSubscriptionを忘れずに削除するか、後述するSubscriptionの有効期限を設定しておくのが良いです。

Backlog詰まりの予防策

上述のBacklogの詰まりを防ぐにはどうしたらよいか、その方策を以下で説明します。

メッセージに有効期限を設ける

NamespaceおよびTopicにはメッセージの**Time-to-live(TTL)**すなわち有効期限を設定することができます。例えばpulsar-adminコマンドを用いて次のように設定します:

$ bin/pulsar-admin namespaces set-message-ttl my-tenant/my-ns --messageTTL 60

$ bin/pulsar-admin namespaces get-message-ttl my-tenant/my-ns
60
オプション 説明 デフォルト値
-ttl, --messageTTL メッセージの有効期限[秒]。ただし0は無効(有効期限切れにならない)を表す 0

Brokerでは定期的にBacklog内のメッセージが有効期限切れであるかをチェックしており、期限が切れたメッセージはAckが返されたのと同じ状態になります。

この設定はメッセージのリアルタイム性が求められるユースケースにおいて特に有用です。有効化することで古くなったメッセージを定期的に廃棄し、Ack holeの発生を防げます。

Subscriptionに有効期限を設ける

Subscriptionにも有効期限を設定することができます。

$ bin/pulsar-admin namespaces set-subscription-expiration-time my-tenant/my-ns --time 60

$ bin/pulsar-admin namespaces get-subscription-expiration-time my-tenant/my-ns
60
オプション 説明 デフォルト値
-t, --time Subscriptionの有効期限[秒]。ただし0は無効(有効期限切れにならない)を表す 0

こちらも同様にBroker内で定期的にチェックが動いており、Consumerが1つも存在しないSubscriptionはこの時間経過後に自動的に削除されます。この設定により使われていないSubscriptionを定期的に廃棄し、Backlogが溜まることを防げます。

Backlogの状態を確認する

pulsar-adminstatsstats-internalといったコマンドを実行することで、現在のTopicの状態を確認することができます。

statsの例:

$ bin/pulsar-admin topics stats persistent://my-tenant/my-ns/my-topic

(中略)
"backlogSize" : 3072, <- Backlogにあるメッセージの合計バイト数
"publishers" : [ ],
"subscriptions" : {
    "sub1" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 3, <- Backlogにあるメッセージの数
(後略)

stats-internalの例:

$ bin/pulsar-admin topics stats-internal persistent://my-tenant/my-ns/my-topic

(中略)
"cursors" : {
    "sub1" : {
      "markDeletePosition" : "6990757:3",
      "readPosition" : "6990757:20",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 17,
      "cursorLedger" : 6990758,
      "cursorLedgerLastEntry" : 1,
      "individuallyDeletedMessages" : "[(6990757:4..6990757:8], (6990757:9..6990757:13], (6990757:14..6990757:19]]",
      "lastLedgerSwitchTimestamp" : "2019-04-09T13:52:46.827+09:00",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 17, <- 最も古いAckが返されていないメッセージから最新のメッセージまでに何個メッセージがあるか
(後略)

これらのコマンドは運用上非常によく使います。様々な情報が含まれるのですが、今回はBacklogの状態に関する項目のみ紹介します。

stats出力結果のbacklogSize

そのTopicのBacklogのメッセージの合計バイト数を表しています。この値が増加し続けている場合はそのTopicにあるSubscriptionのどれかが正しくAckを返せていないことを表します。この値がやがてBacklog Quotaのサイズ上限に達すると、Backlog Quotaのpolicyで定めた挙動を取ることになります(なのでその前にAckを返せていない原因を突き止めて解消することが望ましいです)。

stats出力結果のmsgBacklog

そのSubscriptionのBacklogのメッセージの数を表しています(ただし厳密な値ではなく多少のずれが起こり得ます。厳密な値を取得したい場合は-gpb, --get-precise-backlogというオプションを指定する必要があります)。この値が増加し続けている場合はそのSubscriptionのconsumeが追いついていない(遅い)ことを表しています。

stats-internal出力結果のnumberOfEntriesSinceFirstNotAckedMessage

そのSubscriptionにおいて最も古いAckが返されていないメッセージから最新のメッセージまでに何個メッセージがあるかを表しています。つまりAck holeが存在するとこの値が増え続けることになります。ちなみにConsumerが最新までの全てのメッセージに対してAckを返している状態(すなわちBacklogにメッセージが溜まっていない状態)の場合、この値は1になります(0ではなく)。backlogSizeは増え続けているがmsgBacklogは大きくないような場合、実はAck holeが発生しておりこの値が増え続けていることが多いです。

Ackを返しても消さずに残しておきたい

ここまですべてのSubscriptionからAck済みのメッセージは削除されるという前提で話をしてきましたが、Ack済みのメッセージをあえて残したいケースがあります。例えば障害が発生した際にメッセージを再度送り直したり、原因調査のためにとっておきたい、などです。このような場合はRetentionという設定が役に立ちます。

Retentionを設定すると、Ack済みのメッセージはBacklogからは消えますが、一定時間経過あるいは上限サイズに到達するまでストレージに残ります。設定はpulsar-adminコマンドを用いて次のように行います:

$ bin/pulsar-admin namespaces set-retention --size 1G --time 1d my-tenant/my-ns

$ bin/pulsar-admin namespaces get-retention my-tenant/my-ns
{
  "retentionTimeInMinutes" : 1440,
  "retentionSizeInMB" : 1024
}
オプション 説明 デフォルト値
-s, --size Retention size: 保持するメッセージの合計サイズの上限[Mbytes]。10M, 16Gのように指定可能。0は無効、負数は無限を意味する。 0
-t, --time Retention time: 保持するメッセージの経過時間の上限[分]。4h, 3d, 2wのように指定可能。0は無効、負数は無限を意味する。 0
05.jpg
図5. Retention

上図はBacklog/Retentionの関係を示したものです。Ack済みのメッセージはBacklogからは消えますがストレージに残ります(図の4,5,6)
。そしてそれらの合計サイズがRetention Sizeを上回る、あるいはRetention time以上経過すると古いものから保持対象から外され、削除されるようになります(図の0,1,2,3)。

保持しているメッセージを再送したい際はCursorのリセットを行います。例えばpulsar-adminコマンドでは下記のようになります:

// 時間で指定
$ bin/pulsar-admin topics reset-cursor --subscription sub1 --time 24h persistent://my-tenant/my-ns/my-topic

// メッセージIDで指定
bin/pulsar-admin topics reset-cursor --subscription sub1 --messageId 123:456 persistent://my-tenant/my-ns/my-topic
オプション 説明
-s, --subscription リセット対象のSubscription
-t, --time カーソルを戻す時間。4h, 3d, 2wのように指定可能。
-m, --messageId カーソルを戻すメッセージID。<ledgerId>:<entryId>の形式。

この操作によりCursorが指定した位置まで巻き戻り、メッセージがその場所から再送されることになります。

補足: メッセージIDについて

メッセージIDとはProducerが発行したメッセージに対して自動的に割り振られるクラスタ内で一意なIDです。produce時もしくはconsume時に下記のようにして値を確認することができます:

// produce時
MessageId mid1 = producer.send("test-message".getBytes());
System.out.println(mid1.toString()); // 送信されたメッセージIDが表示される

// consume時
Message msg1 = consumer.receive();
System.out.println(msg1.getMessageId().toString()); // 受信したメッセージIDが表示される

ここでメッセージIDのフォーマットは

<ledgerId>:<entryId>:<partitionIndex>:<batchIndex>

のように4つ数値(それぞれの意味はここでは割愛します)をコロンでつないだものになっています。ただしCursorのリセットの際に必要になるのは先頭の2つ

<ledgerId>:<entryId>

までです。

おわりに

以上、PulsarにおけるBacklogやCursorの仕組みについて説明してきました。Cumulative / IndividualなAckの2方式を選べたり、Backlog Quota, Retention, TTLなどユースケースに合わせて柔軟に設定できるのが特徴ですが、反面、理解が不十分なまま使うと思わぬ落とし穴にはまる可能性があります。この記事がこれからPulsarを使う方にとって少しでも助けになれば幸いです。

告知

2021/9/30(木)、Apache Pulsar Meetup Japan #4がオンライン開催されます!

この記事を読んで少しでもPulsarに興味を持っていただけた方がいれば、ぜひご参加ください。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?