はじめに
AWSアカウント移行の際に、CloudTrail イベントに基づいてポリシーを生成する機能が諸事情で使用できない事がありました。最低限のIAMポリシーを考える上で参考になる情報が欲しく、過去数十日のCloudTrailのeventSourceの取得を検討しました。
本記事では、CloudTrailログからeventSourceとeventNameを抽出する方法について記載します。
環境
- Python:3.13
- AWS CLI:2系
ゴール
- あるAWSアカウントで記録されているCloudTrailのeventSourceとeventNameのデータを過去30日分取得する
1. Lambdaで実施
Lambda関数からCloudTrail APIをコールし、eventSourceとeventNameを取得します。
Lambda関数は最大実行時間が15分のため、長期間のログを取得する場合は注意が必要です。
Lambdaのソース
import json
import boto3
import datetime
from collections import defaultdict
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# 対象のIAMユーザー名
iam_entity = "hoge_user"
days_back = 10
logger.info(f"分析開始: {iam_entity}の過去{days_back}日間のアクティビティ")
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=days_back)
cloudtrail = boto3.client('cloudtrail')
# サービスごとのイベント収集用
service_events = defaultdict(set)
total_events = 0
try:
logger.info("CloudTrailからイベント取得開始")
paginator = cloudtrail.get_paginator('lookup_events')
for page in paginator.paginate(
LookupAttributes=[
{
'AttributeKey': 'Username',
'AttributeValue': iam_entity
}
],
StartTime=start_time,
EndTime=end_time
):
page_events = page['Events']
total_events += len(page_events)
logger.info(f"イベント取得: {len(page_events)}件")
for event in page_events:
if 'CloudTrailEvent' in event:
event_details = json.loads(event['CloudTrailEvent'])
if "eventName" in event_details and "eventSource" in event_details:
service = event_details["eventSource"].split(".")[0].replace("amazonaws.com", "").replace("-", "")
event_name = event_details["eventName"]
# サービスごとにイベントを分類
service_events[service].add(event_name)
# 結果作成
result = {}
for service, events in service_events.items():
event_list = [f"{service}:{event_name}" for event_name in events]
result[service] = sorted(event_list)
response = {
"アクセス分析結果": f"{iam_entity}の過去{days_back}日間のアクティビティ",
"取得イベント数": total_events,
"サービスごとのCloudTrailイベント": result
}
# ログに詳細出力
logger.info("========== 分析結果詳細 ==========")
logger.info(json.dumps(response, ensure_ascii=False, indent=2))
logger.info("=================================")
return response
except Exception as e:
logger.error(f"エラー発生: {str(e)}")
return {
"エラー": str(e)
}
ポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudtrail:LookupEvents"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
実行結果
以下のように取得できます。
{
"アクセス分析結果": "hoge_userの過去10日間のアクティビティ",
"取得イベント数": 516,
"サービスごとのCloudTrailイベント": {
"lambda": [
"lambda:CreateFunction20150331",
"lambda:GetFunction20150331v2",
"lambda:GetFunctionCodeSigningConfig",
"lambda:ListFunctions20150331",
...
],
"bedrock": [
"bedrock:CreateFoundationModelAgreement",
"bedrock:GetFoundationModelAvailability",
"bedrock:GetUseCaseForModelAccess",
"bedrock:InvokeModelWithResponseStream"
...
],
"logs": [
"logs:DescribeLogGroups",
"logs:GetLogEvents",
...
],
...
}
}
Lambdaのデベロッパーガイドには、以下の記載があったのでeventNameはIAMのアクションを考える上で参考になるものもありそうです。
CloudTrail ログファイルでは、 eventName に日付やバージョン情報が含まれる場合がありますが、同じ公開 API アクションを指しています。例えば、GetFunction のアクションが GetFunction20150331v2 として表示される場合があります。次のリストは、イベント名が API アクション名と異なるタイミングを指定します。
~~
・CreateFunction (イベント名: CreateFunction20150331)
~~
参考:https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/logging-using-cloudtrail.html#cloudtrail-management-events
2. AWS CLIコマンドを利用しローカルから実施
長期間(30日以上)のログを取得する場合や、Lambdaのタイムアウト制限を回避したい場合は、ローカル環境からAWS CLIコマンドを使用してeventSourceとeventNameを取得できます。
こちらは、リージョンの指定と一度に取得する件数に制限を加えてみました。
python cloudtrail_analyzer.py IAMユーザー名 --start-date YYYY-MM-DD --end-date YYYY-MM-DD --regions ap-northeast-1,us-east-1
ソース
import json
import subprocess
import datetime
from collections import defaultdict
import sys
# 個別のCloudTrailイベントを処理する関数
def process_cloudtrail_event(event, service_event_names):
event_details = json.loads(event['CloudTrailEvent'])
if "eventName" in event_details and "eventSource" in event_details:
service = event_details["eventSource"].split(".")[0].replace("amazonaws.com", "").replace("-", "")
event_name = event_details["eventName"]
# サービスごとにイベント名を分類
service_event_names[service].add(event_name)
# AWS CLIコマンドを実行して結果を取得する関数
def execute_aws_command(cmd):
try:
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise Exception(f"AWS CLIコマンド実行エラー: {stderr}")
return json.loads(stdout)
except Exception as e:
print(f"エラー発生: {e}")
sys.exit(1)
# 実行コマンド:python cloudtrail_analyzer.py IAMユーザー名 [日数]
def main():
iam_entity = sys.argv[1]
days_back = int(sys.argv[2]) if len(sys.argv) > 2 else 90
print(f"分析開始: {iam_entity}の過去{days_back}日間のアクティビティ")
# 期間の設定
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=days_back)
# サービスごとのイベント名収集用
service_event_names = defaultdict(set)
total_events = 0
# 時間範囲を分割して処理(例:10日ごと)
time_chunks = []
chunk_days = 10
chunk_start = start_time
while chunk_start < end_time:
chunk_end = min(chunk_start + datetime.timedelta(days=chunk_days), end_time)
time_chunks.append((chunk_start, chunk_end))
chunk_start = chunk_end
print(f"期間を{len(time_chunks)}チャンクに分割して処理します")
for i, (chunk_start, chunk_end) in enumerate(time_chunks):
chunk_start_str = chunk_start.strftime("%Y-%m-%dT%H:%M:%S")
chunk_end_str = chunk_end.strftime("%Y-%m-%dT%H:%M:%S")
print(f"チャンク {i+1}/{len(time_chunks)} 処理中: {chunk_start_str} から {chunk_end_str}")
# ページネーション処理
starting_token = None
page_count = 0
events_count = 0
while True:
# AWS CLIコマンド構築
base_cmd = f"aws cloudtrail lookup-events --lookup-attributes AttributeKey=Username,AttributeValue={iam_entity} --start-time {chunk_start_str} --end-time {chunk_end_str}"
if starting_token:
cmd = f"{base_cmd} --starting-token {starting_token}"
else:
cmd = base_cmd
# コマンド実行とデータ取得
data = execute_aws_command(cmd)
time.sleep(0.5)
events = data.get("Events", [])
events_count += len(events)
page_count += 1
if page_count % 10 == 0 or len(events) > 0:
print(f" ページ {page_count} 処理完了: 累計 {events_count} イベント")
# イベント処理
for event in events:
process_cloudtrail_event(event, service_event_names)
# トークンを取得
starting_token = data.get("NextToken")
# トークンがなければループ終了
if not starting_token:
break
total_events += events_count
print(f"チャンク {i+1} 完了: {events_count} イベント処理")
# 結果作成
result = {}
for service, event_names in service_event_names.items():
# CloudTrailのイベント名をサービス名とともに表示
event_list = [f"{service}:{event_name}" for event_name in event_names]
result[service] = sorted(event_list)
response = {
"アクセス分析結果": f"{iam_entity}の過去{days_back}日間のアクティビティ",
"取得イベント数": total_events,
"サービスごとのCloudTrailイベント": result
}
# 結果を保存
current_date = end_time.strftime('%Y%m%d')
output_file = f"cloudtrail_events_{iam_entity}_{current_date}_past{days_back}days.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(response, f, ensure_ascii=False, indent=2)
print(f"分析完了: 結果を {output_file} に保存しました")
# サービス数と総イベント数を表示
print(f"検出されたサービス数: {len(result)}")
print(f"合計イベント数: {total_events}")
# サービスごとのイベント名数を表示
for service, event_names in sorted(result.items()):
print(f"サービス {service}: {len(event_names)}イベント")
if __name__ == "__main__":
main()
実行結果
次のような結果がJSON形式で保存されます。
{
"アクセス分析結果": "ユーザー名の2025-01-01から2025-01-31までのアクティビティ",
"検索対象リージョン": ["ap-northeast-1", "us-east-1"],
"取得イベント数": 1054,
"サービスごとのCloudTrailイベント": {
"lambda": [
"lambda:CreateFunction20150331",
"lambda:GetFunction20150331v2",
"lambda:ListFunctions20150331",
"..."
],
"bedrock": [
"bedrock:CreateFoundationModelAgreement",
"bedrock:GetFoundationModelAvailability",
"bedrock:GetUseCaseForModelAccess",
"bedrock:InvokeModelWithResponseStream"
...
],
...
}
}