はじめに
「手を動かして学ぶ!KafkaとPythonによるAWSストリーミングデータ分析入門【30日チャレンジ】」13日目です。前回は、EC2インスタンスを起動し、MSKクラスターに接続する準備が整いました。
今日は、PythonアプリケーションをEC2インスタンスにデプロイし、実際にKafkaにデータを送信してみましょう。ローカルで書いたコードをクラウド環境で動かすための具体的な手順を解説します。
1. アプリケーションをEC2に転送する
ローカルで開発したPythonスクリプトをEC2インスタンスに転送する方法はいくつかありますが、ここではscpコマンドを使うのが最も簡単です。
scpコマンドは、SSH接続を使ってファイルを安全に転送するためのコマンドです。
コマンド例
ローカルPCのターミナルで、以下のコマンドを実行してください。
scp -i "kafka-challenge-key.pem" producer_aws.py ec2-user@<public-ipv4-address>:/home/ec2-user/
-
scp:コマンド名 -
-i "kafka-challenge-key.pem":SSH接続で使うキーペアを指定します。 -
producer_aws.py:転送するローカルのファイル名です。 -
ec2-user@<public-ipv4-address>:接続先のEC2インスタンスのユーザー名とパブリックIPアドレスです。 -
:/home/ec2-user/:転送先のEC2インスタンス内のディレクトリパスです。
このコマンドを実行すると、producer_aws.pyファイルがEC2インスタンスの/home/ec2-user/ディレクトリに転送されます。
2. EC2インスタンス上でアプリケーションを実行する
ファイルがEC2に転送されたら、SSHでEC2インスタンスに接続し、アプリケーションを実行してみましょう。
ステップ1:SSHでEC2に接続
ローカルPCのターミナルで、以下のコマンドを実行してEC2インスタンスにログインします。
ssh -i "kafka-challenge-key.pem" ec2-user@<public-ipv4-address>
ステップ2:アプリケーションの実行
EC2インスタンスにログインしたら、ファイルが転送されたディレクトリに移動し、Pythonスクリプトを実行します。
cd /home/ec2-user
python3 producer_aws.py
実行が成功すると、ローカルで実行したときと同じように「Message sent to MSK topic: test-topic」というメッセージが表示されます。これで、AWS上でPythonアプリケーションが正常に動作し、MSKクラスターにデータが送信されたことが確認できました。
3. スクリプトをより実用的にする
これまでのスクリプトは単一のメッセージしか送信しませんでしたが、より実用的なアプリケーションとして、複数のメッセージを連続的に送信するように修正してみましょう。これは、Webサーバーのログやセンサーデータなどを模倣するのに役立ちます。
以下のコードをlog_producer.pyとして作成し、先ほどの手順でEC2インスタンスに転送してください。
# log_producer.py
from confluent_kafka import Producer
import json
import time
import random
import uuid
# Kafkaブローカーの設定
conf = {'bootstrap.servers': '<your-msk-bootstrap-brokers>'}
# Producerインスタンスの作成
producer = Producer(conf)
# 送信先のトピック
topic = "web-logs"
# ログデータを生成して連続的に送信
for i in range(100):
user_id = str(uuid.uuid4())
event_type = random.choice(['page_view', 'add_to_cart', 'purchase', 'login'])
timestamp = int(time.time())
log_data = {
'user_id': user_id,
'event_type': event_type,
'timestamp': timestamp,
'ip_address': '192.168.1.' + str(random.randint(1, 255))
}
json_data = json.dumps(log_data)
try:
producer.produce(topic, key=user_id, value=json_data.encode('utf-8'))
print(f"Sent log message {i}: {json_data}")
except Exception as e:
print(f"Failed to produce message: {e}")
time.sleep(0.5)
# バッファ内のすべてのメッセージを送信
producer.flush()
print("All messages sent.")
このスクリプトを実行する前に、web-logsというトピックをMSKクラスターに作成しておく必要があります。
EC2インスタンス上で、以下のコマンドを実行してトピックを作成してください。
(MSKクラスターのブートストラップブローカーのアドレスは、12日目の手順で取得したものを環境変数として設定していると仮定します)
/usr/bin/kafka-topics --create --topic web-logs --bootstrap-server <your-msk-bootstrap-brokers>
トピックが作成されたら、log_producer.pyを実行します。
python3 log_producer.py
これで、EC2インスタンスからMSKクラスターに、連続的なログデータが送信され続けます。
まとめと次回予告
今日は、ローカルで開発したPythonアプリケーションをEC2インスタンスにデプロイし、AWS環境で実行する方法を学びました。これにより、ストリーミングデータ分析のパイプラインの「データ収集」部分がクラウド上で実現できました。
明日は、この収集したデータをAWSのストレージサービスに保存する方法を学びます。
- 14日目: ローカルで動かしたPythonアプリをAWSで実行してみよう
お楽しみに!