0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CloudTrailログからeventSourceとeventNameをスクリプトで抽出する

Last updated at Posted at 2025-03-18

はじめに

AWSアカウント移行の際に、CloudTrail イベントに基づいてポリシーを生成する機能が諸事情で使用できない事がありました。最低限のIAMポリシーを考える上で参考になる情報が欲しく、過去数十日のCloudTrailのeventSourceの取得を検討しました。
本記事では、CloudTrailログからeventSourceとeventNameを抽出する方法について記載します。

環境

  • Python:3.13
  • AWS CLI:2系

ゴール

  • あるAWSアカウントで記録されているCloudTrailのeventSourceとeventNameのデータを過去30日分取得する

1. Lambdaで実施

Lambda関数からCloudTrail APIをコールし、eventSourceとeventNameを取得します。
Lambda関数は最大実行時間が15分のため、長期間のログを取得する場合は注意が必要です。

Lambdaのソース
import json
import boto3
import datetime
from collections import defaultdict
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    # 対象のIAMユーザー名
    iam_entity = "hoge_user"
    days_back = 10
    
    logger.info(f"分析開始: {iam_entity}の過去{days_back}日間のアクティビティ")
    
    end_time = datetime.datetime.now()
    start_time = end_time - datetime.timedelta(days=days_back)
    
    cloudtrail = boto3.client('cloudtrail')
    
    # サービスごとのイベント収集用
    service_events = defaultdict(set)
    total_events = 0
    
    try:
        logger.info("CloudTrailからイベント取得開始")
        paginator = cloudtrail.get_paginator('lookup_events')
        for page in paginator.paginate(
            LookupAttributes=[
                {
                    'AttributeKey': 'Username',
                    'AttributeValue': iam_entity
                }
            ],
            StartTime=start_time,
            EndTime=end_time
        ):
            page_events = page['Events']
            total_events += len(page_events)
            logger.info(f"イベント取得: {len(page_events)}")
            
            for event in page_events:
                if 'CloudTrailEvent' in event:
                    event_details = json.loads(event['CloudTrailEvent'])
                    if "eventName" in event_details and "eventSource" in event_details:
                        service = event_details["eventSource"].split(".")[0].replace("amazonaws.com", "").replace("-", "")
                        event_name = event_details["eventName"]
                        # サービスごとにイベントを分類
                        service_events[service].add(event_name)
        
        # 結果作成
        result = {}
        for service, events in service_events.items():
            event_list = [f"{service}:{event_name}" for event_name in events]
            result[service] = sorted(event_list)
        
        response = {
            "アクセス分析結果": f"{iam_entity}の過去{days_back}日間のアクティビティ",
            "取得イベント数": total_events,
            "サービスごとのCloudTrailイベント": result
        }
        
        # ログに詳細出力
        logger.info("========== 分析結果詳細 ==========")
        logger.info(json.dumps(response, ensure_ascii=False, indent=2))
        logger.info("=================================")
        
        return response
        
    except Exception as e:
        logger.error(f"エラー発生: {str(e)}")
        return {
            "エラー": str(e)
        }
ポリシー
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "cloudtrail:LookupEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

実行結果

以下のように取得できます。

一部抜粋
{
    "アクセス分析結果": "hoge_userの過去10日間のアクティビティ",
    "取得イベント数": 516,
    "サービスごとのCloudTrailイベント": {
        "lambda": [
            "lambda:CreateFunction20150331",
            "lambda:GetFunction20150331v2",
            "lambda:GetFunctionCodeSigningConfig",
            "lambda:ListFunctions20150331",
            ...
        ],
       "bedrock": [
            "bedrock:CreateFoundationModelAgreement",
            "bedrock:GetFoundationModelAvailability",
            "bedrock:GetUseCaseForModelAccess",
            "bedrock:InvokeModelWithResponseStream"
            ...
        ],
        "logs": [
            "logs:DescribeLogGroups",
            "logs:GetLogEvents",
            ...
        ],
        ...
    }
}

Lambdaのデベロッパーガイドには、以下の記載があったのでeventNameはIAMのアクションを考える上で参考になるものもありそうです。

CloudTrail ログファイルでは、 eventName に日付やバージョン情報が含まれる場合がありますが、同じ公開 API アクションを指しています。例えば、GetFunction のアクションが GetFunction20150331v2 として表示される場合があります。次のリストは、イベント名が API アクション名と異なるタイミングを指定します。
~~
・CreateFunction (イベント名: CreateFunction20150331)
~~
参考:https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/logging-using-cloudtrail.html#cloudtrail-management-events

2. AWS CLIコマンドを利用しローカルから実施

長期間(30日以上)のログを取得する場合や、Lambdaのタイムアウト制限を回避したい場合は、ローカル環境からAWS CLIコマンドを使用してeventSourceとeventNameを取得できます。
こちらは、リージョンの指定と一度に取得する件数に制限を加えてみました。

実行コマンド例
python cloudtrail_analyzer.py IAMユーザー名 --start-date YYYY-MM-DD --end-date YYYY-MM-DD --regions ap-northeast-1,us-east-1
ソース
cloudtrail_analyzer.py
import json
import subprocess
import datetime
from collections import defaultdict
import sys


