今回の課題
AWSのGlueを使用して、
BigQuery ExportでBigQueryにエクスポートしたGAのログ生データをS3までエクスポートする。
BigQuery Exportはログデータのエクスポートに72時間の遅延があるため、
遅れてきたログデータまでS3にきちんと入れる方法を考えた。
前提
本記事では、Glueでどのようなコードを実行して上記の課題を実現したかを書くので、
以下は既に準備できているものとします。
- Glueに対して、必要な権限(S3を操作するための権限など)の付与
- ログデータのエクスポート先のS3バケットの用意
- BigQueryExportでのGAのログデータのBigQueryへのエクスポート設定
工夫したところ
Googleの公式ドキュメントにも記載がありますが、
BigQuery ExportでBigQueryに、GAのログデータをエクスポートすると、
エクスポート先のテーブルでは、最初のエクスポートから72時間以内の間で、遅延したログデータを入れ直すために何度か更新が行われます。
日次エクスポートのテーブル(events_YYYYMMDD)は、その日のイベントがすべて収集された後に作成されます。アナリティクスでは、テーブルの日付から最大 72 時間日次テーブルが更新されます。
-- 引用:https://support.google.com/analytics/answer/9358801?hl=ja
そちらに対応するために、EVENT_DATE
カラムが1~4日前のBigQueryのデータを、
毎日S3から削除したうえで入れ直すようにしました。
例えば、2023年9月29日からの3日間だと、以下のように処理を行います。
9月29日からの3日間のイメージ
以下のように、毎日データの削除とデータのロードを繰り返して、
遅延分のログデータも全てS3に入るようにします。
9月29日
- S3にある
EVENT_DATE
が、20230924
~20230928
のデータを削除。 - BigQueryにある
EVENT_DATE
が、20230924
~20230928
のデータをS3に入れる。
9月30日
- S3にある
EVENT_DATE
が、20230925
~20230929
のデータを削除。 - BigQueryにある
EVENT_DATE
が、20230925
~20230929
のデータをS3に入れる。
10月1日
- S3にある
EVENT_DATE
が、20230926
~20230930
のデータを削除。 - BigQueryにある
EVENT_DATE
が、20230926
~20230930
のデータをS3に入れる。
作成したコード
AWS Glueで、以下を毎日実行します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime, timedelta
# ジョブの初期設定を行う
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# 変数を用意
current_date = datetime.now()
s3_path = "s3://yamaguchi-glue-test/"
# Glueのjobを初期化
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# BigQuery Exportの最大72時間のログデータの遅延に対応するために、EVENT_DATEが1~4日前のS3内のオブジェクトを毎日削除する
for day_offset in range(4, 0, -1):
date = current_date - timedelta(days=day_offset)
dir_name = date.strftime("%Y%m%d")
s3_path = f"s3://yamaguchi-glue-test/EVENT_DATE={dir_name}/"
# ディレクトリ内のファイルを削除
glueContext.purge_s3_path(s3_path, options={"retentionPeriod": 0}, transformation_ctx="")
# BigQueryのデータを取得する
GoogleBigQueryConnector0242forAWSGlue30_node1695632489831 = glueContext.create_dynamic_frame.from_options(
connection_type="marketplace.spark",
connection_options={
"parentProject": "project-name",
"query": "SELECT EVENT_DATE,EVENT_TIMESTAMP,EVENT_NAME,EVENT_PARAMS,EVENT_PREVIOUS_TIMESTAMP... FROM `analytics_*********.events_*` WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 4 DAY)) AND FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))",
"connection_options": '{"path": "$outpath", "partitionKeys": ["type"]}',
"materializationDataset": "analytics_*********",
"viewsEnabled": "true",
"maxparallelism": "8",
"connectionName": "glue-yamaguchi-test",
},
transformation_ctx="GoogleBigQueryConnector0242forAWSGlue30_node1695632489831",
)
# 取得したBigQueryのログデータをS3に抽出する。
AmazonS3_node1695632829164 = glueContext.write_dynamic_frame.from_options(
frame=GoogleBigQueryConnector0242forAWSGlue30_node1695632489831,
connection_type="s3",
format="json",
connection_options={
"path": "s3://yamaguchi-glue-test",
"compression": "gzip",
"partitionKeys": ["EVENT_DATE"],
},
transformation_ctx="AmazonS3_node1695632829164",
)
job.commit()
↓こんな感じでS3にEVENT_DATE
毎に日別でディレクトリに出力されます。
コードについての解説
自分が実装時につまずいた箇所を解説として、書き起こしました。
1)毎日EVENT_DATEが1~4日前のディレクトリを削除
purge_s3_path
メソッドで、指定したディレクトリ内を、中のファイル毎全削除することができます。
options
引数のretentionPeriod
を削除したい感覚よりも短く指定しないと、
デフォルトだと7日間削除されないようになっているので注意です。
(ドキュメントの意味が少し分かりづらく、消したいディレクトリが消えなくて長時間つまづいた。)
# BigQuery Exportの最大72時間のログデータの遅延に対応するために、EVENT_DATEが1~4日前のS3内のオブジェクトを毎日削除する
for day_offset in range(4, 0, -1):
date = current_date - timedelta(days=day_offset)
dir_name = date.strftime("%Y%m%d")
s3_path = f"s3://yamaguchi-glue-test/EVENT_DATE={dir_name}/"
# ディレクトリ内のファイルを削除
glueContext.purge_s3_path(s3_path, options={"retentionPeriod": 0}, transformation_ctx="")
2)EVENT_DATE毎にディレクトリを作成する
この部分でEVENT_DATE毎にディレクトリを作成する処理をしています。
"partitionKeys": ["EVENT_DATE"]
の部分で、BigQueryのEVENT_DATE毎にディレクトリが分けられるようになります。
# 取得したBigQueryのログデータをS3に抽出する。
AmazonS3_node1695632829164 = glueContext.write_dynamic_frame.from_options(
frame=GoogleBigQueryConnector0242forAWSGlue30_node1695632489831,
connection_type="s3",
format="json",
connection_options={
"path": "s3://yamaguchi-glue-test",
"compression": "gzip",
"partitionKeys": ["EVENT_DATE"],
},
transformation_ctx="AmazonS3_node1695632829164",
)
まとめ
AWS Glue上のコードだけで、S3のディレクトリを消した上で、BigQuery→S3のデータのエクスポートができたので便利だなと感じた。
また、BigQueryに入れたGAのログデータが大容量にも関わらず、Glueで転送をすると処理が早くて便利だなと感じた。