dynamodbの変換でglueを使う
import sys
import boto3
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from botocore.exceptions import ClientError
from pyspark.sql.functions import lit
===== 引数とGlue初期化 =====
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
===== 設定 =====
source_table = 'mydb'
target_table = 'mydb_20250612'
region = 'ap-northeast-1'
ymd_target_value = 20250612
===== DynamoDB: 新テーブルを作成(存在チェック含む)=====
dynamodb = boto3.client('dynamodb', region_name=region)
try:
dynamodb.create_table(
TableName=target_table,
KeySchema=[
{'AttributeName': 'kid', 'KeyType': 'HASH'},
{'AttributeName': 'km', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'kid', 'AttributeType': 'S'},
{'AttributeName': 'km', 'AttributeType': 'N'}
],
BillingMode='PAY_PER_REQUEST'
)
# テーブル作成完了まで待機
waiter = dynamodb.get_waiter('table_exists')
waiter.wait(TableName=target_table)
print(f"✅ Table '{target_table}' created.")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceInUseException':
print(f"⚠️ Table '{target_table}' already exists.")
else:
raise
===== DynamoDBから分割条件付きで読み込み(例: 全件)=====
dyf = glueContext.create_dynamic_frame.from_options(
connection_type="dynamodb",
connection_options={
"dynamodb.input.tableName": source_table,
"dynamodb.throughput.read.percent": "0.2", # 読み取り制限で安定化
# "push_down_predicate": "ymd BETWEEN 20250101 AND 20250131", # 必要に応じてフィルタ
}
)
===== DynamicFrame → DataFrameでメモリ効率よく変換処理 =====
df = dyf.toDF()
===== ymdを更新(Spark的に処理)=====
df_transformed = df.withColumn("ymd", lit(ymd_target_value))
===== 再びDynamicFrameに戻してDynamoDBへ書き込み =====
dyf_transformed = DynamicFrame.fromDF(df_transformed, glueContext, "dyf_transformed")
glueContext.write_dynamic_frame.from_options(
frame=dyf_transformed,
connection_type="dynamodb",
connection_options={
"dynamodb.output.tableName": target_table
}
)
job.commit()
print("✅ Glue job completed successfully.")