LoginSignup
0
1

[AWS] AthenaでCloudTrailのS3データイベントを分析するクエリ

Last updated at Posted at 2023-06-07

CloudTrailにて取得したS3データイベントをAthenaで分析するクエリについて、ひたすらメモる。

やりたいこと

  • S3データイベントをサクッと分析したい。
  • CloudTrailのマネジメントコンソールだと、データイベントは分析できないからなんとかしたい。
  • データイベントの中のネストされた情報を分析したい。

前提

  • CloudTrailにてS3データイベントを取得する設定になっている。(S3サーバアクセスログではないほう)
  • Athenaサービスを有効化済み。
  • Athenaの利用権限を保有したプリンシパルでマネジメントコンソールにサインインしている。
  • 今回のクエリは、Organizations Trailではない通常のTrailを前提に記載。(データイベントをOrganizations Trailでとるケースはあまりないかなと思慮)
  • SQLあまり綺麗に書けた気がしないので、良い書き方があればコメント欄で教えていただけるとありがたいです。

S3データイベントをAthenaで分析するクエリ

Athenaテーブルの作成

まずは、Athneaテーブルを作る。データイベントは大量にあるから、パーティション付きのテーブルで検索コストを下げる。

Athena クエリ - テーブルの作成
CREATE EXTERNAL TABLE cloudtrail_logs_dataevents(
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                username: STRING>,
            ec2RoleDelivery: STRING,
            webIdFederationData: MAP<STRING,STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING,
    tlsDetails STRUCT<
        tlsVersion: STRING,
        cipherSuite: STRING,
        clientProvidedHostHeader: STRING>
)
COMMENT 'CloudTrail table for data event query test'
PARTITIONED BY (year string, month string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<trailバケット>/AWSLogs/<AwsAccountID>/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');

Partition Projection
今回はPartition Projectionは使わないけど、いちいち検索のたびに検索範囲をALTER TABLEで指定したくない場合は、Partition Projectionを使うと便利かも。

↓安定のクラメソブログさん
【全リージョン対応】CloudTrailのログをAthenaのPartition Projectionなテーブルで作る

CloudTrailデータイベントレコードの分析クエリs

検索対象のパーティションを指定する。調査したい対象の月のパーティションを作成して、以後のWHEREで指定することで、検索対象が無闇に拡大しないようにする。

ALTER TABLE cloudtrail_logs_devents 
    ADD PARTITION 
        (year='2023',month='06') 
        location 's3://<バケット名>/AWSLogs/<AwsAccountId>/CloudTrail/ap-northeast-1/2023/06/';

Patitionの作成を失敗したら、やり直す前に失敗したPartitionをドロップしないとエラーになる

ALTER TABLE cloudtrail_logs_devents DROP PARTITION (year='2023',month='06');

とりあえず、どんな内容か見てみる.

SELECT * FROM cloudtrail_logs_dataevents limit 10;

S3バケットへのAPIアクセス状況を集計する。
以下の例では、json_extractを使って、requestparametersからバケット名を抽出する。
調査対象のバケットが100個以上ある場合には、LIMITを調整する。

SELECT 
    json_extract(requestparameters, '$.bucketName') as bucket,
    count(*)
FROM 
    cloudtrail_logs_dataevents
WHERE 
    year = '2023' and month ='06'
GROUP BY 
    json_extract(requestparameters, '$.bucketName')
LIMIT 100;

特定S3バケットへのアクセスプリンシパルを集計する

SELECT 
    useridentity.arn,
    count(*)
FROM 
    cloudtrail_logs_dataevents
WHERE
    year = '2023' and month ='06'
    and cast(json_extract(requestparameters, '$.bucketName') as varchar) = 'target_backet'
GROUP BY useridentity.arn
LIMIT 100;

プリンシパル毎のアクセス状況を集計する

SELECT 
    useridentity.arn,
    count(*)
FROM 
    cloudtrail_logs_dataevents
WHERE 
    year = '2023' and month ='06'
GROUP BY 
    useridentity.arn
LIMIT 100;

特定プリンシパルがアクセスしたS3バケットの一覧を取得する

SELECT 
    json_extract(requestparameters, '$.bucketName') as bucket,
    count(*)
FROM 
    cloudtrail_logs_dataevents
WHERE 
    year = '2023' and month ='06'
    and useridentity.arn = 'arn:aws:sts::<AwsAccountId>:assumed-role/<role-name>/<Session-Name>'
GROUP BY 
    json_extract(requestparameters, '$.bucketName')
LIMIT 100;

特定プリンシパルがアクセス元として使用したIPアドレスの一覧を取得する

SELECT 
    sourceipaddress, count(*)
FROM 
    cloudtrail_logs_dataevents
WHERE 
    year = '2023' and month ='06'
    and useridentity.arn = 'arn:aws:sts::<AwsAccountId>:assumed-role/<role-name>/<Session-Name>'
GROUP BY
    sourceipaddress
ORDER BY
    sourceipaddress
LIMIT 100;

時間あたりの特定APIの実行数を集計する。
以下の例は、PutObjectの実行数を時間単位で集計。

SELECT 
    DATE_FORMAT(date_parse(eventtime,'%Y-%m-%dT%H:%i:%sZ'), '%Y-%m-%d %H') AS hour, 
    count(*)
FROM 
    cloudtrail_logs_dataevents
WHERE 
    year = '2023' 
    and month ='06'
    and eventname ='PutObject'
GROUP BY 
    DATE_FORMAT(date_parse(eventtime,'%Y-%m-%dT%H:%i:%sZ'), '%Y-%m-%d %H')
ORDER BY 
    hour;
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1