0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Stripe WebhookイベントのデータをKinesisのCosumer(Lambda)で受け取り、何らかのデータ更新をする

Posted at

はじめに

本記事では、以下の図のようにStripeのWebhookをAPI Gateway→Lambda→Kinesisという構成で、API GatewayでWebhookを受け取り、それをKinesis Data Streamに流す、というイベント駆動の処理で、Kinesisに流れたデータを受け取り、何らかのデータ更新を行うサーバレスアプリケーション、具体的にはStripeのデータを更新するConsumer(Lambda)を開発してみたいと思う。
image.png

※上記の構成のメリットとして、KinesisのConsumer(データを受け取る側)を複数配置する事ができるので、マイクロサービスが複数あればそれぞれのマイクロサービスに関連する処理を別のサーバーレスアプリケーションを開発可能である、というのがあるだろう(図中のLambda→RDS Proxy→RDSの部分などがそれ)。

おまけでは、API Gateway→Lambda→Kinesis Data Streamの部分について、serverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするでやっているように、ローカルの開発環境で検証・テストできるようする、ということもやってみてた。

※ソースコード全体は以下。

Kinesis Data StreamのConsumer(Lambda)を実装してみる

はじめにで触れたように、今回はStripeのWebhookイベントを受けて、Stripeのデータを更新するようなLambdaを実装してみたいと思う。

serverless.yamlの設定

まずは、serverless.yamlを設定していく。

serverless.yaml
service: aws-node-serverless-kinesis-consumer-lambda

frameworkVersion: '3'

provider:
  # 省略
  iam:
    role:
      name: aws-node-serverless-kinesis-consumer-lambda-role
      statements:
        - Effect: Allow
          Action:
            - ssm:GetParameter
            - ssm:GetParameters
          Resource:
            - !Sub arn:aws:ssm:ap-northeast-1:${AWS::AccountId}:parameter${file(./env/${self:provider.stage}.json):SSM_PARAMETER_NAME_STRIPE_SECRET_KEY}

plugins:
  - serverless-webpack
  - serverless-plugin-ifelse
  - serverless-offline-kinesis
  - serverless-offline

custom:
  defaultStage: local
  webpack:
    includeModules: true
    packager: 'yarn'
  serverless-offline-kinesis:
    endpoint: http://localhost:4566
    region: ${self:provider.region}
    accessKeyId: local
    secretAccessKey: local
  serverlessIfElse:
    - If: '"${self:provider.stage}" == "local"'
      Set:
        functions.kinesisConsumer.events.0.stream.arn: arn:aws:kinesis:${self:provider.region}:000000000000:stream/${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}

