0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Aurora→BigQuery DWH構築(前編)

0
Last updated at Posted at 2026-02-19

はじめに

株式会社トラストバンクで地域通貨プラットフォーム chiica(チーカ)の開発責任者をしております湊(みなと)(@karura618)です。

今回は、AWS Aurora(MySQL)のデータをGoogle BigQueryにデータウェアハウス(DWH)として転送する基盤の設計と実装を紹介します。

本記事は2部構成の前編です

  • 前編(本記事): AWS Glue Jobによるデータ転送層の設計と実装
  • 後編: BigQueryでの変換・マスキング層の設計と実装

技術的成果

  • 200+テーブルの自動同期基盤を構築
  • テーブルサイズに応じた動的リソース配分による安定性の確保
  • 冪等性リトライ機構による高可用性の実現
  • 変化に強いデータ基盤の構築(レイクハウス思想)

本記事でわかること

  • 200+テーブルのAuroraをBigQueryへ日次同期する設計
  • テーブルサイズに応じた動的なリソース配分の実装
  • modifiedカラムを利用した差分取得(CDC)の実装
  • 変化耐性を優先したアーキテクチャ設計思想
  • AWS Glue Jobの実装詳細と運用戦略

実装規模

  • Aurora: 200+テーブル
  • 総データ量: 数十GB規模
  • 同期方式: 日次差分(modifiedカラムベース)
  • 運用期間: 継続中

解決したい課題

  • 200以上のテーブルを持つAurora(MySQL)のデータをBigQueryに日次で同期したい
  • テーブルサイズに応じた柔軟なリソース制御が必要
  • 仕様変更に強いデータ基盤を構築したい
  • 運用負荷を最小化したい(シンプルな構成)

設計判断の背景

本セクションでは、技術選定と設計方針の判断根拠を説明します。

1. なぜCDCではなく差分+スナップショット型にしたか

比較表:

観点 CDC方式 差分バッチ方式(採用)
リアルタイム性 秒〜分単位 日次(最大24時間遅延)
運用コスト 高(Kafka等の監視必要) 低(Glueのみ)
障害点 多い(3コンポーネント以上) 少ない(1コンポーネント)
Auroraへの負荷 中(Binlog読み取り) 低(差分クエリのみ)
実装難易度
適用ケース リアルタイム分析 日次レポート

判断理由:

  • CDCはリアルタイム性が高いが運用コストが高い(障害点が増える)
  • 今回の要件は日次集計用途のためリアルタイム性は不要
  • シンプルな構成を優先し、障害点を減らすことで可用性を向上

トレードオフ:

  • データ鮮度: 最大24時間のラグ → 日次レポート用途では許容範囲
  • 運用負荷: CDC監視 > バッチ監視(バッチの方が運用しやすい)

実装の決め手:

  • modifiedカラムを基準にした差分取得でAuroraへの負荷を最小化
  • スナップショット型DWHで履歴管理が不要なマスタ系テーブルに最適

2. なぜ"全テーブル転送型"を採用したか

本基盤の根本思想: ETLは「加工処理」ではなく「運搬」に徹する

本基盤では、ETLを「加工処理」として扱いません。

ETLはあくまで**"運搬"に徹し**、データは可能な限り生の状態で保持します。

この設計を採用した理由:

  1. 仕様変更への耐性を持たせるため

    • ビジネスロジックが変わってもETL改修不要
    • 過去データの再加工が可能
  2. 分析要件の変化に柔軟に対応するため

    • 新しいカラムが必要になっても即座に対応可能
    • ETLのデプロイサイクルに依存しない
  3. 上流変更の影響範囲を限定するため

    • Aurora側のテーブル定義変更がETLに波及しない
    • 加工ロジックはBigQuery側のScheduled Queryで完結

レイヤー分離の明確化:

本基盤は厳密にはDWH構築ではなく、まずは 「データレイク層」を構築する思想 で設計しています。

AWS Glueから BigQueryのraw_layerへの転送は、分析用途に最適化されたDWHではなく、 「将来の仕様変更に耐えられる生データ保持層」 として位置付けています。

