この記事で紹介していること
以前、投稿したLambda×Stepfunctions の繰り返し処理を環境へ展開した矢先、
re:invent2022でStep Functions Distributed Mapが発表されました。(あと少し早ければ、、、)
この記事では、噂の新機能に関して紹介します!
- Distributed Mapとは?
- どんな処理が向いている?
- やってみた体験レポート
Distributed Mapとは?
Mapフローは以前からStep Functions にありました。
これまでのMapとの違いについてはAWS公式ドキュメントがわかりやすくまとめてくれています。
引用元:Step Functions 分散マップ – 大規模な並列データ処理のためのサーバーレスソリューション
元の Map ステートフロー 新しい分散マップフロー サブワークフロー 配列内の各項目のサブワークフローを実行します。配列は前のステートから渡される必要があります。サブワークフローの各イテレーションはマップイテレーションと呼ばれ、そのイベントはステートマシンの実行履歴に追加されます。 配列または Amazon S3 データセット内の各項目のサブワークフローを実行します。各サブワークフローは、独自のイベント履歴を持つ、まったく別の子実行として実行されます。 並列ブランチ マップイテレーションは並列実行され、一度に有効な最大同時実行数は約 40 です。 何百万もの項目を複数の子実行に渡すことができ、一度に最大 10,000 の同時実行が可能です 入力ソース JSON 配列のみを入力として受け入れます。 Amazon S3 オブジェクトリスト、JSON 配列またはファイル、csv ファイル、または Amazon S3 インベントリとして入力を受け入れます。 ペイロード 256 KB 各イテレーションは、ファイルへの参照 (Amazon S3) またはファイルから 1 つのレコード (ステート入力) を受け取ります。実際のファイル処理能力は、Lambda のストレージとメモリによって制限されます。 実行履歴 25,000 イベント Map ステートの各イテレーションは子実行で、それぞれの最大イベント数は 25,000 です (エクスプレスモードでは実行履歴に制限はありません)。
僕の言葉で完結に表すと、
- これまで子タスクの同時実行数は上限が40だったが、最大10,000まで可能。(ただし各サービスの上限に従う)
- Lambdaに渡す入力値がJSON配列以外にも、S3の操作(list_objectとか)をStep Functions側で実行できる。
が大きな違いかなと思います。
どんな処理が向いているの?
AWSは大量のデータをLambda処理させるシチュエーションを想定しているようです。
以前の僕のように大量のデータを処理しようとすると、実行時間や各AWS APIの制限により、
コンポーネント間の連携が複雑になっていましたが、このアプデで大規模なワークフローであっても、
超高速かつ完結な処理を実現できると思います。
同時実行数が最大10,000なのでLambdaを1万個並列に起動させるワークフローも実現可能です、、、(やばすぎ)
やってみた体験レポート
既存構成のおさらい
以前、投稿した記事では、Step FunctionsでLambdaを繰り返し処理させるというワークフローを作成していました。
Lambda側でlist_object_v2を実行させているので、1つのLambdaの完了まで約3分、全体処理終了まで50分近くかかります。
バッチ処理なので時間がかかっても特に問題はないですが、これを新機能で高速化します。
やったこと
- Step Functions Distributed Mapでlist_object_v2を実行する
- LambdaではDistributed Mapから渡されるPayloadを処理させる(1件ずつ)
ワークフロー
{
"Comment": "A description of my state machine",
"StartAt": "S3_list",
"States": {
"S3_list": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "通常処理",
"States": {
"通常処理": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:[Account ID]:function:[Function name]:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "処理結果分岐"
},
"処理結果分岐": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.notice",
"IsPresent": true,
"Next": "コピーエラー通知"
}
],
"Default": "OK!!"
},
"コピーエラー通知": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:ap-northeast-1:[Account ID]:[SNS name]",
"Message.$": "$"
},
"End": true
},
"OK!!": {
"Type": "Succeed"
}
}
},
"Label": "S3_list",
"MaxConcurrency": 10,
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "[Bucket name]",
"Prefix": "[Prefix]"
},
"ReaderConfig": {}
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Mapエラー通知"
}
],
"ToleratedFailurePercentage": 100,
"End": true
},
"Mapエラー通知": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message.$": "$",
"TopicArn": "arn:aws:sns:ap-northeast-1:[Account ID]:[SNS name]"
},
"End": true
}
}
}
Lambdaのコード
- Distributed Mapから渡されるPayloadの中から"Key"を1件ずつ処理する
{
"Etag": ""xxxxxxxxxxxxxx"",
"Key": "target_path",
"LastModified": 1672103604,
"Size": 1000,
"StorageClass": "STANDARD"
}
import os
import datetime
import logging
import boto3
s3 = boto3.client('s3', region_name='ap-northeast-1')
sns = boto3.client('sns')
bucket_name = os.environ['bucket_name']
src_prefix = os.environ['src_prefix']
LOG_LEVEL = os.environ['LOG_LEVEL']
PREFIX_FORMAT = '%Y/%m/%d'
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
def lambda_handler(event, context):
error_obj = []
src_path = event['Key']
logger.info('event is [%s].', event)
input_day = datetime.date.today() - datetime.timedelta(days=1)
year_month_day = input_day.strftime(PREFIX_FORMAT)
dst_prefix = 'S3/' + year_month_day
logger.debug('key:%s', src_path)
# Keyからオブジェクト名を生成(YYYY-MM-DD-xxxxxxx)
dst_file_name = os.path.basename(src_path)
# dstとオブジェクト名をjoinしてコピー先パスを生成
dst_path = os.path.join(
dst_prefix,
dst_file_name
)
if str(input_day) in dst_file_name:
try:
s3.copy_object(
Bucket=bucket_name,
Key=dst_path,
CopySource={
'Bucket': bucket_name,
'Key': src_path
}
)
# コピーできなかったsrcオブジェクトをリストへ追加
except:
logger.error(
"Copy_Error:[%s] can't copy.", str(src_path)
)
error_obj.append(src_path)
# srcオブジェクト削除
else:
try:
s3.delete_object(
Bucket=bucket_name,
Key=src_path,
)
# 削除できなかったsrcオブジェクトをリストへ追加
except:
logger.error(
"Delete Error:[%s] can't delete.", str(src_path)
)
error_obj.append(src_path)
else:
logger.info('This request [%s] is not existed.', str(input_day))
return {'result': 'No target'}
# コピー削除に失敗したコンテンツがある場合
if error_obj:
logger.error(
'Some contents remain:%s', error_obj
)
return {'notice': error_obj}
else:
logger.info("Task is complete.")
logger.debug("Target objects is [%s]", dst_file_name)
return {'result': 'Copy done'}
実行結果
上記の合計値はDistributed Map側でlistされたオブジェクト数です。
今回は最大実効数を10としましたが、それでも1000件のオブジェクト処理にかかっていた時間にはほぼ半分になりました。
BEFORE | AFTER |
---|---|
約3分 | 約1.5分 |
なによりLambda側でlistする必要がなくなったので、コード量が減ったのが嬉しい。
つまずいた点
- Step Fuctions自体がAWSステートメント言語で成り立っているため、配列を渡す処理に手間取った。
- 実行上限に余裕があるにも関わらず、実行上限エラーが吐き出された
- エラーを確認する際に、Lambda側なのかStep Functions側なのか双方のログを見ながら見極める必要があった。
States.ExceedToleratedFailureThreshold
- エラーを確認する際に、Lambda側なのかStep Functions側なのか双方のログを見ながら見極める必要があった。
最後に
この記事では、Step Functions Distributed Mapについて紹介しました。
新機能を先陣切って利用すると、テック系ブログや公式ドキュメントにも文献が少なく試行錯誤しながらの検証でした。ただそれにより技術力が身に染み付いていく感覚がして、個人的にはこれからも新機能は率先して使ってみたいと感じました。