0
0

【データ基盤構築/AWS Glue】BigQuery Exportのログデータの遅延に対応したBigQuery→S3のデータパイプラインの作成

Last updated at Posted at 2023-10-02

今回の課題

AWSのGlueを使用して、
BigQuery ExportでBigQueryにエクスポートしたGAのログ生データをS3までエクスポートする。

BigQuery Exportはログデータのエクスポートに72時間の遅延があるため、
遅れてきたログデータまでS3にきちんと入れる方法を考えた。

前提

本記事では、Glueでどのようなコードを実行して上記の課題を実現したかを書くので、
以下は既に準備できているものとします。

  • Glueに対して、必要な権限(S3を操作するための権限など)の付与
  • ログデータのエクスポート先のS3バケットの用意
  • BigQueryExportでのGAのログデータのBigQueryへのエクスポート設定

工夫したところ

Googleの公式ドキュメントにも記載がありますが、
BigQuery ExportでBigQueryに、GAのログデータをエクスポートすると、
エクスポート先のテーブルでは、最初のエクスポートから72時間以内の間で、遅延したログデータを入れ直すために何度か更新が行われます。

日次エクスポートのテーブル(events_YYYYMMDD)は、その日のイベントがすべて収集された後に作成されます。アナリティクスでは、テーブルの日付から最大 72 時間日次テーブルが更新されます。
-- 引用:https://support.google.com/analytics/answer/9358801?hl=ja

そちらに対応するために、EVENT_DATEカラムが1~4日前のBigQueryのデータを、
毎日S3から削除したうえで入れ直すようにしました。
例えば、2023年9月29日からの3日間だと、以下のように処理を行います。

9月29日からの3日間のイメージ

以下のように、毎日データの削除とデータのロードを繰り返して、
遅延分のログデータも全てS3に入るようにします。

9月29日

  1. S3にあるEVENT_DATEが、20230924~20230928のデータを削除。
  2. BigQueryにあるEVENT_DATEが、20230924~20230928のデータをS3に入れる。

9月30日

  1. S3にあるEVENT_DATEが、20230925~20230929のデータを削除。
  2. BigQueryにあるEVENT_DATEが、20230925~20230929のデータをS3に入れる。

10月1日

  1. S3にあるEVENT_DATEが、20230926~20230930のデータを削除。
  2. BigQueryにあるEVENT_DATEが、20230926~20230930のデータをS3に入れる。

作成したコード

AWS Glueで、以下を毎日実行します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime, timedelta

# ジョブの初期設定を行う
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# 変数を用意
current_date = datetime.now()
s3_path = "s3://yamaguchi-glue-test/"

# Glueのjobを初期化
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# BigQuery Exportの最大72時間のログデータの遅延に対応するために、EVENT_DATEが1~4日前のS3内のオブジェクトを毎日削除する
for day_offset in range(4, 0, -1):
    date = current_date - timedelta(days=day_offset)
    dir_name = date.strftime("%Y%m%d")
    s3_path = f"s3://yamaguchi-glue-test/EVENT_DATE={dir_name}/"
    # ディレクトリ内のファイルを削除
    glueContext.purge_s3_path(s3_path, options={"retentionPeriod": 0}, transformation_ctx="")


# BigQueryのデータを取得する
GoogleBigQueryConnector0242forAWSGlue30_node1695632489831 = glueContext.create_dynamic_frame.from_options(
    connection_type="marketplace.spark",
    connection_options={
        "parentProject": "project-name",
        "query": "SELECT EVENT_DATE,EVENT_TIMESTAMP,EVENT_NAME,EVENT_PARAMS,EVENT_PREVIOUS_TIMESTAMP... FROM `analytics_*********.events_*` WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 4 DAY)) AND FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))",
        "connection_options": '{"path": "$outpath", "partitionKeys": ["type"]}',
        "materializationDataset": "analytics_*********",
        "viewsEnabled": "true",
        "maxparallelism": "8",
        "connectionName": "glue-yamaguchi-test",
    },
    transformation_ctx="GoogleBigQueryConnector0242forAWSGlue30_node1695632489831",
)


# 取得したBigQueryのログデータをS3に抽出する。
AmazonS3_node1695632829164 = glueContext.write_dynamic_frame.from_options(
    frame=GoogleBigQueryConnector0242forAWSGlue30_node1695632489831,
    connection_type="s3",
    format="json",
    connection_options={
        "path": "s3://yamaguchi-glue-test",
        "compression": "gzip",
        "partitionKeys": ["EVENT_DATE"],
    },
    transformation_ctx="AmazonS3_node1695632829164",
)

job.commit()

↓こんな感じでS3にEVENT_DATE毎に日別でディレクトリに出力されます。
image.png

コードについての解説

自分が実装時につまずいた箇所を解説として、書き起こしました。

1)毎日EVENT_DATEが1~4日前のディレクトリを削除

purge_s3_pathメソッドで、指定したディレクトリ内を、中のファイル毎全削除することができます。
options引数のretentionPeriodを削除したい感覚よりも短く指定しないと、
デフォルトだと7日間削除されないようになっているので注意です。

(ドキュメントの意味が少し分かりづらく、消したいディレクトリが消えなくて長時間つまづいた。)

# BigQuery Exportの最大72時間のログデータの遅延に対応するために、EVENT_DATEが1~4日前のS3内のオブジェクトを毎日削除する
for day_offset in range(4, 0, -1):
    date = current_date - timedelta(days=day_offset)
    dir_name = date.strftime("%Y%m%d")
    s3_path = f"s3://yamaguchi-glue-test/EVENT_DATE={dir_name}/"
    # ディレクトリ内のファイルを削除
    glueContext.purge_s3_path(s3_path, options={"retentionPeriod": 0}, transformation_ctx="")

2)EVENT_DATE毎にディレクトリを作成する

この部分でEVENT_DATE毎にディレクトリを作成する処理をしています。
"partitionKeys": ["EVENT_DATE"]の部分で、BigQueryのEVENT_DATE毎にディレクトリが分けられるようになります。

# 取得したBigQueryのログデータをS3に抽出する。
AmazonS3_node1695632829164 = glueContext.write_dynamic_frame.from_options(
    frame=GoogleBigQueryConnector0242forAWSGlue30_node1695632489831,
    connection_type="s3",
    format="json",
    connection_options={
        "path": "s3://yamaguchi-glue-test",
        "compression": "gzip",
        "partitionKeys": ["EVENT_DATE"],
    },
    transformation_ctx="AmazonS3_node1695632829164",
)

まとめ

AWS Glue上のコードだけで、S3のディレクトリを消した上で、BigQuery→S3のデータのエクスポートができたので便利だなと感じた。
また、BigQueryに入れたGAのログデータが大容量にも関わらず、Glueで転送をすると処理が早くて便利だなと感じた。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0