0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Athenaのテーブル結合について

0
Posted at

前提情報など

AWS Athenaを利用したテーブル結合に関する自分向けの忘備録です。

サンプルとして準備したテーブル情報は以下になります。

  • posts(親テーブル): ブログ記事本体。ステータス(status)や公開日時を持つ
  • post_comments(子テーブル): 記事へのコメント。post_id で posts に紐づく(1対多)
  • post_tags(子テーブル): 記事に付与されたタグ。post_id で posts に紐づく(1対多)

2026-02-25_20-54-18.PNG

テーブル結合について

データは複数テーブルに存在していますが、これを結合して以下のようなデータを作成したいとします。

{
  "id": 1,
  "title": "リレーショナルDBの正規化入門",
  "body": "データベース設計において正規化は非常に重要です。第1正規形から第3正規形までを具体例とともに解説します。",
  "status": "published",
  "published_at": "2025-01-10T09:00:00+00:00",
  "comments": [
    { "author": "田中太郎", "body": "正規化の説明がとてもわかりやすかったです!", "createdAt": "2025-01-11T10:00:00+00:00" },
    ...
  ],
  "tags": ["database", "mysql", "design"]
}

その場合のSQLは以下になります。

UNLOAD (
    WITH
    comments_agg AS (
        SELECT
            post_id,
            ARRAY_AGG(
                CAST(
                    ROW(author_name, body, created_at)
                    AS ROW(author VARCHAR, body VARCHAR, "createdAt" VARCHAR)
                )
            ) AS comments
        FROM "athena-test"."post_comments"
        GROUP BY post_id
    ),
    tags_agg AS (
        SELECT
            post_id,
            ARRAY_AGG(tag) AS tags
        FROM "athena-test"."post_tags"
        GROUP BY post_id
    )
    SELECT
        p.id,
        p.title,
        p.body,
        p.status,
        p.published_at,
        p.created_at,
        p.updated_at,
        COALESCE(ca.comments, ARRAY[])  AS comments,
        COALESCE(ta.tags, ARRAY[])      AS tags
    FROM
        "athena-test"."posts" p
    LEFT JOIN comments_agg ca ON p.id = ca.post_id
    LEFT JOIN tags_agg     ta ON p.id = ta.post_id
    ORDER BY p.id
)
TO 's3://athena-sample-12345/output/posts_joined/'
WITH (format = 'JSON', compression = 'NONE');

UNLOAD関数

順番に内容を解説します。

UNLOAD (
    ...
)
TO 's3://athena-sample-12345/output/posts_joined/'
WITH (format = 'JSON', compression = 'NONE');

まず、一番外側のUNLOADについてですが、これはAthenaの関数であり、SELECT の結果を、テーブルを作らずに、直接 S3 に好きなフォーマットで書き出すことができます。
今回はjson形式でS3に出力したいのでこの関数を利用します。

UNLOAD

WITH句(CTE)で子テーブルを事前集約

WITH
comments_agg AS (
    SELECT
        post_id,
        ARRAY_AGG(
            CAST(
                ROW(author_name, body, created_at)
                AS ROW(author VARCHAR, body VARCHAR, "createdAt" VARCHAR)
            )
        ) AS comments
    FROM "athena-test"."post_comments"
    GROUP BY post_id
),
tags_agg AS (
    SELECT
        post_id,
        ARRAY_AGG(tag) AS tags
    FROM "athena-test"."post_tags"
    GROUP BY post_id
)

CTE(共通テーブル式)を利用して事前に子テーブルのデータを集約しています。
単純にテーブルを結合すると、親テーブルx子テーブルで複数行になるため、事前に集約してからテーブルを結合する方式をとっています。

comments_agg の解説

ARRAY_AGG(
    CAST(
        ROW(author_name, body, created_at)
        AS ROW(author VARCHAR, body VARCHAR, "createdAt" VARCHAR)
    )
) AS comments

まず、外側のARRAY_AGG関数は集約の関数であり値を一つの配列に変換することができます。
Aggregate functions - array_agg

次にCASTの部分ですが、ここでJSONの構造体を作成しています。
この部分の処理イメージとしては、以下の構造体を作成するための処理です。

{
  "author": "taro",
  "body": "hello",
  "createdAt": "2026-02-25"
}

まず、CASTの基本構文は以下になります。

CAST( AS 変換後の型)

次に、以下の部分の処理についてですが、ROWに対して明示的に型情報を定義しています。
明示的に型情報を定義しなくても動作しますが、型推論が発生して想定外の挙動になる可能性があるため、明示的に型変換を実施しています。
また、Athenaにおいてキャメルケースは自動的に小文字に変換されるため、ダブルクォーテーションでくくっています

