0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon MSKを試したい(DR Replication編)

Last updated at Posted at 2025-03-24

0 はじめに

前回までに東京リージョンにおけるMSKを構築した。
ミッションクリティカルな環境ではDRリージョンを用意することが要件に入ることが一般的だ。
大規模災害の発動時やプライマリリージョンのメンテナンス時に、セカンダリリージョンを利用するように切り替える際にほぼリアルタイムでデータ同期をしたいというのが需要だろう。
本記事では下記を前提条件としてMSKのリージョン間同期を試していく。

  • KafkaはMSKを利用する
  • プライマリリージョンは東京リージョンとする
  • セカンダリリージョンは大阪リージョンとする
  • プライマリリージョンとセカンダリリージョンでは同じAWSサービスを利用する
  • 本記事執筆時点(2025年3月現在)でGAされているサービスのみ利用可能とする

1 構成検討

「はじめに」章に記載した条件をもとに構成を検討する。
つまり下記のような便利なサービスは使えない。

  • Amazon MSK レプリケータ

この場合に利用できる手段は下記のオープンソースアプリケーションを利用しての構成が一般的となる。

  • Mirror Maker
  • Mirror Maker 2.0

1.1 Mirror Makerとは

Mirror MakerはApache KafkaのツールのひとつでKafka上のデータをミラーリングするために利用される。
Mirror Maker(便宜上、Mirror Maker 1.0(MM1.0)と呼称する)とその改良版であるMirror Maker 2.0(MM2.0)が存在する。MM1.0とMM2.0の大まかな違いは下記の通りだ。

MM1.0はProducer(いわゆるPub側)とConsumer(いわゆるSub側)を利用してデータの同期を行っていたが、MM2.0ではKafka Connectフレームワークを利用している。そのためM2.0はAct/Act構成への対応やメッセージ以外のACL設定の同期などより柔軟なツールとして改良された。

したがって今回はツールとしてMM2.0を利用していく。

1.2 構成

今回は下記のような前提条件のもと構成を考える。

  • Kafkaは東京リージョンと大阪リージョンにデプロイする
  • AWSサービスにて代替可能な設定は極力AWSサービスを利用する
  • KafkaはACt/Stbを想定する。ただし、通常時は東京リージョンからのみ利用する

2025年3月現在では大阪リージョンのMSK Connectは下記のように利用できない。
東京リージョン
image.png
大阪リージョン
image.png

よってMSK Connectは利用しない方針とするため、EC2上にMirror Maker 2のクラスターを構築して設定する。
また、今回はあくまで検証のため東京リージョンから大阪リージョンに対してのデータ同期のみ行い、Mirror Makerもスポットインスタンス上に構築する。

構成図は下記の通り。
MSK-Replication.drawio.png

2 構築

2.1 MSK構築

前回の記事などを参考に東京リージョンと大阪リージョンに構築する。
ただし下記のように設定変更している。
※東京リージョンと大阪リージョンで設定異なる箇所は大阪リージョンを括弧内に記載。

  • VPC CIDR
    10.0.0.0/16(10.1.0.0/16)
  • Subnet CIDR
    az-a:10.0.128.0/24(10.1.3.0/24)
    az-b:(10.1.4.0/24)
    az-c:10.0.129.0/24
  • AZ冗長化数
    2
  • クラスター名
    QiitaTokyo(QiitaOsaka)
  • セキュリティーグループ
    インバウンド許可元(9092):自身のSG、10.0.0.0/15
    ※大阪と東京でサブネットのCIDRの採番ルールが異なるのはただのミス。。。

2.2 Mirror Maker構築

東京→大阪のレプリケーションは、可能な限り東京リージョンの障害に巻き込まれないようにしたい。
よってMirror Maker用のEC2は大阪リージョンにて構築する。
OSはAmazon Linux 2023などのLinuxを利用する。
また、踏み台やクライアント(およびリバースプロキシ)を兼ねてパブリックサブネットにクライアントサーバも構築している。

2.2.1 EC2構築

下記に留意して2サーバ+1台を構築する。
Mirror Maker用の2サーバは大阪リージョンにデプロイし、クライアントは東京リージョンにデプロイした。
インスタンスタイプとしてMirror Maker 2は今回のテストでは大きな負荷がかからないためt3.smallを選定し、クライアントはTopic一覧の取得に大きなメモリサイズを求められたのでt3a.mediumまたはm7a.mediumっを選定した。(スポットインスタンスの空き次第で変わる)

