0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

メモ

Posted at

dynamodbの変換でglueを使う

import sys
import boto3
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from botocore.exceptions import ClientError
from pyspark.sql.functions import lit

===== 引数とGlue初期化 =====

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

===== 設定 =====

source_table = 'mydb'
target_table = 'mydb_20250612'
region = 'ap-northeast-1'
ymd_target_value = 20250612

===== DynamoDB: 新テーブルを作成(存在チェック含む)=====

dynamodb = boto3.client('dynamodb', region_name=region)
try:
dynamodb.create_table(
TableName=target_table,
KeySchema=[
{'AttributeName': 'kid', 'KeyType': 'HASH'},
{'AttributeName': 'km', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'kid', 'AttributeType': 'S'},
{'AttributeName': 'km', 'AttributeType': 'N'}
],
BillingMode='PAY_PER_REQUEST'
)
# テーブル作成完了まで待機
waiter = dynamodb.get_waiter('table_exists')
waiter.wait(TableName=target_table)
print(f"✅ Table '{target_table}' created.")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceInUseException':
print(f"⚠️ Table '{target_table}' already exists.")
else:
raise

===== DynamoDBから分割条件付きで読み込み(例: 全件)=====

dyf = glueContext.create_dynamic_frame.from_options(
connection_type="dynamodb",
connection_options={
"dynamodb.input.tableName": source_table,
"dynamodb.throughput.read.percent": "0.2", # 読み取り制限で安定化
# "push_down_predicate": "ymd BETWEEN 20250101 AND 20250131", # 必要に応じてフィルタ
}
)

===== DynamicFrame → DataFrameでメモリ効率よく変換処理 =====

df = dyf.toDF()

===== ymdを更新(Spark的に処理)=====

df_transformed = df.withColumn("ymd", lit(ymd_target_value))

===== 再びDynamicFrameに戻してDynamoDBへ書き込み =====

dyf_transformed = DynamicFrame.fromDF(df_transformed, glueContext, "dyf_transformed")

glueContext.write_dynamic_frame.from_options(
frame=dyf_transformed,
connection_type="dynamodb",
connection_options={
"dynamodb.output.tableName": target_table
}
)

job.commit()
print("✅ Glue job completed successfully.")

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?