LoginSignup
5
3

More than 5 years have passed since last update.

[AWS Glue]クエリベースでCloudfrontログを parquet & JSTにETL(+パーティション分割)する手順

Last updated at Posted at 2019-05-07

リマインドとして、下記のユースケースへの対応手順を記載します。
(awsコンソールの使い方等の、細かい部分は割愛)

ユースケース

・CloudfrontログをAthenaで分析したい
・Cloudfrontログ内の時間はUTCなので、あらかじめJSTに変更しておきたい
・Athenaでフルスキャンが起きないよう、データを日付でパーティション分割したい(分割できるフォーマットに変換したい)
・過去に集計した分も、再度ETLしてパーティションごとに上書きできるようにしたい

手順例

手順例の前提

・Cloudfrontログのテーブル定義の データベース & テーブル名: cloudfront_access_logs.app_log
・ETL後のparquetの出力先: s3://some-bucket/ETL_cloudfront_logs
・Glue Crawler名: clf_parquet_test
・保存方法: overwrite
・ETLの指針: 極力Spark SQLで済ます(ローカルなどでETLの検証がしやすい)

1. Cloudfrontのログ機能を有効にする

2. Cloudfrontログのテーブル定義を作成

下記あたりを参考に
https://qiita.com/ytanaka3/items/ad5e7d96bc425ff4c843

3. ETL出力後のparquetデータ のテーブル定義を更新するGlue Crawlerを追加

スクリーンショット 2019-05-07 13.23.17.png

4. 2.をparquet & JST にETL + 最後に3. のCrawlerを呼び出すGlue Jobを作成

[スクリプト]

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudfront_access_logs", table_name = "app_log", transformation_ctx = "datasource0")

## Transform
df = datasource0.toDF()
df.createOrReplaceTempView('tmp')

# クエリベースでETL (ログ内の時間のUTCをJSTに変換 + パーティション分割用にyyyy-MM-ddのカラムを追加)
df_sql = spark.sql(
'''
SELECT
date_format( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST'), 'yyyy-MM-dd') AS `dt`,
unix_timestamp( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST')) AS `timestamp_JST`,
from_utc_timestamp(concat( request_date, " ", request_time ), 'JST') AS `date_JST`,
  `x_edge_location`,
  `sc_bytes`,
  `client_ip`,
  `cs_method`,
  `cs_host`,
  `cs_uri_stem`,
  `sc_status`,
  `cs_referer`,
  `user_agent`,
  `uri_query`,
  `cookie`,
  `x_edge_result_type`,
  `x_edge_request_id`,
  `x_host_header`,
  `cs_protocol`,
  `cs_bytes`,
  `time_taken`,
  `x_forwarded_for`,
  `ssl_protocol`,
  `ssl_cipher`,
  `x_edge_response_result_type`,
  `cs_protocol_version`
FROM tmp
WHERE
cs_method != '' or cs_method is NOT NULL
''')
#df_sql.show() #確認用

# 追加したカラムでpartition分割し、overwrite
df_sql.repartition(*["dt"]).write.partitionBy(["dt"]).mode("overwrite").parquet("s3://some-bucket/ETL_cloudfront_logs", compression="gzip")

# Crawlerの開始
aws_glue_client = boto3.client('glue', region_name='us-east-1')
aws_glue_client.start_crawler(Name='clf_parquet_test')

job.commit()

あとは用途に応じて、S3のログ保存期間の設定・クエリによる取り込み期間の指定などを行う。

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3