加工はBigQuery側のScheduled Queryで実施し、レイヤー分離を明確にしています。

一般的なETLの課題

多くのETLでは、以下のようなアプローチを取ります:

  • 利用するテーブルのみ抽出
  • 不要カラムは事前に削除
  • 仕様に合わせて事前加工

しかしこの方式は、仕様変更のたびにETL修正が発生し、スクリプトのメンテナンスコストが増大します。

具体例:

# 従来型ETL: 仕様に合わせて事前加工
SELECT
  id,
  name,
  email,  -- 仕様変更で不要になったら
  CASE
    WHEN status = 1 THEN 'active'  -- 仕様変更で分岐が増えたら
    ELSE 'inactive'
  END as status_label
FROM members
WHERE member_type IN (1, 2, 3);  -- 新しいタイプが増えたら

このアプローチでは:

  • カラム追加・削除のたびにETL修正
  • ビジネスロジック変更のたびにスクリプト改修
  • 過去データの再加工が困難

本設計の思想

本設計では以下の階層構造を採用しました:

各層の責任:

役割 実装 変更頻度
ETL層 データ運搬 AWS Glue Job 低(安定)
データレイク層 生データ保持 raw_layer なし
DWH層 加工・整形 analytics_layer(Scheduled Query) 高(柔軟)
分析層 データ活用 BIツール 高(柔軟)

実装方針:

  • modifiedカラムが存在するテーブルは原則すべて転送
  • 加工・絞り込みは後段(BigQuery Scheduled Query)で実施
  • ETL層は極力シンプルに保つ

これにより得られるメリット:

  • ✅ 新しい分析要件が発生してもETL改修不要
  • ✅ カラム追加にも自動追従
  • ✅ 仕様変更時の影響範囲を最小化
  • ✅ 過去に遡った再集計が可能(生データが残っているため)

設計原則

  1. ETLは"運搬"に徹する

    • データの形を変えない
    • ビジネスロジックを含めない
    • シンプルで壊れにくい構造
  2. 加工はDWH層で行う

    • BigQuery Scheduled Queryで加工
    • SQLで変更が容易
    • バージョン管理が簡単
  3. 変化に強い構造を優先する

    • 今日の最適化より、明日の変更耐性
    • スクリプト保守コストの最小化
    • 属人化の排除

レイクハウス思想との親和性

本設計はレイクハウスアーキテクチャの思想に近いアプローチです:

データレイク(生データ)+ データウェアハウス(加工済)
= レイクハウス(柔軟性と性能の両立)
  • データレイク的: 全テーブル転送、生データ保持
  • ウェアハウス的: BigQueryでの高速分析、スキーマ適用

トレードオフ

デメリット:

  • ストレージコストが増加(全テーブル転送のため)
  • raw_layerには生データが残る(アクセス制限必須)

メリット:

  • 長期的なメンテナンスコストが激減
  • 仕様変更への追従が容易
  • 新しい分析要件への対応が迅速

設計思想の宣言

本基盤は「最適化されたDWH」ではなく、「変化に強いデータ基盤」を優先して設計しています。

データは時間とともに変化します。ビジネス要件も変化します。その変化に追従できる柔軟性こそが、長期運用において最も重要な価値だと考えています。


3. 可用性と再実行性の設計

冪等性の保証:

  • CREATE TABLE IF NOT EXISTSで既存テーブルを保護
  • パーティション単位の書き込みで重複挿入を防止
  • リトライ処理による自動復旧

リトライ戦略:

# 失敗時: リソースを縮小して自動リトライ
if error:
    partition_count = "1"
    fetch_size = "50"
    retry()

パーティション分離:

  • 日次パーティションで過去データを保護
  • 失敗時は当日パーティションのみ再実行
  • 部分的な障害でも全体への影響を最小化

システムアーキテクチャ

全体像(前編の範囲)

前編で扱う範囲:

  • Aurora → raw_layer への転送(本記事)

後編で扱う範囲:

  • raw_layer → analytics_layer の変換(後編記事で解説)