# 個別のCloudTrailイベントを処理する関数
def process_cloudtrail_event(event, service_event_names):
    event_details = json.loads(event['CloudTrailEvent'])
    if "eventName" in event_details and "eventSource" in event_details:
        service = event_details["eventSource"].split(".")[0].replace("amazonaws.com", "").replace("-", "")
        event_name = event_details["eventName"]
        # サービスごとにイベント名を分類
        service_event_names[service].add(event_name)


# AWS CLIコマンドを実行して結果を取得する関数
def execute_aws_command(cmd):
    try:
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        stdout, stderr = process.communicate()
        
        if process.returncode != 0:
            raise Exception(f"AWS CLIコマンド実行エラー: {stderr}")
        
        return json.loads(stdout)
    except Exception as e:
        print(f"エラー発生: {e}")
        sys.exit(1)


# 実行コマンド:python cloudtrail_analyzer.py IAMユーザー名 [日数]
def main():
    iam_entity = sys.argv[1]
    days_back = int(sys.argv[2]) if len(sys.argv) > 2 else 90
    
    print(f"分析開始: {iam_entity}の過去{days_back}日間のアクティビティ")
    
    # 期間の設定
    end_time = datetime.datetime.now()
    start_time = end_time - datetime.timedelta(days=days_back)
    
    # サービスごとのイベント名収集用
    service_event_names = defaultdict(set)
    total_events = 0
    
    # 時間範囲を分割して処理(例:10日ごと)
    time_chunks = []
    chunk_days = 10
    chunk_start = start_time
    
    while chunk_start < end_time:
        chunk_end = min(chunk_start + datetime.timedelta(days=chunk_days), end_time)
        time_chunks.append((chunk_start, chunk_end))
        chunk_start = chunk_end
    
    print(f"期間を{len(time_chunks)}チャンクに分割して処理します")
    
    for i, (chunk_start, chunk_end) in enumerate(time_chunks):
        chunk_start_str = chunk_start.strftime("%Y-%m-%dT%H:%M:%S")
        chunk_end_str = chunk_end.strftime("%Y-%m-%dT%H:%M:%S")
        
        print(f"チャンク {i+1}/{len(time_chunks)} 処理中: {chunk_start_str} から {chunk_end_str}")
        
        # ページネーション処理
        starting_token = None
        page_count = 0
        events_count = 0
        
        while True:
            # AWS CLIコマンド構築
            base_cmd = f"aws cloudtrail lookup-events --lookup-attributes AttributeKey=Username,AttributeValue={iam_entity} --start-time {chunk_start_str} --end-time {chunk_end_str}"
            
            if starting_token:
                cmd = f"{base_cmd} --starting-token {starting_token}"
            else:
                cmd = base_cmd
            
            # コマンド実行とデータ取得
            data = execute_aws_command(cmd)
            time.sleep(0.5)
            
            events = data.get("Events", [])
            events_count += len(events)
            page_count += 1
            
            if page_count % 10 == 0 or len(events) > 0:
                print(f"  ページ {page_count} 処理完了: 累計 {events_count} イベント")
            
            # イベント処理
            for event in events:
                process_cloudtrail_event(event, service_event_names)
            
            # トークンを取得
            starting_token = data.get("NextToken")
            
            # トークンがなければループ終了
            if not starting_token:
                break
                
        total_events += events_count
        print(f"チャンク {i+1} 完了: {events_count} イベント処理")
    
    # 結果作成
    result = {}
    for service, event_names in service_event_names.items():
        # CloudTrailのイベント名をサービス名とともに表示
        event_list = [f"{service}:{event_name}" for event_name in event_names]
        result[service] = sorted(event_list)
    
    response = {
        "アクセス分析結果": f"{iam_entity}の過去{days_back}日間のアクティビティ",
        "取得イベント数": total_events,
        "サービスごとのCloudTrailイベント": result
    }
    
    # 結果を保存
    current_date = end_time.strftime('%Y%m%d')
    output_file = f"cloudtrail_events_{iam_entity}_{current_date}_past{days_back}days.json"
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(response, f, ensure_ascii=False, indent=2)
    
    print(f"分析完了: 結果を {output_file} に保存しました")
    
    # サービス数と総イベント数を表示
    print(f"検出されたサービス数: {len(result)}")
    print(f"合計イベント数: {total_events}")
    
    # サービスごとのイベント名数を表示
    for service, event_names in sorted(result.items()):
        print(f"サービス {service}: {len(event_names)}イベント")


if __name__ == "__main__":
    main()

実行結果

次のような結果がJSON形式で保存されます。

一部抜粋
{
  "アクセス分析結果": "ユーザー名の2025-01-01から2025-01-31までのアクティビティ",
  "検索対象リージョン": ["ap-northeast-1", "us-east-1"],
  "取得イベント数": 1054,
  "サービスごとのCloudTrailイベント": {
    "lambda": [
      "lambda:CreateFunction20150331",
      "lambda:GetFunction20150331v2",
      "lambda:ListFunctions20150331",
      "..."
    ],
    "bedrock": [
      "bedrock:CreateFoundationModelAgreement",
      "bedrock:GetFoundationModelAvailability",
      "bedrock:GetUseCaseForModelAccess",
      "bedrock:InvokeModelWithResponseStream"
      ...
    ],
    ...
  }
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?