セキュリティグループにて最低限次を設定する。

  • SSHとMSK間の9092ポートの許可
  • rest用の8083ポートの許可

また次の許可ポリシーを含むIAMプロファイルの設定を行う。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:ap-northeast-1:<アカウントID>:cluster/QiitaTokyo/*",
                "arn:aws:kafka:ap-northeast-3:<アカウントID>:cluster/QiitaOsaka/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:ap-northeast-1:<アカウントID>:topic/QiitaTokyo/*",
                "arn:aws:kafka:ap-northeast-3:<アカウントID>:topic/QiitaOsaka/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:ap-northeast-1:<アカウントID>:group/QiitaTokyo/*",
                "arn:aws:kafka:ap-northeast-3:<アカウントID>:group/QiitaOsaka/*"
            ]
        }
    ]
}

2.2.2 kafka clientのインストール

下記記事の#3 ブローカー接続を同様に行う。
※1 2025年3月時点のMSKの推奨のKafkaバージョンは3.6.0なため、その前提で記載をしている。別バージョンをMSKで利用している場合は下記のアーカイブから、対応するバージョンを持ってくる。
https://archive.apache.org/dist/kafka/
※2 ブローカー数やTopic名、クラスター名など変更しているので、適宜修正しながら実施する。

2.2.3 接続確認

大阪リージョンと東京リージョンにそれぞれTopicを作成したら、producer接続とConsumer接続、およびTopic一覧を確認する。

  • Topic作成(前節にて実施)
# ./kafka-topics.sh --create --bootstrap-server b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098  --command-config client.properties --replication-factor 2 --partitions 1 --topic TestTopic
  • Producer接続
# ./kafka-console-producer.sh --broker-list b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic TestTopic

1行ずつメッセージを入力できる。

[root@ip-10-0-2-221 bin]# ./kafka-console-producer.sh --broker-list b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic TestTopic
>Hi
>TestTopic
  • Consumer接続
# ./kafka-console-consumer.sh --bootstrap-server b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --consumer.config client.properties --topic TestTopic --from-beginning

Producerにて入力したメッセージがほぼリアルタイムで取得される。

[root@ip-10-0-2-221 bin]# ./kafka-console-consumer.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --consumer.config client.properties --topic TestTopic --from-beginning
Hi
TestTopic
  • Topic一覧
# export KAFKA_HEAP_OPTS="-Xmx2g -Xms512m"
# ./kafka-topics.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --list --command-config client.properties

一覧が取得できる。

[root@ip-10-1-3-90 bin]# ./kafka-topics.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098 --list --command-config client.properties
__amazon_msk_canary
__consumer_offsets
heartbeats
TestTopic

2.2.4 Mirror Maker設定

connect-mirror-maker.propertiesファイルを設定する。
今回は下記のような設定とした。
(設定の簡易な説明は日本語コメントにて記載している。)

# Licensed to the Apache Software Foundation (ASF) under A or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

# specify any number of cluster aliases
clusters = tokyo, osaka #今回はリージョン名をクラスター名として使う。クラスター名.トピック名のようにTopicが同期されるので考えて命名する。

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
#MSKのエンドポイントをそれぞれ記載する。
tokyo.bootstrap.servers = b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098
osaka.bootstrap.servers = b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098

#GroupIDを記載する。Mirror Makerのクラスターを判別する。
group.id=mm2-cluster

# enable and configure individual replication flows
#東京から大阪へのレポリケーションを有効化
tokyo->osaka.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
#東京から大阪へのレポリケーション対象のTopicを正規表現で記載。(今回はすべて)
tokyo->osaka.topics = .*

#大阪から東京へは下記のコメントアウトのように記載。
#osaka->tokyo.enabled = true
#osaka->tokyo.topics = .*

# Setting replication factor of newly created remote topics
# ブローカー数を指定。
replication.factor=2

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# TOPIC内部パラメーターのブローカー数を指定。
checkpoints.topic.replication.factor=2
heartbeats.topic.replication.factor=2
offset-syncs.topic.replication.factor=2

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# ストレージ内部パラメーターのブローカー数を指定。
offset.storage.replication.factor=2
status.storage.replication.factor=2
config.storage.replication.factor=2

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
# オフセットのレプリケーションも有効化。
tokyo->osaka.sync.group.offsets.enabled = true

# 同期されるトピックにエイリアス名を付与しない場合の設定。今回はうまく動作しなかった。
#tokyo->osaka.rename.topics=false
#replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplication

# IAMを用いた認証の設定
tokyo.security.protocol=SASL_SSL
tokyo.sasl.mechanism=AWS_MSK_IAM
tokyo.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
tokyo.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
tokyo.session.timeout.ms=10000
tokyo.heartbeat.interval.ms=5000
osaka.security.protocol=SASL_SSL
osaka.sasl.mechanism=AWS_MSK_IAM
osaka.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
osaka.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
osaka.session.timeout.ms=10000
osaka.heartbeat.interval.ms=5000

#Topics Interval
refresh.topics.interval.seconds=60
refresh.topics.interval.seconds=60
sync.group.offsets.interval.seconds=60
emit.checkpoints.interval.seconds=5

#タスク数上限
tasks.max = 8

# REST API の設定(Worker のホスト名または IP アドレス)
rest.host.name=10.1.3.90 #1号機の場合
rest.port=8083

# 専用モードで動作
dedicated.mode.enable.internal.rest=true

設定の詳細は下記を参考にすると良い。
https://docs.redhat.com/ja/documentation/red_hat_streams_for_apache_kafka/2.3/html/configuring_amq_streams_on_openshift/assembly-mirrormaker-str#topic_configuration_synchronization
https://docs.aws.amazon.com/ja_jp/whitepapers/latest/amazon-msk-migration-guide/mirrormaker-2.0-mm2.html

設定ができたらクラスターを組む2台のサーバーにてそれぞれ稼働させる。

# ./connect-mirror-maker.sh /opt/kafka/kafka_2.13-3.6.0/config/connect-mirror-maker.properties

起動ログが流れるが下記のようなWorkerやConnecterの完了ログが流れ始めたら稼働に成功している。

[2025-03-24 07:56:59,585] INFO [Worker clientId=tokyo->osaka, groupId=tokyo-mm2] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2331)
[2025-03-24 07:56:59,876] INFO [Worker clientId=osaka->tokyo, groupId=osaka-mm2] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2331)
[2025-03-24 07:57:04,855] INFO [MirrorSourceConnector|worker] refreshing topics took 73 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2025-03-24 07:57:07,157] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 60 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)

2.2.5 動作確認

2.2.3のコマンド手順にて動作確認する。

Topic001をQiitaTokyoクラスタにて作成し、メッセージを書き込むとQiitaOsakaクラスタに反映されることが確認できる。
下記の例ではQiitaTokyoのTopic001がQiitaOsakaのtokyo.Topic001に反映されるのが確認できる。

Qiita Tokyo
[root@ip-10-0-2-221 bin]# ./kafka-console-producer.sh --broker-list b-2.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.qiitatokyo.<環境固有>.c4.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic Topic001
>Hi
>Tokyo Topic 001
>

Qiita Osaka
[root@ip-10-0-2-221 bin]# ./kafka-console-consumer.sh --bootstrap-server b-1.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.<環境固有>.c4.kafka.ap-northeast-3.amazonaws.com:9098 --consumer.config client.properties --topic tokyo.Topic001 --from-beginning
Hi
Tokyo Topic 001

Topic一覧を表示することでTopic自体ががレプリケートされていることが確認できる。

[root@ip-10-1-3-90 bin]# ./kafka-topics.sh --bootstrap-server b-1.qiitaosaka.jim3wg.c4.kafka.ap-northeast-3.amazonaws.com:9098,b-2.qiitaosaka.jim3wg.c4.kafka.ap-northeast-3.amazonaws.com:9098 --list --command-config client.properties
__amazon_msk_canary
__consumer_offsets
heartbeats
mm2-configs.tokyo.internal
mm2-offsets.tokyo.internal
mm2-status.tokyo.internal
tokyo.TestTopic
tokyo.Topic001
tokyo.checkpoints.internal
tokyo.heartbeats

また、クラスタを組んでいるMirror Makerの一方が終了した場合には、10分ほどの後にもう一方にフェイルオーバーされる。

3 まとめ

MSKは残念なことに大阪リージョンで有効でないサービスが多い。レプリケーションもマネージドサービスを利用して行うことが2025年3月現在ではできない。
自身で構築すると運用コストがかかるので、MSKサービスの大阪リージョンでの拡充を切に願います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?