AWS
S3
CloudFront
lambda
Athena

CloudFrontのアクセスログを高速に集計する

CloudFrontのアクセスログ集計

サービスを構築するときにCDNであるCloudFrontを前面においてインフラ構築することが多いと思います。
この構成の場合、全てのアクセスはCloudFrontを経由するため、
CloudFrontのログ解析が極めて重要になります。(アクセスログがCloudFrontに集約される)
AWS Athenaを使うことで出力されたアクセスログファイルに対してSQLでのデータ集計が可能になります。
Athenaに関してパーティショニングがほぼ必須なのですが、
CloudFront+Athenaのパーティショニング構成に関して今ひとつまとまっている情報がなかったので
この記事にまとめておきます。
次図のような構成を想定しています。

スクリーンショット 2018-08-10 16.12.30.png

CloudFrontにてアクセスログファイルをS3バケットに格納するように設定します。
スクリーンショット 2018-08-10 15.04.07.png

この例の場合、S3のlogs-cloudfront/test.com/以下にCloudFrontアクセスログが保存されるようになります。

Athenaのパーティション作成用にログのS3格納パスを変更する

S3のログを集計するにはAthenaを使うことでログファイルに対してSQL文を実行することができます。
ただし、そのままのログに対してAthenaテーブルを作成すると膨大なログファイルをスキャンすることになり、
検索の時間がかかる上にスキャンしたデータ量分が課金対象となり、とてもお金がかかります。
そこでパーティション付きAthenaテーブルを作成し、スキャン量を減らし、お財布にも時間にも優しいようにする必要があります。

AthenaテーブルのパーティショニングにはHive準拠とそうでないフォーマットの2通りあります。
Hive準拠でない場合、Partitionを作成するのがかなり手間なのでHive準拠形式でAthenaテーブルを作成します。
参考:Amazon Athenaのパーティションを理解する #reinvent

AthenaでHive準拠のPartition付きテーブルを作成するには
前処理としてS3上のログファイルの格納パスを変更する必要があります。
今回は日付でPartitionを切りたいため、dt=YYYY-MM-DDのようなパスを作成します。

次のようなLambdaを作成します。
E1EDGUDFSENDDPはCloudFront IDです。
LambdaのランタイムはNode v8.10を使用しています。
logs-cloudfrontバケットからHive準拠パスに則ったlogs-cloudfront-partitionバケットにログファイルをコピーします。

'use strict';

const aws = require('aws-sdk');
const s3 = new aws.S3({ apiVersion: '2006-03-01' });

exports.handler = (event, context, callback) => {
    const source_bucket = event.Records[0].s3.bucket.name;
    const source_key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));

    // E1EDGUDFSENDDP.2006-01-02-00.XXXXXXXX.gzというファイルを、E1EDGUDFSENDDP/dt=2006-01-02/00-XXXXXXXX.gzに変換している
    const key_components = source_key.split(/[-\.]/);
    const extension = key_components.pop();
    const filename = key_components.pop();
    const hour = key_components.pop();
    const day = key_components.pop();
    const month = key_components.pop();
    const year = key_components.pop();
    key_components.push(`dt=${year}-${month}-${day}`);
    key_components.push(hour + '-' + filename + "." + extension);
    const renamed_key = key_components.join("/");

    const params = {
        Bucket: "logs-cloudfront-partition", 
        CopySource: source_bucket + "/" + source_key, 
        Key: renamed_key,
    };
    s3.copyObject(params, function(err, data) {
        if (err) console.log(err, err.stack); // an error occurred
    });
};

Lambdaの実行IAMロールにはS3を操作するためにAmazonS3FullAccessのロールを追加する必要があります。
スクリーンショット 2018-08-10 15.31.02.png

以下のLambdaをS3のトリガーで実行させるように設定します。
Designerの左側のリストからS3を選択してトリガーを作成します。
バケットにはlogs-cloudfrontを指定します。
イベントタイプはログファイルが作成されたタイミングでLambdaを実行させたいので「オブジェクトの作成(すべて)」を指定します。
トリガーの有効化にはチェックを入れておきます。

スクリーンショット 2018-08-10 15.36.27.png

次のようなDesignerになれば設定完了です。
スクリーンショット 2018-08-10 15.35.13.png

Partition付きCloudFrontログ用のAthenaテーブルを作成

