6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【re:Invent 2022】EventBridge PipesでDDB→Lambdaをしてみた

Last updated at Posted at 2022-12-02

はじめに

この記事は、ミロゴス Advent Calendar 2022 2日目の記事です。

re:Inventはオンラインでの参加のため、KeynoteやLeaders Sessionしか見れない側の人間です。
ちょうどアドカレの担当日と、Werner VogelsのKeynoteの日が被っていたのもあり、気になったものをピックアップし、触ってみました。

Keynote冒頭ではレイテンシやスループットの話から非同期の話に繋がり、イベント駆動アーキテクチャに関する話題が前半のメインになりました。
イベント駆動における主役、EventBridgeやStep Functions周りで新サービスが発表され、個人としては非常に興味深いアップデートが多かったです。

image.png

ここではこれらの中からEventBridge Pipesについて書いていこうと思います。

EventBridge Pipes

EventBridgeと言えばサービス間の橋渡し役で、プロデューサーサービスから渡されたイベントをRuleに記載したイベントパターンでフィルタリングし、イベントコンシューマーに渡していました。
EventBridge Pipesでは同じようにプロデューサーとコンシューマー間の橋渡しをしますが、間にフィルタリングやEnrichmentを挟むことでコンシューマーへ渡すイベントを変換できるようです。

それぞれのコンポーネントとして下記のサービスがサポートされているようです。(発表当日の2022/12/2のAWSコンソール上で確認できたものをベースにしています。)

  • イベントソース
    • Kinesis
    • SQS
    • DynamoDB
    • MQ
    • MSK
    • self-managed Kafka
  • Enrichment
    • Lambda
    • Step Functions
    • API Gateway
    • API Destinations
  • ターゲットサービス
    • API Gateway
    • API Destinations
    • Lambda
    • Cloudwatch Logs
    • ECS
    • EventBridge
    • Kinesis Data Firehose
    • Inspector
    • Kinesis Data Streams
    • Redshift
    • SageMaker Pipeline
    • SNS
    • SQS
    • Step Functions
    • Batch

Pipesを作ってみる

試しにAWSコンソール上で触ってみます。

流れとしてはDynamoDBでのイベントを元に、Filteringを使ってNewImageにidという属性がありeventNameがDelete以外のイベントに限定します。
その後、Enrichmentとターゲットソースには同じLambdaを指定し、それぞれのログを確認できるようにしています。

image.png

呼ばれるLambdaは下記のプログラムです。
Enrichmentが受け取るevent変数の確認と、EnrichmentからターゲットソースのLambdaに渡した値をevent変数で確認します。

lambda_function.py
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(event)
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda through ignore DELETE !')
    }

イベントソース

イベントソースにはDynamoDBを指定します。
なお、DynamoDBはDynamoDB Streamである必要があります。
(選択した後はテーブル名が見えないので、選択ミスにご注意ください。)

image.png

今回、DynamoDBにはPartition keyとしてString型のidのみを設定しています。

フィルタリング

フィルタリングはオプションになります。

コンソール上ではEventBridge Pipesの全体なUIとして、イベントソースのサンプルを確認できます。
そのため、想定しているイベントソースのサンプルを基にフィルタリングの設定ができます。

image.png

下記のようなFilteringを設定しています。DELETE以外にしているので、Itemを削除した場合には後ろのLambdaは実行されません。

{
  "dynamodb": {
    "NewImage": {
      "id": {
        "S": [{
          "exists": true
        }]
      }
    }
  },
  "eventName": [{
    "anything-but": "DELETE"
  }]
}

なお、フィルタリングの例は下記に記載があります。

Enrichment

フィルタリングと同じく、オプション機能です。

Filteringで制限されたイベントを受け取り、そのイベントのJSONをログに出して、固定文字を返すようにしています。

image.png

ターゲットサービス

Enrichmentから渡されるイベントを確認してみます。

image.png

pipesに流してみる

  1. DynamoDBにItemを追加してみます。

idはFirst_hogeとしてみました。

image.png

2.Lambdaの実行ログを確認します。

image.png

Enrichmentとターゲットサービスを同じLambdaにしているため、CloudWatch Logsのログも同じログストリームに出ています。
青枠がEnrichmentのログ、赤枠がターゲットサービスのログです。

Enrichmentのログは下記のような形でした。(JSONの形にフォーマットしています。)
DynamoDB Streamと同じようです。

Enrichmenのevent
[
  {
    "eventID": "e6575fc2c02eb96495d6c3d04a941d8f",
    "eventName": "INSERT",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "ap-northeast-1",
    "dynamodb": {
      "ApproximateCreationDateTime": 1669946402,
      "Keys": { "id": { "S": "First_hoge" } },
      "NewImage": { "id": { "S": "First_hoge" } },
      "SequenceNumber": "13700000000003743678089",
      "SizeBytes": 24,
      "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:0000000000000:table/Pipe-test-table/stream/2022-12-02T01:16:40.686"
  }
]

ターゲットサービスのログはEnrichmentで返しているJSONをそのまま受け取っているようです。

ターゲットサービスのイベント
[{'statusCode': 200, 'body': '"Hello from Lambda through ignore DELETE !"'}]

まとめ

re:Inventで発表されたEventBridgeやStep Functions周りの新サービスのうち、EventBridge Pipesを触ってみました。
現在稼働させているプロダクトにて、DynamoDB StreamからLambdaにイベントを流し、Lambdaで条件分岐させるような実装をしているのですが、pipesを使えばコード数がかなり削減できそうでした。

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?