データフロー詳細(前編)

処理の要点:

  1. 差分取得: modifiedカラムを基準に前日更新分のみ取得
  2. 動的リソース配分: テーブルサイズに応じて並列度を調整
  3. 一時保管: 生データをraw_layerに保管

AWS Glue Jobの実装

Auroraへの負荷を抑えるため、テーブルサイズに応じて動的にリソース制御しています。

主要な実装ポイント

1. テーブルサイズに応じた動的なリソース配分

テーブルサイズに応じて、パーティション数とフェッチサイズを動的に調整します。

# テーブルサイズの閾値
TABLE_SIZE_THRESHOLD = 5 * 1024 * 1024 * 1024  # 5GB
VERY_LARGE_TABLE_THRESHOLD = 15 * 1024 * 1024 * 1024  # 15GB

# テーブルサイズに基づく設定の決定
if table_size_bytes > VERY_LARGE_TABLE_THRESHOLD:
    # 超大容量テーブル: パーティション1, フェッチサイズ50
    partition_count = "1"
    fetch_size = "50"
elif table_size_bytes > TABLE_SIZE_THRESHOLD:
    # 大容量テーブル: パーティション1, フェッチサイズ500
    partition_count = "1"
    fetch_size = "500"
else:
    # 通常テーブル: パーティション4, フェッチサイズ1000
    partition_count = "4"
    fetch_size = "1000"

設計のポイント:

  • Auroraインスタンスのメモリ制約を考慮
  • 大容量テーブルは並列度を下げてメモリ不足を回避
  • MySQLのテンポラリテーブルサイズに注意

2. MySQLセッションパラメータの最適化

conn_args = {
    "url": f"{jdbc_url}/{schema}",
    "dbtable": table_name,
    "numPartitions": partition_count,
    "fetchsize": fetch_size,
    "sessionInitStatement": """
        SET SESSION tmp_table_size=1610612736;
        SET SESSION max_heap_table_size=1610612736;
        SET SESSION sort_buffer_size=16777216;
        SET SESSION read_buffer_size=8388608;
        SET SESSION sql_mode='NO_ENGINE_SUBSTITUTION';
        SET SESSION SQL_BIG_SELECTS=1;
    """,
    "pushDownPredicate": False
}

各パラメータの意味:

  • tmp_table_size: テンポラリテーブルの最大サイズ(1.5GB)
  • max_heap_table_size: メモリテーブルの最大サイズ(1.5GB)
  • SQL_BIG_SELECTS: 大容量クエリの実行を許可
  • pushDownPredicate: 述語プッシュダウンを無効化(Glue側でフィルタ)

3. 差分取得モードと全件取得モード

# 差分モード(昨日のデータのみ)
if IS_DIFF_MODE != 'false':
    conn_args["sampleQuery"] = (
        f"SELECT * FROM {schema}.{table_name} "
        f"WHERE modified BETWEEN '{yesterday} 00:00:00' "
        f"AND '{yesterday} 23:59:59'"
    )

運用の考え方:

  • 日次運用: 差分モードで昨日更新分のみ取得
  • 初期ロード: 全件モードで全データを取得
  • 特定期間の再取得: QUERY_START_DATE/QUERY_END_DATEで範囲指定

4. リトライ処理

try:
    # 通常のパーティション数で実行
    source_dyf = glueContext.create_dynamic_frame_from_options(
        'mysql', connection_options=conn_args
    )
except Exception as e:
    # 失敗時: パーティション1、フェッチサイズ50に縮小してリトライ
    conn_args["numPartitions"] = "1"
    conn_args["fetchsize"] = "50"
    source_dyf = glueContext.create_dynamic_frame_from_options(
        'mysql', connection_options=conn_args
    )

5. modifiedカラムによるフィルタリング

# modifiedカラムを持つテーブルのセットを事前作成
tables_with_modified = set(
    columns_df.toDF()
    .filter(f"TABLE_SCHEMA = '{schema}' AND COLUMN_NAME = 'modified'")
    .select("TABLE_NAME")
    .rdd.flatMap(lambda x: x)
    .collect()
)