CAST(
    ROW(author_name, body, created_at)
    AS ROW(author VARCHAR, body VARCHAR, "createdAt" VARCHAR)
)

CAST を使用して配列内のフィールド名を変更する

tags_agg の解説

ARRAY_AGG(tag) AS tags

タグは単純に文字列の配列にしています。

出力例:

"tags": ["aws", "athena", "sql"]

メインのSQLの解説

SELECT
    p.id,
    p.title,
    p.body,
    p.status,
    p.published_at,
    p.created_at,
    p.updated_at,
    COALESCE(ca.comments, ARRAY[])  AS comments,
    COALESCE(ta.tags, ARRAY[])      AS tags
FROM
    "athena-test"."posts" p
LEFT JOIN comments_agg ca ON p.id = ca.post_id
LEFT JOIN tags_agg     ta ON p.id = ta.post_id
ORDER BY p.id

ここで重要なのはCOALESCEです。
LEFT JOIN を使っているため、

  • コメントが0件
  • タグが0件

の場合、NULL になります。
しかし"comments": []のようにするために、COALESCE(ca.comments, ARRAY[])としてNULLを空配列に置き換えています。

最後に

Athenaを利用すると非常に柔軟にデータを加工して出力することが可能です。
また、UNLOADを利用した場合、データ処理が並列で実行される点も魅力的であると感じています。
皆さんもよきデータ処理ライフを(๑•̀ㅂ•́)و✧

付録

Athenaテーブル作成DDL
-- ==========================================================================
-- Athena DDL: テストデータ用テーブル定義
-- ==========================================================================
-- 使い方:
--   1. <DATABASE> を実際のデータベース名に置換
--   2. <BUCKET> を S3 バケット名に置換
--   3. Athena クエリエディタで実行
--
-- S3 の想定ディレクトリ構造:
--   s3://<BUCKET>/posts/posts.jsonl
--   s3://<BUCKET>/post_comments/post_comments.jsonl
--   s3://<BUCKET>/post_tags/post_tags.jsonl
-- ==========================================================================

-- --------------------------------------------------------------------------
-- posts(親テーブル)
-- --------------------------------------------------------------------------
CREATE EXTERNAL TABLE IF NOT EXISTS `<DATABASE>`.`posts` (
    `id`           INT,
    `title`        STRING,
    `body`         STRING,
    `status`       STRING,
    `published_at` STRING,
    `created_at`   STRING,
    `updated_at`   STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
    'ignore.malformed.json' = 'FALSE',
    'dots.in.keys'          = 'FALSE',
    'case.insensitive'      = 'TRUE'
)
STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<BUCKET>/posts/'
TBLPROPERTIES ('has_encrypted_data' = 'false');

-- --------------------------------------------------------------------------
-- post_comments(子テーブル 1)
-- --------------------------------------------------------------------------
CREATE EXTERNAL TABLE IF NOT EXISTS `<DATABASE>`.`post_comments` (
    `id`          INT,
    `post_id`     INT,
    `author_name` STRING,
    `body`        STRING,
    `created_at`  STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
    'ignore.malformed.json' = 'FALSE',
    'dots.in.keys'          = 'FALSE',
    'case.insensitive'      = 'TRUE'
)
STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<BUCKET>/post_comments/'
TBLPROPERTIES ('has_encrypted_data' = 'false');

-- --------------------------------------------------------------------------
-- post_tags(子テーブル 2)
-- --------------------------------------------------------------------------
CREATE EXTERNAL TABLE IF NOT EXISTS `<DATABASE>`.`post_tags` (
    `id`         INT,
    `post_id`    INT,
    `tag`        STRING,
    `created_at` STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
    'ignore.malformed.json' = 'FALSE',
    'dots.in.keys'          = 'FALSE',
    'case.insensitive'      = 'TRUE'
)
STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<BUCKET>/post_tags/'
TBLPROPERTIES ('has_encrypted_data' = 'false');
検証用データ作成用Pythonスクリプト
"""
Athena検証用テストデータ生成スクリプト

3テーブル(posts / post_comments / post_tags)の JSON Lines ファイルを
output/ 配下にテーブルごとのサブディレクトリで出力する。
S3 にアップロード後、Athena の LOCATION として利用可能。
"""

import json
import os
from datetime import datetime, timedelta, timezone

# ---------------------------------------------------------------------------
# 定数
# ---------------------------------------------------------------------------
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "output")
JST = timezone(timedelta(hours=9))