functions:
  kinesis2stripe:
    name: kinesis2stripe-${sls:stage}
    handler: src/kinesis2stripe.handler
    timeout: 5
    environment:
      STAGE: ${self:provider.stage}
      REGION: ${self:provider.region}
      SSM_PARAMETER_NAME_STRIPE_SECRET_KEY: ${file(./env/${self:provider.stage}.json):SSM_PARAMETER_NAME_STRIPE_SECRET_KEY}
    events:
      - stream:
          type: kinesis
          arn:
            Fn::Join:
              - ':'
              - - arn:aws:kinesis:ap-northeast-1
                - !Sub ${AWS::AccountId}
                - stream/${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
          batchSize: 100
          batchWindow: 5
          startingPosition: LATEST
./env/local.json
{
	"ENV": "local",
	"SSM_PARAMETER_NAME_STRIPE_SECRET_KEY": "/stripe_webhook_local/stripe_secret_key",
	"KINESIS_STREAM_NAME": "kinesis-internal-events-local"
}

上記の設定でいくつか補足をする。

  • provider.iam
    今回、Stripeのデータを更新するためにstripe-nodeを利用するが、その際にsecret keyが必要になる。そのkeyの値はセキュアな情報なのでAWS SSMのパラメータストアから取得するように実装するため、IAMロール(Lambda実行ロール)にSSMパラメータを取得できる権限を設定してる(secret keyはStripeのダッシュボード上で以下のように確認できる)。
    image.png
  • serverless-plugin-ifelse
    ローカルの開発環境で検証・テストを行う上で、Lambda関数のイベントソースマッピングの設定を上書きするために利用しているプラグイン
  • serverless-offline-kinesis
    こちらも、ローカルの開発環境で検証・テストを行うためのプラグインで、Lambda関数のイベントソースマッピングに指定しているKinesisにデータが流れた際に、Lambdaを起動するという部分をエミュレーションしてくれる
  • ${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
    こちらの表記に関してはRecursively reference propertiesを参照

ここまででserverless.yamlの設定は完了になる。

実行ロールのアクセス許可に書かれている通り、Kinesisのデータを受け取るにもIAMロールが必要になるが、これについてはserverlessFW側でいい感じに自動で設定してくれるので、明示的に設定せずで問題ない(ただし、同一 serverless に複数の Lambda 関数がある場合の IAM Permissions についてに書かれているようなカスタムロールを設定する場合には、自分で全てのIAMを設定する必要があるので注意)。

kinesis2stripe(Lambda関数)を実装する

続いて、Kinesisから流れてきたデータを受け取り、Stripeのデータを更新するLambda関数を実装していく。

Amazon Kinesis で AWS Lambda を使用するを見ればわかるが、Kinesisから流れた来たデータをLambdaで受け取る場合、eventオブジェクトの中身は以下のようになっている。つまり、event.Records[].kinesis.dataを取り出し、Base64でデコードする事で元のデータが取り出せる(Node.js 12.xなどを参照)。
image.png

./src/kinesis2stripe.js
// 省略
import { createLogger, format, transports } from 'winston';
import Ssm from '@/ssm';
import EventExecutor from '@/event-executor';

const logger = createLogger({
	// 省略
});

// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
	try {
		const {
			STAGE: stage,
			REGION: region,
			SSM_PARAMETER_NAME_STRIPE_SECRET_KEY: ssmParamNameStripeSecretKey,
			KINESIS_STREAM_NAME: kinesisStreamName
		} = process.env;
		// 省略

		const ssm = new Ssm({ stage, region });
		const { stripeSecretKey } = await ssm.getParameters({
			ssmParamNameStripeSecretKey
		});
		const eventExecutor = new EventExecutor({ stripeSecretKey });

		const errors = [];
		await Promise.all(
			event.Records.map(async (record) => {
				try {
					const { data: eventData } = record.kinesis;
					const { category, type, payload } = JSON.parse(Buffer.from(eventData, 'base64'));
					if (!payload) throw new Error(`payload must be required`);

					const { data } = payload;
					await eventExecutor.execute({ category, type, data, logger });
				} catch (e) {
					errors.push({ message: e.message, stack: e.stack, record });
				}
			})
		);

		if (errors.length) throw new Error(JSON.stringify(errors));
		return { statusCode: 200 };
	} catch (e) {
		logger.error({ message: e.message, stack: e.stack });
		return { statusCode: 500 };
	}
};
./src/lib/event-executor.js
// 省略
const events = {
	stripe: {
		'payment_method.attached': async (options = {}) => {
			// 省略

			const { data, stripe } = options;
			const { id: paymentMethodId, customer: customerId } = data;

			await stripe.customers.update(customerId, {
				invoice_settings: { default_payment_method: paymentMethodId }
			});
		}
	}
};

export default class EventExecutor {
	constructor(options = {}) {
		assert.ok(options.stripeSecretKey, 'stripeSecretKey must be required');

		const { stripeSecretKey } = options;
		this.stripe = new Stripe(stripeSecretKey);
	}

	async execute(options = {}) {
		// 省略

		const { category, type, data, logger } = options;
		if (!Object.keys(events).includes(category) && !Object.keys(events[category]).includes(type)) {
			logger.info({ message: `${category}.${type} event from kinesis skip` });
			return;
		}

		await events[category][type]({ data, stripe: this.stripe });
		logger.info({ message: `${category}.${type} event from kinesis is executed` });
	}
}

上記のコードでいくつか補足をする。

  • await ssm.getParameters({...})
    上記のserverless.yamlの設定で補足した通り、今回はStripeのsecret keyはAWS SSMのパラメータストアに保存している想定なので、SSMからそのsecret keyを取得している(クラスの実装はここを参照)
  • errors.push({ message: e.message, stack: e.stack, record })
    MDNに記述があるように、Promise.allでエラーが発生すると、最初のエラーはのみしか捕捉されないので全てのエラーを記録するために、errorsという配列にエラーメッセージを入れるようにしている(Promise.allについてはmap()で await をした場合のパフォーマンスを参照)
  • await eventExecutor.execute({ category, type, data })
    将来的に色々なイベントをトリガーにデータ更新処理を行いたい場合に、拡張しやすくするためにKinesisに流れてきたデータを処理する本体はクラスに隠蔽する形をとっている。./src/lib/event-executor.jsでは、処理したいイベントの種別をObjectに定義していけば処理内容を追加できるような実装をしてみた。

ここまでできた所で、以下のようにAWS CLIでaws kinesis put-recordを実行し、Kinesisにデータを流してみると、意図した通りに動作している事が確認できる(Stripeのcustomerのpayment_methodデータはあらかじめ手動で作成しておいたもの)。

package.json
	"scripts": {
		...
		"aws.kinesis.put-record": "aws kinesis put-record --stream-name kinesis-internal-events-local --partition-key 123 --data '{"category":"stripe","type":"payment_method.attached","service":"aws-node-serverless-stripe-webhook","payload":{"version":1,"data":{"id":"card_1MI5SxKcqKFZLnGn4uAD9hX7","customer":"cus_MzVc08uHaCvDqX"}}}' --cli-binary-format raw-in-base64-out --endpoint=http://localhost:4566",
		...
	},
terminal A
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *+)
$ yarn aws.kinesis.put-record
yarn run v1.22.19
$ aws kinesis put-record --stream-name kinesis-internal-events-local --partition-key 123 --data '{"category":"stripe","type":"payment_method.attached","service":"aws-node-serverless-stripe-webhook","payload":{"version":1,"data":{"id":"card_1MI5SxKcqKFZLnGn4uAD9hX7","customer":"cus_MzVc08uHaCvDqX"}}}' --cli-binary-format raw-in-base64-out --endpoint=http://localhost:4566
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49636367443686329849318835280279325352841694306935767042",
    "EncryptionType": "NONE"
}
Done in 1.45s.
terminal B
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *+)
$ yarn dev
yarn run v1.22.19
$ sls offline start
serverless-plugin-ifelse - Value Changed for : functions.kinesis2stripe.events.0.stream.arn to: arn:aws:kinesis:ap-northeast-1:000000000000:stream/kinesis-internal-events-local
Starting Offline Kinesis at stage local (ap-northeast-1)

