0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CloudTrailイベントのS3保管を自前で実装した話

Posted at

はじめに

最近、CloudTrailイベントをS3に保存する処理を自前で実装する機会がありました。
通常あまりそのような機会はないかもしれませんが、その経験で得た学びを共有したいと思います :writing_hand:

CloudTrailとは

CloudTrailはAWSアカウントに対する操作履歴を記録・保管してくれるサービスです。
AWSに対する一つ一つの操作が、CloudTrailに個々のイベントとして記録されます。

CloudTrailイベントは、AWS SDK、AWS Management Console、コマンドラインツール、およびその他の AWS サービスを通じて行われた API と非 API アカウントアクティビティの両方の履歴を提供します。

90日間であればマネジメントコンソールから無料でイベントを参照することができます。

CloudTrail では、お客様のアカウントの過去 90 日分の管理イベントを無料で閲覧、検索、ダウンロードすることができます。

概要をちゃんと知りたい方にはBlackBelt「AWS CloudTrail 基礎編: AWS CloudTrail の役割と証跡」がお勧めです。

AWSを使い始めたら大抵最初にすることの一つに 「CloudTrailの証跡を設定すること」 があります。
証跡とは、指定したS3バケットまたはCloudWatch LogsにCloudTrailイベントを保存する機能です。
LookupEvents APIを用いてイベント情報を取得することもできますが、証跡はマネージドにイベントを取得・保管してくれます。

つまり通常は、監査目的でAWS操作履歴を長期保管するような場合、自身でAPIを用いてイベントを取得する必要はありません :beginner:

こんな状況に遭遇しました

  • マルチアカウント環境にて、特定のアカウントにあるS3バケットにCloudTrailのログを集約する構成になっていました。
  • S3のバケットポリシーが適切に設定されておらず、ログ配信エラーになっていました。

image.png

なお、バケットポリシーを修正することにより、直近30日分のイベントについては再配信されます。

証跡を不適切な設定 (S3 バケットに到達できない状態など) にすると、CloudTrail は 30 日間、S3 バケットへのログファイルの再配信を試みます。

  • つまり、誤ったポリシーが設定された日~30日前までのイベントがS3に保管されていない状況にありました。
  • ただ幸い保管できていない期間は直近90日以内であり、イベント情報自体はCloudTrailサービス内に残っている状況でした。

過去のイベントを取得する手段

先述のとおり90日間であればイベントは参照可能です。
取得する手段としては下記の2パターンが考えられます。

① マネジメントコンソールからダウンロード

  • オペレーションは簡単
  • アカウントやリージョンの対象数が多いと、その分マネコン操作が発生するので大変
  • イベント数が多い場合、マネジメントコンソールのセッションがタイムアウトする

② LookupEvents API

  • 自動化によりオペレーションを簡略化できる
  • 実装しないといけない

今回の対象は下記のとおりでした。

  • アカウント数:14
  • リージョン数:17

もう、迷うことなく②を選択しました :neutral_face:

実装

ということでLookupEvents APIでCloudTrailイベントを一括取得する仕組みを実装しました。
全体の構成要素は下記のとおりです。右側のAWSアカウントとリージョンは複数あることを示しています。

image.png

CDKで各種リソースのデプロイを行いました。CloudFormation StackSetsを用いることにより、スイッチ用のIAMロールを一括で作成することができます。

image.png

StackSetsに関して少し補足しておきます。

  • CloudFormation StackSetsを利用する場合、デプロイ先のアカウントにCloudFormation実行用のロールが必要です。
  • ロールの種類としては、Organizations環境下であれば自動配備される サービスマネージド許可 と、自前で事前にロールを用意しておく必要のある セルフサービス許可 の2種類があります。
  • 今回のシチュエーションでは、取得したイベントを保存するS3バケットの所属AWSアカウントがCloudFormation StackSetsの委任管理者であったため、サービスマネージド許可を利用することができ、各アカウントに事前にロールを用意する必要がありませんでした。

CloudTrailイベントの取得とS3へのアップロードはLambdaで行いました。また、対象のリージョンすべてに対して並列にイベント取得を行うためにStep Functionsを利用しました。

image.png

Step Functionsの中身はこのような感じです。

image.png

実装のポイントは下記のとおりです。

  • イベント取得対象期間は1回のLambda実行ごとに30分間
  • Mapステートを利用することで、対象リージョンすべて(17リージョン)に対して並列にLambdaを実行
  • Lambda内の処理において、5分単位でイベントをgzip圧縮してS3アップロード

