dynamodbの変換でglueを使う
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, lit
# ジョブ初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 入力元の S3 パス
input_path = "s3://your-bucket/input/"
dyf = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
format="json",
connection_options={"paths": [input_path]}
)
# DataFrame に変換
df = dyf.toDF()
# すべての ymd.S を "20250611" に上書き
df_updated = df.withColumn(
"Item",
col("Item").withField(
"ymd",
col("Item.ymd").withField(
"S",
lit("20250611") # ← どんな値でも固定値にする
)
)
)
# DynamicFrame に戻す
dyf_updated = DynamicFrame.fromDF(df_updated, glueContext, "dyf_updated")
# 出力先の S3 パス
output_path = "s3://your-bucket/output/"
glueContext.write_dynamic_frame.from_options(
frame=dyf_updated,
connection_type="s3",
format="json",
connection_options={"path": output_path}
)
job.commit()