はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」14日目です。前回は、PythonアプリケーションをEC2インスタンスにデプロイし、AWS上で動作させる準備をしました。
今日は、その続きとして、ローカルで動かしていたPythonアプリケーションを、AWSの環境で実際に実行してみましょう。これまでの知識を統合し、ローカル開発からクラウド環境への移行を完了させます。
1. ローカルで作成したPythonアプリをEC2に転送
まず、ローカル環境で作成したPythonアプリケーションを、AWSのEC2インスタンスに転送する必要があります。scpコマンドを使って、安全にファイルを転送しましょう。
前回作成したproducer_async.py(6日目)とconsumer.py(7日目)を、ローカルPCのターミナルからEC2インスタンスにコピーします。
コマンド例
# プロデューサーのスクリプトを転送
scp -i "kafka-challenge-key.pem" producer_async.py ec2-user@<your-ec2-public-ip>:/home/ec2-user/
# コンシューマーのスクリプトを転送
scp -i "kafka-challenge-key.pem" consumer.py ec2-user@<your-ec2-public-ip>:/home/ec2-user/
これらのコマンドを実行すると、2つのファイルがEC2インスタンスの/home/ec2-user/ディレクトリに転送されます。
2. EC2インスタンス上でアプリケーションを実行
ファイルがEC2に転送されたら、SSHでインスタンスにログインし、アプリケーションを実行してみましょう。
ステップ1:SSHでEC2に接続
ローカルPCのターミナルで、以下のコマンドを実行してEC2インスタンスにログインします。
ssh -i "kafka-challenge-key.pem" ec2-user@<your-ec2-public-ip>
ステップ2:スクリプトの修正
ローカルで作成したスクリプトは、Kafkaブローカーのアドレスとして127.0.0.1:9092がハードコードされています。これを、MSKクラスターのブートストラップブローカーのアドレスに書き換える必要があります。
EC2インスタンス上で、producer_async.pyとconsumer.pyのbootstrap.serversを、MSKクラスターのブートストラップブローカーのアドレスに修正してください。
# producer_async.py および consumer.py の修正箇所
# 修正前
conf = {'bootstrap.servers': '127.0.0.1:9092'}
# 修正後
conf = {'bootstrap.servers': '<your-msk-bootstrap-brokers>'}
viやnanoなどのエディタを使ってファイルを編集します。
vi producer_async.py
ステップ3:プロデューサーの実行
まず、プロデューサーを実行して、MSKクラスターにデータを送信します。
python3 producer_async.py
コンソールに「Attempting to send message...」というログが表示され、正常にデータが送信されていることが確認できます。
ステップ4:コンシューマーの実行
次に、別のSSHセッションでEC2インスタンスに接続し、コンシューマーを実行します。
新しいターミナルウィンドウを開き、再度EC2にログインします。
ssh -i "kafka-challenge-key.pem" ec2-user@<your-ec2-public-ip>
コンシューマーを実行します。
python3 consumer.py
実行すると、プロデューサーが送信したデータがリアルタイムで受信され、コンソールに表示されるはずです。これで、AWS上でプロデューサーとコンシューマーが正常に動作し、MSKクラスターを介してデータがやり取りされていることが証明されました。
3. デプロイと実行の自動化
今回は手動でファイル転送と実行を行いましたが、本番環境ではこのような作業は自動化されます。
-
Git: バージョン管理システムを使ってコードを管理し、EC2インスタンス上で
git cloneしてコードを取得します。 - CI/CDツール: JenkinsやAWS CodePipelineなどのツールを使って、コードのビルド、テスト、デプロイを自動化します。
- コンテナ化: Dockerを使ってアプリケーションをコンテナ化し、Amazon ECSやEKSといったコンテナサービス上で実行することも一般的です。
このチャレンジの後半では、これらの概念にも少しずつ触れていきます。
まとめと次回予告
今日は、ローカルで作成したPythonアプリケーションをAWSのEC2インスタンスにデプロイし、MSKクラスターに接続して実行する方法を学びました。これで、ローカル開発環境で培ったスキルをそのままクラウド環境で活用できることが理解できたでしょう。
明日からは、この基盤をベースに、ストリーミングデータ分析パイプラインをさらに発展させていきます。
15日目: リアルタイムデータ収集:EC2上でWebログをKafkaに送信する
お楽しみに!