# modifiedカラムがないテーブルはスキップ
if table_name not in tables_with_modified:
    print(f"SKIP - No 'modified' column")
    continue

設計の理由:

  • modifiedカラムがないテーブルは差分取得できない
  • 全件転送は時間がかかるため、意図的に除外
  • 必要に応じてSPECIFIC_TABLESパラメータで個別指定

本記事の実装方針:

本記事ではシンプルな実装例としてmodifiedカラムを利用していますが、これは唯一の方法ではありません。

実運用での代替案:

  1. 更新ログテーブル方式

    -- 変更履歴テーブルを用意
    CREATE TABLE change_logs (
      table_name VARCHAR(255),
      record_id BIGINT,
      operation ENUM('INSERT', 'UPDATE', 'DELETE'),
      changed_at TIMESTAMP
    );
    
  2. トリガーベースCDC

    • アプリケーション側でトリガーを設定
    • 変更イベントを別テーブルに記録
    • 物理削除にも対応可能
  3. レプリケーションログ利用

    • Debezium + Kafkaによるbinlog監視
    • リアルタイム性が必要な場合に有効
    • インフラコストは増加
  4. スナップショット比較方式

    • 前回取得時のハッシュ値と比較
    • modifiedカラムがなくても差分検出可能
    • 計算コストが高い

本基盤でmodifiedを採用した理由:

  • 既存テーブルにmodifiedカラムが標準化されていた
  • インフラ追加なしで実装可能
  • シンプルで理解しやすい

将来的に要件が変化した場合は、上記の代替案への移行も可能な設計としています。

6. BigQueryへの書き込み(日次パーティション)

glueContext.write_dynamic_frame.from_options(
    frame=source_dyf,
    connection_type="bigquery",
    connection_options={
        "temporaryGcsBucket": GCS_BUCKET_NAME,
        "parentProject": BQ_PROJECT,
        "table": f"{BQ_DATASET}.{table_name}",
        "connectionName": BQ_CONNECTION_NAME,
        "datePartition": today_date.strftime("%Y%m%d"),  # YYYYMMDD
        "partitionType": "DAY"
    }
)

オプショナルパラメータ

AWS Glue Jobには以下のオプショナルパラメータを用意しています。

パラメータ デフォルト 説明
NUM_PARTITIONS 4 通常テーブルのパーティション数
FETCH_SIZE 1000 通常テーブルのフェッチサイズ
TABLE_SIZE_THRESHOLD 5GB 大容量テーブルの閾値
LARGE_TABLE_NUM_PARTITIONS 1 大容量テーブルのパーティション数
LARGE_TABLE_FETCH_SIZE 500 大容量テーブルのフェッチサイズ
VERY_LARGE_TABLE_THRESHOLD 15GB 超大容量テーブルの閾値
SKIP_VERY_LARGE_TABLES_IN_FULL_MODE true 全件モード時に超大容量テーブルをスキップ
SPECIFIC_TABLES - 特定テーブルのみ処理(カンマ区切り)
QUERY_START_DATE - 取得開始日(YYYY-MM-DD)
QUERY_END_DATE - 取得終了日(YYYY-MM-DD)
完全な実装例(抽象化版)

公開にあたっての方針

以下のコードは、実環境依存の具体的な数値(セッションチューニング値、閾値等)を抽象化し、
設計思想とアーキテクチャを示すことに焦点を当てています。

  • ✅ 設計思想は完全に残す
  • ✅ オプション機能(差分・日付指定)は残す
  • ❌ 実戦ノウハウ数値は削除
  • ❌ 実環境依存パラメータは抽象化
"""
Aurora(MySQL) → BigQuery データレイク転送テンプレート

設計思想:
- ETLは「運搬」に徹する
- 加工は後段(DWH層)で実施
- 変化耐性を優先
- modifiedカラムを利用した差分取り込み対応
- 大容量テーブル制御機構あり
"""

import sys
import datetime
import pytz
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import col

# =============================
# パラメータ
# =============================

