0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS Glueジョブ作成時の備忘録

Last updated at Posted at 2021-04-05

はじめに#

エキサイトの坂本です。久しぶり投稿します。

Privateサブネットに配置しているOracleからPublicサブネットに配置している分析用のRedshiftにデータ転送の際に、AWS Glueを使えば非常に便利です。基本的にAWSコンソールからGlueジョブが自動で発行されますが、ニーズに応じて諸々カスタマイズが必要です。

事例#

ジョブのパラメータ受け取る##

現場でDev環境とLive環境のDB名が違うので、Devで発行したGlueスクリプトはDev環境のDB名が入っていますので、Live環境でGlueジョブ再登録すると、DBを書き換えしたくないですね。その場合は、Glueジョブのパラメータを使えば、解決できます。

例)

...
## @params: [TempDir, JOB_NAME, DATASOURCE_DB_NAME]
args = getResolvedOptions(
    sys.argv, ['TempDir', 'JOB_NAME', 'DATASOURCE_DB_NAME'])

# テーブル名
datasource_table_name = f"{args['DATASOURCE_DB_NAME']}_excite_hc_users"
...

RedshiftにInsert前にTruncateしたい##

datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=resolvechoice4,
    catalog_connection="excite-glue-connection-redshift",
    connection_options={
        "preactions": "truncate table users",
        "dbtable": "users",
        "database": "excite",
    },
    redshift_tmp_dir=args["TempDir"],
    transformation_ctx="datasink5"
)

*注意:connection_optionsはfrom_jdbc_confのみ使えます。詳細はGlueのドキュメントをご確認ください

電話番号のマスキング##

Publicサブネットに配置しているRedshiftに個人情報を転送してはいけないですね

import hashlib
...
# 電話番号のマスキング処理
def maskPhoneNumber(dynamicRecord):
    dynamicRecord["number_hash_hex"] = hashlib.md5(
        dynamicRecord["number_hash_hex"].encode("utf8")).hexdigest()
    return dynamicRecord
...
# 電話番号をマスキング
masked_dynamic_frame = Map.apply(
    frame=applymapping1,
    f=maskPhoneNumber
)

1行ずつデータをチェックしたい##

def checkifmailaddr(val): return hashlib.md5(
    val.encode("utf8")).hexdigest() if val else "hoge"
...
# Dataframeに変換
df_list = resolvechoice4.toDF().collect()

result_list = []
for row in df_list:
    result_list.append(
        Row(
            userid=row['userid'],
            mailaddr_hash_hex=checkifmailaddr(row['mailaddr_hash_hex'])
        )
    )

df = spark.createDataFrame(result_list)
result_data_frame = DynamicFrame.fromDF(df, glueContext, 'result_data_frame')

# これでRedshiftに書き込み
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=result_data_frame,
    ...
)

転送対象データを絞り込み##

膨大なテーブルで全データを入れ直すと、時間やコストがかかりますので、2日の差分だけ転送したい場面もありますね

# 直近x日だけ取得
JST = datetime.timezone(datetime.timedelta(hours=+9))
today = datetime.datetime.today().astimezone(JST)
n_days_ago = today.date() - datetime.timedelta(days=7)
n_days_ago_formatted = n_days_ago.strftime('%Y-%m-%d')
target_df = datasource0.toDF().where(f"ap_regdate >= '{n_days_ago_formatted}'")
datasource0 = DynamicFrame.fromDF(target_df, glueContext, "datasource")

# Redshift側に直近x日のデータ削除
preaction_query = f"delete from users where regdate >= TO_DATE('{n_days_ago_formatted}', 'YYYY-MM-DD HH24:MI:SS')"

# 次は普通に使えます
applymapping1 = ApplyMapping.apply(
    frame=datasource0,
    mappings=[
       ...
    ],
    transformation_ctx="applymapping1"
)

# Redshiftにデータ書き込み
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=resolvechoice4,
    catalog_connection="excite-glue-connection-redshift",
    connection_options={
        "preactions": preaction_query,
        "dbtable": "users",
        "database": "excite",
    },
    redshift_tmp_dir=args["TempDir"],
    transformation_ctx="datasink5"
)

以上です

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?