はじめに#
エキサイトの坂本です。久しぶり投稿します。
Privateサブネットに配置しているOracleからPublicサブネットに配置している分析用のRedshiftにデータ転送の際に、AWS Glueを使えば非常に便利です。基本的にAWSコンソールからGlueジョブが自動で発行されますが、ニーズに応じて諸々カスタマイズが必要です。
事例#
ジョブのパラメータ受け取る##
現場でDev環境とLive環境のDB名が違うので、Devで発行したGlueスクリプトはDev環境のDB名が入っていますので、Live環境でGlueジョブ再登録すると、DBを書き換えしたくないですね。その場合は、Glueジョブのパラメータを使えば、解決できます。
例)
...
## @params: [TempDir, JOB_NAME, DATASOURCE_DB_NAME]
args = getResolvedOptions(
sys.argv, ['TempDir', 'JOB_NAME', 'DATASOURCE_DB_NAME'])
# テーブル名
datasource_table_name = f"{args['DATASOURCE_DB_NAME']}_excite_hc_users"
...
RedshiftにInsert前にTruncateしたい##
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=resolvechoice4,
catalog_connection="excite-glue-connection-redshift",
connection_options={
"preactions": "truncate table users",
"dbtable": "users",
"database": "excite",
},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="datasink5"
)
*注意:connection_optionsはfrom_jdbc_confのみ使えます。詳細はGlueのドキュメントをご確認ください
電話番号のマスキング##
Publicサブネットに配置しているRedshiftに個人情報を転送してはいけないですね
import hashlib
...
# 電話番号のマスキング処理
def maskPhoneNumber(dynamicRecord):
dynamicRecord["number_hash_hex"] = hashlib.md5(
dynamicRecord["number_hash_hex"].encode("utf8")).hexdigest()
return dynamicRecord
...
# 電話番号をマスキング
masked_dynamic_frame = Map.apply(
frame=applymapping1,
f=maskPhoneNumber
)
1行ずつデータをチェックしたい##
def checkifmailaddr(val): return hashlib.md5(
val.encode("utf8")).hexdigest() if val else "hoge"
...
# Dataframeに変換
df_list = resolvechoice4.toDF().collect()
result_list = []
for row in df_list:
result_list.append(
Row(
userid=row['userid'],
mailaddr_hash_hex=checkifmailaddr(row['mailaddr_hash_hex'])
)
)
df = spark.createDataFrame(result_list)
result_data_frame = DynamicFrame.fromDF(df, glueContext, 'result_data_frame')
# これでRedshiftに書き込み
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=result_data_frame,
...
)
転送対象データを絞り込み##
膨大なテーブルで全データを入れ直すと、時間やコストがかかりますので、2日の差分だけ転送したい場面もありますね
# 直近x日だけ取得
JST = datetime.timezone(datetime.timedelta(hours=+9))
today = datetime.datetime.today().astimezone(JST)
n_days_ago = today.date() - datetime.timedelta(days=7)
n_days_ago_formatted = n_days_ago.strftime('%Y-%m-%d')
target_df = datasource0.toDF().where(f"ap_regdate >= '{n_days_ago_formatted}'")
datasource0 = DynamicFrame.fromDF(target_df, glueContext, "datasource")
# Redshift側に直近x日のデータ削除
preaction_query = f"delete from users where regdate >= TO_DATE('{n_days_ago_formatted}', 'YYYY-MM-DD HH24:MI:SS')"
# 次は普通に使えます
applymapping1 = ApplyMapping.apply(
frame=datasource0,
mappings=[
...
],
transformation_ctx="applymapping1"
)
# Redshiftにデータ書き込み
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=resolvechoice4,
catalog_connection="excite-glue-connection-redshift",
connection_options={
"preactions": preaction_query,
"dbtable": "users",
"database": "excite",
},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="datasink5"
)
以上です