共有サブスクリプション機能は MQTT 5.0 で導入され、実運用で広く使用されています。これはMQTT 5.0の新機能ですが、MQTT 3.1.1クライアントでも利用可能です。この記事では、共有サブスクリプションに焦点を当て、その仕組みとユースケースについて詳しく解説します。
MQTT 5.0を初めてご利用ですか?ぜひご覧ください
共有サブスクリプションとは
通常のサブスクリプションでは、メッセージを発行するたびに、すべての一致するサブスクライバーがコピーを受け取ります。サブスクライバーの消費速度がメッセージの生成速度に追いつかない場合、一部のメッセージを他のサブスクライバーに分散して負荷を共有することができません。単一のサブスクライバークライアントのパフォーマンス問題が、容易にシステム全体のメッセージングに影響を及ぼす可能性があります。
MQTT 5.0では共有サブスクリプションが導入され、特定のサブスクリプションを使用するクライアント間でメッセージの負荷を均等に分配できるようになりました。これは、2つのクライアントが同じサブスクリプションを共有している場合、サブスクリプションに一致する各メッセージが1つのクライアントにのみ配信されることを意味します。
共有サブスクリプションは、消費者に優れた水平スケーラビリティを提供し、高いスループットの処理を可能にするだけでなく、高可用性も実現します。たとえ1つのクライアントが切断されたり故障したりしても、同じサブスクリプションを共有する他のクライアントがメッセージの処理を継続できます。必要に応じて、元々そのクライアントに割り当てられていたメッセージフローを引き継ぐことも可能です。
共有サブスクリプションの仕組み
共有サブスクリプションを使用する場合、クライアントの基盤となるコードに変更を加える必要はありません。サブスクリプション時に命名規則に従ったトピックを使用するだけです:
$share/{Share Name}/{Topic Filter}
$share
は予約されたプレフィックスであり、サーバーはこれが共有サブスクリプショントピックであることを認識します。 {Topic Filter}
は実際にサブスクライブしたいトピックです。
中央の {Share Name}
はクライアントによって指定された文字列で、現在の共有サブスクリプションで使用される共有名を表します。通常、 {Share Name}
フィールドはグループ名やグループIDとも呼ばれ、理解しやすくなっています。
同じサブスクリプションを共有したいサブスクリプションセッションのグループは、同じ共有名を使用しなければなりません。したがって、 $share/consumer1/sport/#
と $share/consumer2/sport/#
は異なる共有サブスクリプショングループに属します。
メッセージが複数の共有サブスクリプショングループで使用されているフィルターに同時に一致する場合、サーバーは一致する各共有サブスクリプショングループからセッションを選択し、メッセージのコピーを送信します。これは、1つのトピックのメッセージに複数の異なるタイプのコンシューマが存在する場合に非常に有用です。
たとえ2つのサブスクリプションが同じ共有名 {Share Name}
を持っていても、それが同じ共有サブスクリプションであることを意味しません。 {Share Name}/{Topic Filter}
の組み合わせのみが共有サブスクリプショングループを一意に識別できます。以下のサブスクリプショントピックはすべて異なる共有サブスクリプショングループに属します:
$share/consumer1/sport/tennis/+
$share/consumer2/sport/tennis/+
$share/consumer1/sport/#
$share/comsumer1/finance/#
共有サブスクリプションと非共有サブスクリプションは互いに影響を与えません。メッセージが共有サブスクリプションと非共有サブスクリプションの両方に同時に一致する場合、サーバーは一致した非共有サブスクリプションの各クライアントにメッセージのコピーを送信し、さらに一致した共有サブスクリプショングループごとに1つのセッションにコピーを送信します。これらのサブスクリプションが同じクライアントからのものである場合、このクライアントはメッセージの複数のコピーを受け取る可能性があります。
共有サブスクリプションのロードバランシング戦略
共有サブスクリプションの核心は、サーバーがクライアント間でメッセージの負荷をどのように分配するかにあります。一般的なロードバランシング戦略には以下のものがあります:
- ランダム: 共有サブスクリプショングループ内のセッションをランダムに選択してメッセージを送信します。
- ラウンドロビン: 共有サブスクリプショングループ内のセッションに順番にメッセージを送信します。
- ハッシュ: フィールドのハッシュ結果に基づいてセッションを選択します。
- スティッキー: 共有サブスクリプショングループ内のセッションをランダムに選択してメッセージを送信します。この選択をセッションが終了するまで維持し、その後このプロセスを繰り返します。
- ローカルファースト: ランダムに選択しますが、メッセージ発行者と同じノード上のセッションを優先します。そのようなセッションが存在しない場合、リモートノード上で通常のランダム戦略に降格します。
ランダムおよびラウンドロビン戦略によって達成されるバランス効果は比較的似ているため、適用シナリオに大きな違いはありません。ただし、ランダム戦略の実際のバランス効果は、サーバーで使用されるランダムアルゴリズムの影響を受けることがよくあります。
実際のアプリケーションでは、メッセージが関連することがあります。例えば、同じ画像に属する複数のフラグメントは、複数のサブスクライバーに配信するのに明らかに適していません。この場合、クライアントIDまたはトピックのハッシュ戦略に基づいてセッションを選択する必要があります。これにより、同じ発行者またはトピックからのメッセージが共有サブスクリプショングループ内の同じセッションで常に処理されることが保証されます。もちろん、スティッキー戦略も同じ効果を持ちます。
ローカルファースト戦略は、ランダム戦略よりもクラスター環境での使用に適しています。ローカルサブスクライバーの選択を優先することで、メッセージの遅延を効果的に削減できます。ただし、この戦略を使用する前提は、発行者とサブスクライバーが各ノードに比較的均等に分散され、異なるサブスクライバー間でのメッセージ負荷の過度な差異を避けることです。
MQTT 3.1.1クライアントはどのように共有サブスクリプションを使用しますか?
MQTT 5.0のリリースよりもはるか前に、EMQXは多くのユーザーに採用されている共有サブスクリプションスキームを設計しました。MQTT 5.0の公式スキームと同様に、EMQXは以下の形式のトピックをMQTT 3.1.1における共有サブスクリプショントピックとして認識します:
$queue/{Topic File}
プレフィックス $queue
はこれが共有サブスクリプションであることを示し、{Topic Filter}
は実際にサブスクライブしたいトピックです。これはMQTT 5.0の $share/queue/{Topic Filter}
と同等であり、共有名が queue
に固定されていることを意味します。したがって、このスキームでは同じトピックフィルターを持つ複数の共有サブスクリプショングループをサポートしていません。
MQTT 5.0の共有サブスクリプションの命名規則である $share/{Share Name}/{Topic Filter}
は、MQTT 3.1.1でも有効なトピックであり、共有サブスクリプションのロジックは完全に MQTTブローカー に実装されているため、クライアントはサブスクリプションのトピック内容を変更するだけで済みます。したがって、まだMQTT 3.1.1を使用しているデバイスでも、MQTT 5.0が提供する共有サブスクリプションを直接利用できるようになりました。
共有サブスクリプションのユースケース
共有サブスクリプションの代表的なユースケースをいくつか紹介します:
- バックエンドの消費能力がメッセージ生成能力に一致しない場合、共有サブスクリプションを使用して複数のクライアントで負荷を分担できます。
- システムが高可用性を確保する必要がある場合、特に大量のメッセージが流入する重要なビジネスにおいて、共有サブスクリプションを使用して単一障害点を回避できます。
- 将来的にメッセージの流入が急速に増加し、コンシューマ側が水平スケーリングできる必要がある場合、共有サブスクリプションを使用して高いスケーラビリティを提供できます。
共有サブスクリプション使用の提案
共有サブスクリプショングループ内で同じQoSを使用する
MQTTでは、共有サブスクリプショングループ内のセッションが異なるQoSレベルを使用することが可能ですが、これにより同じグループ内の異なるセッションにメッセージを配信する際に品質保証が異なる結果となる可能性があります。これにより、問題が発生した際のデバッグが困難になることがあります。したがって、共有サブスクリプショングループ内では同じQoSを使用することが最善です。
セッション有効期限間隔を適切に設定する
共有サブスクリプションを永続セッションと一緒に使用することは一般的です。しかし、共有サブスクリプショングループ内のクライアントがオフラインになっても、そのセッションとサブスクリプションがまだ存在する限り、MQTTサーバーはこのセッションにメッセージの分配を継続することに注意する必要があります。
クライアントが障害やその他の理由で長時間オフラインになる可能性があることを考慮すると、セッション有効期限間隔が長すぎると、オフラインのクライアントにメッセージが配信され続けるため、多くのメッセージが処理されなくなります。
より良い選択肢は、セッションが有効期限切れでない場合でも、サブスクライバーがオフラインになった際にメッセージの負荷分配時にそのサブスクライバーを考慮しないようにすることです。この動作は通常のサブスクリプションとは異なりますが、MQTTプロトコルによって許可されています。
デモ
共有サブスクリプションの効果をよりよく示すために、MQTTコマンドラインクライアントツールである MQTTX CLI を使用してデモを行います。
3つのターミナルウィンドウを開き、以下のコマンドを使用して、無料のパブリックMQTTブローカー に接続し、それぞれ $share/consumer1/sport/+
、$share/consumer1/sport/+
、および $share/consumer2/sport/+
のトピックにサブスクライブする3つのクライアントを作成します:
mqttx sub -h 'broker.emqx.io' --topic '$share/consumer1/sport/+'
次に、新しいターミナルウィンドウを開き、以下のコマンドを使用して sport/tennis
トピックに6つのメッセージを発行します。ここでは --multiline
オプションを使用して、Enterキーを押すたびに複数のメッセージを送信します:
mqttx pub -h 'broker.emqx.io' --topic sport/tennis -s --stdin --multiline
EMQXにおける共有サブスクリプションのデフォルトのロードバランシング戦略は ラウンドロビン です。そのため、consumer1
グループ内の2つのサブスクライバーが交互に発行したメッセージを受信するのが見られます。一方、共有サブスクリプショングループ consumer2
にはサブスクライバーが1つしかいないため、すべてのメッセージを受信します:
これは単純な例に過ぎません。共有サブスクリプショングループにいつでも参加したり離脱したりして、EMQXが最新のサブスクリプションに応じてタイムリーに負荷を分配するかどうかを観察したり、EMQXを自分でインストールして異なるロードバランシング戦略の動作を観察したりすることもできます。
コードで共有サブスクリプション機能の使用方法を知りたい場合は、emqx/MQTT-Features-Example プロジェクトにPythonのサンプルコードを提供しています。このプロジェクトは、すべてのMQTT機能のサンプルコードを提供し、誰もがこれらの機能の使用方法を迅速に理解できるようにすることを目的としています。さらに、このプロジェクトへのさらなるサンプルコードの貢献も歓迎しています。