概要
他で(例えば他のAmplifyプロジェクトで)作成したDynamoDBのテーブルへのデータ挿入や変更などを、Amplifyアプリで検知してリアルタイムで利用する仕組みを構築するメモ。
同じAmplifyプロジェクト内で@modelで作成したテーブルであればsubscriptionを定義してアプリから呼び出すだけで簡単にリアルタイムでデータを取得できるが、Amplifyプロジェクトの外で作成されたテーブルのデータ更新を検知することはできない(2021/08/11現在)。
なので、対象のDynamoDBテーブルのStreamをLambdaで受け取って、Lambdaに実装中のAmplifyプロジェクトのmutationを呼び出させて、そのmutationをsubscribeすることでAmplifyアプリで変更を受け取る、ということをする。
通知を受け取るだけでなく、データを取りに行く方法も既存DynamoDBをAmplifyで使うメモにまとめている。もしかしたら、こちらを先にやっておく必要があるかもしれないが、未確認。
前提
- AmplifyでGraphQLで開発中
- 開発中のAmplifyのGraphQLエンドポイントはAPI KEYでアクセス可能
手順
以下は記述がない限り全て開発中のAmplify側での作業とする。
準備
下記の情報が必要になるので控えておく。
- 
通知を受け取りたい対象のDynamoDBテーブルのStreamのARN
 DynamoDBコンソールのテーブルのOverviewタブの「DynamoDB stream details」に記載されている。
- 
開発中のAmplifyのAppSync API URL
 AppSyncコンソールから本アプリのAppSyncを開いてSettingsページの「API Details」に記載されている。
- 
API key
 同じくSettingsページの「API Details」に記載されている。
ダミーデータソースの定義
上の図左側のラムダからAmplifyへのmutationではデータは書き換える必要が無いため、通常はDynamoDBなどであるデータソースを、Noneタイプを使用して実態のないダミーを指定する。
"Resources": {
  ...
  "NoneDataSource": {
    "Type": "AWS::AppSync::DataSource",
    "Properties": {
      "ApiId": { "Ref": "AppSyncApiId" },
      "Name": "MyNoneDataSource",
      "Type": "NONE",
      "ServiceRoleArn": { "Fn::GetAtt": [ "DynamoDBDataSourceRole", "Arn" ] }
    }
  },
  ...
}
スキーマの作成
Lambdaから呼び出すMutationを定義する。Mutationの定義に必要な要素も定義する。
type Mutation {
  notifyNewTodo(input: TodoInput!): Todo
}
type TodoInput {
  id: ID
  description: String!
  categoryId: ID
}
type Todo {
  id: ID!
  description: String!
  categoryId: ID
}
このmutationの呼び出しを検知するsubscriptionを定義する
type Subscription {
  onNotifyNewTodo: Todo @aws_subscribe(mutations: ["notifyNewTodo"])
}
リゾルバの作成
Mutationのリゾルバを作成する。ここでは、Lambdaから notifyNewTodo mutationを呼び出すときに渡される引数をそのままSubscriberに渡したいので、notifyNewTodo mutationの引数の input を $ctx.args.input で取得して payload に設定している。
{
    "version": "2018-05-29",
    "payload": $util.toJson($ctx.args.input)
}
レスポンスのリゾルバはそのまま返す。
$util.toJson($ctx.result)
リゾルバの登録
CustomResource.jsonでこれらのリゾルバを登録する。DataSourceNameでダミーデータソースの名前を指定して、mutationが何もデータを変更しないようにしている。
"Resources": {
  ...
  "MutationNotifyNewTodoResolver": {
    "Type": "AWS::AppSync::Resolver",
    "Properties": {
      "ApiId": { "Ref": "AppSyncApiId" },
      "DataSourceName": { "Fn::GetAtt": [ "NoneDataSource", "Name" ] },
      "TypeName": "Mutation",
      "FieldName": "notifyNewTodo",
      "RequestMappingTemplateS3Location": {
        "Fn::Sub": [
          "s3://${S3DeploymentBucket}/${S3DeploymentRootKey}/resolvers/Mutation.notifyNewTodo.req.vtl",
          {
            "S3DeploymentBucket": { "Ref": "S3DeploymentBucket"},
            "S3DeploymentRootKey": { "Ref": "S3DeploymentRootKey" }
          }
        ]
      },
      "ResponseMappingTemplateS3Location": {
        "Fn::Sub": [
          "s3://${S3DeploymentBucket}/${S3DeploymentRootKey}/resolvers/Mutation.notifyNewTodo.res.vtl",
          {
            "S3DeploymentBucket": { "Ref": "S3DeploymentBucket" },
            "S3DeploymentRootKey": { "Ref": "S3DeploymentRootKey" }
          }
        ]
      }
    }
  },
  ...
}
Lambdaの作成
今回はLambdaにJavaScriptを例として使用する。
で、JSの場合依存ファイルが多すぎてLambda layerで依存ライブラリをLambda関数から切り離さないとLambdaコンソールのエディタが使用できなくなるので、まずはLayerを作成する。作成方法はこちらの記事に記載している。必要になるパッケージは下記の通り3つ。
{
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "dependencies": {
    "aws-appsync": "^4.1.1",
    "graphql-tag": "^2.12.5",
    "node-fetch": "^2.6.1"
  },
  "devDependencies": {}
}
次に、Lambda関数を作成。amplify add functionコマンドでLambda functionを選んでLambda関数の設定ファイルを作成する。
- function templateにはLambda triggerを選択
- トリガにはAmazon DynamoDB Streamを選択
- 「Provide the ARN of DynamoDB stream directly」を選択
- 準備で控えた通知を受け取りたい対象のDynamoDBテーブルのStreamのARNを入力
 
