0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CloudTrail → CWL → Firehose + Lambda で S3 に 1 行 1 イベントで保存する(AwsConsoleSignIn / Others フォルダ分離)

0
Last updated at Posted at 2026-06-30

CloudTrail → CWL → Firehose + Lambda で S3 に 1 行 1 イベントで保存する(AwsConsoleSignIn / Others フォルダ分離)

はじめに

前の記事 では CloudTrail → CWL → Firehose → S3 という構成でログを振り分けた。
しかし CWL → Firehose をそのまま使うと S3 のファイルが 1 行に複数イベントが連結された状態になり、json.loads() で直接パースできない問題がある。

今回は Firehose の Lambda 変換を使ってこの問題を解決し、以下を同時に実現する。

  • AwsConsoleSignIn イベントと それ以外(AwsApiCall 等)を S3 フォルダで分離
  • S3 出力形式を NDJSON(1 行 1 イベント) にする
  • CloudFormation で一括構築する

CloudTrail 直接出力との比較

CloudTrail の Trail を作成すると、S3 に直接ログが出力される。このファイルと今回の構成(Firehose 経由)との フォーマットの違いを実際に確認した。

CloudTrail 直接出力(AWSLogs/<AccountId>/CloudTrail/ 配下)

# GZIP を 1 回展開して得られる JSON(整形表示)
{
  "Records": [
    {
      "eventVersion": "1.11",
      "eventType": "AwsApiCall",
      "eventName": "DescribeRegions",
      "eventTime": "2026-06-30T07:01:08Z",
      "eventSource": "ec2.amazonaws.com",
      ...
    },
    {
      "eventVersion": "1.11",
      "eventType": "AwsApiCall",
      "eventName": "GetRole",
      "eventTime": "2026-06-30T07:01:09Z",
      ...
    }
  ]
}
  • 1 ファイル = {"Records": [...]} の単一 JSON オブジェクト(N イベントを配列に格納)
  • GZIP は 1 層

Firehose + Lambda(本記事の構成)

# GZIP を 1 回展開して得られるテキスト(NDJSON)
{"eventVersion":"1.11","eventType":"AwsApiCall","eventName":"CreateRole","eventTime":"2026-06-30T05:27:12Z",...}
{"eventVersion":"1.11","eventType":"AwsApiCall","eventName":"GetRole","eventTime":"2026-06-30T05:27:30Z",...}
{"eventVersion":"1.11","eventType":"AwsConsoleSignIn","eventName":"ConsoleLogin","eventTime":"2026-06-30T05:37:13Z",...}
  • 1 行 = 1 イベント(NDJSON 形式)
  • GZIP は 1 層

個々のイベント JSON の構造

フィールド構造は直接出力と全く同じ。違いはラッパーのみ。

項目 CloudTrail 直接出力 Firehose + Lambda(本記事)
ファイル構造 {"Records": [event1, event2, ...]} event1\nevent2\n...
パース方法 data['Records'] で配列取得 json.loads(line) で 1 行ずつ
GZIP 層数 1 層 1 層
イベント JSON のフィールド CloudTrail スキーマ 同一Records ラッパーなし)
EventBridge エンベロープ なし なし
リアルタイム性 ~15 分遅延 ~1〜数分(CWL + Firehose バッファ経由)

sessionCredentialFromConsoletlsDetails などのオプションフィールドもそのまま含まれる。


アーキテクチャ

CloudTrail Trail(マルチリージョン・全リージョン集約)
  → S3(直接出力: AWSLogs/...)      ← {"Records":[...]} 形式
  → CloudWatch Logs
      ├── サブスクリプションフィルタ① { $.eventType = "AwsConsoleSignIn" }
      │     → Firehose(Lambda: DATA_MESSAGE → NDJSON)
      │         → S3: AWSConsoleSignIn/yyyy/MM/dd/  ← 1 行 1 イベント
      └── サブスクリプションフィルタ② { $.eventType != "AwsConsoleSignIn" }
            → Firehose(Lambda: DATA_MESSAGE → NDJSON)
                → S3: Others/yyyy/MM/dd/             ← 1 行 1 イベント

CWL は 1 ロググループあたり最大 2 つのサブスクリプションフィルタを設定できる。


CWL → Firehose でなぜ NDJSON にならないのか

CWL は Firehose に送る際、複数のイベントを DATA_MESSAGE という形式にバッチしてまとめて送る。

{
  "messageType": "DATA_MESSAGE",
  "logEvents": [
    { "message": "{...event1 JSON...}" },
    { "message": "{...event2 JSON...}" }
  ]
}

Firehose はこの DATA_MESSAGE を複数受け取ると、区切り文字なしで連結して S3 に書く。
さらに CWL 自身が GZIP 圧縮するため、Firehose の GZIP 圧縮と合わせて 二重 GZIP になる。

Lambda 変換でこれを解決する。


Lambda: DATA_MESSAGE → NDJSON

# cloudtrail-cwl-to-ndjson
import base64
import gzip
import io
import json

