導入
AWS Glue を使えば、S3 に保存された RDS スナップショットのデータを簡単に読み込み、複数のテーブルを JOIN して分析に適した形式へ変換できます。
この記事では、Glue を使って以下のような処理を実現する方法を紹介します。
この記事でできること
- S3 にエクスポートされた RDS のデータを Parquet 形式で読み込み
- ユーザー情報・投稿・閲覧ログ・いいね・タグ情報を結合
- 閲覧時間の算出、いいね日時の抽出など軽い加工処理
- 結果を CSV に変換し S3 に保存
前提条件
- RDS スナップショットを S3 にエクスポート済み(Parquet形式)
- AWS Glue ジョブの IAM ロールに S3 アクセス権限があること
- Glue ジョブは Spark (Python) で作成されていること
コード全体(Glue ジョブ)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import unix_timestamp, col
# Glue ジョブ引数の取得
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Spark / Glue の初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# S3上のRDSスナップショットデータのベースパス
base_path = "s3://my-rds-backup-bucket/rds-snapshots/sample-rds-snapshot-export/sample_db/"
# Parquetファイルの読み込み
users = spark.read.parquet(base_path + "sample_db.users/1/") \
.filter(col("user_id") != "00000000-0000-0000-0000-000000000000") # テストユーザーなど除外したい場合
posts = spark.read.parquet(base_path + "sample_db.posts/1/")
contents_view_logs = spark.read.parquet(base_path + "sample_db.contents_view_logs/1/")
likes = spark.read.parquet(base_path + "sample_db.likes/1/")
user_tags = spark.read.parquet(base_path + "sample_db.user_tags/1/")
# タイムスタンプのキャスト
contents_view_logs = contents_view_logs \
.withColumn("read_at", col("read_at").cast("timestamp")) \
.withColumn("finish_at", col("finish_at").cast("timestamp"))
likes = likes.withColumn("created_at", col("created_at").cast("timestamp"))
# JOIN処理
result = users.alias("u") \
.join(posts.alias("p"), col("u.user_id") == col("p.user_id"), "left") \
.join(contents_view_logs.alias("cl"),
(col("u.user_id") == col("cl.user_id")) & (col("p.post_id") == col("cl.post_id")),
"left") \
.join(likes.alias("l"),
(col("u.user_id") == col("l.user_id")) & (col("p.post_id") == col("l.post_id")),
"left") \
.join(user_tags.alias("ut"), col("u.user_id") == col("ut.user_id"), "left") \
.select(
col("u.user_id"),
col("u.age"),
col("u.gender"),
col("u.profession"),
col("p.post_id"),
col("p.title"),
col("p.category_id"),
(unix_timestamp("cl.finish_at") - unix_timestamp("cl.read_at")).alias("view_duration"),
col("l.created_at").alias("like_at"),
col("ut.tag_id")
)
# 結果の出力先パス(CSV形式)
output_path = "s3://my-rds-backup-bucket/processed/joined_data_csv/"
# データをCSV形式でS3に出力
result.write \
.mode("overwrite") \
.option("header", "true") \
.csv(output_path)
job.commit()
解説:このコードでやっていること
1. Glue ジョブの初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
Spark 環境と GlueContext を初期化し、データ処理の準備をします。
2. データの読み込みと前処理
spark.read.parquet(...)
S3 に保存された RDS スナップショット(Parquet形式)から、各テーブルを読み込みます。日時データは cast("timestamp")
で明示的に変換しています。
3. データの結合(JOIN)
.join(...).select(...)
ユーザー情報に対して、投稿・閲覧ログ・いいね・タグ情報をすべて LEFT JOIN
しています。閲覧時間(view_duration
)の計算もこの中で行っています。
4. 出力(CSV形式でS3に保存)
result.write.option("header", "true").csv(output_path)
加工・統合したデータを CSV として S3 に出力します。mode("overwrite")
によって既存ファイルは上書きされます。
よくある質問
Q. Parquetファイルが sample_db.users/1/
のようなパスにあるのはなぜ?
A. RDSスナップショットをエクスポートすると、テーブルごとにディレクトリが分かれます。その中の数字(例: /1/
)はスナップショットのバージョンのようなもので、最新のデータが格納されています。
Q. user_id != "000..."
のフィルターは必要?
A. 本番環境では不要なデータ(管理者やシステムテスト用ユーザーなど)を除外する用途で使われます。必要なければ削除可能です。
まとめ
AWS Glue と PySpark を活用することで、RDS のスナップショットデータを簡単に加工・分析しやすい形で保存できます。BIツールや機械学習の前処理にも最適な手法です。
定期的に実行すれば、分析基盤や可視化ツールと連携した自動化パイプラインにも発展させられます。