はじめに
Apache Pulsar is an open-source, distributed messaging and streaming platform built for the cloud.
Apache Pulsar(以降、Pulsar)とは、メッセージング・ストリーミングプラットフォームの一種である。
基本的なところは他のドキュメントに譲る。例えば拙著では メッセージングPF「Apache Pulsar」の使い方(入門編) を参考にしていただくと良いかもしれない。
本稿では、サブスクリプションタイプのうち Key_Shared についてその嬉しさと課題点を思うままに述べてみる。これは何らかの公式見解などではなく、あくまで私的な意見であることに注意されたい。
本稿はPulsarのユーザに向けた機能紹介をするものではないのでその点も注意されたい。
本稿は執筆開始時点でv3.3.2がリリースされていた。またv4.0.0はリリース前であった。
断りのない限りこれを前提に記述している。
Key_Sharedサブスクリプションタイプとは
サブスクリプションとは、簡単に言うとトピックに対するカーソルである。トピックのメッセージをどこまで購読したかを保持する。トピックとサブスクリプションは1 vs Nとなり得る。この点はPub/SubにおけるSubscriberと理解していただきたい。
Pusarの機能の一つとして、このサブスクリプションにはいくつかの機能別にタイプと呼ばれるものを提供している。拙著を引用すると次のとおりである。
- Exclusive
- 1つのサブスクリプションを1つのConsumerだけがConsumeできる
- Failover
- 1つのサブスクリプションに複数のConsumerが接続できる
- 最も優先度の高いConsumerだけがConsumeできるが、このConsumerがダウンしたとき次の優先度のConsumerがConsumeできるようになる
- Shared
- 1つのサブスクリプションを複数のConsumerがConsumeできる
- メッセージがラウンドロビンに近い形でConsumerに配信される
- Key_Shared
- 1つのサブスクリプションを複数のConsumerがConsumeできる
- ProducerはメッセージにKeyと呼ばれる識別子を付与して、同じKeyを持つメッセージは同じConsumerに配信される
- 同じKeyを持つメッセージの順序保証ができる
このうちShared/Key_SharedはいわゆるQueue semantics1と呼ばれるものだろう。トピックのメッセージを複数Consumerに分散して配信する。
Key_SharedはSharedと比較して更に制約がある。すなわちメッセージのKeyによって分散に制限をかけて、同じKeyは同じConsumerに配信されるようにする。これは何が嬉しいのだろうか。
端的に言うと次の点だろうか。
- Keyごとの順序保証
- 特定KeyのメッセージをひとつのConsumerで処理することにして、KeyについてPublishされた順にメッセージを処理できる
- 分散の制約がまったく無いとすると、特定メッセージ間に前後関係の文脈があったとしても別々のConsumerに配信されうる
- 文脈の例: ユーザAの操作(操作1をしてから操作2をした), 特定サーバのログ(ログ1が出てからログ2が出た)
- Keyによる分散の制御
- 順序保証の必要がなかったとしても、単に同じKeyを同じConsumerに配信したいという要件がある
ところで、Key_Sharedとは同じKeyは同じConsumerに配信する仕様をサブスクリプションタイプとして実現するものだが、その他にもPartitioned Topicのrouting mode2に依存して複数Consumerを接続しながら実現することもできる。
たとえば簡単な例だと、Partitioned Topicの各partitionに対してExclusiveで同名のサブスクリプション名のConsumerを接続する。結局のところConsumerから見るとトピック(partition)ごとに接続しているだけなのだが、Partitioned Topic自体から俯瞰してみると複数Consumerでメッセージを処理していることになる。
この他にもFailoverを使う例3など考えられるが、話の本筋ではないので割愛する。
いずれも、要するにpartitionごとにひとつのConsumerを接続することで実現するものである。
これとKey_Sharedは何が違うのだろうか。大きな点はpartitionを必要としないところだろう。partitionごとに一つのConsumerを接続させるということは、そのPartitioned Topicの一つのSubscriptionにとって接続可能なConsumerの上限はpartition数である。Consumerをスケールアウトさせるにはpartitionを増やす必要がある。
対してKey_Sharedはpartition数に関係なくConsumerへの配信の部分でKeyごとの順序保証のための制御をしている。つまり単一のトピックであってもConsumer数はスケールアウトさせることができる。
さらに一つKey_Sharedの仕様を紹介する。それはユーザの選択次第でどのKeyをどのConsumerに配信するかのマッピングのアルゴリズムを選択できるという点である。マッピングは次の二種類に大別される4。
- Auto-split
- Consumerが増減すると、マッピングを特定アルゴリズムに沿ってサーバ側で自動分割
- どのKeyが配信されるかが 接続途中で切り替わる 可能性がある
- Sticky
- Consumer接続時にマッピングを手動指定
- 最初から どのKeyが配信されるかが固定
Stickyは論理的には(partition数を変えないものとすると)partitionを使った方法と同じく、マッピングが固定される。マッピングが欠けると、その部分のメッセージはいずれのConsumerにも処理されない。マッピングの欠けがないようにConsumer全体をクライアント側で管理する必要がある。
Auto-splitはその点でいうと、マッピングをサーバ側で自動管理するために一見すると便利そうである。
Key_Sharedの課題点
ここまで素敵な言葉で説明してきたが、つまりはそれを実現するような実装をサーバサイドで持つことになるわけだ。ここに課題点があると考えている。
あくまで筆者の経験的なもので申し訳ないが、問題になるのは同じKeyを同じConsumerに配信するという仕様それ自体と順序保証の仕様5だと考えている。
以降は直近で改善が期待される前者について詳しく取り上げる。
同じKeyを同じConsumerに配信する仕様
これは特にマッピングアルゴリズムにAuto-splitを使用するときに見えてくる。端的に言うと、マッピングが切り替わるのだからConsumerの接続から切断まで常に同じKeyを同じConsumerに配信することは仕様上できないことになる。
ここで重要になるのは、そもそも何を保証するべきなのかだろう。Pulsarとしては次の戦略を取っている6。
- あるKeyは一つのConsumerで処理される
- Consumerが接続されるとマッピングが変わり、再送メッセージはそのConsumerに配信される
- もとのマッピングによって配信されたConsumerが持っているメッセージはそのまま処理でき、Ackを返したら再送はされない
- 再送が発生したら、先の仕様通り新しいマッピングのConsumerに配信される
- 新しく接続したConsumerは、接続した時点ですでに他Consumerに配信したメッセージすべてにAckが返されるまで新規メッセージを受信することはできない
- 受信できると、同一Keyのメッセージが再送されたときに順序が壊れる
非常に雑ながら例示する。2Consumerが接続している状態で、新たに1Consumerが接続するときを考える。
ここではあえてPulsarの用語・仕様に則らない表記をしている7。
# foo:n のとき、メッセージは foo のKeyをもつこととする
未配信: [key-a:1, key-b:2, key-a:3, key-a:4, key-a:5, ...]
マッピング: {key-a: c1, key-b: c2, ...}
c1 received: []
c2 received: []
-(いくつかメッセージを配信)->
未配信: [key-a:4, key-a:5, ...]
# これらはAckを返していないものとする
c1 received: [key-a:1, key-a:3]
c2 received: [key-b:2]
-(c3が接続)->
# key-aのマッピングが変更されたとする
マッピング: {key-a: c3, key-b: c2, ...}
c1 received: [key-a:1, key-a:3]
c2 received: [key-b:2]
c3 received: [] # すべての配信済みメッセージのAckが返されるまで新規メッセージを受信しない
-(c1, c2が全てにAck)->
未配信: [key-a:5, ...]
c1 received: []
c2 received: []
c3 received: [key-a:4]
ここで、条件を変えてc1がAckを返さずに切断したとすると次の通り。
-(c3が接続)->
# key-aのマッピングが変更されたとする
マッピング: {key-a: c3, key-b: c2, ...}
c1 received: [key-a:1, key-a:3]
c2 received: [key-b:2]
c3 received: [] # すべての配信済みメッセージのAckが返されるまで新規メッセージを受信しない
-(c1が切断)->
c2 received: [key-b:2]
c3 received: [key-a:1, key-a:3] # 再送メッセージは受信可能
-(c2, c3が全てにAck)->
未配信: [key-a:5, ...]
c2 received: []
c3 received: [key-a:4]
対して、たとえば先の配信停止の仕様がないとすると次のように遷移する可能性がある。
-(c3が接続)->
未配信: [key-a:5, ...]
# key-aのマッピングが変更されたとする
マッピング: {key-a: c3, key-b: c2, ...}
c1 received: [key-a:1, key-a:3]
c2 received: [key-b:2]
c3 received: [key-a:4] # 新規メッセージを受信
-(c1が切断)->
c2 received: [key-b:2]
c3 received: [key-a:4, key-a:1, key-a:3] # key-aに関する順序崩れ
しかし、ユーザとしてはこの制御はおそらく分かりづらいものだろう。Consumerを接続してもメッセージが全く配信されないという事象に遭遇してからこの仕様に気がついてもおかしくない。
それと、本来配信停止はすでに配信されてしまったがAckを返されていないKeyにのみ限定しても良いだろう。先の例だと key-b
のメッセージがどういう状態であるかは c3
にとって無関係である。
これが実現できれば、無駄な配信停止を省いて条件次第でスループットを若干向上させることに寄与するはずだ。
問題点解決のための動向
同じKeyを同じConsumerに配信する仕様については、仕様自体を変更するProposal(Pulsar Improvement Proposal; PIP)が提案されている。
PIP-379: Key_Shared Draining Hashes for Improved Message Ordering
ざっくりいうと、意図的に配信を止める仕様を変更してConsumerの追加・削除時にsubscription中でAck待ち状態になっているKeyはそれらのAckが開放されるまでいずれも配信できないようにするというものである。
配信を完全に止めてしまうという仕様が現行仕様(再送は可能)と比較して実装上の考慮点を削減8していると思われるのと、止めるのはKey(正確にはそのハッシュ)単位としているので先の問題(無駄な配信停止)を解決しているといえる。
また、統計情報もいくつか追加されている。ユーザ側でもデバッグが容易になることが期待されている。
個人的には興味深い反面まだ懸念が明らかになるほど使っていないこともあり、これらの最終的な評価は実際に利用されるようになってから改めてすることにしたい。
執筆開始時点ではまだこの修正はリリースされていなかったが、v4.0.0にて無事リリースされたようである。
What's New in Apache Pulsar 4.0#Enhanced Key_Shared Subscription: Scale Without Compromising Message Order
おわりに
Key_Sharedの仕組み自体は個人的には興味深い。
願わくば問題が取り除かれており、かつユーザ観点での混乱が少ないだろう仕様に改善されていくことを期待する。
-
https://pulsar.apache.org/docs/concepts-messaging/#routing-modes ↩
-
https://pulsar.apache.org/docs/concepts-messaging/#failover--partitioned-topics
FailoverではpartitionごとにConsumerが選択される。この背景知識の説明は本稿の本質から外れるので、あえてExclusiveのパターンを用いた。 ↩ -
https://github.com/apache/pulsar/pull/10762
たとえば上記はConsumerの接続・切断によるものではなく順序崩れを引き起こすパターンの一つであった。 ↩ -
https://pulsar.apache.org/docs/concepts-messaging/#preserving-order-of-processing ↩
-
順序にはMessageIdを用いたほうが正確だろう。未配信の列はbacklogとpendingAcks等を使って表現したほうが正確かもしれない。しかし、いずれも問題を理解するうえでは不要な概念なので省略する。 ↩
-
PIP-282: Change definition of the recently joined consumers position
余談だが筆者自身も1件はPIPとして修正を出したことがある。現行仕様に対して並行性の問題で先の仕様を遵守できない可能性があり、それは発生すると順序崩れを引き起こすものだった。それに対する対応である。
しかし、最終的にはリリースバージョンに取り込まれることはなかった。仕様がPIP-379に取って代わることで問題自体が消失したのである。 ↩