はじめに
本記事では、AWSの各種サービス(S3、ECS Fargate、ECR、IAM、Lambda、Kinesis Data Streams)を活用して、Webアプリのリアルタイムデータを収集・解析するデータ分析基盤の構築方法について説明します。現代のWebアプリケーションは、ユーザーの行動やシステムの状態をリアルタイムで監視・分析することが求められており、この基盤はそのニーズに応えるものです。
以下の作成したリポジトリです。
アーキテクチャの概要
- Webアプリケーション: ユーザーの行動やシステムイベントをKinesis Data Streamsに送信
- Kinesis Data Streams: リアルタイムでデータを収集し、処理を待つデータのストリームを提供
- AWS Lambda: Kinesis Data Streamsからデータを取り出し、必要な前処理や変換を行います
- ECS Fargate: コンテナ化されたデータ処理アプリケーションを実行し、リアルタイムでデータを解析
- Amazon S3: 解析結果や必要な中間データを格納します
- Amazon ECR: ECS Fargateで使用するDockerイメージを格納
- IAM: 各サービスのセキュリティ設定とアクセス権限を管理
目的
Webアプリのリアルタイムデータを収集し、効率的に分析するための基盤を構築します。
機能一覧
- データ収集機能
実装方法
Webアプリのリアルタイムデータ解析基盤の具体的な実装手順を説明します。構成要素として、webserver、IAM、Kinesis、Lambdaについて順に解説します。
webserver
まず、WebサーバーからKinesis Data Streamsにデータを送信する仕組みを実装します。以下の例では、Expressを使用したWebサーバーを構築し、ユーザーのリクエスト毎、Kinesis Data Streamsに送信します。
const express = require('express');
const AWS = require('aws-sdk');
const app = express();
const kinesis = new AWS.Kinesis({ region: 'ap-northeast-1' });
const streamName = process.env.KINESIS_STREAM_NAME || 'my-kinesis-stream';
const usernames = ["John Smith", "Emma Brown", "David Lee", "john_doe", "jane_doe"];
// Kinesisにデータを送信する関数
function sendDataToKinesis(data) {
const params = {
Data: JSON.stringify(data),
PartitionKey: 'partitionKey',
StreamName: streamName
};
kinesis.putRecord(params, (err, data) => {
if (err) {
console.error('Error sending data to Kinesis:', err);
} else {
console.log('Successfully sent data to Kinesis:', data);
}
});
}
// 各ルートの設定
const routes = [
{ path: '/', action: 'index', message: 'ホーム画面です\n' },
{ path: '/about', action: 'about', message: 'これはAboutページです\n' },
{ path: '/contact', action: 'contact', message: 'これはContactページです\n' },
{ path: '/products', action: 'products', message: 'これはProductsページです\n' },
{ path: '/services', action: 'services', message: 'これはServicesページです\n' },
{ path: '/blog', action: 'blog', message: 'これはBlogページです\n' },
{ path: '/faq', action: 'faq', message: 'これはFAQページです\n' },
{ path: '/portfolio', action: 'portfolio', message: 'これはPortfolioページです\n' }
];
// 各ルートでKinesisにデータを送信
routes.forEach(route => {
app.get(route.path, (req, res) => {
const randomIndex = Math.floor(Math.random() * usernames.length);
// Kinesisに送信するデータ
const data = {
name: usernames[randomIndex],
path: route.path,
action: route.action,
timestamp: new Date().toISOString()
};
// Kinesisにデータを送信
sendDataToKinesis(data);
// 画面送信
res.send(route.message);
});
});
app.listen(3000, () => {
console.log('Server is running on port 3000');
});
IAM
次に、AWSリソースにアクセスするためのIAMロールとポリシーを設定します。適切な権限を付与することで、各サービスが相互に連携できるようにします。
IAMポリシーの作成
以下は、Lambdaに必要な権限を持つIAMポリシーです。
resource "aws_iam_role" "lambda_role" {
name = "lambda_role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com",
},
},
],
})
tags = {
Name = "${var.app_name}-lambda-iam-role"
}
}
IAMロールを作成し、上記のポリシーをアタッチします。
# IAMポリシーの作成(Lambda用)
resource "aws_iam_policy" "lambda_policy" {
name = "lambda_policy"
description = "IAM policy for Lambda to access Kinesis and S3"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams",
],
Resource = "*",
},
{
Effect = "Allow",
Action = [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:ListBucket"
],
Resource = [
"arn:aws:s3:::${var.s3_bucket_name}",
"arn:aws:s3:::${var.s3_bucket_name}/*"
],
},
],
})
}
resource "aws_iam_role_policy_attachment" "lambda_role_attachment" {
role = aws_iam_role.lambda_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
Kinesis
Kinesis Data Streamsを設定し、Webサーバーからのデータを収集します。
resource "aws_kinesis_stream" "main_stream" {
name = "main-stream"
shard_count = 1
retention_period = 24
shard_level_metrics = [
"IncomingBytes",
"OutgoingBytes",
]
stream_mode_details {
stream_mode = "PROVISIONED"
}
tags = {
Name = "${var.app_name}-kinenis"
}
}
Lambda
最後に、Kinesis Data Streamsからデータを取り出し、処理するLambda関数を作成します。この関数はデータを解析し、結果をS3に保存します。
Lambda関数の作成
Lambda関数のコードは以下の通りです。
import boto3
import base64
import os
import datetime
s3 = boto3.client('s3')
bucket_name = os.environ['BUCKET_NAME']
today = datetime.date.today()
formatted_date = today.strftime("%Y-%m-%d")
def lambda_handler(event, context):
object_key = 'data_from_kinesis/record_{}.csv'.format(formatted_date)
# S3に既存のファイルがあるか確認する
try:
existing_object = s3.get_object(Bucket=bucket_name, Key=object_key)
existing_data = existing_object['Body'].read().decode('utf-8')
except s3.exceptions.NoSuchKey:
existing_data = ''
# Kinesisストリームからのイベントデータを処理する
for record in event['Records']:
# KinesisのデータはBase64でエンコードされているため、デコードする
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print("Decoded payload: " + payload)
# payloadをCSVフォーマットに変換する
new_row = payload.split(',')
# 既存のデータに新しい行を追加する
existing_data += ','.join(new_row) + '\n'
# 更新したデータをS3に保存する
s3.put_object(
Bucket=bucket_name,
Key=object_key,
Body=existing_data.encode('utf-8')
)
return {
'statusCode': 200,
'body': 'Data processed successfully!'
}
Lambda 関数のログを格納するための CloudWatch Logs グループを作成します。
resource "aws_cloudwatch_log_group" "lambda" {
name = "/aws/lambda/${var.lambda_function_name}"
}
KinesisストリームをトリガーとしてLambda関数を起動するためのイベントソースマッピングを作成します。
resource "aws_lambda_event_source_mapping" "main" {
event_source_arn = var.kinesis_stream_arn
function_name = aws_lambda_function.main.arn
starting_position = "LATEST"
}
KinesisストリームにLambda関数を呼び出す権限を付与します。
resource "aws_lambda_permission" "allow_kinesis_invoke" {
statement_id = "AllowExecutionFromKinesis"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.main.function_name
principal = "kinesis.amazonaws.com"
source_arn = var.kinesis_stream_arn
}
Lambda関数そのものを定義します。
data "archive_file" "lambda" {
type = "zip"
source_dir = "./modules/lambda/src/in"
output_path = "./modules/lambda/src/out/lambda_function_payload.zip"
}
resource "aws_lambda_function" "main" {
filename = "./modules/lambda/src/out/lambda_function_payload.zip"
function_name = var.lambda_function_name
description = "lambda_function"
role = var.lambda_iam_role
architectures = ["x86_64"]
handler = "index.lambda_handler"
source_code_hash = data.archive_file.lambda.output_base64sha256
timeout = 30
runtime = "python3.9"
depends_on = [aws_cloudwatch_log_group.lambda]
environment {
variables = {
BUCKET_NAME = var.s3_bucket_name
}
}
tags = {
Name = "${var.app_name}-lamdba"
}
}
結果
s3にCSVが作成されました。
まとめ
この記事では、AWSの各種サービス(S3、ECS Fargate、ECR、IAM、Lambda、Kinesis Data Streams)を活用してWebアプリのリアルタイムデータ解析基盤を構築する具体的な手順を解説しました。この基盤を利用することで、リアルタイムなデータ解析と結果の保存が可能となり、Webアプリケーションの性能やユーザー体験を向上させることができます。