こんにちは、YUKITOです。
業務でメール送信共通機能を作っていたときに、「最大1000件を一気に送りたい」という要件が出てきました。
最初は「Lambdaのループで回せばいいかな」と思っていたのですが、Lambdaは最大15分しか動けないので、1000件を1件ずつ送っていたらタイムアウトしてしまうのではないかと思い、Step Functions の Mapステート(並列処理) を使ってみることにしました。
実際に100件で試してみたら想像以上に速かったので、紹介したいと思います!!
やりたかったこと
ざっくり言うと、こういう処理を作りたかったです。
- DBから「送るメールの宛先・件名・本文」を取得する
- 1件ずつ SES でメールを送る(成功 / 失敗を記録)
- 全部送り終わったあとに、DBの送信ステータスを一括で更新する
これを 1000件分、できるだけ早く 終わらせたい、というのがゴールです。
扱うデータ(DBテーブル構成)
今回、前提にしているDBテーブルはこんなイメージです。(仮のカラム名です)
| カラム名 | 型 | 説明 |
|---|---|---|
mail_id |
varchar (PK) | メールごとに一意なID |
to_address |
varchar | 送信先メールアドレス |
subject |
varchar | 件名 |
body |
text | 本文 |
status |
varchar | 送信ステータス(SENT / FAILED) |
created_at |
timestamp | 新規作成日時 |
updated_at |
timestamp | 最終更新日時 |
例えばこんなデータが入っているイメージです。
| mail_id | to_address | subject | body | status |
|---|---|---|---|---|
| mail-0001 | user1@example.com | テスト件名1 | 本文1... | NULL |
| mail-0002 | user2@example.com | テスト件名2 | 本文2... | NULL |
| mail-0003 | user3@example.com | テスト件名3 | 本文3... | NULL |
見やすさのため、
created_at/updated_atの日時系カラムは省略しています。
ポイントは以下の2つです。
- 送信用Lambda(
SendMail)は、SQSから渡されたmail_idをキーに DBから件名・本文・宛先を取得してメールを送る - 送信後、
RecordStatusLambda がstatusをSENT/FAILEDに一括更新する
このため、SQSに流すメッセージには「メールの中身」ではなく mail_id の配列 だけ載せています。中身はDB側にあるので、SQSメッセージは軽く保てます。
なぜ Mapステートを選んだのか
最初に検討した方法と比べると、こんな感じです。
| 項目 | Lambdaでループ | Step Functions Mapステート |
|---|---|---|
| 並列実行 | できない(1件ずつ) | できる(同時実行数を指定) |
| 実行時間の上限 | 最大15分 | 実質ほぼ気にしなくてよい |
| エラー時のリトライ | Lambda全体が落ちる | 失敗した1件だけリトライできる |
| 進捗の可視化 | CloudWatch Logsを読む | コンソールで見える |
特に「失敗した1件だけリトライできる」のが大きくて、メール送信のような「一部だけコケる可能性がある処理」とすごく相性が良いと感じました。
Mapステートとは...
ざっくり言うと、配列を渡すと、各要素に対して同じ処理を繰り返してくれるステートです。
Pythonでいうところの for ループの Step Functions 版、というイメージが近いです。
ただ普通のforと違うのは、
-
同時に何個並列で動かすかを指定できる(
MaxConcurrency) - 各反復の結果をまとめて受け取れる
という点で、今回のような「大量だけどレート制限を意識したい処理」と相性が良いです。
実行モードは2種類あります。
- INLINE モード : 親ステートマシンの中で繰り返す。数百〜数千件くらいまで向け。
- DISTRIBUTED モード : 子ステートマシンとして実行される。万単位の大量データ向け。
今回は最大1000件ということで、まずは INLINEモード で実装しました。
全体の構成
まず全体の流れはこんな感じです。
SQS
↓
EventBridge Pipes
↓
Step Functions
├─ ParseInput(JSONに整形する)
├─ Mapステート(並列でメール送信 + 失敗ハンドリング)
└─ RecordStatus(送信結果をDBに一括反映)
↓
SES / RDS
ここで1つハマったポイントがあって、EventBridge Pipes は JSON を分割してくれないので、Step Functions に入ってきた直後に文字列をパースするステートを1つ挟んでいます。
EventBridge Pipes の仕様や設定項目について詳しく知りたい方は、公式ドキュメントを参照してください。 Amazon EventBridge Pipes - AWS 公式ドキュメント
そして、Step Functions の中身は実際にコンソール上で作るとこんな形になっています。
上から順に処理を追っていくと、
- ParseInput : 入ってきたJSON文字列をオブジェクトに直す
-
SendMails(Mapステート) : 配列の各要素について並列でメール送信
-
SendMail(Lambda) :
mail_idをキーにDBからメール送信情報を取得し、SES で1件分のメールを送信 -
MarkFailed(Pass state) : SendMail が失敗したときに
Catch #1で拾って、その要素を「失敗」としてマークする
-
SendMail(Lambda) :
- RecordStatus(Lambda) : Mapステートを抜けたあとに走り、全件分の送信結果(成功・失敗)を DBに一括反映 する
という流れになっています。
ポイントは、DBの更新を各Lambdaの中ではなく、Mapを抜けたあとにまとめて1回でやっているところです。こうしておくと、
- Mapの中のLambdaは「SESで送る」ことだけに集中できる
- DBへの書き込みが1000回 → 1回(一括)になるので、RDS側の負荷も下げられる
- Mapステートの出力(各反復の結果)をそのまま
RecordStatusの入力に渡せる
というメリットがあります。
SQSにはどんなJSONを入れる?
実際の動きをイメージしやすいように、まず「SQSに入れるメッセージ」がどんな形かを見てみましょう。
今回の構成では、1リクエストにつき "送りたい mail_id の配列" を1つのリクエストIDでまとめて投げるスタイルにしました。
{
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"mail_id": [
"mail-0001",
"mail-0002",
"mail-0003"
]
}
mail_idは、さっきのDBテーブルの主キーと同じものです。
request_idはリクエスト1回ごとに振っているIDで、UUIDを使っています。
ログにも同じIDを書いておくと、何か起きたときに「このリクエストの動き」をまとめて追えて便利かなと思って入れています。
Step Functions まで届くとどう見える?
このメッセージが SQS → EventBridge Pipes を経由して Step Functions に渡されると、こんな形で入ってきます。
[
{
"messageId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"body": "{\"request_id\":\"550e8400-e29b-41d4-a716-446655440000\",\"mail_id\":[\"mail-0001\",\"mail-0002\",\"mail-0003\"]}"
}
]
ポイントは2つです。
-
全体が配列で来る(そのため後続で
$[0]で先頭を指す) -
bodyの中身が文字列になっている(JSONではなく "文字列化されたJSON")
この「文字列のままだとループできない」という事情があるので、最初のステート(ParseInput)で States.StringToJson を使って 文字列 → JSON に戻す必要があります。
実装:ステートごとに見ていく
ステートマシンの定義はネストが深くて全文だと読みにくいので、ステートごとに切り出して説明していきます。
1. ParseInput(JSONを整形する)
SQSから流れてくる時点では body が文字列なので、States.StringToJson でオブジェクトに直します。
"ParseInput": {
"Type": "Pass",
"Parameters": {
"data.$": "States.StringToJson($[0].body)"
},
"OutputPath": "$.data",
"Next": "SendMails"
}
ポイントは OutputPath: "$.data" を指定していることです。
こうすることで、後続のステートでは $.request_id や $.mail_id をそのまま参照できるようになり、毎回 $.data.xxx と書かなくて済みます。
2. SendMails(Mapステートで並列送信)
ここが今回の主役!!!
"SendMails": {
"Type": "Map",
"ItemsPath": "$.mail_id",
"ItemSelector": {
"request_id.$": "$.request_id",
"mail_id.$": "$$.Map.Item.Value"
},
"MaxConcurrency": 10
}
-
ItemsPath: ループ対象の配列 (mail_id) を指定 -
ItemSelector: 各ループで Lambda に渡すペイロードを組み立てる-
request_idは親側の値 -
mail_idは そのループで処理している配列の要素 ($$.Map.Item.Value) をそれぞれ渡す
-
-
MaxConcurrency: 10: 同時実行数。SESの送信レートや下流の負荷を見て調整
Map内のLambda呼び出しでは ResultSelector を使って、Lambdaが返すレスポンスの中から 必要なフィールドだけ 取り出しています。
"ResultSelector": {
"request_id.$": "$.Payload.request_id",
"mail_id.$": "$.Payload.mail_id",
"message_id.$": "$.Payload.message_id",
"result.$": "$.Payload.result"
}
こうすると、mapResults に入る各要素を「成功時に欲しいフィールド」に絞ることができ、すっきりした形になります。
3. MarkFailed(Catchで失敗を拾う)
SendMailは Lambda呼び出しタスク(Type: "Task") ですが、ここでは失敗ハンドリング部分(Catch)だけを抜き出しています。
SendMail が失敗したときの分岐がこちら。
"SendMail": {
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "MarkFailed"
}
]
},
"MarkFailed": {
"Type": "Pass",
"Parameters": {
"request_id.$": "$.request_id",
"mail_id.$": "$.mail_id",
"error.$": "$.error.Cause",
"result": "failed"
},
"End": true
}
Catch を入れているおかげで、SendMail が失敗しても Map 全体は止まりません。
失敗した要素だけ MarkFailed に流れて、result: "failed" の形で mapResults に積まれます。
成功も失敗も、同じ「配列の1要素」として RecordStatus に渡せるようになるのがポイントです。
4. RecordStatus(DBに一括反映)
Mapを抜けたあと、$.mapResults には各メールの結果(成功 / 失敗)が配列で入っています。
たとえば3件中2件が成功・1件が失敗した場合、こんな形になります。
[
{
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"mail_id": "mail-0001",
"message_id": "<SESから返されたメッセージID>",
"result": "sent"
},
{
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"mail_id": "mail-0002",
"message_id": "<SESから返されたメッセージID>",
"result": "sent"
},
{
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"mail_id": "mail-0003",
"error": "MessageRejected: Email address is not verified.",
"result": "failed"
}
]
ポイントは、成功した要素と失敗した要素が同じ配列の中に並んで入ってくるところです。
- 成功した要素 :
ResultSelectorで組み立てた形(message_idあり、result: "sent") - 失敗した要素 :
MarkFailedで組み立てた形(errorあり、result: "failed")
result の値で「成功 / 失敗」を判別できるので、RecordStatus Lambda側はこの配列を1回ループするだけで両方の更新が組み立てられます。
この配列をまとめて受け取り、DBの status を一括更新するのが RecordStatus Lambda です。
"RecordStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:<リージョン>:<アカウントID>:function:<関数名>",
"Payload": {
"results.$": "$.mapResults"
}
},
"End": true
}
<リージョン><アカウントID><関数名>の部分は、ご自身の環境に合わせて差し替えてください。
RecordStatus Lambda の中では、受け取った配列をループして、
-
result: "sent"のmail_id→statusをSENTに更新 -
result: "failed"のmail_id→statusをFAILEDに更新
というように、DBに対してまとめて UPDATE を発行します。
Mapの中で1件ずつDB更新する設計でも動きますが、その場合「並列度ぶんDB接続が同時に開く」のでRDSへの負荷が読みづらくなります。
今回のように 最後にまとめてやる ほうが、シンプルで予測しやすかったです。
動かしてみる
実際に 100件のテストデータ で動かしてみました。
- 同時実行数:10
- 結果:1〜2分以内に全件送信+ステータス更新まで完了
このペースなら、1000件でも 15分以内には十分終わりそうです。
今回、SESのサンドボックスを解除していないため実際に1000件のメール送信のテストはできていないが、100件のテストで1〜2分以内に送信できているため1000件の場合は15分以内に送信できるんじゃないかと考えています。
最初はLambda単体で「15分でほんとうに終わるのか…?」と不安だったので、これはかなり安心しました。
やってみて感じたこと
実際に使ってみて、Mapステートには以下のような良さがあると感じました。
- 同時実行数をパラメータで一発で変えられるので、レート制限の調整がしやすい
- どの要素が成功して、どれが失敗したかがコンソールで一目で分かる
-
Catchで失敗だけ別ルートに流せるので、エラーハンドリングが書きやすい - 後段のLambda(RecordStatus)にMapの結果をそのまま渡せるので、「並列処理 → 集約処理」がきれいに書ける
Mapステートもそうですが、Step Functions を使いこなせるといろいろな場面で使えそうなので、今後もいろいろなパターンを試して使っていきたいと思いました。
ぜひ試してみてください!
