はじめに
本記事では、Glue JobでRedshift Serverlessのテーブルを定義する方法を整理します。
前回までの記事では、データインサート先のテーブルがない場合でもデータインサートができましたが、
データ型は推測された型で格納されていました。
今回はデータインサート先となるテーブルを作ってから(もし作成済みであれば処理スキップ)、
データインサートするまでの方法を整理します。
本記事は ↓ の続きです。
前回までの記事で、GlueJobでRedshift Serverlessのテーブルにデータ書き込みする流れを整理しました。
(前回記事で紹介したリソースは構築済み前提で進めていきます)
https://qiita.com/tamabe/items/ba8c0ec56f377aeeb908
この記事のまとめ
- Redshiftへのデータインサート前後にSQLクエリを実行できる
- SparkのDynamicFrameWriterクラスでpreactionsとpostactionsパラメータを利用する
- Redshiftではステージングテーブルに一時的にデータを保持してからターゲットテーブルに書き込む
この記事の構成
構成図
Glue JobでRedshift Serverlessのテーブルを定義するための構成を図にしています

preactionsとpostactions
RedshiftやRedshift Serverlessにデータを読み書きする際、Sparkコネクタが提供されていて、様々なオプションが用意されています1。
GlueではPySparkを使ってコード記述が可能です2
Glueから、Redshiftにデータ書き込みを行う際には、DynamicFrameWriterクラスが用意されています34。
preactionsとpostactionsパラメータを使うことで、
データインサート前後に、指定のSQLクエリを実行することができます。
AWS Repost記事56によると、preactions, postactionsパラメータは以下のように使用します:
- 改行文字を含まない
- セミコロンで区切られたSQLコマンド
コードの中身
Redshift Serverlessのテーブルを定義するコードの中身を整理します。
DynamicFrameWriterクラスを使用し、
preactionsとpostactionsオプションを含むconnection_optionsパラメータを指定します。
connection_options の使用方法を整理します。
ここでの処理イメージとして、
preactionsで、まずステージングテーブル、ターゲットテーブルを新規作成し、
postactionsで、ステージングテーブルに書き込まれた内容をターゲットテーブルに書き込みます。
kRedshiftにデータを書き込みする際は、ステージングデーブルに一時的にデータを保持します7。
使ってみての注意点は以下です。
- preactionsとpostactionsは、セットで使う(どちらかだけ指定しても動かない)
- dbtableには、staging_tableを指定する(target_tableではない)
サンプルコードは以下です。
# preactionsとpostactionsの使用例
glueContext.write_dynamic_frame.from_options(
frame = <書き込み対象DynamicFrame>,
connection_type = "redshift",
connection_options = {
"redshiftTmpDir": <書き込みデータの一時的な保持先のS3パス>,
"useConnectionProperties": "true",
"connectionName": <GlueJobがRedshiftとの接続に利用するコネクション>,
"dbtable": staging_table,
"preactions": pre_query,
"postactions": post_query,
}
)
connection_optionsでは以下の変数を定義しています。
target_table
インサートするデータの目的地となるテーブル
※postactionsにて、このテーブルにデータを書き込む
target_table = "<Redshiftのスキーマ名>.<Redshiftのターゲットテーブル名>"
staging_table
ターゲットテーブルに書き込まれるデータを、一時的に書き込むためのテーブル
※DynamicFrameWriterクラスは、このテーブルにデータを書き込む
staging_table = "<Redshiftのスキーマ名>.<Redshiftのステージングテーブル名>"
definitions
ターゲットテーブル(およびステージングテーブル)の定義
※カラム名、データ型などを定義する
definitions = [
"id bigint",
"entity varchar(255)",
...
]
prequery
preactionsパラメータに設定するSQLクエリの内容
※target_tableの作成、staging_tableの作成を行うSQLを定義する
pre_query = f"""
CREATE TABLE IF NOT EXISTS {target_table} (
{', '.join(definitions)}
);
DROP TABLE IF EXISTS {staging_table};
CREATE TABLE {staging_table} (LIKE {target_table});
"""
postquery
postactionsパラメータに設定するSQLクエリの内容
※staging_tableのデータをtarget_tableにインサートし、その後staging_tableを削除するSQLを定義する
post_query = f"""
BEGIN;
INSERT INTO {target_table} (
{', '.join(columns)}
)
SELECT
{', '.join(columns)}
FROM {staging_table};
DROP TABLE {staging_table};
END;
"""
まとめ
本記事では、Glue Jobを使って、Redshift Serverlessにテーブル定義を行ってからデータを書き込みするための記述を整理しました。
参考URL
この記事で参考にしたURLです。
-
AWSドキュメント:Amazon Redshift integration for Apache Spark Other configuration options
https://docs.aws.amazon.com/redshift/latest/mgmt/spark-redshift-connector-other-config.html ↩ -
AWS 記事:PySpark で AWS Glue ETL スクリプトをプログラムする
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python.html ↩ -
AWSドキュメント:DynamicFrameWriter class
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer.html ↩ -
AWS 記事:Redshift 接続
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-connect-redshift-home.html#aws-glue-programming-etl-connect-redshift-write ↩ -
AWS Repost:AWS Glue S3 to Redshift ETL job - ignored
preactionsandpostactions
https://repost.aws/ja/questions/QUR4mvZ1NUQvKof115XSOxPg/aws-glue-s3-to-redshift-etl-job-ignored-preactions-and-postactions ↩ -
AWS Repost:How do I run SQL commands on an Amazon Redshift table before or after writing data in an AWS Glue job?
https://repost.aws/knowledge-center/sql-commands-redshift-glue-job ↩ -
AWS公式:一時的に使用するステージングテーブルを作成する
https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/merge-create-staging-table.html ↩