ARG_KEYS = [
    "JOB_NAME",
    "BQ_PROJECT",
    "BQ_DATASET",
    "GCS_BUCKET",
    "DB_SCHEMA",
    "DB_CONNECTION",
    "BQ_CONNECTION",
    "IS_DIFF_MODE"
]

args = getResolvedOptions(sys.argv, ARG_KEYS)

BQ_PROJECT = args["BQ_PROJECT"]
BQ_DATASET = args["BQ_DATASET"]
DB_SCHEMA = args["DB_SCHEMA"]
IS_DIFF_MODE = args["IS_DIFF_MODE"]

# =============================
# 日付制御(JST基準)
# =============================

jst = pytz.timezone("Asia/Tokyo")
today = datetime.datetime.now(jst).date()
yesterday = today - datetime.timedelta(days=1)

# =============================
# Glue / Spark 初期化
# =============================

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# =============================
# テーブル一覧取得
# =============================

jdbc_conf = glueContext.extract_jdbc_conf(
    connection_name=args["DB_CONNECTION"]
)

tables_df = glueContext.create_dynamic_frame_from_options(
    "mysql",
    connection_options={
        "url": jdbc_conf["url"] + "/information_schema",
        "user": jdbc_conf["user"],
        "password": jdbc_conf["password"],
        "dbtable": "tables",
    }
)

target_tables = (
    tables_df.toDF()
    .filter(f"TABLE_SCHEMA = '{DB_SCHEMA}'")
    .select("TABLE_NAME")
    .collect()
)

# =============================
# modifiedカラム存在確認
# =============================

columns_df = glueContext.create_dynamic_frame_from_options(
    "mysql",
    connection_options={
        "url": jdbc_conf["url"] + "/information_schema",
        "user": jdbc_conf["user"],
        "password": jdbc_conf["password"],
        "dbtable": "columns",
    }
)

tables_with_modified = set(
    columns_df.toDF()
    .filter(f"TABLE_SCHEMA = '{DB_SCHEMA}' AND COLUMN_NAME = 'modified'")
    .select("TABLE_NAME")
    .rdd.flatMap(lambda x: x)
    .collect()
)

# =============================
# メイン処理
# =============================

for row in target_tables:

    table_name = row["TABLE_NAME"]

    if table_name not in tables_with_modified:
        print(f"{table_name} SKIP - no modified column")
        continue

    print(f"Processing table: {table_name}")

    conn_options = {
        "url": jdbc_conf["url"] + f"/{DB_SCHEMA}",
        "user": jdbc_conf["user"],
        "password": jdbc_conf["password"],
        "dbtable": table_name,
    }

    # =============================
    # 差分モード(CDC)
    # =============================

    if IS_DIFF_MODE == "true":
        conn_options["sampleQuery"] = (
            f"SELECT * FROM {DB_SCHEMA}.{table_name} "
            f"WHERE modified BETWEEN "
            f"'{yesterday} 00:00:00' AND '{yesterday} 23:59:59'"
        )

    try:
        source_dyf = glueContext.create_dynamic_frame_from_options(
            "mysql",
            connection_options=conn_options
        )

        if source_dyf.count() == 0:
            print(f"{table_name} - No data")
            continue

        glueContext.write_dynamic_frame.from_options(
            frame=source_dyf,
            connection_type="bigquery",
            connection_options={
                "parentProject": BQ_PROJECT,
                "table": f"{BQ_DATASET}.{table_name}",
                "connectionName": args["BQ_CONNECTION"],
                "temporaryGcsBucket": args["GCS_BUCKET"],
            }
        )

        print(f"{table_name} SUCCESS")

    except Exception as e:
        print(f"{table_name} FAILED: {str(e)}")

job.commit()

差分取り込み(CDC)の実装

本基盤では modified カラムを利用した差分同期を実装しています。

現在の実装:

  • IS_DIFF_MODE=true の場合、前日分のみ抽出
  • フルロードとの切替可能
  • modifiedカラムがないテーブルは自動的にスキップ

将来的な拡張可能性:

  • BigQuery側の最大modified取得による水位管理
  • 水位管理テーブルによる完全CDC
  • リアルタイム性が必要な場合はDebezium + Kafkaへの移行
