2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Step Functionsで日付を取得して後続のGlueジョブで使いまわす(手動で別日実行もしたい)

Last updated at Posted at 2024-07-12

はじめに

Step Functions、皆さん使っているでしょうか。GUIでフローが組めて便利ですよね。
そんな便利なStep Functionsですが、実際に使おうとすると日付をどうやって取得するか問題に見舞われると思います。今回は、その解決方法の一例を試してみたので記事にします。

やりたいこと

  • Step Functionsのフロー内で今日の日付yyyymmdd形式)を取得して、Glueジョブ内の処理で使う
    • 日付は日本時間(JST) とする
  • ただし、フローを手動実行した時に日付を指定すると、その日付がGlueジョブ内の処理で使われる
    • 例えば日次で実行していた処理が土日で失敗していて、月曜日にリカバリーを実施する場合などを想定

実際に運用し始めると、後から別日で実行した時ってよくありますよね。それをうまく実現してみます。

日付を取得する方法の検討

Step Functions内で日付を取得する方法はいくつか存在します。その方法を紹介している素晴らしい記事があるので、そこから紹介します。

① EventBridge Schedulerから受け取ったタイムスタンプを加工する
こちらは、EventBridgeからタイムスタンプをStep Functionsで受け取る方法です。この方法を使うと、コーディングなしで日付が使えます。

② Lambda関数を使って現在の日付を取得する
一方で、こちらはコーディングが必要となりますが、柔軟に対応ができるLambdaを使う方法です。

③ Step Functions内のContextオブジェクトから日付を取得する
こちらは、Step FunctionsのPassステートなどでContextオブジェクトから日付を取得し、組み込み関数で加工します。①と同じくコーディングなしで、かつStep Functions内の機能だけで完結します。

今回はどれを使う?

今回は、② Lambda関数を使って現在の日付を取得する を選択します。理由は以下の2つです。

  • JSTでタイムスタンプを取得したいから
    • 他の方法では、UTCでタイムスタンプを取得します。今回はJSTで取得したかったため、Lambdaでのコーディングを選びました
  • Step Functionsの組み込み関数でのタイムスタンプ処理が(個人的に)難しいから

実際にやってみる

以下のようなステートマシンを作成しました。それぞれの部分について説明していきます。
説明がしやすいように、Lambda、Glueの部分から説明して、最後にChoice Stateの部分を説明します。

image.png

Lambda

まずは、メインである日付を取得するLambdaです。

image.png

Step Functionsから呼び出すところは、特別なことはしていません。既に作成しているLambda関数を指定しているだけです。

呼び出しているLambda関数を見てみます。

import json
from datetime import datetime, timedelta, timezone

def lambda_handler(event, context):
    # 現在の日付を取得する
    jst = timezone(timedelta(hours=9))
    dt_now = datetime.now(jst)
    today_date = dt_now.strftime('%Y%m%d')

    return {
        'Date': today_date
    }

JSTで現在のタイムスタンプを取得して、フォーマットをyyyymmddという8桁の数字になるように整形しています。それを返すようにしているので、例えばこの関数の返り値は以下のようになります。

{
  "Date": "20240710"
}

1つ目のGlue

Lambdaで受け取った日付を利用して、データ処理を行うGlueです。まずは1つ目のGlueの設定です。

image.png

ここでは、Glueに渡すパラメータとして、以下のようにジョブ名とジョブパラメータを指定しています。

{
  "JobName": "20240709-date-job",
  "Arguments": {
    "--date.$": "$.Date",
    "--s3_path_input": "s3://bucket-name/xxxxxxxx_sfn_date_glue/input/",
    "--s3_path_output": "s3://bucket-name/xxxxxxxx_sfn_date_glue/output/"
  }
}

ここで指定している"--date.$": "$.Date"の部分が、Lambdaから返される8桁の日付となります。
右の値の方の$は、入力のルート(最上位)を表します。LambdaからわたってくるJsonは"Date"しか持っていないので、このような指定方法になります。左のキーの$は、パラメータの値が動的に決まる($を使う場合)は決まりとして最後を.$で終わらせなければならないそうです(ドキュメントより)。

また、1つ目のGlueは「出力」タブも少し変更する必要があります。

image.png