- Advancedの設定で先程設定したLambda layerを指定する。Spaceキーで選択なので注意。(後でamplify update functionからも可能)
- 準備で控えた情報を環境変数として登録
- APP_SYNC_API_URL: 開発中のAmplifyのAppSync API URL
- API_KEY: 開発中のAmplifyのAPI
 
% amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: notifynewtodo
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Lambda trigger
? What event source do you want to associate with Lambda trigger? Amazon DynamoDB Stream
? Choose a DynamoDB event source option Provide the ARN of DynamoDB stream directly
? Provide the ARN of Amazon DynamoDB stream arn:aws:dynamodb:ap-northeast-1:803093504115:table/Todo-xxxxxxxxxxxxxxxxx-dev/stream/2021-08-04T10:26:56.947
...
Lambdaのロジックを実装
このLambda関数がDynamoDBのStreamで、DBデータに変化があった場合に呼び出される。このLambdaから上記で定義したmutationを呼んでやる。
const AWS = require('aws-sdk')
const appsync = require('aws-appsync')
const gql = require('graphql-tag')
const fetch = require('node-fetch')
if (!globalThis.fetch) globalThis.fetch = fetch
const graphqlClient = new appsync.AWSAppSyncClient({
  url: process.env.APP_SYNC_API_URL,
  region: process.env.AWS_REGION, // AmplifyのリージョンとLambdaのリージョンは同じ前提
  auth: {
    type: 'API_KEY',
    apiKey: process.env.API_KEY,
  },
  disableOffline: true
})
const mutation = gql`
  mutation NotifyNewTodo($input: TodoInput!) {
    notifyNewTodo(input: $input) {
      id
      description
      categoryId
    }
  }
`;
exports.handler = async event => {
  // 複数来る可能性もあるのでイテレートして処理する
  for (const record of event.Records) {
    if (record.eventName !== 'INSERT') return // この例では作成時のみmutationを呼んでいる
    const item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage)
    await graphqlClient.mutate({
      mutation,
      variables: {
        // `input: item` にするとitemに下記以外が入っているため動作しない
        input: {
          id: item.id,
          description: item.description,
          categoryId: item.categoryId,
        }
      },
    })
  }
};
amplify pushで環境を更新する。
これで動作確認ができるので、AppSyncコンソールのQuery画面でsubscriptionを実行して待機状態にする。
DynamoDBのコンソールなどから対象のテーブルにデータを入れて、このQuery画面でデータを受け取れたら成功。
もし、エラーになったり、データが受け取れない場合は、下記含め、フィールドの指定や定義が正しいか、気をつけてチェックしてみると良い。
- 上記JSコード内のmutation文字列で指定している引数に無いフィールドをSubscriptionでリクエストしていないか
- スキーマ定義にないフィールドをこのmutation文字列で指定していないか
また、デバッグは毎回 amplify push するのではなく、Lambdaコンソールであれこれ試してみると効率が良い。
サブスクライブするデータを絞り込む
現状だと、テーブルへの変更は全て通知されるが、subscriptionに引数を渡して受け取る通知を絞り込むことができる。
onNotifyNewTodo(categoryId: ID): Todo @aws_subscribe(mutations: ["notifyNewTodo"])
この場合、categoryIdにパーティションキーが張られている必要があるかもしれない。テーブルを定義しているAmplifyプロジェクトの方でこのフィールドを引数にしたsubscriptionを作成するとそれをパーティションキーにしたインデックスが自動的に作成されるのかもしれない。が、未確認。
StreamのARNをパラメータ化する
Streamを環境毎に切り替える場合、パラメータ化して対応できる。
環境設定ファイルの中の、StreamでトリガしているLambda (function) の設定にARNを示すパラメータを追加する。
"function": {
  ...
  "myfunction": {
    ...
    "todoStreamArn":  "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxxx:table/Todo-wwwwwwwwwwwww-dev/stream/2021-08-04T10:26:56.947"
  }
}
amplify/backend/function/myfunction/myfunction-cloudformation-template.json の Parameters にこのパラメータを定義する。そして、それをLambdaTriggerPolicyから参照してやる。
"Parameters": {
  ...
  "todoStreamArn": {  // <-- これ
    "Type": "String"
  }
},
...
"Resources": {
  ...
  "LambdaTriggerPolicy": {
      ...
      "PolicyDocument": {
        ...
        "Statement": [
          {
            ...
            "Resource": {
              "Ref": "todoStreamArn"  // <-- これ
            }
          }
        ]
      }
    }
  },
  "LambdaEventSourceMapping": {
    ...
    "Properties": {
      ...
      "EventSourceArn": {
        "Ref": "todoStreamArn"  // <-- これ
      },
      ...
    }
  }  
}
以上。
参考
AWS AppSync Subscriptions with DynamoDB Streams, Lambda and Serverless

