1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Glue Studio を Script 化し Job Parameter を与えて、Step Functions から Glue Job を実行してみた

Posted at

はじめに

Glue を使ったデータ加工を 1 日 1 回 行うときに、出力先の S3 に日付ごとにディレクトリを分けたいときがあります。この時に、Glue の Job Parameter を使うことで、出力先ディレクトリなどのパラメーターを渡す機能があり、こちらで柔軟に指定が可能です。

ただ、2023 年 8 月現在、Glue Studio の GUI を使ってデータ加工を行っている場合は、Job Parameter を使って S3 のディレクトリを指定することが出来ません。GUI を使ったデータ加工フローを Script 化すると、柔軟に指定が出来るようになります。今回はその手順を紹介していきます。

また、Glue を含めた全体のワークフローとして Step Functions を利用頂くことも多くあります。今回は、Step Functions から Glue を呼びだすときのパラメーターを指定する方法も紹介していきます。

前提

Glue Studio を使って、データ加工のフローを持っている前提で記事を記載します。なお、このフローは 前回の記事 で作成しました。

image-20230812221556376.png

Glue : Copy して Script 化

Glue Studio で作成したフローを Script 化していきます。注意点があります。GUI ベースのものを Script 化にしてしまうと、GUI に戻すことはできません。そのため、Script 化する前に、そのフローを Clone してバックアップを持っておきましょう。

Script 化するフローを選択して、Clone job を選びます。

image-20230812221633553.png

Clone した Job を使って、Edit Script を押します。

image-20230812222025390.png

Confirm を押します。

image-20230812222050431.png

すると、Script が生成されました。全体は以下のようなコードになっています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node transformcode
transformcode_node1691764083220 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://glue-studio-etltest01/codetransform.csv"],
        "recurse": True,
    },
    transformation_ctx="transformcode_node1691764083220",
)

# Script generated for node buhin
buhin_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "multiline": False,
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://glue-studio-etltest01/buhin.csv"],
        "recurse": True,
    },
    transformation_ctx="buhin_node1",
)

# Script generated for node Join
buhin_node1DF = buhin_node1.toDF()
transformcode_node1691764083220DF = transformcode_node1691764083220.toDF()
Join_node1691764237797 = DynamicFrame.fromDF(
    buhin_node1DF.join(
        transformcode_node1691764083220DF,
        (buhin_node1DF["buhin"] == transformcode_node1691764083220DF["before"]),
        "outer",
    ),
    glueContext,
    "Join_node1691764237797",
)

# Script generated for node Drop Fields
DropFields_node1691764648382 = DropFields.apply(
    frame=Join_node1691764237797,
    paths=["before"],
    transformation_ctx="DropFields_node1691764648382",
)

# Script generated for node Rename Field
RenameField_node1691764779832 = RenameField.apply(
    frame=DropFields_node1691764648382,
    old_name="after",
    new_name="bufin_converted",
    transformation_ctx="RenameField_node1691764779832",
)

# Script generated for node Amazon S3
AmazonS3_node1691846031658 = glueContext.write_dynamic_frame.from_options(
    frame=RenameField_node1691764779832,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://glue-studio-etltest01/output/",
        "partitionKeys": [],
    },
    transformation_ctx="AmazonS3_node1691846031658",
)

job.commit()

Glue : Code を修正して、Job Parameter を受け取る

まず、修正後のコードを全て記載します。このあとに、差分を説明します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME","output_dir","test_param"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

print("Debug!!!!!!!")
output_dir=args["output_dir"]
print(output_dir)
test_param=args["test_param"]
print(args["test_param"])

# Script generated for node transformcode
transformcode_node1691764083220 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://glue-studio-etltest01/codetransform.csv"],
        "recurse": True,
    },
    transformation_ctx="transformcode_node1691764083220",
)

# Script generated for node buhin
buhin_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "multiline": False,
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://glue-studio-etltest01/buhin.csv"],
        "recurse": True,
    },
    transformation_ctx="buhin_node1",
)

# Script generated for node Join
buhin_node1DF = buhin_node1.toDF()
transformcode_node1691764083220DF = transformcode_node1691764083220.toDF()
Join_node1691764237797 = DynamicFrame.fromDF(
    buhin_node1DF.join(
        transformcode_node1691764083220DF,
        (buhin_node1DF["buhin"] == transformcode_node1691764083220DF["before"]),
        "outer",
    ),
    glueContext,
    "Join_node1691764237797",
)

# Script generated for node Drop Fields
DropFields_node1691764648382 = DropFields.apply(
    frame=Join_node1691764237797,
    paths=["before"],
    transformation_ctx="DropFields_node1691764648382",
)

