はじめに
Apache Pulsarでは、Consumerがトピックの購読を開始する際にサブスクリプションと呼ばれる識別子を指定する必要があります。Producerによってトピックにproduceされたメッセージは、それぞれのサブスクリプションに対して1つずつ配信されます。つまり、全てのConsumerが異なるサブスクリプションでトピックを購読していればメッセージがブロードキャストされる事になります。一方、全てのConsumerが同じサブスクリプションに属していればそれらの内どれか1つにメッセージが配信される事になり、負荷分散や冗長化を実現できます。PulsarのサブスクリプションはKafkaで言うConsumer Groupに当たるものと言えるでしょう。
ここからが本題となりますが、Pulsar 2.4.0からレプリケーテッドサブスクリプションと呼ばれる特殊なサブスクリプションの作成が可能になりました。今回はこのレプリケーテッドサブスクリプションの機能の概要からアーキテクチャまでを解説してみたいと思います。
なお、以降の記述は執筆時点での最新バージョンであるPulsar 2.8.0の実装に基づくものであり、将来のバージョンには当てはまらない可能性があります。また、Pulsar自体に関する詳しい説明は割愛しておりますので、そちらにご興味のある方は公式サイトや過去の記事を参照してください。
ジオレプリケーションとは
レプリケーテッドサブスクリプションについて解説する前に、まずはジオレプリケーションと呼ばれる機能を紹介しておく必要があります。
Pulsarのインスタンスは、1つ以上のクラスターから構成されます。通常、地理的に離れたデータセンター1つ1つをクラスターに対応させるのが一般的です。例えば、東日本のデータセンターには「jp-east」というクラスターを構築し、西日本のデータセンターには「jp-west」というクラスターを構築する、といった具合です。
さて、Pulsarにおけるトピックは何らかのネームスペースに所属する必要があります。このネームスペースを作成する際に1つ以上のクラスターを紐づける事ができます。
# ネームスペース「tenant1/ns1」にクラスター「jp-east」と「jp-west」を紐づける
$ bin/pulsar-admin namespaces create --clusters jp-east,jp-west tenant1/ns1
ネームスペースに紐づいたクラスターは後から変更も可能です。
# ネームスペース「tenant1/ns1」に紐づくクラスターを「jp-east」のみに変更する
$ bin/pulsar-admin namespaces set-clusters --clusters jp-east tenant1/ns1
jp-eastとjp-westが紐づけられたネームスペースでは、jp-east側にトピックを作成すると自動的にjp-west側でも同名のトピックが生成されます(その逆も然り)。そして、jp-east側にproduceされたメッセージは自動的に複製・転送され、jp-west側でもconsumeできます。これがジオレプリケーションと呼ばれる機能です。
ジオレプリケーションを利用したフェイルオーバー
ジオレプリケーションのユースケースの1つとして考えられるのがデータセンター単位でのフェイルオーバーです。
例えば、大規模な災害などで東日本のデータセンターが利用不能になったとします。BCP(事業継続計画)の観点から言えば、こうした事態が起きても西日本のデータセンターに処理を引き継いでサービスを継続できるのが望ましいでしょう。
障害発生時にProducerやConsumerが接続するクラスターを自動的に切り替える事自体は、GSLB(Global Server Load Balancing)といった技術を使えば容易に実現できます。問題は、Pulsarのサブスクリプションがクラスターごとに独立した存在であるという点です。各サブスクリプションは「Consumerにどこまでのメッセージを配信し、どこまでのメッセージに対してAck応答が行われたか」を示すカーソルという情報を持っているのですが、このカーソルの指す位置がクラスター間で同期されていないのです。
例として、jp-east側のトピックを「sub」というサブスクリプションで購読しているConsumerがいるとします。そして、Producerがm0からm3までの4つのメッセージをトピックにproduceし、その内m0とm1をConsumerが受け取ってAck応答していたとします。このタイミングでjp-eastクラスターで障害が発生し、接続先がjp-westクラスターに切り替わったらどうなるでしょうか。
Producer側は特に問題なく、jp-westクラスターへの接続が完了次第、引き続きメッセージm4とm5をproduceします。一方のConsumer側ですが、もしjp-west側にはまだ「sub」というサブスクリプションが存在していなかった場合、Consumerが接続した時点で作成されます。しかしこの場合、jp-west側のサブスクリプション作成前にproduceされたm0からm3までのメッセージはjp-west側では配信される事はありません。したがってConsumerはm2とm3を受け取れず、それよりも先にm4とm5のメッセージを受け取ってしまう事になります。
では、事前にjp-west側でも「sub」というサブスクリプションを作っておいた場合はどうでしょうか。この場合、障害発生前にはjp-west側にはConsumerが接続していないため、m0からm3までのメッセージは受信される事なくキューに溜まっていく事になります。障害発生後はConsumerはm0からm3までのメッセージを受け取り、その後にm4とm5を受け取ります。m0とm1は障害発生前にjp-east側でも既に受け取っているため、これらのメッセージは重複する事になります。
レプリケーテッドサブスクリプションとは
こうしたメッセージの欠落や重複といった課題を解決するために実装された機能がレプリケーテッドサブスクリプションです。
レプリケーテッドサブスクリプションを有効にするかどうかはConsumer側で指定できます。例えば、Javaのクライアントライブラリを使用している場合には次のようにすればレプリケーテッドサブスクリプションを有効にできます。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/test")
.subscriptionName("sub")
.replicateSubscriptionState(true) // レプリケーテッドサブスクリプションを有効化するための指定
.subscribe();
このようなConsumerがjp-east側に接続した場合、jp-west側でも自動的に同名のサブスクリプションが作成されます。そして、jp-east側でConsumerに受信されAck応答されたメッセージは、jp-west側でもAck応答された扱いとなり、キューから削除されます。これにより、jp-east側で障害が発生してjp-west側にConsumerが再接続した場合でも、(ほぼ)同じ位置からメッセージの受信を再開できます。
アーキテクチャ
一見すると単純に見えるレプリケーテッドサブスクリプションですが、そのアーキテクチャは意外に複雑です。ここからはレプリケーテッドサブスクリプションという機能がどのように実現されているか、その仕組みを見ていきます。
複数のクラスター間でカーソルを同期させたいとなった場合、単純に考えれば両方のクラスターから読み書き可能なストレージにカーソルの情報を都度書き込んで共有させればいいと考えるかもしれません。しかし、これを実現するのは現実的ではありませんでした。
まず、高いパフォーマンスを発揮でき、かつ全てのクラスターから利用可能なストレージがPulsarには存在しませんでした。Configuration Store(Global ZooKeeper) は全てのクラスターから読み書きできますが、あくまでメタデータの管理用であってあまり高いパフォーマンスは出せません。BookKeeperはシーケンシャルな書き込みに対して高いパフォーマンスを発揮しますが、単一のクラスターに閉じたデータしか扱えません。
また、カーソルはConsumerにAck応答された最も新しいメッセージのIDを保持していますが、メッセージIDはクラスター毎に完全に独立しています。つまり、jp-east側のメッセージと、それがジオレプリケーションされたjp-west側のメッセージでは両者のIDは一致しません。したがって、単純にクラスター共通のストレージにあるクラスターのカーソルが指す位置が書き込まれても、他のクラスターは自身のカーソルをどこに動かせばいいのかがわかりません。
このような状況の中考え出されたのが、マーカーと呼ばれる特殊なメッセージを導入し、それをトピックの通常のメッセージの合間に流すという手法でした。マーカーメッセージはConsumerに配信される事はありませんが、Broker内部でカーソルの位置を同期させるために使用されます。
レプリケーテッドサブスクリプションで使用される主要なマーカーメッセージは、
- ReplicatedSubscriptionsSnapshotRequest
- ReplicatedSubscriptionsSnapshotResponse
- ReplicatedSubscriptionsSnapshot
- ReplicatedSubscriptionsUpdate
の4つです。以降はそれぞれのマーカーメッセージの役割を見ていきます。
ReplicatedSubscriptionsSnapshotRequest
ReplicatedSubscriptionsSnapshotRequestは、あるクラスターが他のクラスターの現時点での最新(Ack応答済みかどうかは問わない)のメッセージのIDを教えてもらうためのメッセージです。このマーカーメッセージは、各クラスターのトピックにおいて一定間隔(デフォルトの設定では1秒間隔)で自動生成され、自分以外のクラスターに送信されます。
ReplicatedSubscriptionsSnapshotRequestの構造をJSONで表現すると次のようになります(実際にはJSONではなく、Protocol Buffersによりシリアライズされたバイナリデータです)。
{
"snapshot_id": "3fb212fc-784e-44ce-bb1e-117af24efa5e", // スナップショットIDと呼ばれるUUID文字列
"source_cluster": "jp-east" // 送信元クラスター名
}
ReplicatedSubscriptionsSnapshotResponse
ReplicatedSubscriptionsSnapshotRequestを受け取ったクラスターが、ローカルクラスターのトピックにおける最新のメッセージIDを送信元のクラスターに返信するためのメッセージがReplicatedSubscriptionsSnapshotResponseです。その構造は次の通りです。
{
"snapshot_id": "3fb212fc-784e-44ce-bb1e-117af24efa5e", // スナップショットID
"cluster": {
"cluster": "jp-west", // 送信元クラスター名
"message_id": { // 送信元クラスターの最新のメッセージID
"ledger_id": 200,
"entry_id": 67890
}
}
}
ReplicatedSubscriptionsSnapshot
他のクラスターからReplicatedSubscriptionsSnapshotResponseを受け取ったReplicatedSubscriptionsSnapshotRequestの送信元クラスターでは、ローカルのトピックに対してReplicatedSubscriptionsSnapshotメッセージをproduceします。このメッセージには、ローカルのトピックに流れてきたReplicatedSubscriptionsSnapshotResponseメッセージに対応するメッセージIDと、ReplicatedSubscriptionsSnapshotResponseに含まれていたリモートクラスターのトピックにおける最新のメッセージIDが含まれます。
{
"snapshot_id": "3fb212fc-784e-44ce-bb1e-117af24efa5e", // スナップショットID
"local_message_id": { // ローカルのReplicatedSubscriptionsSnapshotResponseのメッセージID
"ledger_id": 100,
"entry_id": 12345
},
"clusters": [
{
"cluster": "jp-west", // ReplicatedSubscriptionsSnapshotResponseの送信元クラスター名
"message_id": { // ReplicatedSubscriptionsSnapshotResponseの送信元クラスターの最新のメッセージID
"ledger_id": 200,
"entry_id": 67890
}
}
]
}
Broker内部に存在するConsumerへのメッセージ配信を担うディスパッチャーと呼ばれるコンポーネントは、ReplicatedSubscriptionsSnapshotメッセージが流れてきたら、それを一定個数分メモリにキャッシュします(デフォルトの設定では直近10個分をキャッシュ)。
ReplicatedSubscriptionsUpdate
そして、ConsumerからのAck応答によりカーソルが移動したタイミングで生成されるのがReplicatedSubscriptionsUpdateメッセージです。キャッシュされているReplicatedSubscriptionsSnapshotメッセージの中で「ローカルのカーソルが指す位置以前の local_message_id
を持つ最も新しいもの」を含んだReplicatedSubscriptionsUpdateメッセージが自分以外のクラスターに送信されます。
{
"subscription_name": "sub", // サブスクリプション名
"clusters": [
{
"cluster": "jp-west", // ReplicatedSubscriptionsSnapshotResponseの送信元クラスター名
"message_id": { // ReplicatedSubscriptionsSnapshotResponseの送信元クラスターの(当時の)最新のメッセージID
"ledger_id": 200,
"entry_id": 67890
}
}
]
}
これを受信したリモートクラスターは、そこに含まれる自身のクラスターのメッセージIDを取り出し、それ以前のメッセージを全てAck応答されたものとして扱います。これにより、クラスター間のカーソル位置の「ある程度は」同期されます。
課題
前述のアーキテクチャの説明を見て頂ければわかるように、現状のレプリケーテッドサブスクリプションによるクラスター間のカーソル位置の同期は完璧ではありません。ReplicatedSubscriptionsSnapshotの作成は最短でも1秒間隔でしか行われませんし、ReplicatedSubscriptionsSnapshot作成時点のメッセージIDまでローカルのカーソルが移動しないと他のクラスターのカーソルは移動しません。よって、Consumerの接続先のクラスターが切り替わった際にはわずかながらメッセージの重複が発生します。
また、マーカーメッセージの存在はトピックの統計情報に影響を与えます。Pulsarでは、トピックを流れているメッセージの平均サイズやスループット、まだAck応答されていないメッセージの件数などといった統計情報をREST APIやPrometheusで取得でき、監視などに役立てる事ができます。しかし、レプリケーテッドサブスクリプションが存在するトピックではマーカーメッセージの分が統計値に上乗せされるため、実態の把握がしにくくなるという欠点があります。
まとめ
レプリケーテッドサブスクリプションは、大規模災害時などにデータセンター単位でのシームレスなフェイルオーバーを実現する、平時には目立たないながらも極めて重要な機能です。また、そのアーキテクチャがなかなか興味深いのも特徴の一つだと思います。
現状ではまだいくつか未解決の課題がありますが、それに関しては今後の改良に期待したいと思います。