1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ - 12日目: EC2インスタンスを準備して、MSKクラスターに接続する

Posted at

はじめに

「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」12日目です。昨日は、VPCとセキュリティグループを設計し、MSKクラスターへのアクセスを許可する設定を行いました。

今日は、いよいよPythonアプリケーションを実行する環境としてEC2インスタンスを準備し、MSKクラスターに接続してみましょう。これにより、ローカルで動かしていたプロデューサーやコンシューマーを、AWS上で実行できるようになります。


1. EC2インスタンスの起動

AWSマネジメントコンソールにログインし、「EC2」サービスを開いて「インスタンスを起動」をクリックします。

ステップ1:名前とアプリケーションテンプレート(AMI)

  • 名前: Kafka-Python-Clientなど、わかりやすい名前を付けます。
  • アプリケーションおよびOSイメージ(Amazon マシンイメージ): 「Amazon Linux 2023 AMI」を選択します。これは、AWSが提供する安定したLinuxOSです。

ステップ2:インスタンスタイプとキーペア

  • インスタンスタイプ: t3.microを選択します。これは無料利用枠内で使える、開発・テストに十分なスペックのインスタンスです。
  • キーペア(ログイン): SSHでインスタンスに接続するためのキーペアを作成します。
    • 「新しいキーペアを作成」をクリックし、kafka-challenge-keyなどの名前を付けます。
    • 「作成」をクリックすると、.pemファイルがダウンロードされます。このファイルは大切に保管してください。

ステップ3:ネットワーク設定

  • VPC: MSKクラスターと同じVPC(通常はデフォルトVPC)を選択します。
  • サブネット: 任意のサブネットを選択します。
  • ファイアウォール(セキュリティグループ): 「既存のセキュリティグループを選択」を選び、KafkaClientEC2-SGのような名前の新しいセキュリティグループを作成します。
    • このセキュリティグループには、SSH接続(ポート22) と、MSKへの接続を許可する カスタムTCPルール(ポート9092) を設定する必要があります。SSHは「マイIP」に設定し、MSKへの接続は昨日の手順で設定したセキュリティグループIDを指定します。

ステップ4:高度な詳細

  • IAMインスタンスプロフィール: 9日目で作成したIAMロール(KafkaClientEC2Role)を選択します。これにより、EC2インスタンスがMSKやS3に安全にアクセスできます。

すべての設定を確認し、「インスタンスを起動」をクリックします。起動には数分かかります。


2. EC2インスタンスへのSSH接続

インスタンスが起動したら、SSHで接続します。

  1. EC2コンソールのインスタンス一覧から、作成したインスタンスを選択します。
  2. 「接続」をクリックし、「SSHクライアント」タブに表示されている接続コマンドをコピーします。
    ssh -i "kafka-challenge-key.pem" ec2-user@<public-ipv4-address>
    
  3. ターミナルを開き、.pemファイルを保存したディレクトリに移動します。
  4. まず、キーペアの権限を変更します。
    chmod 400 kafka-challenge-key.pem
    
  5. コピーしたコマンドを実行して、インスタンスに接続します。

これで、EC2インスタンスのシェルにログインできました。


3. Pythonアプリケーションのデプロイと接続テスト

EC2インスタンス上で、ローカルで作成したPythonプロデューサーを動かしてみましょう。

  1. Pythonとpipのインストール

    sudo yum update -y
    sudo yum install python3 -y
    sudo yum install python3-pip -y
    
  2. confluent-kafkaライブラリのインストール

    pip3 install confluent-kafka
    
  3. プロデューサースクリプトの作成
    producer_aws.pyというファイルを作成し、5日目で作成したコードを貼り付けます。ただし、Kafkaブローカーのアドレスを、MSKクラスターのブートストラップブローカーのアドレスに置き換えます

    # producer_aws.py
    from confluent_kafka import Producer
    import json
    
    conf = {'bootstrap.servers': '<your-msk-bootstrap-brokers>'} # ここをMSKのアドレスに置き換える
    producer = Producer(conf)
    
    data = {'message': 'Hello from AWS EC2!'}
    json_data = json.dumps(data)
    topic = "test-topic"
    
    producer.produce(topic, key="key1", value=json_data.encode('utf-8'))
    producer.flush()
    
    print(f"Message sent to MSK topic: {topic}")
    
  4. トピックの作成
    MSKクラスターでtest-topicがまだ存在しない場合は、kafka-topics.shコマンドを使って作成します。MSKのブートストラップブローカーはAWS CLIで取得できます。

    # MSKのブートストラップブローカーを取得
    BOOTSTRAP_BROKERS=$(aws msk describe-cluster --cluster-arn <your-cluster-arn> --query 'ClusterInfo.BrokerNodeGroupInfo.ClientSubnetInfo.BrokerEndpoints' --output text)
    
    # トピックを作成
    /usr/bin/kafka-topics --create --topic test-topic --bootstrap-server $BOOTSTRAP_BROKERS
    
  5. スクリプトの実行

    python3 producer_aws.py
    

    実行が成功し、エラーが出なければ、EC2インスタンスからMSKクラスターへの接続が完了し、データが送信されています。


まとめと次回予告

今日は、EC2インスタンスを起動し、IAMロールとセキュリティグループを適切に設定して、MSKクラスターにPythonアプリケーションから接続する方法を学びました。これで、ローカル開発環境からAWSクラウド環境への移行が完了しました。

いよいよ明日からは、この基盤の上でストリーミングデータ分析のパイプラインを構築していきます。

13日目: リアルタイムデータ収集:EC2上でWebログをKafkaに送信する

お楽しみに!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?