はじめに
AWS Glue は、サーバーレスでデータ処理を行うための強力なツールです。本記事では、AWS Glue を使用して S3 に保存された JSON データを Parquet に変換する方法 を紹介します。
DynamoDB からエクスポートしたデータは JSON 形式で保存されることが多く、そのままでは分析しにくい場合があります。Parquet 形式に変換することで、データの圧縮率が向上し、Athena や Redshift Spectrum でのクエリ性能が向上します。
- データの形式: JSON → Parquet
- 保存場所: S3
- IAM 設定: 必要な権限を付与
- ファイル名の管理: JSON の元ファイル名を Parquet に引き継ぐ
1. Glue Job の概要
AWS Glue の ETL (Extract, Transform, Load) ジョブを使用し、以下の処理を行います。
- S3 にある JSON ファイルをリストアップ
- 各 JSON ファイルを Spark DataFrame に変換
- Parquet 形式に変換し、S3 に保存
- 元の JSON ファイル名と一致する Parquet ファイル名にする
2. IAM ロールの設定
Glue ジョブを実行するために、適切な IAM ロールを設定する必要があります。
必要なポリシー
以下の権限を Glue の実行ロールに付与します。
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::your-bucket-name",
"arn:aws:s3:::your-bucket-name/*"
]
}
-
s3:ListBucket
→ バケット内のファイル一覧を取得 -
s3:GetObject
→ JSON ファイルを取得 -
s3:PutObject
→ Parquet ファイルを S3 に保存
3. Glue Job のコード
以下の Python スクリプトを AWS Glue のスクリプトエディタに保存し、実行します。
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
from urllib.parse import urlparse
# Glue Job の初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# S3のJSONデータが格納されている入力ディレクトリ
input_path = "s3://your-bucket-name/sales-data/"
# Parquetデータを保存する新しいディレクトリ
output_path = "s3://your-bucket-name/output/parquet/"
# S3 クライアント
s3 = boto3.client("s3")
# バケット名とプレフィックスを取得
parsed_url = urlparse(input_path)
bucket_name = parsed_url.netloc
prefix = parsed_url.path.lstrip("/")
# S3 の JSON ファイルをリストアップ
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if "Contents" not in response:
raise ValueError("Error: No JSON files found in input_path!")
json_files = [obj["Key"] for obj in response["Contents"] if obj["Key"].endswith(".json")]
if not json_files:
raise ValueError("Error: No JSON files found in input_path!")
print(f"Found {len(json_files)} JSON files.")
# 各 JSON ファイルを個別に変換
for json_file in json_files:
json_file_path = f"s3://{bucket_name}/{json_file}"
print(f"Processing file: {json_file_path}")
# Glue DynamicFrame を作成
AmazonS3_node = glueContext.create_dynamic_frame.from_options(
format_options={"multiLine": "false"},
connection_type="s3",
format="json",
connection_options={"paths": [json_file_path]},
transformation_ctx="AmazonS3_node"
)
# DynamicFrame を Spark DataFrame に変換
df = AmazonS3_node.toDF()
# データが空ならスキップ
if df.count() == 0:
print(f"Skipping {json_file} as it contains no data.")
continue
# ファイル名を取得(拡張子を .parquet に変更)
file_name = json_file.split("/")[-1].replace(".json", ".parquet")
output_file_path = f"{output_path}{file_name}"
# Parquet形式で保存
df.write.mode("overwrite").parquet(output_file_path)
print(f"Saved {output_file_path}")
print(f"All JSON files under {input_path} have been converted to Parquet and saved in {output_path}.")
# Jobの終了
job.commit()
4. 実行方法
5. まとめ
AWS Glue を使うことで、S3 に保存された JSON データを簡単に Parquet 形式に変換できます。特に、DynamoDB からエクスポートされた JSON データを分析しやすくするのに有効です。IAM の適切な権限を設定し、Glue Job を実行することで、効率的なデータ処理が可能になります。本記事では、元の JSON ファイル名を維持しながら Parquet に変換する方法を紹介しました。AWS Glue を活用し、データ分析や ETL の効率化を進めましょう!