はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」12日目です。昨日は、VPCとセキュリティグループを設計し、MSKクラスターへのアクセスを許可する設定を行いました。
今日は、いよいよPythonアプリケーションを実行する環境としてEC2インスタンスを準備し、MSKクラスターに接続してみましょう。これにより、ローカルで動かしていたプロデューサーやコンシューマーを、AWS上で実行できるようになります。
1. EC2インスタンスの起動
AWSマネジメントコンソールにログインし、「EC2」サービスを開いて「インスタンスを起動」をクリックします。
ステップ1:名前とアプリケーションテンプレート(AMI)
-
名前:
Kafka-Python-Clientなど、わかりやすい名前を付けます。 - アプリケーションおよびOSイメージ(Amazon マシンイメージ): 「Amazon Linux 2023 AMI」を選択します。これは、AWSが提供する安定したLinuxOSです。
ステップ2:インスタンスタイプとキーペア
-
インスタンスタイプ:
t3.microを選択します。これは無料利用枠内で使える、開発・テストに十分なスペックのインスタンスです。 -
キーペア(ログイン): SSHでインスタンスに接続するためのキーペアを作成します。
- 「新しいキーペアを作成」をクリックし、
kafka-challenge-keyなどの名前を付けます。 - 「作成」をクリックすると、
.pemファイルがダウンロードされます。このファイルは大切に保管してください。
- 「新しいキーペアを作成」をクリックし、
ステップ3:ネットワーク設定
- VPC: MSKクラスターと同じVPC(通常はデフォルトVPC)を選択します。
- サブネット: 任意のサブネットを選択します。
-
ファイアウォール(セキュリティグループ): 「既存のセキュリティグループを選択」を選び、
KafkaClientEC2-SGのような名前の新しいセキュリティグループを作成します。- このセキュリティグループには、SSH接続(ポート22) と、MSKへの接続を許可する カスタムTCPルール(ポート9092) を設定する必要があります。SSHは「マイIP」に設定し、MSKへの接続は昨日の手順で設定したセキュリティグループIDを指定します。
ステップ4:高度な詳細
-
IAMインスタンスプロフィール: 9日目で作成したIAMロール(
KafkaClientEC2Role)を選択します。これにより、EC2インスタンスがMSKやS3に安全にアクセスできます。
すべての設定を確認し、「インスタンスを起動」をクリックします。起動には数分かかります。
2. EC2インスタンスへのSSH接続
インスタンスが起動したら、SSHで接続します。
- EC2コンソールのインスタンス一覧から、作成したインスタンスを選択します。
- 「接続」をクリックし、「SSHクライアント」タブに表示されている接続コマンドをコピーします。
ssh -i "kafka-challenge-key.pem" ec2-user@<public-ipv4-address> - ターミナルを開き、
.pemファイルを保存したディレクトリに移動します。 - まず、キーペアの権限を変更します。
chmod 400 kafka-challenge-key.pem - コピーしたコマンドを実行して、インスタンスに接続します。
これで、EC2インスタンスのシェルにログインできました。
3. Pythonアプリケーションのデプロイと接続テスト
EC2インスタンス上で、ローカルで作成したPythonプロデューサーを動かしてみましょう。
-
Pythonとpipのインストール
sudo yum update -y sudo yum install python3 -y sudo yum install python3-pip -y -
confluent-kafkaライブラリのインストールpip3 install confluent-kafka -
プロデューサースクリプトの作成
producer_aws.pyというファイルを作成し、5日目で作成したコードを貼り付けます。ただし、Kafkaブローカーのアドレスを、MSKクラスターのブートストラップブローカーのアドレスに置き換えます。# producer_aws.py from confluent_kafka import Producer import json conf = {'bootstrap.servers': '<your-msk-bootstrap-brokers>'} # ここをMSKのアドレスに置き換える producer = Producer(conf) data = {'message': 'Hello from AWS EC2!'} json_data = json.dumps(data) topic = "test-topic" producer.produce(topic, key="key1", value=json_data.encode('utf-8')) producer.flush() print(f"Message sent to MSK topic: {topic}") -
トピックの作成
MSKクラスターでtest-topicがまだ存在しない場合は、kafka-topics.shコマンドを使って作成します。MSKのブートストラップブローカーはAWS CLIで取得できます。# MSKのブートストラップブローカーを取得 BOOTSTRAP_BROKERS=$(aws msk describe-cluster --cluster-arn <your-cluster-arn> --query 'ClusterInfo.BrokerNodeGroupInfo.ClientSubnetInfo.BrokerEndpoints' --output text) # トピックを作成 /usr/bin/kafka-topics --create --topic test-topic --bootstrap-server $BOOTSTRAP_BROKERS -
スクリプトの実行
python3 producer_aws.py実行が成功し、エラーが出なければ、EC2インスタンスからMSKクラスターへの接続が完了し、データが送信されています。
まとめと次回予告
今日は、EC2インスタンスを起動し、IAMロールとセキュリティグループを適切に設定して、MSKクラスターにPythonアプリケーションから接続する方法を学びました。これで、ローカル開発環境からAWSクラウド環境への移行が完了しました。
いよいよ明日からは、この基盤の上でストリーミングデータ分析のパイプラインを構築していきます。
13日目: リアルタイムデータ収集:EC2上でWebログをKafkaに送信する
お楽しみに!