概要
他で(例えば他のAmplifyプロジェクトで)作成したDynamoDBのテーブルへのデータ挿入や変更などを、Amplifyアプリで検知してリアルタイムで利用する仕組みを構築するメモ。
同じAmplifyプロジェクト内で@model
で作成したテーブルであればsubscriptionを定義してアプリから呼び出すだけで簡単にリアルタイムでデータを取得できるが、Amplifyプロジェクトの外で作成されたテーブルのデータ更新を検知することはできない(2021/08/11現在)。
なので、対象のDynamoDBテーブルのStreamをLambdaで受け取って、Lambdaに実装中のAmplifyプロジェクトのmutationを呼び出させて、そのmutationをsubscribeすることでAmplifyアプリで変更を受け取る、ということをする。
通知を受け取るだけでなく、データを取りに行く方法も既存DynamoDBをAmplifyで使うメモにまとめている。もしかしたら、こちらを先にやっておく必要があるかもしれないが、未確認。
前提
- AmplifyでGraphQLで開発中
- 開発中のAmplifyのGraphQLエンドポイントはAPI KEYでアクセス可能
手順
以下は記述がない限り全て開発中のAmplify側での作業とする。
準備
下記の情報が必要になるので控えておく。
-
通知を受け取りたい対象のDynamoDBテーブルのStreamのARN
DynamoDBコンソールのテーブルのOverviewタブの「DynamoDB stream details」に記載されている。 -
開発中のAmplifyのAppSync API URL
AppSyncコンソールから本アプリのAppSyncを開いてSettingsページの「API Details」に記載されている。 -
API key
同じくSettingsページの「API Details」に記載されている。
ダミーデータソースの定義
上の図左側のラムダからAmplifyへのmutationではデータは書き換える必要が無いため、通常はDynamoDBなどであるデータソースを、Noneタイプを使用して実態のないダミーを指定する。
"Resources": {
...
"NoneDataSource": {
"Type": "AWS::AppSync::DataSource",
"Properties": {
"ApiId": { "Ref": "AppSyncApiId" },
"Name": "MyNoneDataSource",
"Type": "NONE",
"ServiceRoleArn": { "Fn::GetAtt": [ "DynamoDBDataSourceRole", "Arn" ] }
}
},
...
}
スキーマの作成
Lambdaから呼び出すMutationを定義する。Mutationの定義に必要な要素も定義する。
type Mutation {
notifyNewTodo(input: TodoInput!): Todo
}
type TodoInput {
id: ID
description: String!
categoryId: ID
}
type Todo {
id: ID!
description: String!
categoryId: ID
}
このmutationの呼び出しを検知するsubscriptionを定義する
type Subscription {
onNotifyNewTodo: Todo @aws_subscribe(mutations: ["notifyNewTodo"])
}
リゾルバの作成
Mutationのリゾルバを作成する。ここでは、Lambdaから notifyNewTodo
mutationを呼び出すときに渡される引数をそのままSubscriberに渡したいので、notifyNewTodo
mutationの引数の input
を $ctx.args.input
で取得して payload
に設定している。
{
"version": "2018-05-29",
"payload": $util.toJson($ctx.args.input)
}
レスポンスのリゾルバはそのまま返す。
$util.toJson($ctx.result)
リゾルバの登録
CustomResource.jsonでこれらのリゾルバを登録する。DataSourceName
でダミーデータソースの名前を指定して、mutationが何もデータを変更しないようにしている。
"Resources": {
...
"MutationNotifyNewTodoResolver": {
"Type": "AWS::AppSync::Resolver",
"Properties": {
"ApiId": { "Ref": "AppSyncApiId" },
"DataSourceName": { "Fn::GetAtt": [ "NoneDataSource", "Name" ] },
"TypeName": "Mutation",
"FieldName": "notifyNewTodo",
"RequestMappingTemplateS3Location": {
"Fn::Sub": [
"s3://${S3DeploymentBucket}/${S3DeploymentRootKey}/resolvers/Mutation.notifyNewTodo.req.vtl",
{
"S3DeploymentBucket": { "Ref": "S3DeploymentBucket"},
"S3DeploymentRootKey": { "Ref": "S3DeploymentRootKey" }
}
]
},
"ResponseMappingTemplateS3Location": {
"Fn::Sub": [
"s3://${S3DeploymentBucket}/${S3DeploymentRootKey}/resolvers/Mutation.notifyNewTodo.res.vtl",
{
"S3DeploymentBucket": { "Ref": "S3DeploymentBucket" },
"S3DeploymentRootKey": { "Ref": "S3DeploymentRootKey" }
}
]
}
}
},
...
}
Lambdaの作成
今回はLambdaにJavaScriptを例として使用する。
で、JSの場合依存ファイルが多すぎてLambda layerで依存ライブラリをLambda関数から切り離さないとLambdaコンソールのエディタが使用できなくなるので、まずはLayerを作成する。作成方法はこちらの記事に記載している。必要になるパッケージは下記の通り3つ。
{
"version": "1.0.0",
"description": "",
"main": "index.js",
"dependencies": {
"aws-appsync": "^4.1.1",
"graphql-tag": "^2.12.5",
"node-fetch": "^2.6.1"
},
"devDependencies": {}
}
次に、Lambda関数を作成。amplify add function
コマンドでLambda functionを選んでLambda関数の設定ファイルを作成する。
- function templateにはLambda triggerを選択
- トリガにはAmazon DynamoDB Streamを選択
- 「Provide the ARN of DynamoDB stream directly」を選択
- 準備で控えた通知を受け取りたい対象のDynamoDBテーブルのStreamのARNを入力
- Advancedの設定で先程設定したLambda layerを指定する。Spaceキーで選択なので注意。(後で
amplify update function
からも可能) - 準備で控えた情報を環境変数として登録
- APP_SYNC_API_URL: 開発中のAmplifyのAppSync API URL
- API_KEY: 開発中のAmplifyのAPI
% amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: notifynewtodo
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Lambda trigger
? What event source do you want to associate with Lambda trigger? Amazon DynamoDB Stream
? Choose a DynamoDB event source option Provide the ARN of DynamoDB stream directly
? Provide the ARN of Amazon DynamoDB stream arn:aws:dynamodb:ap-northeast-1:803093504115:table/Todo-xxxxxxxxxxxxxxxxx-dev/stream/2021-08-04T10:26:56.947
...
Lambdaのロジックを実装
このLambda関数がDynamoDBのStreamで、DBデータに変化があった場合に呼び出される。このLambdaから上記で定義したmutationを呼んでやる。
const AWS = require('aws-sdk')
const appsync = require('aws-appsync')
const gql = require('graphql-tag')
const fetch = require('node-fetch')
if (!globalThis.fetch) globalThis.fetch = fetch
const graphqlClient = new appsync.AWSAppSyncClient({
url: process.env.APP_SYNC_API_URL,
region: process.env.AWS_REGION, // AmplifyのリージョンとLambdaのリージョンは同じ前提
auth: {
type: 'API_KEY',
apiKey: process.env.API_KEY,
},
disableOffline: true
})
const mutation = gql`
mutation NotifyNewTodo($input: TodoInput!) {
notifyNewTodo(input: $input) {
id
description
categoryId
}
}
`;
exports.handler = async event => {
// 複数来る可能性もあるのでイテレートして処理する
for (const record of event.Records) {
if (record.eventName !== 'INSERT') return // この例では作成時のみmutationを呼んでいる
const item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage)
await graphqlClient.mutate({
mutation,
variables: {
// `input: item` にするとitemに下記以外が入っているため動作しない
input: {
id: item.id,
description: item.description,
categoryId: item.categoryId,
}
},
})
}
};
amplify push
で環境を更新する。
これで動作確認ができるので、AppSyncコンソールのQuery画面でsubscriptionを実行して待機状態にする。
DynamoDBのコンソールなどから対象のテーブルにデータを入れて、このQuery画面でデータを受け取れたら成功。
もし、エラーになったり、データが受け取れない場合は、下記含め、フィールドの指定や定義が正しいか、気をつけてチェックしてみると良い。
- 上記JSコード内のmutation文字列で指定している引数に無いフィールドをSubscriptionでリクエストしていないか
- スキーマ定義にないフィールドをこのmutation文字列で指定していないか
また、デバッグは毎回 amplify push
するのではなく、Lambdaコンソールであれこれ試してみると効率が良い。
サブスクライブするデータを絞り込む
現状だと、テーブルへの変更は全て通知されるが、subscriptionに引数を渡して受け取る通知を絞り込むことができる。
onNotifyNewTodo(categoryId: ID): Todo @aws_subscribe(mutations: ["notifyNewTodo"])
この場合、categoryIdにパーティションキーが張られている必要があるかもしれない。テーブルを定義しているAmplifyプロジェクトの方でこのフィールドを引数にしたsubscriptionを作成するとそれをパーティションキーにしたインデックスが自動的に作成されるのかもしれない。が、未確認。
StreamのARNをパラメータ化する
Streamを環境毎に切り替える場合、パラメータ化して対応できる。
環境設定ファイルの中の、StreamでトリガしているLambda (function) の設定にARNを示すパラメータを追加する。
"function": {
...
"myfunction": {
...
"todoStreamArn": "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxxx:table/Todo-wwwwwwwwwwwww-dev/stream/2021-08-04T10:26:56.947"
}
}
amplify/backend/function/myfunction/myfunction-cloudformation-template.json の Parameters
にこのパラメータを定義する。そして、それをLambdaTriggerPolicyから参照してやる。
"Parameters": {
...
"todoStreamArn": { // <-- これ
"Type": "String"
}
},
...
"Resources": {
...
"LambdaTriggerPolicy": {
...
"PolicyDocument": {
...
"Statement": [
{
...
"Resource": {
"Ref": "todoStreamArn" // <-- これ
}
}
]
}
}
},
"LambdaEventSourceMapping": {
...
"Properties": {
...
"EventSourceArn": {
"Ref": "todoStreamArn" // <-- これ
},
...
}
}
}
以上。
参考
AWS AppSync Subscriptions with DynamoDB Streams, Lambda and Serverless