はじめに
Glue を使ったデータ加工を 1 日 1 回 行うときに、出力先の S3 に日付ごとにディレクトリを分けたいときがあります。この時に、Glue の Job Parameter を使うことで、出力先ディレクトリなどのパラメーターを渡す機能があり、こちらで柔軟に指定が可能です。
ただ、2023 年 8 月現在、Glue Studio の GUI を使ってデータ加工を行っている場合は、Job Parameter を使って S3 のディレクトリを指定することが出来ません。GUI を使ったデータ加工フローを Script 化すると、柔軟に指定が出来るようになります。今回はその手順を紹介していきます。
また、Glue を含めた全体のワークフローとして Step Functions を利用頂くことも多くあります。今回は、Step Functions から Glue を呼びだすときのパラメーターを指定する方法も紹介していきます。
前提
Glue Studio を使って、データ加工のフローを持っている前提で記事を記載します。なお、このフローは 前回の記事 で作成しました。
Glue : Copy して Script 化
Glue Studio で作成したフローを Script 化していきます。注意点があります。GUI ベースのものを Script 化にしてしまうと、GUI に戻すことはできません。そのため、Script 化する前に、そのフローを Clone してバックアップを持っておきましょう。
Script 化するフローを選択して、Clone job を選びます。
Clone した Job を使って、Edit Script を押します。
Confirm を押します。
すると、Script が生成されました。全体は以下のようなコードになっています。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node transformcode
transformcode_node1691764083220 = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": ",",
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://glue-studio-etltest01/codetransform.csv"],
"recurse": True,
},
transformation_ctx="transformcode_node1691764083220",
)
# Script generated for node buhin
buhin_node1 = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": ",",
"multiline": False,
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://glue-studio-etltest01/buhin.csv"],
"recurse": True,
},
transformation_ctx="buhin_node1",
)
# Script generated for node Join
buhin_node1DF = buhin_node1.toDF()
transformcode_node1691764083220DF = transformcode_node1691764083220.toDF()
Join_node1691764237797 = DynamicFrame.fromDF(
buhin_node1DF.join(
transformcode_node1691764083220DF,
(buhin_node1DF["buhin"] == transformcode_node1691764083220DF["before"]),
"outer",
),
glueContext,
"Join_node1691764237797",
)
# Script generated for node Drop Fields
DropFields_node1691764648382 = DropFields.apply(
frame=Join_node1691764237797,
paths=["before"],
transformation_ctx="DropFields_node1691764648382",
)
# Script generated for node Rename Field
RenameField_node1691764779832 = RenameField.apply(
frame=DropFields_node1691764648382,
old_name="after",
new_name="bufin_converted",
transformation_ctx="RenameField_node1691764779832",
)
# Script generated for node Amazon S3
AmazonS3_node1691846031658 = glueContext.write_dynamic_frame.from_options(
frame=RenameField_node1691764779832,
connection_type="s3",
format="csv",
connection_options={
"path": "s3://glue-studio-etltest01/output/",
"partitionKeys": [],
},
transformation_ctx="AmazonS3_node1691846031658",
)
job.commit()
Glue : Code を修正して、Job Parameter を受け取る
まず、修正後のコードを全て記載します。このあとに、差分を説明します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME","output_dir","test_param"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
print("Debug!!!!!!!")
output_dir=args["output_dir"]
print(output_dir)
test_param=args["test_param"]
print(args["test_param"])
# Script generated for node transformcode
transformcode_node1691764083220 = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": ",",
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://glue-studio-etltest01/codetransform.csv"],
"recurse": True,
},
transformation_ctx="transformcode_node1691764083220",
)
# Script generated for node buhin
buhin_node1 = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": ",",
"multiline": False,
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://glue-studio-etltest01/buhin.csv"],
"recurse": True,
},
transformation_ctx="buhin_node1",
)
# Script generated for node Join
buhin_node1DF = buhin_node1.toDF()
transformcode_node1691764083220DF = transformcode_node1691764083220.toDF()
Join_node1691764237797 = DynamicFrame.fromDF(
buhin_node1DF.join(
transformcode_node1691764083220DF,
(buhin_node1DF["buhin"] == transformcode_node1691764083220DF["before"]),
"outer",
),
glueContext,
"Join_node1691764237797",
)
# Script generated for node Drop Fields
DropFields_node1691764648382 = DropFields.apply(
frame=Join_node1691764237797,
paths=["before"],
transformation_ctx="DropFields_node1691764648382",
)
# Script generated for node Rename Field
RenameField_node1691764779832 = RenameField.apply(
frame=DropFields_node1691764648382,
old_name="after",
new_name="bufin_converted",
transformation_ctx="RenameField_node1691764779832",
)
# Script generated for node Amazon S3
AmazonS3_node1691846031658 = glueContext.write_dynamic_frame.from_options(
frame=RenameField_node1691764779832,
connection_type="s3",
format="csv",
connection_options={
"path": "s3://glue-studio-etltest01/output/" + output_dir + "/",
"partitionKeys": [],
},
transformation_ctx="AmazonS3_node1691846031658",
)
job.commit()
引数を受けとる部分です。getResolvedOptions
の関数で、output_dir と test_param の引数を新たに受け取って args に格納しています。その後、print 分などでデバッグ的に標準出力に出しています。
args = getResolvedOptions(sys.argv, ["JOB_NAME","output_dir","test_param"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
print("Debug!!!!!!!")
output_dir=args["output_dir"]
print(output_dir)
test_param=args["test_param"]
print(args["test_param"])
その後、S3 バケットに出力する部分です。path
に output_dir
を指定することで、出力するパラメータ―を指定できるようにしています。
# Script generated for node Amazon S3
AmazonS3_node1691846031658 = glueContext.write_dynamic_frame.from_options(
frame=RenameField_node1691764779832,
connection_type="s3",
format="csv",
connection_options={
"path": "s3://glue-studio-etltest01/output/" + output_dir + "/",
"partitionKeys": [],
},
transformation_ctx="AmazonS3_node1691846031658",
)
Save を押して保存します。
Glue : 単体で動作確認
AWS CLI から Glue Job を実行する際に、パラメーターを渡して動作確認をしてみます。--arguments=
の指定で、2 つのパラメーターを渡しています。output_dir
が S3 バケットで指定されるパラメーターになります。
aws glue start-job-run \
--job-name "Transform Code Scripting" \
--arguments='--output_dir="20230812", --test_param="this is test"'
実行した結果、Console 上でジョブの状態が確認できます。この詳細画面に、標準出力を確認するためのリンクがあります。Cloudwatch Logs で Output_logs
を選択します。
Python コードに記載したデバッグ文字列が確認できます。パラメーターで指定した 20230812
や this is test
が確認できました。
S3 バケット上で、パラメータで指定した名前でディレクトリが自動作成され、データ加工の結果が格納されました。
Step Functions : Glue を呼びだすステートマシンを作成
Step Functions から Glue Job を呼びだしてパラメーターの指定をしてみます。
Design your workflow visually で Next を押します。
画面左側にある StartJobRun を検索して、真ん中に配置します。
この Start Job Run の API Parameter を確認するために AWS Document を見ながら作り上げてもよいのですが、AWS CLI のデバッグからパラメーターを確認する方法もあります。AWS CLI の実行に --debug
を付けて実行します。
aws glue start-job-run \
--job-name "Transform Code Scripting" \
--arguments='--output_dir="20230812", --test_param="this is test"' --debug
デバッグメッセージが表示されるなかで、以下の文に注目します。右にスクロールすると body
のパラメータが見えます。これを Step Functions で同様に指定すれば OK です。
2023-08-12 23:58:57,959 - MainThread - botocore.endpoint - DEBUG - Making request for OperationModel(name=StartJobRun) with params: ...省略... 'body': b'{"JobName": "Transform Code Scripting", "Arguments": {"--output_dir": "20230812", "--test_param": "this is test"}}' 省略
Step Functions の API Parameter に次を指定します。
{
"JobName": "Transform Code Scripting",
"Arguments": {
"--output_dir": "20230813",
"--test_param": "this is test"
}
}
Glue の実行完了を待つために、Wait for task to complete にチェックを入れます。
通常時は Success、エラー時は Fail につなげる設定を入れた後に、Next を押します。
Next
適当に State Machine の名前などを入れて、Create を押します。
StepFunction : Glue を呼びだす実行確認
作成された State machine 上で Start execution を押します。
Start execution を押します。
実行の様子
- Glue の Job 実行が終わるまで、待機している様子が見えます。
実行が成功しました。
S3 Bucket を確認すると、渡したパラメータが利用され、S3 の出力先ディレクトリ名として利用されています。
わかったこと
- Step Functions が Glue Job を実行するときに、実行が終わるまで待機することが簡単にできる
- Step Functions から、Glue Job へ Job Parameter を渡すことが可能
参考 URL
- Step Function を実行したときの日本日付を取得したい場合は、次の記事が参考になる
- [Step Functions] ステートマシンで「日付」を取り扱う方法
- https://dev.classmethod.jp/articles/step-functions-how-to-use-date-in-state-machine/