リマインドとして、下記のユースケースへの対応手順を記載します。
(awsコンソールの使い方等の、細かい部分は割愛)
ユースケース
・CloudfrontログをAthenaで分析したい
・Cloudfrontログ内の時間はUTCなので、あらかじめJSTに変更しておきたい
・Athenaでフルスキャンが起きないよう、データを日付でパーティション分割したい(分割できるフォーマットに変換したい)
・過去に集計した分も、再度ETLしてパーティションごとに上書きできるようにしたい
手順例
手順例の前提
・Cloudfrontログのテーブル定義の データベース & テーブル名: cloudfront_access_logs.app_log
・ETL後のparquetの出力先: s3://some-bucket/ETL_cloudfront_logs
・Glue Crawler名: clf_parquet_test
・保存方法: overwrite
・ETLの指針: 極力Spark SQLで済ます(ローカルなどでETLの検証がしやすい)
1. Cloudfrontのログ機能を有効にする
2. Cloudfrontログのテーブル定義を作成
下記あたりを参考に
https://qiita.com/ytanaka3/items/ad5e7d96bc425ff4c843
3. ETL出力後のparquetデータ のテーブル定義を更新するGlue Crawlerを追加

4. 2.をparquet & JST にETL + 最後に3. のCrawlerを呼び出すGlue Jobを作成
[スクリプト]
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudfront_access_logs", table_name = "app_log", transformation_ctx = "datasource0")
## Transform
df = datasource0.toDF()
df.createOrReplaceTempView('tmp')
# クエリベースでETL (ログ内の時間のUTCをJSTに変換 + パーティション分割用にyyyy-MM-ddのカラムを追加)
df_sql = spark.sql(
'''
SELECT
date_format( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST'), 'yyyy-MM-dd') AS `dt`,
unix_timestamp( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST')) AS `timestamp_JST`,
from_utc_timestamp(concat( request_date, " ", request_time ), 'JST') AS `date_JST`,
`x_edge_location`,
`sc_bytes`,
`client_ip`,
`cs_method`,
`cs_host`,
`cs_uri_stem`,
`sc_status`,
`cs_referer`,
`user_agent`,
`uri_query`,
`cookie`,
`x_edge_result_type`,
`x_edge_request_id`,
`x_host_header`,
`cs_protocol`,
`cs_bytes`,
`time_taken`,
`x_forwarded_for`,
`ssl_protocol`,
`ssl_cipher`,
`x_edge_response_result_type`,
`cs_protocol_version`
FROM tmp
WHERE
cs_method != '' or cs_method is NOT NULL
''')
# df_sql.show() #確認用
# 追加したカラムでpartition分割し、overwrite
df_sql.repartition(*["dt"]).write.partitionBy(["dt"]).mode("overwrite").parquet("s3://some-bucket/ETL_cloudfront_logs", compression="gzip")
# Crawlerの開始
aws_glue_client = boto3.client('glue', region_name='us-east-1')
aws_glue_client.start_crawler(Name='clf_parquet_test')
job.commit()
あとは用途に応じて、S3のログ保存期間の設定・クエリによる取り込み期間の指定などを行う。