はじめに
この記事は、ミロゴス Advent Calendar 2022 2日目の記事です。
re:Inventはオンラインでの参加のため、KeynoteやLeaders Sessionしか見れない側の人間です。
ちょうどアドカレの担当日と、Werner VogelsのKeynoteの日が被っていたのもあり、気になったものをピックアップし、触ってみました。
Keynote冒頭ではレイテンシやスループットの話から非同期の話に繋がり、イベント駆動アーキテクチャに関する話題が前半のメインになりました。
イベント駆動における主役、EventBridgeやStep Functions周りで新サービスが発表され、個人としては非常に興味深いアップデートが多かったです。
- Amazon EventBridge Pipes
- Step Functions Distributed Map
- 現時点ではプレビューなサービス
- Amazon CodeCatalyst
- AWS Application Composer。
- What's New: Introducing AWS Application Composer (Preview)
- AWS News Blog: Visualize and create your serverless workloads with AWS Application Composer
- AWSアーキテクチャをGUIで設計でき、SAMを使ってデプロイができるようです。
ここではこれらの中からEventBridge Pipesについて書いていこうと思います。
EventBridge Pipes
EventBridgeと言えばサービス間の橋渡し役で、プロデューサーサービスから渡されたイベントをRuleに記載したイベントパターンでフィルタリングし、イベントコンシューマーに渡していました。
EventBridge Pipesでは同じようにプロデューサーとコンシューマー間の橋渡しをしますが、間にフィルタリングやEnrichmentを挟むことでコンシューマーへ渡すイベントを変換できるようです。
それぞれのコンポーネントとして下記のサービスがサポートされているようです。(発表当日の2022/12/2のAWSコンソール上で確認できたものをベースにしています。)
- イベントソース
- Kinesis
- SQS
- DynamoDB
- MQ
- MSK
- self-managed Kafka
- Enrichment
- Lambda
- Step Functions
- API Gateway
- API Destinations
- ターゲットサービス
- API Gateway
- API Destinations
- Lambda
- Cloudwatch Logs
- ECS
- EventBridge
- Kinesis Data Firehose
- Inspector
- Kinesis Data Streams
- Redshift
- SageMaker Pipeline
- SNS
- SQS
- Step Functions
- Batch
Pipesを作ってみる
試しにAWSコンソール上で触ってみます。
流れとしてはDynamoDBでのイベントを元に、Filteringを使ってNewImageにidという属性がありeventNameがDelete以外のイベントに限定します。
その後、Enrichmentとターゲットソースには同じLambdaを指定し、それぞれのログを確認できるようにしています。
呼ばれるLambdaは下記のプログラムです。
Enrichmentが受け取るevent
変数の確認と、EnrichmentからターゲットソースのLambdaに渡した値をevent
変数で確認します。
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(event)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda through ignore DELETE !')
}
イベントソース
イベントソースにはDynamoDBを指定します。
なお、DynamoDBはDynamoDB Streamである必要があります。
(選択した後はテーブル名が見えないので、選択ミスにご注意ください。)
今回、DynamoDBにはPartition keyとしてString型のidのみを設定しています。
フィルタリング
フィルタリングはオプションになります。
コンソール上ではEventBridge Pipesの全体なUIとして、イベントソースのサンプルを確認できます。
そのため、想定しているイベントソースのサンプルを基にフィルタリングの設定ができます。
下記のようなFilteringを設定しています。DELETE以外にしているので、Itemを削除した場合には後ろのLambdaは実行されません。
{
"dynamodb": {
"NewImage": {
"id": {
"S": [{
"exists": true
}]
}
}
},
"eventName": [{
"anything-but": "DELETE"
}]
}
なお、フィルタリングの例は下記に記載があります。
Enrichment
フィルタリングと同じく、オプション機能です。
Filteringで制限されたイベントを受け取り、そのイベントのJSONをログに出して、固定文字を返すようにしています。
ターゲットサービス
Enrichmentから渡されるイベントを確認してみます。
pipesに流してみる
- DynamoDBにItemを追加してみます。
idはFirst_hoge
としてみました。
2.Lambdaの実行ログを確認します。
Enrichmentとターゲットサービスを同じLambdaにしているため、CloudWatch Logsのログも同じログストリームに出ています。
青枠がEnrichmentのログ、赤枠がターゲットサービスのログです。
Enrichmentのログは下記のような形でした。(JSONの形にフォーマットしています。)
DynamoDB Streamと同じようです。
[
{
"eventID": "e6575fc2c02eb96495d6c3d04a941d8f",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1669946402,
"Keys": { "id": { "S": "First_hoge" } },
"NewImage": { "id": { "S": "First_hoge" } },
"SequenceNumber": "13700000000003743678089",
"SizeBytes": 24,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:0000000000000:table/Pipe-test-table/stream/2022-12-02T01:16:40.686"
}
]
ターゲットサービスのログはEnrichmentで返しているJSONをそのまま受け取っているようです。
[{'statusCode': 200, 'body': '"Hello from Lambda through ignore DELETE !"'}]
まとめ
re:Inventで発表されたEventBridgeやStep Functions周りの新サービスのうち、EventBridge Pipesを触ってみました。
現在稼働させているプロダクトにて、DynamoDB StreamからLambdaにイベントを流し、Lambdaで条件分岐させるような実装をしているのですが、pipesを使えばコード数がかなり削減できそうでした。