0
0

Kinesis data streams、lambda、S3を活用したWebアプリのリアルタイムデータ解析基盤構築

Last updated at Posted at 2024-07-04

はじめに

本記事では、AWSの各種サービス(S3、ECS Fargate、ECR、IAM、Lambda、Kinesis Data Streams)を活用して、Webアプリのリアルタイムデータを収集・解析するデータ分析基盤の構築方法について説明します。現代のWebアプリケーションは、ユーザーの行動やシステムの状態をリアルタイムで監視・分析することが求められており、この基盤はそのニーズに応えるものです。

aws.png

以下の作成したリポジトリです。

アーキテクチャの概要

  • Webアプリケーション: ユーザーの行動やシステムイベントをKinesis Data Streamsに送信
  • Kinesis Data Streams: リアルタイムでデータを収集し、処理を待つデータのストリームを提供
  • AWS Lambda: Kinesis Data Streamsからデータを取り出し、必要な前処理や変換を行います
  • ECS Fargate: コンテナ化されたデータ処理アプリケーションを実行し、リアルタイムでデータを解析
  • Amazon S3: 解析結果や必要な中間データを格納します
  • Amazon ECR: ECS Fargateで使用するDockerイメージを格納
  • IAM: 各サービスのセキュリティ設定とアクセス権限を管理

目的

Webアプリのリアルタイムデータを収集し、効率的に分析するための基盤を構築します。

機能一覧

  • データ収集機能

実装方法

Webアプリのリアルタイムデータ解析基盤の具体的な実装手順を説明します。構成要素として、webserver、IAM、Kinesis、Lambdaについて順に解説します。

webserver

まず、WebサーバーからKinesis Data Streamsにデータを送信する仕組みを実装します。以下の例では、Expressを使用したWebサーバーを構築し、ユーザーのリクエスト毎、Kinesis Data Streamsに送信します。

webserver/app/main.js
const express = require('express');
const AWS = require('aws-sdk');
const app = express();

const kinesis = new AWS.Kinesis({ region: 'ap-northeast-1' });
const streamName = process.env.KINESIS_STREAM_NAME || 'my-kinesis-stream';
const usernames = ["John Smith", "Emma Brown", "David Lee", "john_doe", "jane_doe"];

// Kinesisにデータを送信する関数
function sendDataToKinesis(data) {
    const params = {
        Data: JSON.stringify(data),
        PartitionKey: 'partitionKey',
        StreamName: streamName
    };

    kinesis.putRecord(params, (err, data) => {
        if (err) {
            console.error('Error sending data to Kinesis:', err);
        } else {
            console.log('Successfully sent data to Kinesis:', data);
        }
    });
}

// 各ルートの設定
const routes = [
    { path: '/', action: 'index', message: 'ホーム画面です\n' },
    { path: '/about', action: 'about', message: 'これはAboutページです\n' },
    { path: '/contact', action: 'contact', message: 'これはContactページです\n' },
    { path: '/products', action: 'products', message: 'これはProductsページです\n' },
    { path: '/services', action: 'services', message: 'これはServicesページです\n' },
    { path: '/blog', action: 'blog', message: 'これはBlogページです\n' },
    { path: '/faq', action: 'faq', message: 'これはFAQページです\n' },
    { path: '/portfolio', action: 'portfolio', message: 'これはPortfolioページです\n' }
];

// 各ルートでKinesisにデータを送信
routes.forEach(route => {
    app.get(route.path, (req, res) => {
        const randomIndex = Math.floor(Math.random() * usernames.length);

        // Kinesisに送信するデータ
        const data = {
            name: usernames[randomIndex],
            path: route.path,
            action: route.action,
            timestamp: new Date().toISOString()
        };

        // Kinesisにデータを送信
        sendDataToKinesis(data);

        // 画面送信
        res.send(route.message);
    });
});

app.listen(3000, () => {
    console.log('Server is running on port 3000');
});

IAM

次に、AWSリソースにアクセスするためのIAMロールとポリシーを設定します。適切な権限を付与することで、各サービスが相互に連携できるようにします。

IAMポリシーの作成

以下は、Lambdaに必要な権限を持つIAMポリシーです。

infra/modules/iam/aws_iam_role.tf
resource "aws_iam_role" "lambda_role" {
  name = "lambda_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com",
        },
      },
    ],
  })

  tags = {
    Name = "${var.app_name}-lambda-iam-role"
  }
}

IAMロールを作成し、上記のポリシーをアタッチします。

infra/modules/iam/aws_iam_policy.tf
# IAMポリシーの作成(Lambda用)
resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "IAM policy for Lambda to access Kinesis and S3"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "kinesis:GetRecords",
          "kinesis:GetShardIterator",
          "kinesis:DescribeStream",
          "kinesis:ListStreams",
        ],
        Resource = "*",
      },
      {
        Effect = "Allow",
        Action = [
          "s3:PutObject",
          "s3:PutObjectAcl",
          "s3:GetObject",
          "s3:ListBucket"
        ],
        Resource = [
          "arn:aws:s3:::${var.s3_bucket_name}",
          "arn:aws:s3:::${var.s3_bucket_name}/*"
        ],
      },
    ],
  })
}
infra/modules/iam/aws_iam_policy_attachment.tf
resource "aws_iam_role_policy_attachment" "lambda_role_attachment" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

