CloudTrail → CWL → Firehose + Lambda で S3 に 1 行 1 イベントで保存する(AwsConsoleSignIn / Others フォルダ分離)
はじめに
前の記事 では CloudTrail → CWL → Firehose → S3 という構成でログを振り分けた。
しかし CWL → Firehose をそのまま使うと S3 のファイルが 1 行に複数イベントが連結された状態になり、json.loads() で直接パースできない問題がある。
今回は Firehose の Lambda 変換を使ってこの問題を解決し、以下を同時に実現する。
-
AwsConsoleSignInイベントと それ以外(AwsApiCall等)を S3 フォルダで分離 - S3 出力形式を NDJSON(1 行 1 イベント) にする
- CloudFormation で一括構築する
CloudTrail 直接出力との比較
CloudTrail の Trail を作成すると、S3 に直接ログが出力される。このファイルと今回の構成(Firehose 経由)との フォーマットの違いを実際に確認した。
CloudTrail 直接出力(AWSLogs/<AccountId>/CloudTrail/ 配下)
# GZIP を 1 回展開して得られる JSON(整形表示)
{
"Records": [
{
"eventVersion": "1.11",
"eventType": "AwsApiCall",
"eventName": "DescribeRegions",
"eventTime": "2026-06-30T07:01:08Z",
"eventSource": "ec2.amazonaws.com",
...
},
{
"eventVersion": "1.11",
"eventType": "AwsApiCall",
"eventName": "GetRole",
"eventTime": "2026-06-30T07:01:09Z",
...
}
]
}
- 1 ファイル =
{"Records": [...]}の単一 JSON オブジェクト(N イベントを配列に格納) - GZIP は 1 層
Firehose + Lambda(本記事の構成)
# GZIP を 1 回展開して得られるテキスト(NDJSON)
{"eventVersion":"1.11","eventType":"AwsApiCall","eventName":"CreateRole","eventTime":"2026-06-30T05:27:12Z",...}
{"eventVersion":"1.11","eventType":"AwsApiCall","eventName":"GetRole","eventTime":"2026-06-30T05:27:30Z",...}
{"eventVersion":"1.11","eventType":"AwsConsoleSignIn","eventName":"ConsoleLogin","eventTime":"2026-06-30T05:37:13Z",...}
- 1 行 = 1 イベント(NDJSON 形式)
- GZIP は 1 層
個々のイベント JSON の構造
フィールド構造は直接出力と全く同じ。違いはラッパーのみ。
| 項目 | CloudTrail 直接出力 | Firehose + Lambda(本記事) |
|---|---|---|
| ファイル構造 | {"Records": [event1, event2, ...]} |
event1\nevent2\n... |
| パース方法 |
data['Records'] で配列取得 |
json.loads(line) で 1 行ずつ |
| GZIP 層数 | 1 層 | 1 層 |
| イベント JSON のフィールド | CloudTrail スキーマ |
同一(Records ラッパーなし) |
| EventBridge エンベロープ | なし | なし |
| リアルタイム性 | ~15 分遅延 | ~1〜数分(CWL + Firehose バッファ経由) |
sessionCredentialFromConsole・tlsDetails などのオプションフィールドもそのまま含まれる。
アーキテクチャ
CloudTrail Trail(マルチリージョン・全リージョン集約)
→ S3(直接出力: AWSLogs/...) ← {"Records":[...]} 形式
→ CloudWatch Logs
├── サブスクリプションフィルタ① { $.eventType = "AwsConsoleSignIn" }
│ → Firehose(Lambda: DATA_MESSAGE → NDJSON)
│ → S3: AWSConsoleSignIn/yyyy/MM/dd/ ← 1 行 1 イベント
└── サブスクリプションフィルタ② { $.eventType != "AwsConsoleSignIn" }
→ Firehose(Lambda: DATA_MESSAGE → NDJSON)
→ S3: Others/yyyy/MM/dd/ ← 1 行 1 イベント
CWL は 1 ロググループあたり最大 2 つのサブスクリプションフィルタを設定できる。
CWL → Firehose でなぜ NDJSON にならないのか
CWL は Firehose に送る際、複数のイベントを DATA_MESSAGE という形式にバッチしてまとめて送る。
{
"messageType": "DATA_MESSAGE",
"logEvents": [
{ "message": "{...event1 JSON...}" },
{ "message": "{...event2 JSON...}" }
]
}
Firehose はこの DATA_MESSAGE を複数受け取ると、区切り文字なしで連結して S3 に書く。
さらに CWL 自身が GZIP 圧縮するため、Firehose の GZIP 圧縮と合わせて 二重 GZIP になる。
Lambda 変換でこれを解決する。
Lambda: DATA_MESSAGE → NDJSON
# cloudtrail-cwl-to-ndjson
import base64
import gzip
import io
import json
def handler(event, context):
output = []
for record in event['records']:
# base64 デコード → CWL の GZIP 展開 → JSON パース
compressed = base64.b64decode(record['data'])
with gzip.GzipFile(fileobj=io.BytesIO(compressed)) as gz:
payload = json.loads(gz.read())
# CONTROL_MESSAGE(設定変更時のヘルスチェック)は書き込まない
if payload['messageType'] != 'DATA_MESSAGE':
output.append({
'recordId': record['recordId'],
'result': 'Dropped',
'data': record['data'],
})
continue
# logEvents の各要素を \n で結合して NDJSON にする
# ev['message'] が CloudTrail イベントの JSON 文字列
lines = '\n'.join(ev['message'] for ev in payload['logEvents']) + '\n'
output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(lines.encode()).decode(),
})
return {'records': output}
処理の流れ:
Firehose から受け取るデータ
base64( gzip( DATA_MESSAGE JSON ) )
↓ base64 デコード
gzip( DATA_MESSAGE JSON )
↓ gzip 展開
{"messageType":"DATA_MESSAGE","logEvents":[{"message":"{event1}"},{"message":"{event2}"}]}
↓ logEvents を \n で結合
{event1}\n{event2}\n
↓ base64 エンコード
Firehose へ返す(Firehose が GZIP 圧縮して S3 へ書く)
CloudFormation テンプレート
AWSTemplateFormatVersion: '2010-09-09'
Description: CloudTrail → CWL → Firehose + Lambda で NDJSON 形式で S3 に保存する
Resources:
# S3: CloudTrail 直接出力バケット
TrailBucket:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
TrailBucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref TrailBucket
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: AWSCloudTrailAclCheck
Effect: Allow
Principal:
Service: cloudtrail.amazonaws.com
Action: s3:GetBucketAcl
Resource: !GetAtt TrailBucket.Arn
- Sid: AWSCloudTrailWrite
Effect: Allow
Principal:
Service: cloudtrail.amazonaws.com
Action: s3:PutObject
Resource: !Sub ${TrailBucket.Arn}/AWSLogs/${AWS::AccountId}/*
Condition:
StringEquals:
s3:x-amz-acl: bucket-owner-full-control
# CloudWatch Logs グループ
TrailLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/cloudtrail/management-events
RetentionInDays: 30
# IAM: CloudTrail → CWL
CloudTrailCWLRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: cloudtrail.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: CWLAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !GetAtt TrailLogGroup.Arn
# CloudTrail Trail(マルチリージョン)
ManagementTrail:
Type: AWS::CloudTrail::Trail
DependsOn: TrailBucketPolicy
Properties:
TrailName: management-events
S3BucketName: !Ref TrailBucket
IsLogging: true
IsMultiRegionTrail: true # ← 全リージョンを ap-northeast-1 に集約
IncludeGlobalServiceEvents: true
EnableLogFileValidation: true
CloudWatchLogsLogGroupArn: !GetAtt TrailLogGroup.Arn
CloudWatchLogsRoleArn: !GetAtt CloudTrailCWLRole.Arn
EventSelectors:
- ReadWriteType: All
IncludeManagementEvents: true
# S3: フィルタ済みログバケット(NDJSON)
SigninBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub cloudtrail-filtered-${AWS::AccountId}
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
# Lambda: CWL DATA_MESSAGE → NDJSON 変換
CwlToNdjsonRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
CwlToNdjson:
Type: AWS::Lambda::Function
Properties:
FunctionName: cloudtrail-cwl-to-ndjson
Runtime: python3.12
Handler: index.handler
Role: !GetAtt CwlToNdjsonRole.Arn
Timeout: 60
Code:
ZipFile: |
import base64, gzip, io, json
def handler(event, context):
output = []
for record in event['records']:
compressed = base64.b64decode(record['data'])
with gzip.GzipFile(fileobj=io.BytesIO(compressed)) as gz:
payload = json.loads(gz.read())
if payload['messageType'] != 'DATA_MESSAGE':
output.append({'recordId': record['recordId'], 'result': 'Dropped', 'data': record['data']})
continue
lines = '\n'.join(ev['message'] for ev in payload['logEvents']) + '\n'
output.append({'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(lines.encode()).decode()})
return {'records': output}
# IAM: Firehose → S3 + Lambda
FirehoseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: S3Access
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt SigninBucket.Arn
- !Sub ${SigninBucket.Arn}/*
- PolicyName: LambdaAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !GetAtt CwlToNdjson.Arn
# Firehose①: AwsConsoleSignIn → AWSConsoleSignIn/
SigninDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: console-signin-filter
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt SigninBucket.Arn
RoleARN: !GetAtt FirehoseRole.Arn
Prefix: 'AWSConsoleSignIn/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
ErrorOutputPrefix: 'errors/signin/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 1
CompressionFormat: GZIP
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt CwlToNdjson.Arn
# Firehose②: AwsConsoleSignIn 以外 → Others/
OthersDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: console-others-filter
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt SigninBucket.Arn
RoleARN: !GetAtt FirehoseRole.Arn
Prefix: 'Others/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
ErrorOutputPrefix: 'errors/others/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 1
CompressionFormat: GZIP
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt CwlToNdjson.Arn
# IAM: CWL → Firehose
CWLToFirehoseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: !Sub logs.${AWS::Region}.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: FirehoseAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- firehose:PutRecord
- firehose:PutRecordBatch
Resource:
- !GetAtt SigninDeliveryStream.Arn
- !GetAtt OthersDeliveryStream.Arn
# CWL サブスクリプションフィルタ①: AwsConsoleSignIn のみ
ConsoleLoginFilter:
Type: AWS::Logs::SubscriptionFilter
DependsOn: ManagementTrail
Properties:
LogGroupName: !Ref TrailLogGroup
FilterName: ConsoleLoginOnly
FilterPattern: '{ $.eventType = "AwsConsoleSignIn" }'
DestinationArn: !GetAtt SigninDeliveryStream.Arn
RoleArn: !GetAtt CWLToFirehoseRole.Arn
# CWL サブスクリプションフィルタ②: AwsConsoleSignIn 以外
OthersFilter:
Type: AWS::Logs::SubscriptionFilter
DependsOn: ManagementTrail
Properties:
LogGroupName: !Ref TrailLogGroup
FilterName: OthersOnly
FilterPattern: '{ $.eventType != "AwsConsoleSignIn" }'
DestinationArn: !GetAtt OthersDeliveryStream.Arn
RoleArn: !GetAtt CWLToFirehoseRole.Arn
Outputs:
TrailBucketName:
Description: CloudTrail 全管理イベント S3 バケット(直接出力)
Value: !Ref TrailBucket
SigninBucketName:
Description: AwsConsoleSignIn / Others 振り分け済みログ S3 バケット(NDJSON)
Value: !Ref SigninBucket
デプロイ
aws cloudformation deploy \
--template-file cloudtrail-cwl-firehose.yaml \
--stack-name CloudTrailCWLFirehoseStack \
--capabilities CAPABILITY_IAM \
--region ap-northeast-1
検証結果
AwsConsoleSignIn(コンソールログイン 2 回)
aws s3 ls s3://cloudtrail-filtered-<AccountId>/AWSConsoleSignIn/2026/06/30/
# → console-signin-filter-2-2026-06-30-05-39-40-xxxx.gz 800 B
import gzip, json
with gzip.open('console-signin-filter-2-xxxx.gz', 'rb') as f:
raw = f.read().decode('utf-8')
lines = [l for l in raw.splitlines() if l.strip()]
print(len(lines)) # → 2(2 回分のログイン)
for line in lines:
e = json.loads(line)
print(e['eventType'], e['eventName'], e['userIdentity']['userName'], e['eventTime'])
# → AwsConsoleSignIn ConsoleLogin hijiri 2026-06-30T05:37:13Z
# → AwsConsoleSignIn ConsoleLogin hijiri 2026-06-30T05:37:26Z
✅ 1 行 = 1 ログインイベント、AwsConsoleSignIn のみを確認。
Others(AwsApiCall 209 件)
lines = ... # 同様に展開
print(len(lines)) # → 209
from collections import Counter
print(Counter(json.loads(l)['eventType'] for l in lines))
# → {'AwsApiCall': 209}
✅ AwsConsoleSignIn の混入なし。
NDJSON の解析スクリプト
import gzip, json
def parse_ndjson_file(path: str) -> list[dict]:
"""Firehose + Lambda が出力した NDJSON ファイルを解析する"""
with gzip.open(path, 'rb') as f:
raw = f.read().decode('utf-8')
return [json.loads(line) for line in raw.splitlines() if line.strip()]
# AWSConsoleSignIn の例
signin_events = parse_ndjson_file('console-signin-filter-2-xxxx.gz')
for e in signin_events:
# e が直接 CloudTrail イベント(Records ラッパーなし)
print(e['eventType'], e['userIdentity']['userName'], e['eventTime'])
CloudTrail 直接出力({"Records": [...]} 形式)との違いは data['Records'] で配列を取り出す必要があるかどうかのみで、個々のイベント JSON の構造は同一。
⚠️ 注意点
| 注意点 | 内容 |
|---|---|
| CWL 遅延 | CloudTrail → CWL 転送に数秒〜数十秒の遅延がある |
| CONTROL_MESSAGE | CWL が送信するヘルスチェックメッセージ。サブスクリプションフィルタの作成・更新時や CWL の内部動作で不定期に発生する。Lambda で Dropped にするため S3 には書かれない |
| サブスクリプションフィルタ上限 | 1 ロググループあたり最大 2 つ。本構成で上限に達する |
| Firehose バッファリング | 最小 60 秒 or 1MB でフラッシュ。リアルタイム性が必要な場合は CWL → Lambda 直接呼び出しを検討 |
| ConsoleLogin のリージョン |
AwsConsoleSignIn は通常 us-east-1 で記録される(IAM がグローバルサービスのため、サインインエンドポイントが us-east-1 に固定)。IsMultiRegionTrail: true にすることで全リージョンを ap-northeast-1 に集約でき、漏れなく収集できる |