Amazon MSK(Managed Streaming for Apache Kafka)とは
Amazon MSKとはAWSが提供するApache Kafkaのマネージドサービスです。
Apache KafkaのClusterをマネージドサービスとして提供しています。
brokerの数やKafkaのバージョン等を指定するだけで構築でき、マネージドサービスなので実行環境などを気にする必要がないというメリットもあります。
VPC内のサブネットを選択することで作製ができ、2つ以上のサブネットがないと構築できない為、デフォルトでマルチAZとなります。
以下公式説明です。
Amazon MSK は、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを簡単に構築および実行できようにする完全マネージド型のサービスです。 Apache Kafka は、リアルタイムのストリーミングデータパイプラインおよびアプリケーションを構築するためのオープンソースプラットフォームです。Amazon MSK では、ネイティブ Apache Kafka API を使用し、データレイクへの入力、データベースとの間での変更のストリーミング、機械学習および分析アプリケーションの強化を行うことができます。
なお、Apache Kafkaについては以下をご参照ください。
LambdaとMSKの連携について
Lambda実行のトリガーとしてAmazon MSK(以下MSK)が選択できます。
EC2をクライアントとしてMSKにメッセージを送り、SASL/SCRAM認証でLambdaを実行するまで試してみます。
前提:
- クライアントEC2:javaインストール済み、kafkaダウンロード済み(MSKで指定したバージョンをダウンロードしてください。)
- MSK:アクセスコントロール方法→SASL/SCRAM認証,Version2.6.2,SGはEC2からの通信を許可
- Lambda:NatGateway経由でアクセス1
構成自体は以下を参考にしています。
SecretsManagerでユーザ名/パスワードの作成・Clusterへの接続
今回は以下を参考に、SASL/SCRAM 認証を用いてクライアント認証を行います。そのため、まずSecretsManagerでシークレットを作成します。
SecretsManagerでクライアント認証用のユーザ名/パスワードを作成します。
- 「その他シークレット」を選択し、「プレーンテキスト」から以下のようにユーザ名とパスワードを入力
- シークレット名は「AmazonMSK_MyClusterSecret」のようにプレフィックスが必要
- デフォルトのKMSキーは使用できないため、新規にCMKを作成するか既存のCMKを使用するしかないようです。また、エイリアスがあるとエラーが出るため、削除しています。
{
"username": "alice",
"password": "alice-secret"
}
シークレット作成後、EC2にSSHし認証します。
まず、シークレットをクラスターに関連付けます。
$ aws kafka batch-associate-scram-secret \
--cluster-arn <Clusterのarn> \
--secret-arn-list <上記シークレットのarn>
次に、以下で新規トピックを作成します。
--replication-factor
などは適宜自環境に合うように変更してください。
$ aws kafka describe-cluster \
--region ap-northeast-1 \
--cluster-arn <Clusterのarn>
> ...
"ZookeeperConnectString": "<ZookeeperConnectString>"
...
$ cd kafka_2.12-2.6.2/bin
$ ./kafka-topics.sh \
--create \
--zookeeper "<ZookeeperConnectString>"\
--replication-factor 2 \
--partitions 1 \
--topic TestTopic
> Created topic TestTopic.
構成ファイル(users_jaas.conf)を作成・exportします。
usernameとpasswordはSecretsManagerで設定した値に置換してください。
$ vim users_jaas.conf
$ export KAFKA_OPTS=-Djava.security.auth.login.config=users_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};
JDKキーストアファイルをJVMから/tmpにコピーします。
<JDKFolder>はご自身の環境の値を入れてください。
$ ls /usr/lib/jvm/
> java-1.8.0-openjdk-...
...
$ cp /usr/lib/jvm/<JDKFolder>/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
プロパティファイルを作成
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
最後にBootstrapBrokerStringを取得して、MSKに適当なメッセージを送信します。
$ aws kafka get-bootstrap-brokers \
--region ap-northeast-1 \
--cluster-arn <Clusterのarn>
>
{
"BootstrapBrokerStringSaslScram": "<BootstrapBrokerStringSaslScram>"
}
$ ./kafka-console-producer.sh \
--broker-list <BootstrapBrokerStringSaslScram> \
--topic TestTopic \
--producer.config client_sasl.properties
> ここにメッセージを入力できます。
LambdaのトリガーにMSKを設定
次にLambdaのトリガーにMSKを設定します。
AWS CLIから設定します。
$ aws lambda create-event-source-mapping \
--event-source-arn <Clusterのarn> \
--topics TestTopic \
--source-access-configurations Type=SASL_SCRAM_512_AUTH,URI=<シークレットのURI> \
--starting-position LATEST \
--function-name <LambdaのFunction名>
設定完了後プロデューサーからメッセージを送ると...
$ ./kafka-console-producer.sh \
--broker-list <BootstrapBrokerStringSaslScram> \
--topic TestTopic \
--producer.config client_sasl.properties
> SampleMessage
>
Lambdaが実行されます。
2021-09-09T17:38:30.028+09:00 START RequestId: XXX Version: $LATEST
2021-09-09T17:38:30.031+09:00 2021-09-09T08:38:30.031Z XXX INFO MSKから来たMessage: SampleMessage
2021-09-09T17:38:30.032+09:00 END RequestId: XXX
2021-09-09T17:38:30.032+09:00 REPORT RequestId: XXX Duration: 1.20 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 65 MB
Lambdaが実行されない場合は、Lambdaにアタッチされているセキュリティグループ等構成を再度確認してみてください。
(私はセキュリティグループの設定が間違っていおり、Lambdaのトリガー欄に「PROBLEM: Connection error. Please check your event source connection configuration.」というメッセージが出ていました。)