Starting Offline at stage local (ap-northeast-1)

Offline [http for lambda] listening on http://localhost:3002
Function names exposed for local invocation by aws-sdk:
           * kinesis2stripe: kinesis2stripe-local

{"level":"info","message":"stripe.payment_method.attached event from kinesis is executed","timestamp":"2022-12-23T07:15:35.365Z"}
(λ: kinesis2stripe) RequestId: 372b1ce7-576f-4928-aaad-3c1b8c044c1c  Duration: 395.38 ms  Billed Duration: 396 ms

image.png

ここまで確認ができれば、あとは実際にAWS上にDeployしてStripe→ ・・・ →API Gateway→Lambda→Kinesis→ ・・・ →Lambda→ ・・・ →Stripeが通しで動く事は確認できるだろう(ローカルの開発環境で検証・テストしているので、AWSにDeployしてエラーが出るとすればIAMの権限周りくらい)。

実際にAWS環境にDeployして検証してみる

うまく動いている事が以下のように確認できた。
image.png

※ちなみに、KinesisにアクセスするためのIAMロールは、serverlessの方でイベントソースマッピングを設定したリソースに合わせて以下のように自動で設定してくれる(この辺り、serverlessFWを利用するメリットの1つになるだろう)。
image.png

まとめとして

今回はStripeからのWebhookイベントをKinesisに流した後、Consumerでそのデータを受け取り何らかのデータ更新を行うというのをやってみた。Kinesisを利用する事で、簡単にイベント駆動の実現ができるので、今後もこうした構成を利用する機会はありそうだなと思う。

おまけ

ローカル環境でAPI Gateway→Lambda→Kinesis→・・・→Lambda→Stripeのデータ更新を検証できるようにしてみる

やり方としては全然難しくなく、以下のようにserverless.yamlを設定し、LambdaのイベントソースマッピングにAPI Gatewayを持つようにすればいい。

serverless.yaml
functions:
  kinesis2stripe:
    # 省略
  # ローカルの開発環境での検証用
  stripe2kinesisLocalOnly:
    name: stripe2kinesisLocalOnly
    handler: support/stripe2kinesisLocalOnly.handler
    timeout: 5
    environment:
      STRIPE_SECRET_KEY: sk_test_**************************************************
      STRIPE_ENDPOINT_SECRET: whsec_**************************************************
      KINESIS_STREAM_NAME: ${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
    events:
      - http:
          method: post
          path: /webhook

上記で2点補足する。

  • STRIPE_SECRET_KEY
    Stripeのダッシュボードで確認できるシークレットキーを設定する(Webhook の署名を確認するに書かれている処理で利用するので)
    image.png
  • STRIPE_ENDPOINT_SECRET
    以下の画像のように、Stripe-CLIを利用してWebhookの検証をローカル環境でもできるが、その際に払い出されるsigning secretを設定する(Stripe CLI で Webhook の組み込みをテストするも合わせて参照)
    image.png

続いて、Lambda関数を実装していくが、実装としては以下のようになる。

./support/stripe2kinesisLocalOnly.js
// 省略

const logger = createLogger({
	// 省略
});

// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
	try {
		const sig = event.headers[`Stripe-Signature`];
		const {
			STRIPE_SECRET_KEY: stripeSecretKey,
			STRIPE_ENDPOINT_SECRET: stripeEndpointSecret,
			KINESIS_STREAM_NAME: kinesisStreamName
		} = process.env;
		// 省略

		const stripe = new Stripe(stripeSecretKey);
		const kinesisClient = new KinesisClient({
			region: 'ap-northeast-1',
			endpoint: 'http://localhost:4566'
		});

		const {
			type,
			data: { object: payload }
		} = stripe.webhooks.constructEvent(event.body, sig, stripeEndpointSecret);

		await kinesisClient.send(
			new PutRecordCommand({
				Data: new TextEncoder().encode(
					JSON.stringify({
						category: 'stripe',
						type,
						service: 'aws-node-serverless-stripe-webhook',
						payload: { version: 1, data: payload }
					})
				),
				PartitionKey: '123',
				StreamName: kinesisStreamName
			})
		);

		logger.info({ message: `sucess putRecord 'stripe' ${type} event data` });
		return { statusCode: 200 };
	} catch (e) {
		logger.error({ message: e.message, stack: e.stack });
		return { statusCode: 500 };
	}
};

