0 はじめに
前回までに東京リージョンにおけるMSKを構築した。
ミッションクリティカルな環境ではDRリージョンを用意することが要件に入ることが一般的だ。
大規模災害の発動時やプライマリリージョンのメンテナンス時に、セカンダリリージョンを利用するように切り替える際にほぼリアルタイムでデータ同期をしたいというのが需要だろう。
本記事では下記を前提条件としてMSKのリージョン間同期を試していく。
- KafkaはMSKを利用する
- プライマリリージョンは東京リージョンとする
- セカンダリリージョンは大阪リージョンとする
- プライマリリージョンとセカンダリリージョンでは同じAWSサービスを利用する
- 本記事執筆時点(2025年3月現在)でGAされているサービスのみ利用可能とする
1 構成検討
「はじめに」章に記載した条件をもとに構成を検討する。
つまり下記のような便利なサービスは使えない。
- Amazon MSK レプリケータ
この場合に利用できる手段は下記のオープンソースアプリケーションを利用しての構成が一般的となる。
- Mirror Maker
- Mirror Maker 2.0
1.1 Mirror Makerとは
Mirror MakerはApache KafkaのツールのひとつでKafka上のデータをミラーリングするために利用される。
Mirror Maker(便宜上、Mirror Maker 1.0(MM1.0)と呼称する)とその改良版であるMirror Maker 2.0(MM2.0)が存在する。MM1.0とMM2.0の大まかな違いは下記の通りだ。
MM1.0はProducer(いわゆるPub側)とConsumer(いわゆるSub側)を利用してデータの同期を行っていたが、MM2.0ではKafka Connectフレームワークを利用している。そのためM2.0はAct/Act構成への対応やメッセージ以外のACL設定の同期などより柔軟なツールとして改良された。
したがって今回はツールとしてMM2.0を利用していく。
1.2 構成
今回は下記のような前提条件のもと構成を考える。
- Kafkaは東京リージョンと大阪リージョンにデプロイする
- AWSサービスにて代替可能な設定は極力AWSサービスを利用する
- KafkaはACt/Stbを想定する。ただし、通常時は東京リージョンからのみ利用する
2025年3月現在では大阪リージョンのMSK Connectは下記のように利用できない。
東京リージョン
大阪リージョン
よってMSK Connectは利用しない方針とするため、EC2上にMirror Maker 2のクラスターを構築して設定する。
また、今回はあくまで検証のため東京リージョンから大阪リージョンに対してのデータ同期のみ行い、Mirror Makerもスポットインスタンス上に構築する。
2 構築
2.1 MSK構築
前回の記事などを参考に東京リージョンと大阪リージョンに構築する。
ただし下記のように設定変更している。
※東京リージョンと大阪リージョンで設定異なる箇所は大阪リージョンを括弧内に記載。
- VPC CIDR
10.0.0.0/16(10.1.0.0/16) - Subnet CIDR
az-a:10.0.128.0/24(10.1.3.0/24)
az-b:(10.1.4.0/24)
az-c:10.0.129.0/24 - AZ冗長化数
2 - クラスター名
QiitaTokyo(QiitaOsaka) - セキュリティーグループ
インバウンド許可元(9092):自身のSG、10.0.0.0/15
※大阪と東京でサブネットのCIDRの採番ルールが異なるのはただのミス。。。
2.2 Mirror Maker構築
東京→大阪のレプリケーションは、可能な限り東京リージョンの障害に巻き込まれないようにしたい。
よってMirror Maker用のEC2は大阪リージョンにて構築する。
OSはAmazon Linux 2023などのLinuxを利用する。
また、踏み台やクライアント(およびリバースプロキシ)を兼ねてパブリックサブネットにクライアントサーバも構築している。
2.2.1 EC2構築
下記に留意して2サーバ+1台を構築する。
Mirror Maker用の2サーバは大阪リージョンにデプロイし、クライアントは東京リージョンにデプロイした。
インスタンスタイプとしてMirror Maker 2は今回のテストでは大きな負荷がかからないためt3.smallを選定し、クライアントはTopic一覧の取得に大きなメモリサイズを求められたのでt3a.mediumまたはm7a.mediumっを選定した。(スポットインスタンスの空き次第で変わる)
セキュリティグループにて最低限次を設定する。
- SSHとMSK間の9092ポートの許可
- rest用の8083ポートの許可
また次の許可ポリシーを含むIAMプロファイルの設定を行う。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:<アカウントID>:cluster/QiitaTokyo/*",
"arn:aws:kafka:ap-northeast-3:<アカウントID>:cluster/QiitaOsaka/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:<アカウントID>:topic/QiitaTokyo/*",
"arn:aws:kafka:ap-northeast-3:<アカウントID>:topic/QiitaOsaka/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:<アカウントID>:group/QiitaTokyo/*",
"arn:aws:kafka:ap-northeast-3:<アカウントID>:group/QiitaOsaka/*"
]
}
]
}
2.2.2 kafka clientのインストール
下記記事の#3 ブローカー接続を同様に行う。
※1 2025年3月時点のMSKの推奨のKafkaバージョンは3.6.0なため、その前提で記載をしている。別バージョンをMSKで利用している場合は下記のアーカイブから、対応するバージョンを持ってくる。
https://archive.apache.org/dist/kafka/
※2 ブローカー数やTopic名、クラスター名など変更しているので、適宜修正しながら実施する。
2.2.3 接続確認
大阪リージョンと東京リージョンにそれぞれTopicを作成したら、producer接続とConsumer接続、およびTopic一覧を確認する。
- Topic作成(前節にて実施)
# ./kafka-topics.sh --create --bootstrap-server b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --command-config client.properties --replication-factor 2 --partitions 1 --topic TestTopic
- Producer接続
# ./kafka-console-producer.sh --broker-list b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic TestTopic
1行ずつメッセージを入力できる。
[root@ip-10-0-2-221 bin]# ./kafka-console-producer.sh --broker-list b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic TestTopic
>Hi
>TestTopic
- Consumer接続
# ./kafka-console-consumer.sh --bootstrap-server b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --consumer.config client.properties --topic TestTopic --from-beginning
Producerにて入力したメッセージがほぼリアルタイムで取得される。
[root@ip-10-0-2-221 bin]# ./kafka-console-consumer.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --consumer.config client.properties --topic TestTopic --from-beginning
Hi
TestTopic
- Topic一覧
# export KAFKA_HEAP_OPTS="-Xmx2g -Xms512m"
# ./kafka-topics.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --list --command-config client.properties
一覧が取得できる。
[root@ip-10-1-3-90 bin]# ./kafka-topics.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098 --list --command-config client.properties
__amazon_msk_canary
__consumer_offsets
heartbeats
TestTopic
2.2.4 Mirror Maker設定
connect-mirror-maker.propertiesファイルを設定する。
今回は下記のような設定とした。
(設定の簡易な説明は日本語コメントにて記載している。)
# Licensed to the Apache Software Foundation (ASF) under A or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = tokyo, osaka #今回はリージョン名をクラスター名として使う。クラスター名.トピック名のようにTopicが同期されるので考えて命名する。
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
#MSKのエンドポイントをそれぞれ記載する。
tokyo.bootstrap.servers = b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098
osaka.bootstrap.servers = b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098
#GroupIDを記載する。Mirror Makerのクラスターを判別する。
group.id=mm2-cluster
# enable and configure individual replication flows
#東京から大阪へのレポリケーションを有効化
tokyo->osaka.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
#東京から大阪へのレポリケーション対象のTopicを正規表現で記載。(今回はすべて)
tokyo->osaka.topics = .*
#大阪から東京へは下記のコメントアウトのように記載。
#osaka->tokyo.enabled = true
#osaka->tokyo.topics = .*
# Setting replication factor of newly created remote topics
# ブローカー数を指定。
replication.factor=2
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# TOPIC内部パラメーターのブローカー数を指定。
checkpoints.topic.replication.factor=2
heartbeats.topic.replication.factor=2
offset-syncs.topic.replication.factor=2
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# ストレージ内部パラメーターのブローカー数を指定。
offset.storage.replication.factor=2
status.storage.replication.factor=2
config.storage.replication.factor=2
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
# オフセットのレプリケーションも有効化。
tokyo->osaka.sync.group.offsets.enabled = true
# 同期されるトピックにエイリアス名を付与しない場合の設定。今回はうまく動作しなかった。
#tokyo->osaka.rename.topics=false
#replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplication
# IAMを用いた認証の設定
tokyo.security.protocol=SASL_SSL
tokyo.sasl.mechanism=AWS_MSK_IAM
tokyo.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
tokyo.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
tokyo.session.timeout.ms=10000
tokyo.heartbeat.interval.ms=5000
osaka.security.protocol=SASL_SSL
osaka.sasl.mechanism=AWS_MSK_IAM
osaka.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
osaka.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
osaka.session.timeout.ms=10000
osaka.heartbeat.interval.ms=5000
#Topics Interval
refresh.topics.interval.seconds=60
refresh.topics.interval.seconds=60
sync.group.offsets.interval.seconds=60
emit.checkpoints.interval.seconds=5
#タスク数上限
tasks.max = 8
# REST API の設定(Worker のホスト名または IP アドレス)
rest.host.name=10.1.3.90 #1号機の場合
rest.port=8083
# 専用モードで動作
dedicated.mode.enable.internal.rest=true
設定の詳細は下記を参考にすると良い。
https://docs.redhat.com/ja/documentation/red_hat_streams_for_apache_kafka/2.3/html/configuring_amq_streams_on_openshift/assembly-mirrormaker-str#topic_configuration_synchronization
https://docs.aws.amazon.com/ja_jp/whitepapers/latest/amazon-msk-migration-guide/mirrormaker-2.0-mm2.html
設定ができたらクラスターを組む2台のサーバーにてそれぞれ稼働させる。
# ./connect-mirror-maker.sh /opt/kafka/kafka_2.13-3.6.0/config/connect-mirror-maker.properties
起動ログが流れるが下記のようなWorkerやConnecterの完了ログが流れ始めたら稼働に成功している。
[2025-03-24 07:56:59,585] INFO [Worker clientId=tokyo->osaka, groupId=tokyo-mm2] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2331)
[2025-03-24 07:56:59,876] INFO [Worker clientId=osaka->tokyo, groupId=osaka-mm2] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2331)
[2025-03-24 07:57:04,855] INFO [MirrorSourceConnector|worker] refreshing topics took 73 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2025-03-24 07:57:07,157] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 60 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
2.2.5 動作確認
2.2.3のコマンド手順にて動作確認する。
Topic001をQiitaTokyoクラスタにて作成し、メッセージを書き込むとQiitaOsakaクラスタに反映されることが確認できる。
下記の例ではQiitaTokyoのTopic001がQiitaOsakaのtokyo.Topic001に反映されるのが確認できる。
[root@ip-10-0-2-221 bin]# ./kafka-console-producer.sh --broker-list b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic Topic001
>Hi
>Tokyo Topic 001
>
[root@ip-10-0-2-221 bin]# ./kafka-console-consumer.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098 --consumer.config client.properties --topic tokyo.Topic001 --from-beginning
Hi
Tokyo Topic 001
Topic一覧を表示することでTopic自体ががレプリケートされていることが確認できる。
[root@ip-10-1-3-90 bin]# ./kafka-topics.sh --bootstrap-server b-1.qiitaosaka.jim3wg.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.jim3wg.c4.kafka.ap-northeast-3.amazonaws.com:9098 --list --command-config client.properties
__amazon_msk_canary
__consumer_offsets
heartbeats
mm2-configs.tokyo.internal
mm2-offsets.tokyo.internal
mm2-status.tokyo.internal
tokyo.TestTopic
tokyo.Topic001
tokyo.checkpoints.internal
tokyo.heartbeats
また、クラスタを組んでいるMirror Makerの一方が終了した場合には、10分ほどの後にもう一方にフェイルオーバーされる。
3 まとめ
MSKは残念なことに大阪リージョンで有効でないサービスが多い。レプリケーションもマネージドサービスを利用して行うことが2025年3月現在ではできない。
自身で構築すると運用コストがかかるので、MSKサービスの大阪リージョンでの拡充を切に願います。