Query EditorにてPartition付きのCloudFrontログ用のAthenaテーブルを作成します。
参考:Amazon Athena で CloudFront のアクセスログを集計する
LOCATIONのE1EDGUDFSENDDPはCloudFront IDなのでお使いの環境のものに代えてください
LOCATION末尾の/は必須です。

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_log_partition (
  `date` date,
  `time` string,
  x_edge_location string,
  sc_bytes int,
  c_ip string,
  cs_method string,
  cs_host string,
  cs_uri_stem string,
  sc_status int,
  cs_referer string,
  cs_user_agent string,
  cs_uri_query string,
  cs_cookie string,
  x_edge_result_type string,
  x_edge_request_id string,
  x_host_header string,
  cs_protocol string,
  cs_bytes int,
  time_taken int,
  x_forwarded_for string,
  ssl_protocol string,
  ssl_cipher string,
  x_edge_response_result_type string,
  cs_protocol_version string
)
PARTITIONED BY  ( 
  dt string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES
 (
  'serialization.format' = '\t',
  'input.regex' = '\t'
 )
LOCATION 's3://logs-cloudfront-partition/test/com/E1EDGUDFSENDDP/';

各フィールドは次のようなデータが格納されています。

フィールド 説明
date イベントが発生した日付。yyyy-mm-dd形式。UTC。
time CloudFrontがリクエストへの対応を完了した時刻(UTC)。
x_edge_location リクエストを処理したエッジロケーション。3文字コードと割り当てられた数字で識別される。
sc_bytes CloudFront がリクエストに応答してビューワーに渡したデータの総バイト数
c_ip リクエストを送信したクライアントの IP アドレス。
cs_method リクエスト時のHTTPメソッド。
cs_host CloudFront ディストリビューションのドメイン名。
cs_uri_stem パスおよびオブジェクトを識別する URI 。
sc_status CloudFrontが応答したHTTP ステータスコード。応答前にクライアントが切断した場合は"000"を記録。
cs_referer リクエスト元のドメインの名前。
user_agent リクエスト内の User-Agent ヘッダーの値。
uri_query URI のクエリ文字列の部分 (ある場合)。ない場合には"-"を記録。
cookie リクエスト内の Cookie ヘッダー。ない場合には"-"を記録。
x_edge_result_type エッジロケーションへのCloudFrontのレスポンスの分類。例えば以下のようなものがある
・Hit:エッジキャッシュから応答
・RefreshHit:キャッシュの有効期限がきれていたため、オリジンに最新バージョンを問い合わせ
・Miss:リクエストをオリジンサーバに転送し、結果を応答
x_edge_request_id 要求を一意に識別する文字列。
x_host_header ビューワーによってこのリクエストの Host ヘッダーに追加された値。
cs_protocol リクエストのプロトコル。httpまたはhttps
cs_bytes リクエストに組み込まれたデータのバイト数
time_taken CloudFrontエッジサーバがリクエストを受け取ってから、レスポンスの最終バイトをエッジサーバーの出力キューに書き込むまでの秒数。
x_forwarded_for ビューワーがリクエストを送るのに HTTPプロキシやロードバランサーを使った場合、client_ipの値は、プロキシあるいはロードバランサーの IP アドレスとなる。その場合のリクエスト元のビューワーの IP アドレス。
ssl_protocol cs_protocolがhttpsの場合のSSLプロトコル。
ssl_cipher cs_protocolがhttpsの場合のSSL 暗号。
x_edge_response_result_type CloudFront がレスポンスをビューワーに返す直前に行った分類。x_edge_result_typeも参照。
cs_protocol_version リクエストで指定した HTTP バージョン。

以下のクエリ部でdtフィールドをPartitionに指定させることが必要です。

PARTITIONED BY  ( 
  dt string
)

実行するとcloudfront_log_partition(Partitioned)のテーブルが作成されます
(今回はlogs Database以下で実行しています)
スクリーンショット 2018-08-10 16.39.56.png

テーブルが作成されたら、次のクエリでPartitionのロードを行います。

MSCK REPAIR TABLE cloudfront_log_partition;

where句にてdtフィールドでのクエリ実行ができます。(パーティション指定検索)

SELECT * FROM "logs"."cloudfront_log_partition" where dt>='2018-08-10' limit 10;

定期的に自動的にパーティションを切る

ログファイルが追加されたら定期的にパーティションを再構築する必要があります。(MSCK REPAIR TABLEを実行する)
参考:lambda(python)で定期的にAthenaのMSCK REPAIR TABLEを行う

次の想定で自動的に指定のAthenaテーブルのパーティションを再構築します。
・databases: logs
・table: cloudfront_log_partition
・query_result_bucket: aws-athena-query-results-321498486874-ap-northeast-1(Athena作成時に自動生成されたBucket)

Lambdaで定期実行する処理を記述します。
ランタイム:NodeJS 8.10

const AWS = require('aws-sdk');
const athena = new AWS.Athena({apiVersion: '2017-05-18'});

exports.handler = (event, context, callback) => {
    const params = {
        QueryString: 'MSCK REPAIR TABLE cloudfront_log_partition;', // Athenaのクエリ
        QueryExecutionContext: {
            'Database': 'logs' // AthenaのDatabase名
        },
        ResultConfiguration: { 
            OutputLocation: 's3://aws-athena-query-results-321498486874-ap-northeast-1' // Athenaのクエリ実行ログが保存されるS3バケット(Athena作成時に自動生成) 
        }
    };
    athena.startQueryExecution(params, function(err, data) {
        if (err) return callback(null,error)
        callback(null, {
            statusCode: 200,
            headers: {
                "Access-Control-Allow-Origin" : "*" 
            },
            body :data.QueryExecutionId
        });
    });
};

IAMでLambdaロールにS3とAthenaの権限を付与します。
スクリーンショット 2018-08-15 16.27.26.png

CloudWatch Eventトリガーを追加して1日おきに実行するようにします。
スクリーンショット 2018-08-15 16.28.50.png

保存します。
スクリーンショット 2018-08-15 16.31.30.png

Google Botのアクセスログを集計する

試しにGoogle Botのアクセスログを集計してみます。
Google のクローラ(ユーザー エージェント)を参考に
UserAgentでアクセスしてきているGoogle Botを検出します。
1週間前分から集計するには、current_dateとinterval、date_formatを併用します。
Date and Time Functions and Operators

SELECT REGEXP_EXTRACT(cs_uri_stem, '/[^/]*/?') as page, date, cs_user_agent
FROM logs.cloudfront_log_partition
WHERE cs_user_agent LIKE '%Googlebot%' AND dt >= date_format((current_date - interval '7' day), '%Y-%m-%d')

スクリーンショット 2018-08-10 16.30.34.png