はじめに
皆さん、イベント駆動処理していますか?
スケジュールを前もって決められない場合などに便利ですよね。
ただ、イベントが増えすぎると、それに伴い処理するリソースが増えていってしまいます。
例えば、S3にファイルが置かれたら、EventBridgeで発火、Step Functionsを起動してそのステートマシンでGlueを実行する処理を考えます。
以下は、ファイル数が3つの場合です。
このように、ファイルに応じてリソースを増やしていくと、管理が大変になります。
一方で、このように一本化できると、リソース管理が楽になりますね。
今回は、この構成で、S3バケットに置かれたファイルのバケット名とプレフィックスをGlueでログに出力する処理を実行してみます。
やってみた
上記の図で左にいるものは右のリソースに依存しているので、一番右のリソースから作成していきます。
また、今回重要なのは、S3 bucketのバケット名とプレフィックスをEventBridgeからStep Functions、Step FunctionsからGlueへとバケツリレーしていくことです。
バケツを受け渡すときに、どのように指定するのかがポイントになります。
その渡し方を確認するために、一度Step Functionsは仮で作成して、お試し実行してみます。そうすることで、EventBridgeからどんなパラメータが渡ってくるかを確認できます。
Glueジョブを作る
今回はパラメータが渡ってくるバケツリレーを確認できれば良いので、ファイルが置かれたS3のバケット名とプレフィックスをログに出力するジョブを作成します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_bucket', 's3_key'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
s3_bucket = args['s3_bucket']
s3_key = args['s3_key']
print("s3_bucket")
print(s3_bucket)
print("s3_key")
print(s3_key)
job.commit()
ジョブパラメータとして--s3_bucket
, --s3_key
を定義して、それを使ってStep Functionsから値を受け取ります。
Job detailsでジョブパラメータを定義できますが、動的にジョブパラメータを指定する際はそこで定義しないでもOKです。
また、Job detailsでMaximum concurrency
の値を10くらいに変更しておきます。これは、このジョブをいくつ並行して実行できるかを指定する値です。デフォルトは1ですが、そのままだとS3に複数ファイルが一気に置かれた場合、1つしかジョブが実行できません。
Step Functionsのステートマシンを仮で作る
Glueを呼び出すステートマシンを作ります。ただし、EventBridgeからどんなイベントが渡ってくるのかよくわからないので、とりあえず仮作成して実行してみます。
APIパラメータは以下の通りで、先ほど作成したGlueジョブの名前を指定します。
{
"JobName": "20241007_event_job"
}
これだとただGlueジョブを実行するだけで、ジョブに何も渡しませんが、一旦これで進みます。
EventBridgeを作る
以下のS3パスに、ファイルを更新していくことを考えます。
s3://bucket-name/test/input_1
s3://bucket-name/test/input_2
s3://bucket-name/test/input_3
上記のバケット、プレフィックスをイベントパターンに指定します。
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": ["bucket-name"]
},
"object": {
"key": [{
"prefix": "test/"
}]
}
}
}
"prefix": "test/"
と指定することで、test配下の全てのフォルダが更新されたことを検知してくれます。
ターゲットには、先ほど作成したステートマシンを選択します。
試しに実行してみる
試しに、S3に適当なファイルを置いてみます。
EventBridgeが発火し、ステートマシン、Glueが実行されますが…
Glueは失敗しています。ステートマシンで、ジョブパラメータを設定していないためです。
一方、ステートマシンは成功していますが、重要なのはどんなパラメータが入力されているかどうかです。
実行結果の、Glue StartJobRunの入力タブを見てみます。
このようなパラメータが渡ってきていますね。今回ほしいのは、オレンジで一部の情報を隠した部分になります。
ステートマシンを修正する
入力パラメータと、どの部分が欲しいかが分かったので、それを修正します。
APIパラメータを以下のように設定することで、EventBridgeから渡ってきたパラメータを、Glueのジョブパラメータとして受け渡します。この指定の方法がこの記事の全てと言っても過言ではありません。
{
"JobName": "20241007_event_job",
"Arguments": {
"--s3_bucket.$": "$.detail.bucket.name",
"--s3_key.$": "$.detail.object.key"
}
}
実行してみる
S3に、ファイルを3つおいてみました。
ステートマシンは、3回起動が成功しています。
同じくGlueも3回起動が成功しています。
それぞれのログを見てみると、想定通り以下のように出力されていました。
s3_bucket
bucket-name
s3_key
test/input_1/test1.txt
s3_bucket
bucket-name
s3_key
test/input_2/test2.txt
s3_bucket
bucket-name
s3_key
test/input_2/test2.txt
おわりに
S3のイベントの情報がそのままStep Functionsに渡されてくるので、それをGlueのジョブパラメータとして渡してあげるところがポイントでしたね。逆にそれが分かれば結構簡単にできたなという印象です。
EventBridge側でパラメータを変換することもできるので、それができるとステートマシン側の変数指定が少し楽になりそうです。そちらも今度試してみたいですね。