はじめに
Step Functions、皆さん使っているでしょうか。GUIでフローが組めて便利ですよね。
そんな便利なStep Functionsですが、実際に使おうとすると日付をどうやって取得するか問題に見舞われると思います。今回は、その解決方法の一例を試してみたので記事にします。
やりたいこと
- Step Functionsのフロー内で今日の日付(
yyyymmdd
形式)を取得して、Glueジョブ内の処理で使う- 日付は日本時間(JST) とする
- ただし、フローを手動実行した時に日付を指定すると、その日付がGlueジョブ内の処理で使われる
- 例えば日次で実行していた処理が土日で失敗していて、月曜日にリカバリーを実施する場合などを想定
実際に運用し始めると、後から別日で実行した時ってよくありますよね。それをうまく実現してみます。
日付を取得する方法の検討
Step Functions内で日付を取得する方法はいくつか存在します。その方法を紹介している素晴らしい記事があるので、そこから紹介します。
① EventBridge Schedulerから受け取ったタイムスタンプを加工する
こちらは、EventBridgeからタイムスタンプをStep Functionsで受け取る方法です。この方法を使うと、コーディングなしで日付が使えます。
② Lambda関数を使って現在の日付を取得する
一方で、こちらはコーディングが必要となりますが、柔軟に対応ができるLambdaを使う方法です。
③ Step Functions内のContextオブジェクトから日付を取得する
こちらは、Step FunctionsのPassステートなどでContextオブジェクトから日付を取得し、組み込み関数で加工します。①と同じくコーディングなしで、かつStep Functions内の機能だけで完結します。
今回はどれを使う?
今回は、② Lambda関数を使って現在の日付を取得する を選択します。理由は以下の2つです。
- JSTでタイムスタンプを取得したいから
- 他の方法では、UTCでタイムスタンプを取得します。今回はJSTで取得したかったため、Lambdaでのコーディングを選びました
- Step Functionsの組み込み関数でのタイムスタンプ処理が(個人的に)難しいから
- String 演算の組み込み関数を利用して、かつそれに応じたInput/Outputの形式を考えるのが大変そうでした
実際にやってみる
以下のようなステートマシンを作成しました。それぞれの部分について説明していきます。
説明がしやすいように、Lambda、Glueの部分から説明して、最後にChoice Stateの部分を説明します。
Lambda
まずは、メインである日付を取得するLambdaです。
Step Functionsから呼び出すところは、特別なことはしていません。既に作成しているLambda関数を指定しているだけです。
呼び出しているLambda関数を見てみます。
import json
from datetime import datetime, timedelta, timezone
def lambda_handler(event, context):
# 現在の日付を取得する
jst = timezone(timedelta(hours=9))
dt_now = datetime.now(jst)
today_date = dt_now.strftime('%Y%m%d')
return {
'Date': today_date
}
JSTで現在のタイムスタンプを取得して、フォーマットをyyyymmdd
という8桁の数字になるように整形しています。それを返すようにしているので、例えばこの関数の返り値は以下のようになります。
{
"Date": "20240710"
}
1つ目のGlue
Lambdaで受け取った日付を利用して、データ処理を行うGlueです。まずは1つ目のGlueの設定です。
ここでは、Glueに渡すパラメータとして、以下のようにジョブ名とジョブパラメータを指定しています。
{
"JobName": "20240709-date-job",
"Arguments": {
"--date.$": "$.Date",
"--s3_path_input": "s3://bucket-name/xxxxxxxx_sfn_date_glue/input/",
"--s3_path_output": "s3://bucket-name/xxxxxxxx_sfn_date_glue/output/"
}
}
ここで指定している"--date.$": "$.Date"
の部分が、Lambdaから返される8桁の日付となります。
右の値の方の$
は、入力のルート(最上位)を表します。LambdaからわたってくるJsonは"Date"しか持っていないので、このような指定方法になります。左のキーの$
は、パラメータの値が動的に決まる($
を使う場合)は決まりとして最後を.$
で終わらせなければならないそうです(ドキュメントより)。
また、1つ目のGlueは「出力」タブも少し変更する必要があります。
ResultPathを有効化することで、「Lambdaから受け取った日付のパラメータを後ろに受け渡す」ことができます。これをしないと、2つ目のGlueでDateの変数を受け取れません。
Step Functionsでは、変数を別のジョブなどで使いまわしたい場合、グローバル変数のように入力の変数を扱うのではなくバケツリレーのように変数を受け渡していかないといけません。
このあたりの入力/出力の話は少しややこしいのですが、以下の記事がとても分かりやすく解説してくれているので是非読んでみてください。感動するくらい分かりやすいです。
ここからは、実際のGlueジョブを見ていきます。まずはジョブパラメータです。
先ほど指定したジョブパラメータを設定しています。ただし、値は全てNone
としています。Step Functionsから実行した場合のみ、このジョブパラメータが上書きされて実行されます。
スクリプトはあまり重要ではないので、興味ある方はどうぞ。渡されてきた日付を、列として追加するジョブになっています。
Glueのスクリプト
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'date', 's3_path_input', 's3_path_output'])
date = args['date']
s3_path_input = args['s3_path_input']
s3_path_output = args['s3_path_output']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# S3からcsvファイルを読み込む
df = spark.read.csv(s3_path_input, header=True, inferSchema=True)
# 日付カラムを追加
from pyspark.sql.functions import lit
df = df.withColumn("date", lit(date))
# csvファイルとして出力
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(s3_path_output)
job.commit()
2つ目のGlue
1つ目と同じスクリプトを実行します。そのため、日付の変数を受け取る必要があります。
指定するパラメータは同じような形式なので割愛します。一応、出力するS3パスを変えてあります。
ここでのポイントは、同じジョブ名を指定しているということです。同じジョブ名を指定して利用する変数の中身を変えることで、Glueジョブ自体は1つで、複数の処理をStep Functionsで実行できます。
複数ジョブを実行する場合は、Glueジョブの同時実行数制限に気を付けてください。今回のように直列に実施するときは大丈夫ですが、Mapステートなどで並列実行するときは気を付ける必要があります。
Choice state
ここまでは、Lambdaで取得した日付を後続のGlueで利用していましたが、元々の要件に合った 「ただし、フローを手動実行した時に日付を指定すると、その日付がGlueジョブ内の処理で使われる」 を実現していません。それを実現したのがこちらになります。
ルール部分の詳細はこちらです。
これを実施することで、ステートマシンを実行した時に、"Date"
の値が指定されていたかどうかを判断します。指定されていればそれをそのままGlueで利用し、指定されていなければLambdaで日付を取得します。
実際、手動実行するように以下のようにDateを指定しておくことで、その日付で後続の処理を実行できます。
(一応、JSON部分だけ抽出)
{
"Date": "20240701"
}
この入力、よく見るとLambdaの返り値と全く同じJSONになっていますよね。そのため、これがそのままGlueの入力として使えるのです。
一応、今回作成したステートマシンの定義(JSON)を張っておきます。
ステートマシン定義
{
"Comment": "A description of my state machine",
"StartAt": "input Date check",
"States": {
"input Date check": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Date",
"IsPresent": true,
"Next": "ETL1"
}
],
"Default": "get date"
},
"ETL1": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "20240709-date-job",
"Arguments": {
"--date.$": "$.Date",
"--s3_path_input": "s3://bucket-name/xxxxxxxx_sfn_date_glue/input/",
"--s3_path_output": "s3://bucket-name/xxxxxxxx_sfn_date_glue/output/"
}
},
"Next": "ETL2",
"ResultPath": null
},
"ETL2": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "20240709-date-job",
"Arguments": {
"--date.$": "$.Date",
"--s3_path_input": "s3://bucket-name/xxxxxxxx_sfn_date_glue/input/",
"--s3_path_output": "s3://bucket-name/xxxxxxxx_sfn_date_glue/output2/"
}
},
"End": true
},
"get date": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:999999999999:function:get-date:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Next": "ETL1"
}
}
}
おわりに
Lambdaでコーディングは必要になりますが、全体的には簡単にやりたいことが実現できたと思います。全てをノーコードでやろうとすると逆に大変になる部分はあるので、上手くLambdaなどを使って効率よく開発していきたいですね。
久しぶりにStep Functionsを使ってみて、やはりポイントは入力と出力の指定方法だなと感じました。結構癖があるので、何度か試して慣れていきたいところです。一応もう一度、おすすめの記事を張って終わりにします。