はじめに
皆さんは、あるLambda関数から他のLambda関数を呼び出したいときはどうしますでしょうか。本記事ではアンチパターンとその解決策を紹介します。
結論
- 一般的にはLambda関数内で他Lambda関数を呼ぶ(invoke)のはアンチパターンとされている
- Lambdaから他のLambda関数を呼び出したい場合は「SQS」または「Step Functions」等を利用する
- アンチパターンであることを理解した上であれば、直接Lambda関数を呼ぶことを否定するものではない
- 単純な処理であれば有用なケースもあり得る
LambdaからLambdaを呼ぶことの問題点
ここが話の本題になります。まずは雑に2つのLambda関数を用意しました。Hello Worldを出力するlambda_callee.py
と、そのLambda関数を呼び出すlambda_caller.py
という関数です。lambda_caller.pyにはLambda関数をinvokeできる権限を付与しています。
import json
import boto3
def lambda_handler(event, context):
# Lambda クライアントを作成
lambda_client = boto3.client('lambda')
# Lambda関数Bを呼び出し
response = lambda_client.invoke(FunctionName='lambda_callee')
# レスポンスを読み取り
payload = json.loads(response['Payload'].read())
print(f"Lambda callee response: {payload}")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully called Lambda callee',
'lambda_callee_response': payload
})
}
import json
def lambda_handler(event, context):
message = "Hello World"
print(message)
return {
'statusCode': 200,
'body': json.dumps({
'message': message,
})
}
lambda_caller.pyをテスト実行すると、何の問題もなくlambda_callee.pyも実行され、応答が返されます。ただ動けば良いものであれば、問題にはならないかもしれません。
呼び出された側のLambdaも実行されていることが確認できます。
ただし、このような実行方法では下記のような問題点があります。
同時実行数の観点
AWSアカウントのクォータの一つに、Lambdaの同時実行数があります。デフォルトではその数は1000となっています。この同時実行数は引き上げることが可能ですが、先ほどのサンプルLambda関数のように、一つのリクエストに対して2つのLambda関数が実行されるケースでは余計にこの同時実行数を消費してしまいます。lambda_callerに対して500リクエストが来た場合、合計で1000の関数が実行されることになり、以降のリクエストはスロットリングが発生することになります。
また、このクォータはAWSアカウントレベルの制限となっており、これらのLambda関数以外にもLambda関数があればそちらの動作にも影響する可能性があります。AWSアカウントの同時実行数を引き上げ続ける方法もありますが、可能であれば1リクエストで即時2同時実行となるような構成は避けるべきでしょう。
呼び出し元Lambdaの待ち時間によるコスト観点
サンプル関数のように、処理が単純で実行時間もさほどかからないケースではあまり問題にならないかもしれません。しかし、呼び出し先の関数で生成AIの回答を待つ時間が発生するなど、何十秒も関数が実行され続ける場合はどうでしょうか。呼び出し元のLambda関数はただ呼び出し先のLambda関数の完了を待っているだけの状態となります。そのような状態が起こると問題になるのが、コストの問題です。
先ほどLambda関数の内容によっては待ち時間が発生すると記載しましたが、この待ち時間中でもLambda関数としては実行中と扱われます。つまり、何も処理をしていなくてもLambda実行時間による課金が発生するということになります。処理時間だけであればともかく、待機時間に対して課金が発生するような設計は極力避けるべきでしょう。
タイムアウト値の設計観点
Lambda関数はタイムアウトの時間を設定することができます(デフォルト3秒)。一般論として、タイムアウト値はクライアント側から遠ざかるにつれて短くすることが原則とされています。
便宜上、「関数」という書き方をしていますが、これは「システム」や「(内部/外部の)API」と置き換えても同様です。システム間連携をする際もタイムアウトについてはよく考える必要があります。
下記の図ではクライアントに近い方からタイムアウト値がA>B>Cの長さになるよう設定されています。
ところが、下記のようにタイムアウト値がA>C>Bとなっているケースではどのようなことが起こり得るでしょうか。
関数C自体は正常に処理できているにも関わらず、関数Bはタイムアウトでエラー応答を返します。結果関数Aもクライアント側にエラー応答となります。クライアントからリトライ処理があるかもしれませんが、関数Cとしては処理済みのリクエストが飛んでくることになり、ややこしいことになります(この辺りは冪等性の話にも繋がる)。
可能な限りタイムアウト値についてもよく考える必要があります。
コードの保守性/リトライ観点
Lambdaに限らない、タイムアウト値の話をしたのは保守性の話につながるからです。関数間が直接つながっているからこそ、関数のコードを修正時、常に他の関数のこともよく考える必要が出てきます。リトライ処理を考える時も同様です。これらの問題点はいわゆる密結合状態になっているからこそ発生する問題といえます。
非同期呼び出しにすれば解決する?
Lambda関数はinvoke時にInvocationTypeというパラメータを設定することができます。デフォルトではRequestResponseになっており、これは同期処理、つまり呼び出し先の処理完了を待つことになります。
一方でInvocationTypeをEventにすることで、非同期処理となるため上記で挙げた一部問題は解決するかもしれません。しかし、結局のところエラーハンドリングができていない点や、呼び出し先の関数が正しく処理されているか確かめる術がない点など、いくつかの問題点は残り続けるため、根本的な対処とは言えません。
対策編
それではLambda関数から他のLambda関数を呼び出したい時の方法です。
解決策①〜SQS利用〜
個人的にはこれが最もシンプルだと感じます。
呼び出し元のLambdaはSQSにメッセージを送るように変更します。呼び出し元のLambdaにはSQSにメッセージを送信できる権限が必要になります。
import json
import boto3
QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/(AWSアカウントID)/lambda_queue'
def lambda_handler(event, context):
# SQS クライアントを作成
sqs_client = boto3.client('sqs')
message_body = {
'message': 'Hello from Lambda caller!'
}
# SQSキューにメッセージを送信
response = sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(message_body),
)
# レスポンスを読み取り
payload = response['MessageId']
print(f"SQS response: {payload}")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully SQS Request',
'sqs_response': payload
})
}
続いて呼び出し先も修正します。
import json
def lambda_handler(event, context):
# SQSからのメッセージを処理
for record in event['Records']:
# メッセージの内容を取得
message_body = json.loads(record['body'])
print(f"Processing message: {message_body['message']}")
return {
'statusCode': 200,
}
呼び出し先のLambdaにはSQSのメッセージを取得できるようにトリガーを設定します。この時、Lambdaの権限にSQSからメッセージを取得できる権限が必要になります。
この状態でcaller側のLambdaをテストすると、成功することが確認できました。
呼び出し先のLambdaも実行されていることが確認できます。
SQSではDLQ(デッドレターキュー)を利用することで、エラーハンドリングも可能です。
SQSを挟むことで、Lambda間を疎結合にすることができました。コスト面で言えばSQSの利用料はかかりますが、月100万リクエストまでは無料なので、規模によりますが気にならないレベルではないでしょうか。
解決策②~Step Functions利用
Step Functions方式では単にステータスコード200を返す呼び出し元Lambdaと、呼び出し先Lambdaを用意しました。
import json
def lambda_handler(event, context):
print('Caller Lambda')
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully SF Request',
})
}
import json
def lambda_handler(event, context):
print('Callee Lambda')
return {
'statusCode': 200,
}
その後、StepFunctionsのGUIから、Lambda Invokeを選択し、配置します。
その後、ワークフローを実行すると、Lambda関数が2つ実行されることを確認できます。
Step FunctionsはGUIで簡単に設定できるかつ、Lambda関数で他のLambdaを呼び出すコードを書く必要性もなくなり、可読性が高くなる点も良い点だなと思いました。
その他の解決策
ここでは試しませんが、EventBridgeを利用したやり方もありそうです。
感想
Lambda関数のinvokeは気軽に使える一方で、実際に使う際にはエラーハンドリングなどの点を考慮する必要があります。Lambdaに限らず、システム(サービス)間は疎結合にするという考え方を抑える必要があります。
今回、Step Functionsを軽く使ってみましたが、想像よりも簡単に構築が可能でした。より複雑なシステム間連携が必要な際には使用も一考かなと思いました。
参考