# Script generated for node Rename Field
RenameField_node1691764779832 = RenameField.apply(
    frame=DropFields_node1691764648382,
    old_name="after",
    new_name="bufin_converted",
    transformation_ctx="RenameField_node1691764779832",
)

# Script generated for node Amazon S3
AmazonS3_node1691846031658 = glueContext.write_dynamic_frame.from_options(
    frame=RenameField_node1691764779832,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://glue-studio-etltest01/output/" + output_dir + "/",
        "partitionKeys": [],
    },
    transformation_ctx="AmazonS3_node1691846031658",
)

job.commit()

引数を受けとる部分です。getResolvedOptions の関数で、output_dir と test_param の引数を新たに受け取って args に格納しています。その後、print 分などでデバッグ的に標準出力に出しています。

args = getResolvedOptions(sys.argv, ["JOB_NAME","output_dir","test_param"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

print("Debug!!!!!!!")
output_dir=args["output_dir"]
print(output_dir)
test_param=args["test_param"]
print(args["test_param"])

その後、S3 バケットに出力する部分です。pathoutput_dir を指定することで、出力するパラメータ―を指定できるようにしています。

# Script generated for node Amazon S3
AmazonS3_node1691846031658 = glueContext.write_dynamic_frame.from_options(
    frame=RenameField_node1691764779832,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://glue-studio-etltest01/output/" + output_dir + "/",
        "partitionKeys": [],
    },
    transformation_ctx="AmazonS3_node1691846031658",
)

Save を押して保存します。

image-20230812234203121.png

Glue : 単体で動作確認

AWS CLI から Glue Job を実行する際に、パラメーターを渡して動作確認をしてみます。--arguments= の指定で、2 つのパラメーターを渡しています。output_dir が S3 バケットで指定されるパラメーターになります。

aws glue start-job-run \
--job-name "Transform Code Scripting" \
--arguments='--output_dir="20230812", --test_param="this is test"'

実行した結果、Console 上でジョブの状態が確認できます。この詳細画面に、標準出力を確認するためのリンクがあります。Cloudwatch Logs で Output_logs を選択します。

image-20230812225557333.png

Python コードに記載したデバッグ文字列が確認できます。パラメーターで指定した 20230812this is test が確認できました。

image-20230812225641126.png

S3 バケット上で、パラメータで指定した名前でディレクトリが自動作成され、データ加工の結果が格納されました。

image-20230813003744111.png

Step Functions : Glue を呼びだすステートマシンを作成

Step Functions から Glue Job を呼びだしてパラメーターの指定をしてみます。

image-20230812234631852.png

Design your workflow visually で Next を押します。

image-20230812235302634.png

画面左側にある StartJobRun を検索して、真ん中に配置します。

image-20230812235621910.png

この Start Job Run の API Parameter を確認するために AWS Document を見ながら作り上げてもよいのですが、AWS CLI のデバッグからパラメーターを確認する方法もあります。AWS CLI の実行に --debug を付けて実行します。

aws glue start-job-run \
      --job-name "Transform Code Scripting" \
      --arguments='--output_dir="20230812", --test_param="this is test"' --debug

デバッグメッセージが表示されるなかで、以下の文に注目します。右にスクロールすると body のパラメータが見えます。これを Step Functions で同様に指定すれば OK です。

2023-08-12 23:58:57,959 - MainThread - botocore.endpoint - DEBUG - Making request for OperationModel(name=StartJobRun) with params: ...省略... 'body': b'{"JobName": "Transform Code Scripting", "Arguments": {"--output_dir": "20230812", "--test_param": "this is test"}}' 省略

Step Functions の API Parameter に次を指定します。

{
  "JobName": "Transform Code Scripting",
  "Arguments": {
    "--output_dir": "20230813",
    "--test_param": "this is test"
  }
}

image-20230813000645770.png

Glue の実行完了を待つために、Wait for task to complete にチェックを入れます。

image-20230813002118002.png

通常時は Success、エラー時は Fail につなげる設定を入れた後に、Next を押します。

image-20230813001010786.png

Next

image-20230813001140094.png

適当に State Machine の名前などを入れて、Create を押します。

image-20230813001248664.png

StepFunction : Glue を呼びだす実行確認

作成された State machine 上で Start execution を押します。

image-20230813001518492.png

Start execution を押します。

image-20230813001533985.png

実行の様子

  • Glue の Job 実行が終わるまで、待機している様子が見えます。

image-20230813001656281.png

実行が成功しました。

image-20230813001818792.png

S3 Bucket を確認すると、渡したパラメータが利用され、S3 の出力先ディレクトリ名として利用されています。

image-20230813001854114.png

わかったこと

  • Step Functions が Glue Job を実行するときに、実行が終わるまで待機することが簡単にできる
  • Step Functions から、Glue Job へ Job Parameter を渡すことが可能

参考 URL

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?