はじめに
Apache kafkaは高可用、高信頼の分散メッセージングシステムを実現しており、クラスタがダウンしたり、データをロストすることはほとんど発生することはないと思います。
ただし、災害やネットワーク側の障害等でクラスタが利用できなることは考えられるので、災害対策用のDRサイトを準備することもあります。
kafkaではクラスタ間でレプリーケーションを実現するMirror Makerというレプリケーションツールが標準で同梱されています。
Mirror Makerはkafkaとは別プロセスで起動するアプリケーションで、ミラー元のconsumerとなり指定されたトピックをconsumeし、ミラー先のトピックにpublishします。これはミラー元とは非同期の処理になるため、2つのクラスタ間の全メッセージは完全に一致させることはできません。
ただし、Mirror Makerを複数プロセスで起動させることで、ラグを少なくすることはできるようです。
今回はこのMirror Makerをインストールし、レプリケーションできることを確認していきます。
なお、DRサイトはMirror Makerの一つのユースケースではありますが、他にも遠隔地の別アプリケーションのためにクラスタのコピーを作成するなど、色々なユースケースがあります。
環境
環境・ソフトウェアは以下を使用します。
- CentOS 7.5(firewalld, SELinuxは無効化)
- kafka 2.2.1
kafkaのインストールはここでは記載しませんが、1台構成であれば以下のサイトを見れば簡単にインストールできます。(クラスタ構成も簡単ですが)
Mirror Makerの準備
Mirror Makerは以下のサイトを参考に構築していきます。
とはいえ、kafkaに同梱されているので、実際はプロパティファイルを作成して起動するだけになります。
Mirror Makerを起動する前に、ミラー元、ミラー先のzookeeperとkafkaを起動しておきます。
$ cd /opt/kafka/
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
また、ミラー元、ミラー先にトピック(test)を作成します。
後でbest practiceにもでてきますが、ミラー先にもトピックは最初に作成しておくのが良いみたいです。
$ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test
Mirror Makerの起動
ミラー先のサーバの/opt/kafka/config以下にMirror Maker用に次の2ファイルを作成します。
- mirror-consumer.properties(ミラー元への接続用)
- mirror-producer.properties(ミラー先への接続用)
mirror-consumer.propertiesは以下のように設定しています。
# ミラー元kafkaサーバへの接続情報。複数サーバの場合は","区切りで指定
bootstrap.servers=kafkaserver1:9092
# consumerのグループID
group.id=mirror-consumer-group
exclude.internal.topics=true
client.id=mirror_maker_consumer
mirror-producer.propertiesは以下のように設定しています。
bootstrap.servers=kafkaserver2:9092
acks=1
batch.size=100
client.id=mirror_maker_producer
設定ファイルを作成したら、後は以下のコマンドでMirror Makerを起動するだけです。
Mirror Makerは接続できない状態の動作を考慮し、ミラー先のクラスタで起動しておくのが推奨です。
$ cd /opt/kafka
$ bin/kafka-mirror-maker.sh --consumer.config config/mirror-consumer.properties --producer.config config/mirror-producer.properties --whitelist test
レプリケーション対象のトピックのリストは--whitelistオプションで指定します。このオプションは正規表現が使用できます。今回はtestトピックだけなのでそのまま指定しています。*を指定すれば全てのトピックが対象になります。
確認
確認のためミラー元でkafka-console-producer.shを使用してtestトピックにメッセージをpublishしてみます。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>a
>b
>c
>
ミラー元で接続されているconsumer-groupを確認します。
# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
mirror-consumer-group
console-consumer-87140
上で表示されているmirror-consumer-groupがMirror Makerのconsumer-groupです。
メッセージをpublishした後にmirror-consumer-groupのoffsetを確認した結果が以下のとおりです。
LAGが"0"であれば全てのメッセージの同期がとれているということになります。
# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mirror-consumer-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 8 8 0 mirror-consumer-group-0-3787bef8-7ff8-461e-a95c-c598ed0b05f7 /192.168.10.135 mirror-consumer-group-0
test 1 9 9 0 mirror-consumer-group-0-3787bef8-7ff8-461e-a95c-c598ed0b05f7 /192.168.10.135 mirror-consumer-group-0
test 2 9 9 0 mirror-consumer-group-0-3787bef8-7ff8-461e-a95c-c598ed0b05f7 /192.168.10.135 mirror-consumer-group-0
トピックのベストプラクティス
以下のページにトピック作成のベストプラクティスについて記載がありました。
Create topics in target cluster
If you have consumers that are going to consume data from target cluster and your parallelism requirement for a consumer is same as your source cluster, Its important that you create a same topic in target cluster with same no.of partitions.
Example:
If you have a topic name called “click-logs” with 6 partitions in source cluster , make sure you have same no.of partitions in the target cluster. If you are using a target cluster as more of a backup, not active this might not need to be same.
If users didn’t create a topic in target cluster, producer in mirrormaker will attempt to create a topic and target cluster broker will create a topic with configured num.partitions and num.replicas, this may not be the partitions and replication that the user wants.
要はミラー先にあらかじめトピックを作成しておきましょうということです。
フェイルオーバー後
フェイルオーバーした後は、旧アクティブは新スタンバイとしての役割にします。
旧アクティブから新アクティブに切り替えた時、データの欠損が生じている可能性があり、そのまま旧アクティブを新スタンバイとはしません。
一回すべてのデータをクリアし、新スタンバイとして利用することになると思います。
今回は試してはみませんが。
MirrorMaker以外の選択肢
有償ですが、confluentの"Confluent Replicator"というのがありました。
以下にMirror Makerとの比較がありますが、Mirror Makerより機能が多いようです。
最後に
クラスタ間でレプリケーションを実施した手順の紹介でした。
本番環境で実際に採用するには、以下のようなことの検討も必要でしょう。
- Mirror Makerの複数インスタンス起動
- Mirror Makerのチューニング
- フェイルオーバー後にメッセージの欠損がでないようにアプリケーションがconsumeするoffsetの設定
- メッセージが欠損した場合のリカバリ方法の検討
- メッセージが重複した場合の検討(そもそも最初からべき等性が考慮された設計になっていること)
- etc