# 将来の拡張例: 水位管理による差分取得
last_modified = get_last_watermark(table_name)  # BigQuery側から取得
conn_options["sampleQuery"] = (
    f"SELECT * FROM {DB_SCHEMA}.{table_name} "
    f"WHERE modified > '{last_modified}'"
)

大容量テーブル対策(設計思想)

実運用では、テーブルサイズに応じた以下の制御を実装していますが、本記事では抽象化しています:

実装している機能:

  1. テーブルサイズに応じたパーティション制御

    • 小容量テーブル: 並列度高
    • 大容量テーブル: 並列度低(メモリ不足回避)
  2. フルロード抑制

    • 超大容量テーブル(閾値以上)は全件モードでスキップ
    • 差分モードのみ許可
  3. 再試行戦略

    • 失敗時はリソースを縮小して自動リトライ
    • パーティション数とフェッチサイズを段階的に削減
  4. MySQLセッションチューニング

    • テンポラリテーブルサイズ調整
    • バッファサイズ最適化
    • タイムアウト制御

設計のポイント:

  • Auroraへの負荷を最小化
  • メモリ不足によるジョブ失敗を回避
  • 大容量テーブルでも安定して処理可能

抽象化した理由:

  • 具体的な閾値は環境依存(インスタンスサイズ、データ特性による)
  • チューニング値は試行錯誤の結果(ノウハウの一部)
  • 設計思想と拡張性を示すことが本記事の目的

運用フロー

日次運用(前編の範囲)

前編で扱う処理内容:

時刻 処理 詳細
01:00 AWS Glue Job実行 差分モード: 昨日のmodifiedデータを取得
01:30 データ書き込み raw_layer に書き込み

後編で扱う処理内容:

  • 02:00〜: BigQuery Scheduled Queryによる変換・マスキング処理

初期ロード(全件取得)

# 全件モードで実行(IS_DIFF_MODE=false)
# 注意: 超大容量テーブル(15GB超)はスキップされる
aws glue start-job-run \
  --job-name aurora-to-bigquery \
  --arguments '{"--IS_DIFF_MODE":"false"}'

特定テーブルのみ処理

# 特定テーブルのみ処理
aws glue start-job-run \
  --job-name aurora-to-bigquery \
  --arguments '{"--SPECIFIC_TABLES":"table1,table2,table3"}'

過去データの再取得

# 特定期間のデータを再取得
aws glue start-job-run \
  --job-name aurora-to-bigquery \
  --arguments '{
    "--QUERY_START_DATE":"2024-01-01",
    "--QUERY_END_DATE":"2024-01-31"
  }'

運用戦略

障害検知と復旧

監視項目

監視項目 閾値 アラート条件
Glue Job実行時間 2時間以上 タイムアウト検知
テーブル処理失敗数 3テーブル以上 システム障害の可能性
データ量の異常値 前日比±50% データ欠損/重複の可能性

障害発生時の復旧手順

復旧コマンド:

# 1. 障害調査: Glue Jobのログを確認
aws glue get-job-runs --job-name aurora-to-bigquery --max-results 5

# 2. 特定テーブルの再実行
aws glue start-job-run \
  --job-name aurora-to-bigquery \
  --arguments '{"--SPECIFIC_TABLES":"failed_table1,failed_table2"}'

# 3. BigQueryパーティションの削除(重複防止)
bq rm -f -p your-project-id:raw_layer.table_name\$20260218

冪等性の保証

パーティション単位の再実行

# BigQuery書き込み時にパーティションを指定
"datePartition": today_date.strftime("%Y%m%d")  # 20260218

メリット:

  • 同じパーティションに再度書き込みしても上書きされる
  • 過去データは影響を受けない
  • 失敗した日付のみ再実行可能

スケーラビリティ

テーブル数の増加に対応

現状の200テーブルから500テーブルに増加した場合:

対応不要な項目:

  • AWS Glue Job: 自動的に全テーブルを処理

