LoginSignup
4
2

More than 3 years have passed since last update.

MSK(Amazon Managed Streaming for Apache Kafka)へのトピック登録

Last updated at Posted at 2019-12-19

1.はじめに

 Amazon MSK(Amazon Managed Streaming for Apache Kafka)は、Apache Kafkaのマネージドサービスです。Amazon MSKのトピック登録含めた設定変更は、Amazon CLI経由で行うことができます。
 デフォルトでは、Kafka Brokerに対して、Kafka Producerはトピックを生成することができせん。なぜならば、トピックの自動生成をいONにするパラメータであるauto.create.topics.enable,が trueとなっていないためです。
 したがって、本投稿では、まずはAmazon CLI経由でトピックの有効化設定を行い、その後Kafka Producer経由でトピックを登録する手順を解説します。

2.設定方法

2-1.セキュリティグループの設定

 Amazon CLI経由でAmazon MSKを操作するためには、Kafka Clusterを構築するさいに、Amazon CLIで操作する端末(Amazon CLI端末)をアクセス可能なセキュリティグループに所属させる必要があります。
 Kafka Clusterを構築するさいには、Amazon CLI端末が含まれるセキュリティグループをアクセス対象として含めるように注意してください。

2-2.arnの確認

 
 Amazon Management Consolから、Amazon MSKのarnを確認します。この投稿のためだけに作成したので、arnは既に有効値とはなっていないです。

aaaaa.PNG

この場合、arnは以下の値となります。

arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2

2-3.bootstrap-brokers(Zookeeper)の接続先を確認

describe-clusterコマンドで、bootstrap-brokers(Zookeeper)の接続先を確認する。この場合、"z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181"となります。

aws kafka describe-cluster --region ap-northeast-1 --cluster-arn arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2
{
    "ClusterInfo": {
        "EncryptionInfo": {
            "EncryptionInTransit": {
                "ClientBroker": "TLS", 
                "InCluster": true
            }, 
            "EncryptionAtRest": {
                "DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:196585472650:key/d94c7909-3392-482a-908d-d25f9a31abb9"
            }
        }, 
        "BrokerNodeGroupInfo": {
            "BrokerAZDistribution": "DEFAULT", 
            "ClientSubnets": [
                "subnet-05e68568d43a7c6d3", 
                "subnet-091efbcc876bdefb1", 
                "subnet-05a73c6877da30604"
            ], 
            "StorageInfo": {
                "EbsStorageInfo": {
                    "VolumeSize": 1
                }
            }, 
            "SecurityGroups": [
                "sg-0204c59a2c6992390", 
                "sg-0fc8fe66209887218", 
                "sg-0c4b89523751d689c"
            ], 
            "InstanceType": "kafka.m5.large"
        }, 
        "ClusterName": "Amazon-MSK-Test", 
        "CurrentBrokerSoftwareInfo": {
            "KafkaVersion": "2.3.1", 
            "ConfigurationRevision": 1, 
            "ConfigurationArn": "arn:aws:kafka:ap-northeast-1:196585472650:configuration/SalesClusterConfiguration2/9c66b8dc-836e-4348-a2a3-b2bd19717b0b-2"
        }, 
        "Tags": {
            "Name": "Kafka-Cluster"
        }, 
        "CreationTime": "2019-12-19T02:09:06.63Z", 
        "NumberOfBrokerNodes": 3, 
        "ZookeeperConnectString": "z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181", 
        "State": "ACTIVE", 
        "CurrentVersion": "K2EUQ1WTGCTBG2", 
        "ClusterArn": "arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2", 
        "EnhancedMonitoring": "DEFAULT", 
        "OpenMonitoring": {
            "Prometheus": {
                "NodeExporter": {
                    "EnabledInBroker": false
                }, 
                "JmxExporter": {
                    "EnabledInBroker": false
                }
            }
        }
    }
}

2-4.設定ファイルの作成

 Amazon CLI端末の任意のディレクトリに、設定変更用のファイルを作成します。設定ファイルのフォーマットは、Amazon MSKが指定した形式にする必要があります。ここを参照してください。

/home/ec2-user/test/configuration.txt
auto.create.topics.enable = true
message.max.bytes = 73400320

2-5.Amazon MSK公式のpythonツールの準備

設定ファイルと同一ディレクトリ階層に、以下のpythonファイルを作成します。今回の場合は、configuration.txtと同一ディレクトリであるhome/ec2-user/に配置します。
pythonファイルの雛形は、ここを参照してください。

/home/ec2-user/unction.py
import boto3

client = boto3.client('kafka')

config_file = open('/home/ec2-user/test/configuration.txt', 'r')

server_properties = config_file.read()

response = client.create_configuration(
            #この値はAWS アカウントでユニークにする必要があります。
            Name='SalesClusterConfiguration3',
                Description='The configuration to use on all sales clusters.',
                    KafkaVersions=['1.1.1', '2.1.0','2.3.1'],
                        ServerProperties=server_properties
                        )
print(response)

 boto3ライブラリがない場合は、pipコマンドでインストールしてください。

pip install boto3

2-6.設定反映

pythonコマンドを実行することで設定を反映することができます。以下の通り、'HTTPStatusCode': 200が返信された場合、設定変更が可能です。

python function.py 
{u'LatestRevision': {u'CreationTime': datetime.datetime(2019, 12, 19, 2, 42, 53, 2000, tzinfo=tzlocal()), u'Description': u'The configuration to use on all sales clusters.', u'Revision': 1}, u'Arn': u'arn:aws:kafka:ap-northeast-1:196585472650:configuration/SalesClusterConfiguration3/30ffe529-2733-486d-bfed-1f1641db9edf-2', u'CreationTime': datetime.datetime(2019, 12, 19, 2, 42, 53, 2000, tzinfo=tzlocal()), u'Name': u'SalesClusterConfiguration3', 'ResponseMetadata': {'RetryAttempts': 0, 'HTTPStatusCode': 200, 'RequestId': '939d9052-a6f7-4fa4-b66f-91fe737a8057', 'HTTPHeaders': {'x-amzn-requestid': '939d9052-a6f7-4fa4-b66f-91fe737a8057', 'x-amz-cf-pop': 'NRT12-C2', 'content-length': '347', 'via': '1.1 4cb3df5349fbb69c930b315b7d0a5272.cloudfront.net (CloudFront)', 'x-cache': 'Miss from cloudfront', 'x-amz-apigw-id': 'E7iC8G5xNjMFeFw=', 'x-amzn-trace-id': 'Root=1-5dfae3ac-0cd2e93c1fe9319eb9166bc2;Sampled=0', 'connection': 'keep-alive', 'x-amz-cf-id': 'zSz8M9gaSNl4bIuNUdbDfY_Sm4cEWCoZF3RfuAX22aQBZxHJOUI4ug==', 'date': 'Thu, 19 Dec 2019 02:42:53 GMT', 'content-type': 'application/json'}}}

2-7.トピック登録

Kafka Producerからトピックを登録します。Created topic "XXXXXXX".と結果が返ってきたらトピック生成完了です。

kafka-topics --zookeeper "z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181" --create --topic first-test --partitions 3 --replication-factor 3
Created topic "first-test".

以上

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2