コードの抜粋を記載します。

CDKのStep Functions部分実装コード
import {
  Duration,
  aws_lambda as lambda,
  aws_logs as logs,
  RemovalPolicy,
  aws_stepfunctions as sfn,
  aws_stepfunctions_tasks as tasks,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export interface SfnProps {
  readonly systemName: string;
  readonly func: lambda.IFunction;
}

export class Sfn extends Construct {
  constructor(scope: Construct, id: string, props: SfnProps) {
    super(scope, id);

    const logGroup = new logs.LogGroup(this, 'LogGroup', {
      retention: logs.RetentionDays.ONE_WEEK,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    const initState = sfn.Pass.jsonata(this, '初期化', {
      assign: {
        current_time: '{% $states.input.start_time %}',
        end_time: '{% $states.input.end_time %}',
        account_id: '{% $states.input.account_id %}',
        regions: '{% $states.input.regions %}',
      },
    });

    const checkTimeRangeState = sfn.Choice.jsonata(this, '時間範囲の確認');

    const lambdaInvokeTask = tasks.LambdaInvoke.jsonata(
      this,
      'Lambda呼び出し',
      {
        lambdaFunction: props.func,
        taskTimeout: sfn.Timeout.duration(Duration.minutes(15)),
        payload: sfn.TaskInput.fromObject({
          start_time: '{% $current_time %}',
          end_time: '{% $fromMillis($toMillis($current_time) + 1800000) %}',
          account_id: '{% $account_id %}',
          region: '{% $states.input %}',
        }),
      }
    );

    const mapState = sfn.Map.jsonata(this, 'リージョンごとの処理', {
      items: sfn.ProvideItems.jsonata('{% $regions %}'),
    });

    const timeUpdateState = sfn.Pass.jsonata(this, '時間の更新', {
      assign: {
        current_time: '{% $fromMillis($toMillis($current_time) + 1800000) %}',
      },
    });

    const doneState = sfn.Succeed.jsonata(this, '完了');

    const chain = sfn.Chain.start(initState).next(
      checkTimeRangeState
        .when(
          sfn.Condition.jsonata(
            '{% $toMillis($current_time) < $toMillis($end_time) %}'
          ),
          mapState
            .itemProcessor(lambdaInvokeTask)
            .next(timeUpdateState)
            .next(checkTimeRangeState)
        )
        .otherwise(doneState)
    );

    const sfnMachine = new sfn.StateMachine(this, 'StateMachine', {
      stateMachineName: `${props.systemName}-machine`,
      definitionBody: sfn.DefinitionBody.fromChainable(chain),
      timeout: Duration.days(2),
      logs: {
        destination: logGroup,
        level: sfn.LogLevel.ERROR,
        includeExecutionData: true,
      },
    });
  }
}
LambdaのPythonコード
import gzip
import json
import logging
import os
import random
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client("s3")
sts_client = boto3.client("sts")

S3_BUCKET = os.environ["S3_BUCKET"]
IAM_ROLE_NAME = os.environ["IAM_ROLE_NAME"]
INTERVAL_MINUTES = os.environ["INTERVAL_MINUTES"]


def lambda_handler(event, context):
    """
    Lambda関数のメインハンドラー
    """
    try:
        start_time = event["start_time"]  # UTC時刻文字列 (例: "2022-01-01T00:00:00Z")
        end_time = event["end_time"]  # UTC時刻文字列 (例: "2022-01-01T23:59:59Z")
        account_id = event["account_id"]  # 対象アカウントID
        target_region = event["region"]  # 対象リージョン

        logger.info(
            f"Processing CloudTrail logs for account: {account_id}, region: {target_region}, time range: {start_time} to {end_time}"
        )

        start_dt = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
        end_dt = datetime.fromisoformat(end_time.replace("Z", "+00:00"))

        cloudtrail_client = assume_role_and_get_cloudtrail_client(
            account_id, target_region, IAM_ROLE_NAME
        )

        current_time = start_dt
        while current_time < end_dt:
            next_time = min(
                current_time + timedelta(minutes=float(INTERVAL_MINUTES)), end_dt
            )

            logger.info(f"Processing logs from {current_time} to {next_time}")

            events = get_cloudtrail_events(cloudtrail_client, current_time, next_time)

            if events:
                upload_compressed_logs(
                    s3_client,
                    events,
                    S3_BUCKET,
                    account_id,
                    target_region,
                    current_time,
                )
                logger.info(
                    f"Uploaded {len(events)} events for time range {current_time} to {next_time}"
                )
            else:
                logger.info(
                    f"No events found for time range {current_time} to {next_time}"
                )

            current_time = next_time

        return

    except Exception as e:
        logger.error(f"Error processing CloudTrail logs: {str(e)}")
        return {"statusCode": 500, "body": json.dumps({"error": str(e)})}


def assume_role_and_get_cloudtrail_client(account_id: str, region: str, role_name: str):
    """
    指定されたアカウントのIAMロールにスイッチロールしてCloudTrailクライアントを取得
    """
    role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"

    try:
        logger.info(f"Assuming role: {role_arn}")

        response = sts_client.assume_role(
            RoleArn=role_arn, RoleSessionName="cloudtrail-log-processor"
        )

        credentials = response["Credentials"]

        cloudtrail_client = boto3.client(
            "cloudtrail",
            region_name=region,
            aws_access_key_id=credentials["AccessKeyId"],
            aws_secret_access_key=credentials["SecretAccessKey"],
            aws_session_token=credentials["SessionToken"],
        )

        logger.info(f"Successfully assumed role: {role_arn}")
        return cloudtrail_client

    except Exception as e:
        logger.error(f"Failed to assume role {role_arn}: {str(e)}")
        raise


def get_cloudtrail_events(
    cloudtrail_client, start_time: datetime, end_time: datetime
) -> List[Dict[str, Any]]:
    """
    CloudTrailからイベントを取得(Throttling回避のための固定ウェイト+リトライ)
    """
    events = []
    max_retries = 5
    safe_interval_sec = 0.6

    try:
        paginator = cloudtrail_client.get_paginator("lookup_events")
        page_iterator = paginator.paginate(
            StartTime=start_time,
            EndTime=end_time,
            PaginationConfig={
                "PageSize": 50,
                "MaxItems": 10000,
            },
        )

        for page_number, page in enumerate(page_iterator, start=1):
            for attempt in range(max_retries):
                try:
                    if "Events" in page:
                        for event in page["Events"]:
                            event_data = convert_event_to_serializable(event)
                            events.append(event_data)
                    break
                except ClientError as e:
                    if e.response["Error"]["Code"] == "ThrottlingException":
                        sleep_time = (2**attempt) + random.uniform(0, 1)
                        logger.warning(
                            f"[Page {page_number}] ThrottlingException: retrying in {sleep_time:.2f} seconds..."
                        )
                        time.sleep(sleep_time)
                    else:
                        raise
            else:
                raise Exception(
                    f"[Page {page_number}] Max retries exceeded due to repeated ThrottlingExceptions"
                )

            time.sleep(safe_interval_sec)

        logger.info(f"Retrieved {len(events)} events from CloudTrail")
        return events

    except Exception as e:
        logger.error(f"Error retrieving CloudTrail events: {str(e)}")
        raise


def convert_event_to_serializable(event: Dict[str, Any]) -> Dict[str, Any]:
    """
    CloudTrailイベントをJSON serializable形式に変換
    """
    serializable_event = {}

    for key, value in event.items():
        if isinstance(value, datetime):
            serializable_event[key] = value.isoformat()
        elif key == "CloudTrailEvent":
            # CloudTrailEventは既にJSON文字列なので、パースして辞書として保存
            try:
                serializable_event[key] = json.loads(value)
            except json.JSONDecodeError:
                serializable_event[key] = value
        else:
            serializable_event[key] = value

    return serializable_event


def upload_compressed_logs(
    s3_client,
    events: List[Dict[str, Any]],
    bucket: str,
    account_id: str,
    region: str,
    timestamp: datetime,
):
    """
    イベントログをGZ圧縮してS3にアップロード
    """
    try:
        json_data = json.dumps(events, indent=2, ensure_ascii=False)

        compressed_data = gzip.compress(json_data.encode("utf-8"))

        timestamp_str = timestamp.strftime("%Y%m%dT%H%M%SZ")
        date_path = timestamp.strftime("%Y/%m/%d")

        object_key = (
            f"AWSLogs/{account_id}/CloudTrail/{region}/{date_path}/"
            f"{account_id}_CloudTrail_{region}_{timestamp_str}.json.gz"
        )

        s3_client.put_object(
            Bucket=bucket,
            Key=object_key,
            Body=compressed_data,
            ContentType="application/json",
            ContentEncoding="gzip",
        )

        logger.info(
            f"Successfully uploaded compressed log to s3://{bucket}/{object_key}"
        )

    except Exception as e:
        logger.error(f"Error uploading compressed logs to S3: {str(e)}")
        raise

Step Functionsの実行にはシェルスクリプトを利用しました。

Step Functions実行用シェルスクリプト
#!/bin/bash

# 引数チェック
if [ "$#" -ne 4 ]; then
  echo "Usage: $0 <aws_profile> <account_id> <start_time> <end_time>"
  echo "Example: $0 my-profile 012345678912 2025-04-17T15:00:00Z 2025-04-18T15:00:00Z"
  exit 1
fi

# 引き数の取得
aws_profile="$1"
account_id="$2"
start_time="$3"
end_time="$4"

# ステートマシン実行名を生成
start_date=$(echo "$start_time" | sed -E 's/T/-/; s/:/-/g; s/Z//')
end_date=$(echo "$end_time"   | sed -E 's/T/-/; s/:/-/g; s/Z//')
timestamp=$(date '+%Y%m%d%H%M%S')
execution_name="${account_id}_${start_date}_${end_date}_${timestamp}"

# 該当の環境で有効化されているリージョンを対象とする
regions=$(cat <<EOF
[
  "ap-northeast-1",

  ~省略~
  
  "us-east-1"
]
EOF
)

# 実行対象のステートマシンARN
statemachine_arn="xxx"

# JSON を組み立てて Step Functions を実行
aws stepfunctions start-execution \
  --profile "$aws_profile" \
  --state-machine-arn "$statemachine_arn" \
  --name "$execution_name" \
  --input "$(jq -n \
    --arg start_time "$start_time" \
    --arg end_time "$end_time" \
    --arg account_id "$account_id" \
    --argjson regions "$regions" \
    '{start_time: $start_time, end_time: $end_time, account_id: $account_id, regions: $regions}'
  )"

考慮点

ここまでにご紹介した実装方式でCloudTrailイベントを一括取得するにあたり、いくつか考慮すべきポイントがありました。

  • LookupEventsクォータ
  • Step Functionsイベント上限数
  • Lambdaタイムアウト

LookupEventsクォータ

LookupEventsはアカウント内のリージョンごとに秒間あたり2回のリクエストまでしか実行できません。また、この上限は緩和もできません。

image.png

そのため、ThrottlingExceptionをキャッチしてリトライする仕組みを入れました。

Step Functionsイベント上限数

Step Functionsのイベント上限は 25,000 です。30分単位でLambda実行するようにしましたが、Step Functions実行時のパラメータで指定する取得対象期間が長すぎるとイベント数が増え、もしかするとイベント上限に抵触する可能性もあるかもしれないため、注意が必要です。

Lambdaタイムアウト

CloudTrailイベントを取得するメインの処理にはLambdaを利用したため、1回のLambda実行はタイムアウト上限15分以内に収める必要があります。

Lambda は、コードを一定時間実行してからタイムアウトします。タイムアウトとは、Lambda 関数がタイムアウトするまでの最大実行時間です。この設定のデフォルト値は 3 秒ですが、最大値の 900 秒 (15 分) まで 1 秒単位で調整できます。

なお、15分超過してタイムアウトするとErrorsメトリクスが記録されますが、その Errorsメトリクスの記録時刻は関数が呼び出された時刻である ということは認識しておくと良いと思います。個人的にあまり見ないパターンだったので、今回遭遇して、あれ?となりました。

エラーメトリクスのタイムスタンプは、エラーが発生した時点ではなく、関数が呼び出された時間を反映していることに注意してください。

さいごに

例えばCloudTrailの特定のデータイベントを用途に合わせて取得するようなシチュエーションであればたまにあるのかもしれませんが、今回ご紹介したようなシチュエーションは障害でもなければ遭遇しないかと思います。そのためあまり大っぴらにできることでもありませんが・・・その取得をする過程でいろいろと学んだことがあったため、この記事を書こうと思いました。
どなたかの参考になればうれしく思います。

弊社では一緒に働く仲間を募集中です:sparkles:

現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!

募集内容等詳細は、是非採用サイトをご確認ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?