Kinesis

Kinesis Data Streamsを設定し、Webサーバーからのデータを収集します。

infra/modules/kinesis/aws_kinesis_stream.tf
resource "aws_kinesis_stream" "main_stream" {
  name             = "main-stream"
  shard_count      = 1
  retention_period = 24

  shard_level_metrics = [
    "IncomingBytes",
    "OutgoingBytes",
  ]

  stream_mode_details {
    stream_mode = "PROVISIONED"
  }

  tags = {
    Name = "${var.app_name}-kinenis"
  }
}

Lambda

最後に、Kinesis Data Streamsからデータを取り出し、処理するLambda関数を作成します。この関数はデータを解析し、結果をS3に保存します。

Lambda関数の作成

Lambda関数のコードは以下の通りです。

infra/modules/lambda/src/in/index.py
import boto3
import base64
import os
import datetime

s3 = boto3.client('s3')
bucket_name = os.environ['BUCKET_NAME']

today = datetime.date.today()
formatted_date = today.strftime("%Y-%m-%d")

def lambda_handler(event, context):
    object_key = 'data_from_kinesis/record_{}.csv'.format(formatted_date)

    # S3に既存のファイルがあるか確認する
    try:
        existing_object = s3.get_object(Bucket=bucket_name, Key=object_key)
        existing_data = existing_object['Body'].read().decode('utf-8')
    except s3.exceptions.NoSuchKey:
        existing_data = ''

    # Kinesisストリームからのイベントデータを処理する
    for record in event['Records']:
        # KinesisのデータはBase64でエンコードされているため、デコードする
        payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
        print("Decoded payload: " + payload)

        # payloadをCSVフォーマットに変換する
        new_row = payload.split(',')

        # 既存のデータに新しい行を追加する
        existing_data += ','.join(new_row) + '\n'

    # 更新したデータをS3に保存する
    s3.put_object(
        Bucket=bucket_name,
        Key=object_key,
        Body=existing_data.encode('utf-8')
    )

    return {
        'statusCode': 200,
        'body': 'Data processed successfully!'
    }

Lambda 関数のログを格納するための CloudWatch Logs グループを作成します。

infra/modules/lambda/aws_cloudwatch_log_group.tf
resource "aws_cloudwatch_log_group" "lambda" {
  name = "/aws/lambda/${var.lambda_function_name}"
}

KinesisストリームをトリガーとしてLambda関数を起動するためのイベントソースマッピングを作成します。

infra/modules/lambda/aws_lambda_event_source_mapping.tf
resource "aws_lambda_event_source_mapping" "main" {
  event_source_arn  = var.kinesis_stream_arn
  function_name     = aws_lambda_function.main.arn
  starting_position = "LATEST"
}

KinesisストリームにLambda関数を呼び出す権限を付与します。

infra/modules/lambda/aws_lambda_permission.tf
resource "aws_lambda_permission" "allow_kinesis_invoke" {
  statement_id  = "AllowExecutionFromKinesis"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.main.function_name
  principal     = "kinesis.amazonaws.com"
  source_arn    = var.kinesis_stream_arn
}

Lambda関数そのものを定義します。

infra/modules/lambda/aws_lambda_function.tf
data "archive_file" "lambda" {
  type        = "zip"
  source_dir  = "./modules/lambda/src/in"
  output_path = "./modules/lambda/src/out/lambda_function_payload.zip"
}

resource "aws_lambda_function" "main" {
  filename         = "./modules/lambda/src/out/lambda_function_payload.zip"
  function_name    = var.lambda_function_name
  description      = "lambda_function"
  role             = var.lambda_iam_role
  architectures    = ["x86_64"]
  handler          = "index.lambda_handler"
  source_code_hash = data.archive_file.lambda.output_base64sha256
  timeout          = 30
  runtime          = "python3.9"
  depends_on       = [aws_cloudwatch_log_group.lambda]

  environment {
    variables = {
      BUCKET_NAME = var.s3_bucket_name
    }
  }
  tags = {
    Name = "${var.app_name}-lamdba"
  }
}

結果

s3にCSVが作成されました。

csv.png

まとめ

この記事では、AWSの各種サービス(S3、ECS Fargate、ECR、IAM、Lambda、Kinesis Data Streams)を活用してWebアプリのリアルタイムデータ解析基盤を構築する具体的な手順を解説しました。この基盤を利用することで、リアルタイムなデータ解析と結果の保存が可能となり、Webアプリケーションの性能やユーザー体験を向上させることができます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0