対応が必要な項目:

  • Glue Jobの実行時間が増加 → DPU数の増強

大容量テーブルへの対応

テーブルサイズが100GB超になった場合:

# 超大容量テーブル用の閾値を追加
HUGE_TABLE_THRESHOLD = 100 * 1024 * 1024 * 1024  # 100GB

if table_size_bytes > HUGE_TABLE_THRESHOLD:
    # 特別な処理: 差分モード必須、リトライなし
    if IS_DIFF_MODE == 'false':
        raise Exception("Huge table requires DIFF_MODE")

データ品質の確保

異常値検知

-- 日次データ量の確認クエリ
SELECT
  _PARTITIONDATE as partition_date,
  COUNT(*) as record_count,
  LAG(COUNT(*), 1) OVER (ORDER BY _PARTITIONDATE) as prev_count,
  (COUNT(*) - LAG(COUNT(*), 1) OVER (ORDER BY _PARTITIONDATE)) /
    LAG(COUNT(*), 1) OVER (ORDER BY _PARTITIONDATE) * 100 as change_rate
FROM `project.raw_layer.members`
GROUP BY _PARTITIONDATE
ORDER BY _PARTITIONDATE DESC
LIMIT 7;

アラート基準:

  • change_rateが±50%を超える場合は異常値として通知

データ整合性チェック

-- Aurora vs BigQueryのレコード数比較
-- Aurora側
SELECT COUNT(*) FROM members WHERE DATE(modified) = '2026-02-18';

-- BigQuery側
SELECT COUNT(*) FROM `project.raw_layer.members`
WHERE DATE(_PARTITIONTIME) = '2026-02-18';

コスト最適化

AWS Glueコストの削減

# 小さいテーブルのみ処理する場合はDPU数を削減
if total_table_size < 10GB:
    glue_dpu = 2  # デフォルト10から削減
else:
    glue_dpu = 10

まとめ

前編で実装した内容

  1. AWS Glue Job

    • テーブルサイズに応じた動的なリソース配分
    • 差分モード/全件モードの切り替え
    • リトライ処理による安定性向上
    • modifiedカラムベースのCDC実装
  2. 設計思想

    • ETLは「運搬」に徹する
    • 変化に強いデータ基盤(レイクハウス思想)
    • シンプルな構成による高可用性
  3. 運用戦略

    • 冪等性の保証(パーティション単位の再実行)
    • 障害検知と復旧フロー
    • データ品質の確保

達成した改善点

  1. 保守性の向上

    • 全テーブル転送により仕様変更に強い
    • ETL改修が不要な構造
  2. 可用性の向上

    • シンプルな構成(障害点が少ない)
    • 自動リトライ機構
    • パーティション分離
  3. スケーラビリティ

    • 動的リソース配分により大容量テーブルにも対応
    • テーブル数の増加に自動追従

後編で解説する内容

後編記事のリンク: (後編記事公開後にリンクを追加)

後編では、以下の内容を解説します:

  1. BigQueryクエリ自動生成バッチ

    • MySQL定義ファイルの自動解析
    • 型変換の自動化
    • 工数削減(40時間 → 2分)
  2. BigQuery Scheduled Query

    • スナップショット型DWHの設計思想
    • 最新レコード抽出(QUALIFY句)
    • メタデータカラムの設計
  3. 個人情報マスキング

    • データ分類ポリシー(P0/P1/P2/P3)
    • SHA256による不可逆ハッシュ化
    • データセット分離によるアクセス制御
  4. IAM設計

    • Google Groupsによる権限管理
    • 最小権限の原則
    • 監査ログの活用

おわりに

今回、AWS Glueを使用してBigQueryとの連携を試しましたが自前でバッチファイルを作成するよりもサービス影響やメンテナンスの負荷がかからず、シンプルに作成できました。
また機会があればAWS Glueでいろいろ試してみたいですね。

後編では、BigQuery側の変換・マスキング処理を詳しく解説します。

エンジニア募集

弊社では絶賛エンジニア募集中です!
気になった方、是非お気軽に Wantedly からご連絡ください!

参考リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?