はじめに
本記事では、以下の図のようにStripeのWebhookをAPI Gateway→Lambda→Kinesisという構成で、API GatewayでWebhookを受け取り、それをKinesis Data Streamに流す、というイベント駆動の処理で、Kinesisに流れたデータを受け取り、何らかのデータ更新を行うサーバレスアプリケーション、具体的にはStripeのデータを更新するConsumer(Lambda)を開発してみたいと思う。
※上記の構成のメリットとして、KinesisのConsumer(データを受け取る側)を複数配置する事ができるので、マイクロサービスが複数あればそれぞれのマイクロサービスに関連する処理を別のサーバーレスアプリケーションを開発可能である、というのがあるだろう(図中のLambda→RDS Proxy→RDS
の部分などがそれ)。
※おまけでは、API Gateway→Lambda→Kinesis Data Streamの部分について、serverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするでやっているように、ローカルの開発環境で検証・テストできるようする、ということもやってみてた。
※ソースコード全体は以下。
Kinesis Data StreamのConsumer(Lambda)を実装してみる
はじめにで触れたように、今回はStripeのWebhookイベントを受けて、Stripeのデータを更新するようなLambdaを実装してみたいと思う。
serverless.yamlの設定
まずは、serverless.yamlを設定していく。
service: aws-node-serverless-kinesis-consumer-lambda
frameworkVersion: '3'
provider:
# 省略
iam:
role:
name: aws-node-serverless-kinesis-consumer-lambda-role
statements:
- Effect: Allow
Action:
- ssm:GetParameter
- ssm:GetParameters
Resource:
- !Sub arn:aws:ssm:ap-northeast-1:${AWS::AccountId}:parameter${file(./env/${self:provider.stage}.json):SSM_PARAMETER_NAME_STRIPE_SECRET_KEY}
plugins:
- serverless-webpack
- serverless-plugin-ifelse
- serverless-offline-kinesis
- serverless-offline
custom:
defaultStage: local
webpack:
includeModules: true
packager: 'yarn'
serverless-offline-kinesis:
endpoint: http://localhost:4566
region: ${self:provider.region}
accessKeyId: local
secretAccessKey: local
serverlessIfElse:
- If: '"${self:provider.stage}" == "local"'
Set:
functions.kinesisConsumer.events.0.stream.arn: arn:aws:kinesis:${self:provider.region}:000000000000:stream/${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
functions:
kinesis2stripe:
name: kinesis2stripe-${sls:stage}
handler: src/kinesis2stripe.handler
timeout: 5
environment:
STAGE: ${self:provider.stage}
REGION: ${self:provider.region}
SSM_PARAMETER_NAME_STRIPE_SECRET_KEY: ${file(./env/${self:provider.stage}.json):SSM_PARAMETER_NAME_STRIPE_SECRET_KEY}
events:
- stream:
type: kinesis
arn:
Fn::Join:
- ':'
- - arn:aws:kinesis:ap-northeast-1
- !Sub ${AWS::AccountId}
- stream/${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
batchSize: 100
batchWindow: 5
startingPosition: LATEST
{
"ENV": "local",
"SSM_PARAMETER_NAME_STRIPE_SECRET_KEY": "/stripe_webhook_local/stripe_secret_key",
"KINESIS_STREAM_NAME": "kinesis-internal-events-local"
}
上記の設定でいくつか補足をする。
- provider.iam
今回、Stripeのデータを更新するためにstripe-nodeを利用するが、その際にsecret key
が必要になる。そのkeyの値はセキュアな情報なのでAWS SSMのパラメータストアから取得するように実装するため、IAMロール(Lambda実行ロール)にSSMパラメータを取得できる権限を設定してる(secret key
はStripeのダッシュボード上で以下のように確認できる)。
-
serverless-plugin-ifelse
ローカルの開発環境で検証・テストを行う上で、Lambda関数のイベントソースマッピングの設定を上書きするために利用しているプラグイン -
serverless-offline-kinesis
こちらも、ローカルの開発環境で検証・テストを行うためのプラグインで、Lambda関数のイベントソースマッピングに指定しているKinesisにデータが流れた際に、Lambdaを起動するという部分をエミュレーションしてくれる -
${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
こちらの表記に関してはRecursively reference propertiesを参照
ここまででserverless.yamlの設定は完了になる。
※実行ロールのアクセス許可に書かれている通り、Kinesisのデータを受け取るにもIAMロールが必要になるが、これについてはserverlessFW側でいい感じに自動で設定してくれるので、明示的に設定せずで問題ない(ただし、同一 serverless に複数の Lambda 関数がある場合の IAM Permissions についてに書かれているようなカスタムロールを設定する場合には、自分で全てのIAMを設定する必要があるので注意)。
kinesis2stripe(Lambda関数)を実装する
続いて、Kinesisから流れてきたデータを受け取り、Stripeのデータを更新するLambda関数を実装していく。
Amazon Kinesis で AWS Lambda を使用するを見ればわかるが、Kinesisから流れた来たデータをLambdaで受け取る場合、event
オブジェクトの中身は以下のようになっている。つまり、event.Records[].kinesis.data
を取り出し、Base64でデコードする事で元のデータが取り出せる(Node.js 12.xなどを参照)。
// 省略
import { createLogger, format, transports } from 'winston';
import Ssm from '@/ssm';
import EventExecutor from '@/event-executor';
const logger = createLogger({
// 省略
});
// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
try {
const {
STAGE: stage,
REGION: region,
SSM_PARAMETER_NAME_STRIPE_SECRET_KEY: ssmParamNameStripeSecretKey,
KINESIS_STREAM_NAME: kinesisStreamName
} = process.env;
// 省略
const ssm = new Ssm({ stage, region });
const { stripeSecretKey } = await ssm.getParameters({
ssmParamNameStripeSecretKey
});
const eventExecutor = new EventExecutor({ stripeSecretKey });
const errors = [];
await Promise.all(
event.Records.map(async (record) => {
try {
const { data: eventData } = record.kinesis;
const { category, type, payload } = JSON.parse(Buffer.from(eventData, 'base64'));
if (!payload) throw new Error(`payload must be required`);
const { data } = payload;
await eventExecutor.execute({ category, type, data, logger });
} catch (e) {
errors.push({ message: e.message, stack: e.stack, record });
}
})
);
if (errors.length) throw new Error(JSON.stringify(errors));
return { statusCode: 200 };
} catch (e) {
logger.error({ message: e.message, stack: e.stack });
return { statusCode: 500 };
}
};
// 省略
const events = {
stripe: {
'payment_method.attached': async (options = {}) => {
// 省略
const { data, stripe } = options;
const { id: paymentMethodId, customer: customerId } = data;
await stripe.customers.update(customerId, {
invoice_settings: { default_payment_method: paymentMethodId }
});
}
}
};
export default class EventExecutor {
constructor(options = {}) {
assert.ok(options.stripeSecretKey, 'stripeSecretKey must be required');
const { stripeSecretKey } = options;
this.stripe = new Stripe(stripeSecretKey);
}
async execute(options = {}) {
// 省略
const { category, type, data, logger } = options;
if (!Object.keys(events).includes(category) && !Object.keys(events[category]).includes(type)) {
logger.info({ message: `${category}.${type} event from kinesis skip` });
return;
}
await events[category][type]({ data, stripe: this.stripe });
logger.info({ message: `${category}.${type} event from kinesis is executed` });
}
}
上記のコードでいくつか補足をする。
-
await ssm.getParameters({...})
上記のserverless.yamlの設定で補足した通り、今回はStripeのsecret key
はAWS SSMのパラメータストアに保存している想定なので、SSMからそのsecret key
を取得している(クラスの実装はここを参照) -
errors.push({ message: e.message, stack: e.stack, record })
MDNに記述があるように、Promise.allでエラーが発生すると、最初のエラーはのみしか捕捉されないので全てのエラーを記録するために、errors
という配列にエラーメッセージを入れるようにしている(Promise.allについてはmap()で await をした場合のパフォーマンスを参照) - await eventExecutor.execute({ category, type, data })
将来的に色々なイベントをトリガーにデータ更新処理を行いたい場合に、拡張しやすくするためにKinesisに流れてきたデータを処理する本体はクラスに隠蔽する形をとっている。./src/lib/event-executor.js
では、処理したいイベントの種別をObjectに定義していけば処理内容を追加できるような実装をしてみた。
ここまでできた所で、以下のようにAWS CLIでaws kinesis put-record
を実行し、Kinesisにデータを流してみると、意図した通りに動作している事が確認できる(Stripeのcustomerのpayment_methodデータはあらかじめ手動で作成しておいたもの)。
"scripts": {
...
"aws.kinesis.put-record": "aws kinesis put-record --stream-name kinesis-internal-events-local --partition-key 123 --data '{"category":"stripe","type":"payment_method.attached","service":"aws-node-serverless-stripe-webhook","payload":{"version":1,"data":{"id":"card_1MI5SxKcqKFZLnGn4uAD9hX7","customer":"cus_MzVc08uHaCvDqX"}}}' --cli-binary-format raw-in-base64-out --endpoint=http://localhost:4566",
...
},
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *+)
$ yarn aws.kinesis.put-record
yarn run v1.22.19
$ aws kinesis put-record --stream-name kinesis-internal-events-local --partition-key 123 --data '{"category":"stripe","type":"payment_method.attached","service":"aws-node-serverless-stripe-webhook","payload":{"version":1,"data":{"id":"card_1MI5SxKcqKFZLnGn4uAD9hX7","customer":"cus_MzVc08uHaCvDqX"}}}' --cli-binary-format raw-in-base64-out --endpoint=http://localhost:4566
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49636367443686329849318835280279325352841694306935767042",
"EncryptionType": "NONE"
}
Done in 1.45s.
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *+)
$ yarn dev
yarn run v1.22.19
$ sls offline start
serverless-plugin-ifelse - Value Changed for : functions.kinesis2stripe.events.0.stream.arn to: arn:aws:kinesis:ap-northeast-1:000000000000:stream/kinesis-internal-events-local
Starting Offline Kinesis at stage local (ap-northeast-1)
Starting Offline at stage local (ap-northeast-1)
Offline [http for lambda] listening on http://localhost:3002
Function names exposed for local invocation by aws-sdk:
* kinesis2stripe: kinesis2stripe-local
{"level":"info","message":"stripe.payment_method.attached event from kinesis is executed","timestamp":"2022-12-23T07:15:35.365Z"}
(λ: kinesis2stripe) RequestId: 372b1ce7-576f-4928-aaad-3c1b8c044c1c Duration: 395.38 ms Billed Duration: 396 ms
ここまで確認ができれば、あとは実際にAWS上にDeployしてStripe→ ・・・ →API Gateway→Lambda→Kinesis→ ・・・ →Lambda→ ・・・ →Stripeが通しで動く事は確認できるだろう(ローカルの開発環境で検証・テストしているので、AWSにDeployしてエラーが出るとすればIAMの権限周りくらい)。
実際にAWS環境にDeployして検証してみる
※ちなみに、KinesisにアクセスするためのIAMロールは、serverlessの方でイベントソースマッピングを設定したリソースに合わせて以下のように自動で設定してくれる(この辺り、serverlessFWを利用するメリットの1つになるだろう)。
まとめとして
今回はStripeからのWebhookイベントをKinesisに流した後、Consumerでそのデータを受け取り何らかのデータ更新を行うというのをやってみた。Kinesisを利用する事で、簡単にイベント駆動の実現ができるので、今後もこうした構成を利用する機会はありそうだなと思う。
おまけ
ローカル環境でAPI Gateway→Lambda→Kinesis→・・・→Lambda→Stripeのデータ更新
を検証できるようにしてみる
やり方としては全然難しくなく、以下のようにserverless.yaml
を設定し、LambdaのイベントソースマッピングにAPI Gatewayを持つようにすればいい。
functions:
kinesis2stripe:
# 省略
# ローカルの開発環境での検証用
stripe2kinesisLocalOnly:
name: stripe2kinesisLocalOnly
handler: support/stripe2kinesisLocalOnly.handler
timeout: 5
environment:
STRIPE_SECRET_KEY: sk_test_**************************************************
STRIPE_ENDPOINT_SECRET: whsec_**************************************************
KINESIS_STREAM_NAME: ${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
events:
- http:
method: post
path: /webhook
上記で2点補足する。
- STRIPE_SECRET_KEY
Stripeのダッシュボードで確認できるシークレットキー
を設定する(Webhook の署名を確認するに書かれている処理で利用するので)
- STRIPE_ENDPOINT_SECRET
以下の画像のように、Stripe-CLIを利用してWebhookの検証をローカル環境でもできるが、その際に払い出されるsigning secret
を設定する(Stripe CLI で Webhook の組み込みをテストするも合わせて参照)
続いて、Lambda関数を実装していくが、実装としては以下のようになる。
// 省略
const logger = createLogger({
// 省略
});
// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
try {
const sig = event.headers[`Stripe-Signature`];
const {
STRIPE_SECRET_KEY: stripeSecretKey,
STRIPE_ENDPOINT_SECRET: stripeEndpointSecret,
KINESIS_STREAM_NAME: kinesisStreamName
} = process.env;
// 省略
const stripe = new Stripe(stripeSecretKey);
const kinesisClient = new KinesisClient({
region: 'ap-northeast-1',
endpoint: 'http://localhost:4566'
});
const {
type,
data: { object: payload }
} = stripe.webhooks.constructEvent(event.body, sig, stripeEndpointSecret);
await kinesisClient.send(
new PutRecordCommand({
Data: new TextEncoder().encode(
JSON.stringify({
category: 'stripe',
type,
service: 'aws-node-serverless-stripe-webhook',
payload: { version: 1, data: payload }
})
),
PartitionKey: '123',
StreamName: kinesisStreamName
})
);
logger.info({ message: `sucess putRecord 'stripe' ${type} event data` });
return { statusCode: 200 };
} catch (e) {
logger.error({ message: e.message, stack: e.stack });
return { statusCode: 500 };
}
};
上記のコードについて少し補足する。
- stripe.webhooks.constructEvent
Webhook の署名を確認するに書かれている方法で、StripeのWebhookイベントの署名確認を行っている
ここまで実装が完了すると、以下のように通しで確認できるようになる。
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *)
$ yarn dev
yarn run v1.22.19
$ sls offline start
serverless-plugin-ifelse - Value Changed for : functions.kinesis2stripe.events.0.stream.arn to: arn:aws:kinesis:ap-northeast-1:000000000000:stream/kinesis-internal-events-local
Starting Offline Kinesis at stage local (ap-northeast-1)
Starting Offline at stage local (ap-northeast-1)
Offline [http for lambda] listening on http://localhost:3002
Function names exposed for local invocation by aws-sdk:
* kinesis2stripe: kinesis2stripe-local
* stripe2kinesisLocalOnly: stripe2kinesisLocalOnly
┌───────────────────────────────────────────────────────────────────────────────┐
│ │
│ POST | http://localhost:3000/local/webhook │
│ POST | http://localhost:3000/2015-03-31/functions/stripe2kinesisLocalOnly │
│ /invocations │
│ │
└───────────────────────────────────────────────────────────────────────────────┘
Server ready: http://localhost:3000 🚀
POST /local/webhook (λ: stripe2kinesisLocalOnly)
POST /local/webhook (λ: stripe2kinesisLocalOnly)
{"level":"info","message":"sucess putRecord 'stripe' payment_method.attached event data","timestamp":"2022-12-26T02:10:05.715Z"}
(λ: stripe2kinesisLocalOnly) RequestId: 6dd97efc-ec34-4b31-9572-734f9d60961a Duration: 814.15 ms Billed Duration: 815 ms
...
{"level":"info","message":"stripe.payment_method.attached event from kinesis is executed","timestamp":"2022-12-26T02:10:08.908Z"}
(λ: kinesis2stripe) RequestId: b4ea89b8-8050-452d-b5a7-e2e6e8e9c836 Duration: 2161.44 ms Billed Duration: 2162 ms
...
study@localhost:~/workspace/learn-serverless (kinesis-consumer-lambda *)
$ stripe listen -f http://localhost:3000/local/webhook
> Ready! You are using Stripe API Version [2022-11-15]. Your webhook signing secret is whsec_***************************************** (^C to quit)
2022-12-26 11:10:04 --> payment_method.attached [evt_1MJ68eKcqKFZLnGnl5SwFPk7]
2022-12-26 11:10:05 --> customer.source.created [evt_1MJ68eKcqKFZLnGnLZvYbRVi]
2022-12-26 11:10:05 --> setup_intent.created [evt_1MJ68fKcqKFZLnGnuG70rV86]
2022-12-26 11:10:05 <-- [200] POST http://localhost:3000/local/webhook [evt_1MJ68eKcqKFZLnGnl5SwFPk7]
2022-12-26 11:10:05 --> setup_intent.succeeded [evt_1MJ68fKcqKFZLnGnpHHPYZEx]
2022-12-26 11:10:05 <-- [200] POST http://localhost:3000/local/webhook [evt_1MJ68fKcqKFZLnGnuG70rV86]
2022-12-26 11:10:07 <-- [200] POST http://localhost:3000/local/webhook [evt_1MJ68eKcqKFZLnGnLZvYbRVi]
2022-12-26 11:10:07 <-- [200] POST http://localhost:3000/local/webhook [evt_1MJ68fKcqKFZLnGnpHHPYZEx]
2022-12-26 11:10:09 --> customer.updated [evt_1MJ68iKcqKFZLnGnjhjSbADH]
2022-12-26 11:10:09 <-- [200] POST http://localhost:3000/local/webhook [evt_1MJ68iKcqKFZLnGnjhjSbADH]