0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

S3にファイルが置かれたらStep Functions+Glueで処理したい(リソースは1つ)

Last updated at Posted at 2024-10-08

はじめに

皆さん、イベント駆動処理していますか?
スケジュールを前もって決められない場合などに便利ですよね。
ただ、イベントが増えすぎると、それに伴い処理するリソースが増えていってしまいます。

例えば、S3にファイルが置かれたら、EventBridgeで発火、Step Functionsを起動してそのステートマシンでGlueを実行する処理を考えます。
以下は、ファイル数が3つの場合です。

image.png

このように、ファイルに応じてリソースを増やしていくと、管理が大変になります。

一方で、このように一本化できると、リソース管理が楽になりますね。

image.png

今回は、この構成で、S3バケットに置かれたファイルのバケット名とプレフィックスをGlueでログに出力する処理を実行してみます。

やってみた

上記の図で左にいるものは右のリソースに依存しているので、一番右のリソースから作成していきます。
また、今回重要なのは、S3 bucketのバケット名とプレフィックスをEventBridgeからStep Functions、Step FunctionsからGlueへとバケツリレーしていくことです。
バケツを受け渡すときに、どのように指定するのかがポイントになります。
その渡し方を確認するために、一度Step Functionsは仮で作成して、お試し実行してみます。そうすることで、EventBridgeからどんなパラメータが渡ってくるかを確認できます。

Glueジョブを作る

今回はパラメータが渡ってくるバケツリレーを確認できれば良いので、ファイルが置かれたS3のバケット名とプレフィックスをログに出力するジョブを作成します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_bucket', 's3_key'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

s3_bucket = args['s3_bucket']
s3_key = args['s3_key']

print("s3_bucket")
print(s3_bucket)
print("s3_key")
print(s3_key)

job.commit()

ジョブパラメータとして--s3_bucket, --s3_keyを定義して、それを使ってStep Functionsから値を受け取ります。

Job detailsでジョブパラメータを定義できますが、動的にジョブパラメータを指定する際はそこで定義しないでもOKです。

また、Job detailsでMaximum concurrencyの値を10くらいに変更しておきます。これは、このジョブをいくつ並行して実行できるかを指定する値です。デフォルトは1ですが、そのままだとS3に複数ファイルが一気に置かれた場合、1つしかジョブが実行できません。

image.png

Step Functionsのステートマシンを仮で作る

Glueを呼び出すステートマシンを作ります。ただし、EventBridgeからどんなイベントが渡ってくるのかよくわからないので、とりあえず仮作成して実行してみます。

image.png

APIパラメータは以下の通りで、先ほど作成したGlueジョブの名前を指定します。

{
  "JobName": "20241007_event_job"
}

これだとただGlueジョブを実行するだけで、ジョブに何も渡しませんが、一旦これで進みます。

EventBridgeを作る

以下のS3パスに、ファイルを更新していくことを考えます。

  • s3://bucket-name/test/input_1
  • s3://bucket-name/test/input_2
  • s3://bucket-name/test/input_3

上記のバケット、プレフィックスをイベントパターンに指定します。

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["bucket-name"]
    },
    "object": {
      "key": [{
        "prefix": "test/"
      }]
    }
  }
}

"prefix": "test/"と指定することで、test配下の全てのフォルダが更新されたことを検知してくれます。

ターゲットには、先ほど作成したステートマシンを選択します。

試しに実行してみる

試しに、S3に適当なファイルを置いてみます。
EventBridgeが発火し、ステートマシン、Glueが実行されますが…

image.png

Glueは失敗しています。ステートマシンで、ジョブパラメータを設定していないためです。

一方、ステートマシンは成功していますが、重要なのはどんなパラメータが入力されているかどうかです。
実行結果の、Glue StartJobRunの入力タブを見てみます。

image.png

このようなパラメータが渡ってきていますね。今回ほしいのは、オレンジで一部の情報を隠した部分になります。

ステートマシンを修正する

入力パラメータと、どの部分が欲しいかが分かったので、それを修正します。

image.png

APIパラメータを以下のように設定することで、EventBridgeから渡ってきたパラメータを、Glueのジョブパラメータとして受け渡します。この指定の方法がこの記事の全てと言っても過言ではありません。

{
  "JobName": "20241007_event_job",
  "Arguments": {
    "--s3_bucket.$": "$.detail.bucket.name",
    "--s3_key.$": "$.detail.object.key"
  }
}

実行してみる

S3に、ファイルを3つおいてみました。

ステートマシンは、3回起動が成功しています。

image.png

同じくGlueも3回起動が成功しています。

image.png

それぞれのログを見てみると、想定通り以下のように出力されていました。

s3_bucket
bucket-name
s3_key
test/input_1/test1.txt
s3_bucket
bucket-name
s3_key
test/input_2/test2.txt
s3_bucket
bucket-name
s3_key
test/input_2/test2.txt

おわりに

S3のイベントの情報がそのままStep Functionsに渡されてくるので、それをGlueのジョブパラメータとして渡してあげるところがポイントでしたね。逆にそれが分かれば結構簡単にできたなという印象です。
EventBridge側でパラメータを変換することもできるので、それができるとステートマシン側の変数指定が少し楽になりそうです。そちらも今度試してみたいですね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?