結論
- 入力
- Lambda 関数の第一引数
event
は、各サービスからのpayloadが "配列" で引き渡される
- Lambda 関数の第一引数
- 出力
-
return ...
で返した内容が、EventBridge Pipes のターゲットに引き渡される
-
解説
何もしない = ソースもしくはフィルタから event
を受け取って、何もせずにターゲットへ event
を引き渡す Lambda 関数を基に解説します。
def lambda_handler(event, context):
print(event)
return event
export const handler = async (event, context) => {
console.log(event);
return event;
};
入力
Lambda 関数の第一引数 event
には、ソースもしくはフィルタからのpayloadが引き渡されます。第二引数 context
も使用可能です。
例えば Amazon SQS をソースに設定し、MessageBody を {"v":1}
としたデータをSQSキューに入れると print(event)
の結果は以下のようになります。
[
{
'messageId': '8c8b72a2-85f0-47be-bf9f-81edff5197e9',
'receiptHandle': '.....',
'body': '{"v":1}',
'attributes': {
'ApproximateReceiveCount': '1',
'SentTimestamp': '1672477371472',
'SenderId': 'AID................KM',
'ApproximateFirstReceiveTimestamp': '1672477371473'
},
'messageAttributes': {},
'md5OfMessageAttributes': None,
'md5OfBody': '455c0c10d425837001a72a84239ac362',
'eventSource': 'aws:sqs',
'eventSourceARN': 'arn:aws:sqs:REGION:............:SQS_NAME',
'awsRegion': 'REGION'
}
]
payloadはソースのサービスで異なりますが、EventBridge Pipes のターゲットにおける「サンプルイベント/イベントペイロード」で確認できるものと同じです。
"ほぼ" と表現した理由は event
には配列として渡されるからです。よって Lambda 関数内での参照は、例えば messageId であれば event[0]['messageId']
として参照することになります。
def lambda_handler(event, context):
print(event)
print(event[0]['messageId'])
return event
event
は配列なので、 (a)複数の payload が入ることを考慮する (b)パイプ設定の「バッチサイズ」を 1 にするといった実装・設定を行う必要があります。
出力
return ...
で返した内容が、EventBridge Pipes のターゲットに引き渡されます。コード内で JSON 文字列化は不要で、dict や hash で返してあげれば OK です。
この際、 "空" を return すると、後続の "ターゲット" は実行されません。空とは {}
[]
""
になります。
以下のコードは、ターゲットを一切実行しない Lambda 関数です。
def lambda_handler(event, context):
return {}
以下のコードは、どのような入力があったとしても、ターゲットに {"foo": "bar"}
を引き渡す Lambda 関数です。
def lambda_handler(event, context):
return {'foo': 'bar'}
Enrichment に指定する Lambda 関数実装の考察
入力トランスフォーマーに極力依存しない Lambda 関数の出力
入力トランスフォーマーは、関数も無く構文にも慣れが必要なため、できる限り指定しないようにするのが良いと考えています。そうなると、Enrichment に指定する Lambda 関数内で、ターゲット向けの出力フォーマットを確定させる実装になります。
例えば、Slack の chat.postMessage
への送信フォーマットは channel
と text
の二つを送ることになりますが、以下のような形です。
def lambda_handler(event, context):
payload = {
'channel': 'XXX',
'text': event[0]['body']
}
return payload
# => {
# 'channel': 'XXX',
# 'text': "STRING...."
#}
ターゲットの入力トランスフォーマーへの指定は不要です。出力自体が、Slack の chat.postMessage に合っているためです。
入力元が Lambda トリガーなのか、EventBridge Pipes の Enrichment(強化)なのかを判別する方法
結論、Python であれば event
が dict
なら Lambda トリガー、list
なら EventBridge Pipes の Enrichment(強化)として判定できます。 isinstance()
が使えるでしょう。
Lambda トリガーとして呼ばれた時には実行しないガード条件を実装:
def lambda_handler(event, context):
if isinstance(event, dict):
raise RuntimeError('This function is not for triggers.')
...
どちらから呼ばれても処理できるようにする実装:
def lambda_handler(event, context):
records = event['Records'] if isinstance(event, dict) else event
print(records[0]['eventSource'])
...
これは、以下の違いを吸収する実装です。
他のサービスから直接実行された場合の event (例: トリガーで指定された時)
{"Records":[{"messageId": "..."}]}
EventBridge Pipes 内 Enrichment(強化)で呼ばれた際の event
[{"messageId": "..."}]
違いについては、これらをご覧ください
SQS
- Lambda : https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#example-standard-queue-message-event
- EventBridge Pipes : https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#:~:text=Example%20events
Kinesis
- Lambda : https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-event-example
- EventBridge Pipes : https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html#:~:text=Example%20event
まとめ
Amazon EventBridge Pipes は、ターゲットにも AWS Lambda を指定できるため、わざわざ Enrichment で Lambda 関数で処理する理由が見当たらないという意見もありそうです。
私の好きな本「UNIX という考え方」には "一つのプログラムには一つの事をうまくやらせる" という定理が紹介されています。
ターゲットには AWS サービスだけでなく外部 API をコールする機能が備わっています。その実装を Lambda 関数内から追い出し「payload変換に集中する」という意味では、Enrichment に指定する Lambda 関数はシンプルにできるメリットがあるでしょう。
EoT