def ts(year: int, month: int, day: int, hour: int = 0, minute: int = 0) -> str:
    """ISO 8601 形式のタイムスタンプ文字列を返す (UTC)"""
    return datetime(year, month, day, hour, minute, tzinfo=timezone.utc).isoformat()


# ---------------------------------------------------------------------------
# posts(親テーブル): 5 件
# ---------------------------------------------------------------------------
posts = [
    {
        "id": 1,
        "title": "リレーショナルDBの正規化入門",
        "body": "データベース設計において正規化は非常に重要です。第1正規形から第3正規形までを具体例とともに解説します。",
        "status": "published",
        "published_at": ts(2025, 1, 10, 9, 0),
        "created_at": ts(2025, 1, 8, 14, 30),
        "updated_at": ts(2025, 1, 10, 8, 45),
    },
    {
        "id": 2,
        "title": "AWS Athenaで始めるサーバーレス分析",
        "body": "S3上のデータをSQLで手軽に分析できるAthenaの基本的な使い方を紹介します。パーティション設定やコスト最適化のコツも解説。",
        "status": "published",
        "published_at": ts(2025, 2, 5, 10, 0),
        "created_at": ts(2025, 2, 1, 11, 0),
        "updated_at": ts(2025, 2, 5, 9, 30),
    },
    {
        "id": 3,
        "title": "JSON Linesフォーマットの利点と使い方",
        "body": "JSON Linesは1行1JSONオブジェクトの形式で、ストリーム処理やログ分析に適しています。従来のJSON配列との違いを比較します。",
        "status": "published",
        "published_at": ts(2025, 3, 12, 8, 0),
        "created_at": ts(2025, 3, 10, 16, 0),
        "updated_at": ts(2025, 3, 12, 7, 50),
    },
    {
        "id": 4,
        "title": "Prestoクエリエンジンの内部アーキテクチャ",
        "body": "AthenaのベースとなっているPrestoの分散クエリ実行の仕組みを深掘りします。コーディネータとワーカーの役割分担について。",
        "status": "draft",
        "published_at": None,
        "created_at": ts(2025, 4, 1, 13, 0),
        "updated_at": ts(2025, 4, 3, 10, 20),
    },
    {
        "id": 5,
        "title": "S3をデータレイクとして活用するベストプラクティス",
        "body": "パーティショニング、圧縮形式の選択、ライフサイクルポリシーなど、S3をデータレイクとして運用する際の実践的なノウハウをまとめます。",
        "status": "published",
        "published_at": ts(2025, 5, 20, 7, 0),
        "created_at": ts(2025, 5, 15, 9, 0),
        "updated_at": ts(2025, 5, 20, 6, 45),
    },
]

# ---------------------------------------------------------------------------
# post_comments(子テーブル1): 各 post に 2〜4 件 → 合計 15 件
# ---------------------------------------------------------------------------
post_comments = [
    # post_id=1 (3件)
    {"id": 1, "post_id": 1, "author_name": "田中太郎", "body": "正規化の説明がとてもわかりやすかったです!", "created_at": ts(2025, 1, 11, 10, 0)},
    {"id": 2, "post_id": 1, "author_name": "佐藤花子", "body": "第4正規形についても解説してほしいです。", "created_at": ts(2025, 1, 12, 14, 30)},
    {"id": 3, "post_id": 1, "author_name": "鈴木一郎", "body": "実務では非正規化とのバランスが大事ですよね。", "created_at": ts(2025, 1, 15, 9, 0)},
    # post_id=2 (4件)
    {"id": 4, "post_id": 2, "author_name": "山田次郎", "body": "Athena便利ですね!コスト管理の部分が参考になりました。", "created_at": ts(2025, 2, 6, 11, 0)},
    {"id": 5, "post_id": 2, "author_name": "高橋美咲", "body": "パーティション設定でクエリ料金がかなり下がりました。", "created_at": ts(2025, 2, 7, 16, 0)},
    {"id": 6, "post_id": 2, "author_name": "田中太郎", "body": "GlueクローラーとAthenaの連携についても知りたいです。", "created_at": ts(2025, 2, 8, 8, 30)},
    {"id": 7, "post_id": 2, "author_name": "伊藤健太", "body": "Athena v3 (Trino) への移行情報もあると嬉しいです。", "created_at": ts(2025, 2, 10, 12, 0)},
    # post_id=3 (2件)
    {"id": 8, "post_id": 3, "author_name": "佐藤花子", "body": "JSON Lines初めて知りました。ログ基盤で使ってみます。", "created_at": ts(2025, 3, 13, 10, 0)},
    {"id": 9, "post_id": 3, "author_name": "中村大輔", "body": "ndjsonとの違いはありますか?", "created_at": ts(2025, 3, 14, 15, 0)},
    # post_id=4 (3件)
    {"id": 10, "post_id": 4, "author_name": "鈴木一郎", "body": "下書き段階ですがとても興味深いテーマです。", "created_at": ts(2025, 4, 2, 9, 0)},
    {"id": 11, "post_id": 4, "author_name": "山田次郎", "body": "PrestoとTrinoの違いについても触れてほしいです。", "created_at": ts(2025, 4, 3, 14, 0)},
    {"id": 12, "post_id": 4, "author_name": "高橋美咲", "body": "メモリ管理の仕組みが気になります。", "created_at": ts(2025, 4, 4, 11, 30)},
    # post_id=5 (3件)
    {"id": 13, "post_id": 5, "author_name": "伊藤健太", "body": "パーティショニング戦略の解説が実践的で助かります。", "created_at": ts(2025, 5, 21, 8, 0)},
    {"id": 14, "post_id": 5, "author_name": "中村大輔", "body": "Parquet vs ORC の比較も入れてほしいです。", "created_at": ts(2025, 5, 22, 13, 0)},
    {"id": 15, "post_id": 5, "author_name": "田中太郎", "body": "ライフサイクルポリシーの設定例がわかりやすかったです。", "created_at": ts(2025, 5, 23, 10, 30)},
]

