1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Glue で RDS データを分析用に統合・変換し CSV で S3 に出力する方法

Last updated at Posted at 2025-05-13

導入

AWS Glue を使えば、S3 に保存された RDS スナップショットのデータを簡単に読み込み、複数のテーブルを JOIN して分析に適した形式へ変換できます。

この記事では、Glue を使って以下のような処理を実現する方法を紹介します。

この記事でできること

  • S3 にエクスポートされた RDS のデータを Parquet 形式で読み込み
  • ユーザー情報・投稿・閲覧ログ・いいね・タグ情報を結合
  • 閲覧時間の算出、いいね日時の抽出など軽い加工処理
  • 結果を CSV に変換し S3 に保存

前提条件

  • RDS スナップショットを S3 にエクスポート済み(Parquet形式)
  • AWS Glue ジョブの IAM ロールに S3 アクセス権限があること
  • Glue ジョブは Spark (Python) で作成されていること

コード全体(Glue ジョブ)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import unix_timestamp, col

# Glue ジョブ引数の取得
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Spark / Glue の初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3上のRDSスナップショットデータのベースパス
base_path = "s3://my-rds-backup-bucket/rds-snapshots/sample-rds-snapshot-export/sample_db/"

# Parquetファイルの読み込み
users = spark.read.parquet(base_path + "sample_db.users/1/") \
    .filter(col("user_id") != "00000000-0000-0000-0000-000000000000")  # テストユーザーなど除外したい場合
posts = spark.read.parquet(base_path + "sample_db.posts/1/")
contents_view_logs = spark.read.parquet(base_path + "sample_db.contents_view_logs/1/")
likes = spark.read.parquet(base_path + "sample_db.likes/1/")
user_tags = spark.read.parquet(base_path + "sample_db.user_tags/1/")

# タイムスタンプのキャスト
contents_view_logs = contents_view_logs \
    .withColumn("read_at", col("read_at").cast("timestamp")) \
    .withColumn("finish_at", col("finish_at").cast("timestamp"))

likes = likes.withColumn("created_at", col("created_at").cast("timestamp"))

# JOIN処理
result = users.alias("u") \
    .join(posts.alias("p"), col("u.user_id") == col("p.user_id"), "left") \
    .join(contents_view_logs.alias("cl"),
          (col("u.user_id") == col("cl.user_id")) & (col("p.post_id") == col("cl.post_id")),
          "left") \
    .join(likes.alias("l"),
          (col("u.user_id") == col("l.user_id")) & (col("p.post_id") == col("l.post_id")),
          "left") \
    .join(user_tags.alias("ut"), col("u.user_id") == col("ut.user_id"), "left") \
    .select(
        col("u.user_id"),
        col("u.age"),
        col("u.gender"),
        col("u.profession"),
        col("p.post_id"),
        col("p.title"),
        col("p.category_id"),
        (unix_timestamp("cl.finish_at") - unix_timestamp("cl.read_at")).alias("view_duration"),
        col("l.created_at").alias("like_at"),
        col("ut.tag_id")
    )

# 結果の出力先パス(CSV形式)
output_path = "s3://my-rds-backup-bucket/processed/joined_data_csv/"

# データをCSV形式でS3に出力
result.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)

job.commit()

解説:このコードでやっていること

1. Glue ジョブの初期化

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

Spark 環境と GlueContext を初期化し、データ処理の準備をします。

2. データの読み込みと前処理

spark.read.parquet(...)

S3 に保存された RDS スナップショット(Parquet形式)から、各テーブルを読み込みます。日時データは cast("timestamp") で明示的に変換しています。

3. データの結合(JOIN)

.join(...).select(...)

ユーザー情報に対して、投稿・閲覧ログ・いいね・タグ情報をすべて LEFT JOIN しています。閲覧時間(view_duration)の計算もこの中で行っています。

4. 出力(CSV形式でS3に保存)

result.write.option("header", "true").csv(output_path)

加工・統合したデータを CSV として S3 に出力します。mode("overwrite") によって既存ファイルは上書きされます。

よくある質問

Q. Parquetファイルが sample_db.users/1/ のようなパスにあるのはなぜ?

A. RDSスナップショットをエクスポートすると、テーブルごとにディレクトリが分かれます。その中の数字(例: /1/)はスナップショットのバージョンのようなもので、最新のデータが格納されています。

Q. user_id != "000..." のフィルターは必要?

A. 本番環境では不要なデータ(管理者やシステムテスト用ユーザーなど)を除外する用途で使われます。必要なければ削除可能です。

まとめ

AWS Glue と PySpark を活用することで、RDS のスナップショットデータを簡単に加工・分析しやすい形で保存できます。BIツールや機械学習の前処理にも最適な手法です。

定期的に実行すれば、分析基盤や可視化ツールと連携した自動化パイプラインにも発展させられます。

関連リンク

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?