def handler(event, context):
    output = []
    for record in event['records']:
        # base64 デコード → CWL の GZIP 展開 → JSON パース
        compressed = base64.b64decode(record['data'])
        with gzip.GzipFile(fileobj=io.BytesIO(compressed)) as gz:
            payload = json.loads(gz.read())

        # CONTROL_MESSAGE(設定変更時のヘルスチェック)は書き込まない
        if payload['messageType'] != 'DATA_MESSAGE':
            output.append({
                'recordId': record['recordId'],
                'result': 'Dropped',
                'data': record['data'],
            })
            continue

        # logEvents の各要素を \n で結合して NDJSON にする
        # ev['message'] が CloudTrail イベントの JSON 文字列
        lines = '\n'.join(ev['message'] for ev in payload['logEvents']) + '\n'
        output.append({
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(lines.encode()).decode(),
        })

    return {'records': output}

処理の流れ:

Firehose から受け取るデータ
  base64( gzip( DATA_MESSAGE JSON ) )
      ↓ base64 デコード
  gzip( DATA_MESSAGE JSON )
      ↓ gzip 展開
  {"messageType":"DATA_MESSAGE","logEvents":[{"message":"{event1}"},{"message":"{event2}"}]}
      ↓ logEvents を \n で結合
  {event1}\n{event2}\n
      ↓ base64 エンコード
  Firehose へ返す(Firehose が GZIP 圧縮して S3 へ書く)

CloudFormation テンプレート

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudTrail → CWL → Firehose + Lambda で NDJSON 形式で S3 に保存する