上記のコードについて少し補足する。

ここまで実装が完了すると、以下のように通しで確認できるようになる。

serverlessのログ
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *)
$ yarn dev
yarn run v1.22.19
$ sls offline start
serverless-plugin-ifelse - Value Changed for : functions.kinesis2stripe.events.0.stream.arn to: arn:aws:kinesis:ap-northeast-1:000000000000:stream/kinesis-internal-events-local
Starting Offline Kinesis at stage local (ap-northeast-1)

Starting Offline at stage local (ap-northeast-1)

Offline [http for lambda] listening on http://localhost:3002
Function names exposed for local invocation by aws-sdk:
           * kinesis2stripe: kinesis2stripe-local
           * stripe2kinesisLocalOnly: stripe2kinesisLocalOnly

┌───────────────────────────────────────────────────────────────────────────────┐
│                                                                               │
│   POST | http://localhost:3000/local/webhook                                  │
│   POST | http://localhost:3000/2015-03-31/functions/stripe2kinesisLocalOnly   │
│   /invocations                                                                │
│                                                                               │
└───────────────────────────────────────────────────────────────────────────────┘

Server ready: http://localhost:3000 🚀

POST /local/webhook (λ: stripe2kinesisLocalOnly)

POST /local/webhook (λ: stripe2kinesisLocalOnly)
{"level":"info","message":"sucess putRecord 'stripe' payment_method.attached event data","timestamp":"2022-12-26T02:10:05.715Z"}
(λ: stripe2kinesisLocalOnly) RequestId: 6dd97efc-ec34-4b31-9572-734f9d60961a  Duration: 814.15 ms  Billed Duration: 815 ms

...

{"level":"info","message":"stripe.payment_method.attached event from kinesis is executed","timestamp":"2022-12-26T02:10:08.908Z"}
(λ: kinesis2stripe) RequestId: b4ea89b8-8050-452d-b5a7-e2e6e8e9c836  Duration: 2161.44 ms  Billed Duration: 2162 ms

...
Stripe CLIのログ
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *)
$ stripe listen -f http://localhost:3000/local/webhook
> Ready! You are using Stripe API Version [2022-11-15]. Your webhook signing secret is whsec_***************************************** (^C to quit)
2022-12-26 11:10:04   --> payment_method.attached [evt_1MJ68eKcqKFZLnGnl5SwFPk7]
2022-12-26 11:10:05   --> customer.source.created [evt_1MJ68eKcqKFZLnGnLZvYbRVi]
2022-12-26 11:10:05   --> setup_intent.created [evt_1MJ68fKcqKFZLnGnuG70rV86]
2022-12-26 11:10:05  <--  [200] POST http://localhost:3000/local/webhook [evt_1MJ68eKcqKFZLnGnl5SwFPk7]
2022-12-26 11:10:05   --> setup_intent.succeeded [evt_1MJ68fKcqKFZLnGnpHHPYZEx]
2022-12-26 11:10:05  <--  [200] POST http://localhost:3000/local/webhook [evt_1MJ68fKcqKFZLnGnuG70rV86]
2022-12-26 11:10:07  <--  [200] POST http://localhost:3000/local/webhook [evt_1MJ68eKcqKFZLnGnLZvYbRVi]
2022-12-26 11:10:07  <--  [200] POST http://localhost:3000/local/webhook [evt_1MJ68fKcqKFZLnGnpHHPYZEx]
2022-12-26 11:10:09   --> customer.updated [evt_1MJ68iKcqKFZLnGnjhjSbADH]
2022-12-26 11:10:09  <--  [200] POST http://localhost:3000/local/webhook [evt_1MJ68iKcqKFZLnGnjhjSbADH]

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?