ResultPathを有効化することで、「Lambdaから受け取った日付のパラメータを後ろに受け渡す」ことができます。これをしないと、2つ目のGlueでDateの変数を受け取れません。
Step Functionsでは、変数を別のジョブなどで使いまわしたい場合、グローバル変数のように入力の変数を扱うのではなくバケツリレーのように変数を受け渡していかないといけません。

このあたりの入力/出力の話は少しややこしいのですが、以下の記事がとても分かりやすく解説してくれているので是非読んでみてください。感動するくらい分かりやすいです。

ここからは、実際のGlueジョブを見ていきます。まずはジョブパラメータです。

image.png

先ほど指定したジョブパラメータを設定しています。ただし、値は全てNoneとしています。Step Functionsから実行した場合のみ、このジョブパラメータが上書きされて実行されます。

スクリプトはあまり重要ではないので、興味ある方はどうぞ。渡されてきた日付を、列として追加するジョブになっています。

Glueのスクリプト
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'date', 's3_path_input', 's3_path_output'])
date = args['date']
s3_path_input = args['s3_path_input']
s3_path_output = args['s3_path_output']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3からcsvファイルを読み込む
df = spark.read.csv(s3_path_input, header=True, inferSchema=True)

# 日付カラムを追加
from pyspark.sql.functions import lit
df = df.withColumn("date", lit(date))

# csvファイルとして出力
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(s3_path_output)

job.commit()

2つ目のGlue

1つ目と同じスクリプトを実行します。そのため、日付の変数を受け取る必要があります。

image.png

指定するパラメータは同じような形式なので割愛します。一応、出力するS3パスを変えてあります。
ここでのポイントは、同じジョブ名を指定しているということです。同じジョブ名を指定して利用する変数の中身を変えることで、Glueジョブ自体は1つで、複数の処理をStep Functionsで実行できます。

複数ジョブを実行する場合は、Glueジョブの同時実行数制限に気を付けてください。今回のように直列に実施するときは大丈夫ですが、Mapステートなどで並列実行するときは気を付ける必要があります。

Choice state

ここまでは、Lambdaで取得した日付を後続のGlueで利用していましたが、元々の要件に合った 「ただし、フローを手動実行した時に日付を指定すると、その日付がGlueジョブ内の処理で使われる」 を実現していません。それを実現したのがこちらになります。

image.png

ルール部分の詳細はこちらです。

image.png

これを実施することで、ステートマシンを実行した時に、"Date"の値が指定されていたかどうかを判断します。指定されていればそれをそのままGlueで利用し、指定されていなければLambdaで日付を取得します。

実際、手動実行するように以下のようにDateを指定しておくことで、その日付で後続の処理を実行できます。

image.png

(一応、JSON部分だけ抽出)

{
  "Date": "20240701"
}

この入力、よく見るとLambdaの返り値と全く同じJSONになっていますよね。そのため、これがそのままGlueの入力として使えるのです。

一応、今回作成したステートマシンの定義(JSON)を張っておきます。

ステートマシン定義
{
  "Comment": "A description of my state machine",
  "StartAt": "input Date check",
  "States": {
    "input Date check": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Date",
          "IsPresent": true,
          "Next": "ETL1"
        }
      ],
      "Default": "get date"
    },
    "ETL1": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "20240709-date-job",
        "Arguments": {
          "--date.$": "$.Date",
          "--s3_path_input": "s3://bucket-name/xxxxxxxx_sfn_date_glue/input/",
          "--s3_path_output": "s3://bucket-name/xxxxxxxx_sfn_date_glue/output/"
        }
      },
      "Next": "ETL2",
      "ResultPath": null
    },
    "ETL2": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "20240709-date-job",
        "Arguments": {
          "--date.$": "$.Date",
          "--s3_path_input": "s3://bucket-name/xxxxxxxx_sfn_date_glue/input/",
          "--s3_path_output": "s3://bucket-name/xxxxxxxx_sfn_date_glue/output2/"
        }
      },
      "End": true
    },
    "get date": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:999999999999:function:get-date:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Next": "ETL1"
    }
  }
}

おわりに

Lambdaでコーディングは必要になりますが、全体的には簡単にやりたいことが実現できたと思います。全てをノーコードでやろうとすると逆に大変になる部分はあるので、上手くLambdaなどを使って効率よく開発していきたいですね。
久しぶりにStep Functionsを使ってみて、やはりポイントは入力と出力の指定方法だなと感じました。結構癖があるので、何度か試して慣れていきたいところです。一応もう一度、おすすめの記事を張って終わりにします。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?