Resources:

  # S3: CloudTrail 直接出力バケット
  TrailBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  TrailBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref TrailBucket
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Sid: AWSCloudTrailAclCheck
            Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: s3:GetBucketAcl
            Resource: !GetAtt TrailBucket.Arn
          - Sid: AWSCloudTrailWrite
            Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: s3:PutObject
            Resource: !Sub ${TrailBucket.Arn}/AWSLogs/${AWS::AccountId}/*
            Condition:
              StringEquals:
                s3:x-amz-acl: bucket-owner-full-control

  # CloudWatch Logs グループ
  TrailLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/cloudtrail/management-events
      RetentionInDays: 30

  # IAM: CloudTrail → CWL
  CloudTrailCWLRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: CWLAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: !GetAtt TrailLogGroup.Arn

  # CloudTrail Trail(マルチリージョン)
  ManagementTrail:
    Type: AWS::CloudTrail::Trail
    DependsOn: TrailBucketPolicy
    Properties:
      TrailName: management-events
      S3BucketName: !Ref TrailBucket
      IsLogging: true
      IsMultiRegionTrail: true        # ← 全リージョンを ap-northeast-1 に集約
      IncludeGlobalServiceEvents: true
      EnableLogFileValidation: true
      CloudWatchLogsLogGroupArn: !GetAtt TrailLogGroup.Arn
      CloudWatchLogsRoleArn: !GetAtt CloudTrailCWLRole.Arn
      EventSelectors:
        - ReadWriteType: All
          IncludeManagementEvents: true

  # S3: フィルタ済みログバケット(NDJSON)
  SigninBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub cloudtrail-filtered-${AWS::AccountId}
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  # Lambda: CWL DATA_MESSAGE → NDJSON 変換
  CwlToNdjsonRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  CwlToNdjson:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: cloudtrail-cwl-to-ndjson
      Runtime: python3.12
      Handler: index.handler
      Role: !GetAtt CwlToNdjsonRole.Arn
      Timeout: 60
      Code:
        ZipFile: |
          import base64, gzip, io, json
          def handler(event, context):
              output = []
              for record in event['records']:
                  compressed = base64.b64decode(record['data'])
                  with gzip.GzipFile(fileobj=io.BytesIO(compressed)) as gz:
                      payload = json.loads(gz.read())
                  if payload['messageType'] != 'DATA_MESSAGE':
                      output.append({'recordId': record['recordId'], 'result': 'Dropped', 'data': record['data']})
                      continue
                  lines = '\n'.join(ev['message'] for ev in payload['logEvents']) + '\n'
                  output.append({'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(lines.encode()).decode()})
              return {'records': output}

  # IAM: Firehose → S3 + Lambda
  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: S3Access
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt SigninBucket.Arn
                  - !Sub ${SigninBucket.Arn}/*
        - PolicyName: LambdaAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !GetAtt CwlToNdjson.Arn

  # Firehose①: AwsConsoleSignIn → AWSConsoleSignIn/
  SigninDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: console-signin-filter
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt SigninBucket.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
        Prefix: 'AWSConsoleSignIn/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
        ErrorOutputPrefix: 'errors/signin/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 1
        CompressionFormat: GZIP
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !GetAtt CwlToNdjson.Arn

  # Firehose②: AwsConsoleSignIn 以外 → Others/
  OthersDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: console-others-filter
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt SigninBucket.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
        Prefix: 'Others/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
        ErrorOutputPrefix: 'errors/others/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/'
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 1
        CompressionFormat: GZIP
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !GetAtt CwlToNdjson.Arn

  # IAM: CWL → Firehose
  CWLToFirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: !Sub logs.${AWS::Region}.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: FirehoseAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - firehose:PutRecord
                  - firehose:PutRecordBatch
                Resource:
                  - !GetAtt SigninDeliveryStream.Arn
                  - !GetAtt OthersDeliveryStream.Arn

  # CWL サブスクリプションフィルタ①: AwsConsoleSignIn のみ
  ConsoleLoginFilter:
    Type: AWS::Logs::SubscriptionFilter
    DependsOn: ManagementTrail
    Properties:
      LogGroupName: !Ref TrailLogGroup
      FilterName: ConsoleLoginOnly
      FilterPattern: '{ $.eventType = "AwsConsoleSignIn" }'
      DestinationArn: !GetAtt SigninDeliveryStream.Arn
      RoleArn: !GetAtt CWLToFirehoseRole.Arn

  # CWL サブスクリプションフィルタ②: AwsConsoleSignIn 以外
  OthersFilter:
    Type: AWS::Logs::SubscriptionFilter
    DependsOn: ManagementTrail
    Properties:
      LogGroupName: !Ref TrailLogGroup
      FilterName: OthersOnly
      FilterPattern: '{ $.eventType != "AwsConsoleSignIn" }'
      DestinationArn: !GetAtt OthersDeliveryStream.Arn
      RoleArn: !GetAtt CWLToFirehoseRole.Arn

Outputs:
  TrailBucketName:
    Description: CloudTrail 全管理イベント S3 バケット(直接出力)
    Value: !Ref TrailBucket
  SigninBucketName:
    Description: AwsConsoleSignIn / Others 振り分け済みログ S3 バケット(NDJSON)
    Value: !Ref SigninBucket

デプロイ

aws cloudformation deploy \
  --template-file cloudtrail-cwl-firehose.yaml \
  --stack-name CloudTrailCWLFirehoseStack \
  --capabilities CAPABILITY_IAM \
  --region ap-northeast-1

検証結果

AwsConsoleSignIn(コンソールログイン 2 回)

aws s3 ls s3://cloudtrail-filtered-<AccountId>/AWSConsoleSignIn/2026/06/30/
# → console-signin-filter-2-2026-06-30-05-39-40-xxxx.gz  800 B
import gzip, json

with gzip.open('console-signin-filter-2-xxxx.gz', 'rb') as f:
    raw = f.read().decode('utf-8')

lines = [l for l in raw.splitlines() if l.strip()]
print(len(lines))  # → 2(2 回分のログイン)

for line in lines:
    e = json.loads(line)
    print(e['eventType'], e['eventName'], e['userIdentity']['userName'], e['eventTime'])
# → AwsConsoleSignIn ConsoleLogin hijiri 2026-06-30T05:37:13Z
# → AwsConsoleSignIn ConsoleLogin hijiri 2026-06-30T05:37:26Z

✅ 1 行 = 1 ログインイベントAwsConsoleSignIn のみを確認。

Others(AwsApiCall 209 件)

lines = ...  # 同様に展開
print(len(lines))  # → 209

from collections import Counter
print(Counter(json.loads(l)['eventType'] for l in lines))
# → {'AwsApiCall': 209}

✅ AwsConsoleSignIn の混入なし


NDJSON の解析スクリプト

import gzip, json

def parse_ndjson_file(path: str) -> list[dict]:
    """Firehose + Lambda が出力した NDJSON ファイルを解析する"""
    with gzip.open(path, 'rb') as f:
        raw = f.read().decode('utf-8')
    return [json.loads(line) for line in raw.splitlines() if line.strip()]

# AWSConsoleSignIn の例
signin_events = parse_ndjson_file('console-signin-filter-2-xxxx.gz')
for e in signin_events:
    # e が直接 CloudTrail イベント(Records ラッパーなし)
    print(e['eventType'], e['userIdentity']['userName'], e['eventTime'])

CloudTrail 直接出力({"Records": [...]} 形式)との違いは data['Records'] で配列を取り出す必要があるかどうかのみで、個々のイベント JSON の構造は同一


⚠️ 注意点

注意点 内容
CWL 遅延 CloudTrail → CWL 転送に数秒〜数十秒の遅延がある
CONTROL_MESSAGE CWL が送信するヘルスチェックメッセージ。サブスクリプションフィルタの作成・更新時や CWL の内部動作で不定期に発生する。Lambda で Dropped にするため S3 には書かれない
サブスクリプションフィルタ上限 1 ロググループあたり最大 2 つ。本構成で上限に達する
Firehose バッファリング 最小 60 秒 or 1MB でフラッシュ。リアルタイム性が必要な場合は CWL → Lambda 直接呼び出しを検討
ConsoleLogin のリージョン AwsConsoleSignIn は通常 us-east-1 で記録される(IAM がグローバルサービスのため、サインインエンドポイントが us-east-1 に固定)。IsMultiRegionTrail: true にすることで全リージョンを ap-northeast-1 に集約でき、漏れなく収集できる

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?