# ---------------------------------------------------------------------------
# post_tags(子テーブル2): 各 post に 2〜4 タグ → 合計 16 件
# ---------------------------------------------------------------------------
post_tags = [
    # post_id=1 (3タグ)
    {"id": 1, "post_id": 1, "tag": "database", "created_at": ts(2025, 1, 8, 14, 30)},
    {"id": 2, "post_id": 1, "tag": "mysql", "created_at": ts(2025, 1, 8, 14, 30)},
    {"id": 3, "post_id": 1, "tag": "design", "created_at": ts(2025, 1, 8, 14, 30)},
    # post_id=2 (4タグ)
    {"id": 4, "post_id": 2, "tag": "aws", "created_at": ts(2025, 2, 1, 11, 0)},
    {"id": 5, "post_id": 2, "tag": "athena", "created_at": ts(2025, 2, 1, 11, 0)},
    {"id": 6, "post_id": 2, "tag": "serverless", "created_at": ts(2025, 2, 1, 11, 0)},
    {"id": 7, "post_id": 2, "tag": "analytics", "created_at": ts(2025, 2, 1, 11, 0)},
    # post_id=3 (3タグ)
    {"id": 8, "post_id": 3, "tag": "json", "created_at": ts(2025, 3, 10, 16, 0)},
    {"id": 9, "post_id": 3, "tag": "data-format", "created_at": ts(2025, 3, 10, 16, 0)},
    {"id": 10, "post_id": 3, "tag": "streaming", "created_at": ts(2025, 3, 10, 16, 0)},
    # post_id=4 (3タグ)
    {"id": 11, "post_id": 4, "tag": "presto", "created_at": ts(2025, 4, 1, 13, 0)},
    {"id": 12, "post_id": 4, "tag": "distributed-system", "created_at": ts(2025, 4, 1, 13, 0)},
    {"id": 13, "post_id": 4, "tag": "architecture", "created_at": ts(2025, 4, 1, 13, 0)},
    # post_id=5 (3タグ)
    {"id": 14, "post_id": 5, "tag": "aws", "created_at": ts(2025, 5, 15, 9, 0)},
    {"id": 15, "post_id": 5, "tag": "s3", "created_at": ts(2025, 5, 15, 9, 0)},
    {"id": 16, "post_id": 5, "tag": "data-lake", "created_at": ts(2025, 5, 15, 9, 0)},
]


# ---------------------------------------------------------------------------
# ファイル出力
# ---------------------------------------------------------------------------
def write_jsonl(records: list[dict], subdir: str, filename: str) -> str:
    """レコードを JSON Lines 形式でファイルに書き出す"""
    dir_path = os.path.join(OUTPUT_DIR, subdir)
    os.makedirs(dir_path, exist_ok=True)
    file_path = os.path.join(dir_path, filename)
    with open(file_path, "w", encoding="utf-8") as f:
        for record in records:
            f.write(json.dumps(record, ensure_ascii=False) + "\n")
    return file_path


def main() -> None:
    targets = [
        (posts, "posts", "posts.jsonl"),
        (post_comments, "post_comments", "post_comments.jsonl"),
        (post_tags, "post_tags", "post_tags.jsonl"),
    ]
    for records, subdir, filename in targets:
        path = write_jsonl(records, subdir, filename)
        print(f"✅ {path}  ({len(records)} records)")


if __name__ == "__main__":
    main()

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?