LoginSignup
3
5

More than 1 year has passed since last update.

他で管理されているDynamoDBへの更新をAmplifyアプリでサブスクライブする

Last updated at Posted at 2021-08-11

概要

他で(例えば他のAmplifyプロジェクトで)作成したDynamoDBのテーブルへのデータ挿入や変更などを、Amplifyアプリで検知してリアルタイムで利用する仕組みを構築するメモ。

同じAmplifyプロジェクト内で@modelで作成したテーブルであればsubscriptionを定義してアプリから呼び出すだけで簡単にリアルタイムでデータを取得できるが、Amplifyプロジェクトの外で作成されたテーブルのデータ更新を検知することはできない(2021/08/11現在)。

なので、対象のDynamoDBテーブルのStreamをLambdaで受け取って、Lambdaに実装中のAmplifyプロジェクトのmutationを呼び出させて、そのmutationをsubscribeすることでAmplifyアプリで変更を受け取る、ということをする。

Untitled Diagram (1).png

通知を受け取るだけでなく、データを取りに行く方法も既存DynamoDBをAmplifyで使うメモにまとめている。もしかしたら、こちらを先にやっておく必要があるかもしれないが、未確認。

前提

  • AmplifyでGraphQLで開発中
  • 開発中のAmplifyのGraphQLエンドポイントはAPI KEYでアクセス可能

手順

以下は記述がない限り全て開発中のAmplify側での作業とする。

準備

下記の情報が必要になるので控えておく。

  • 通知を受け取りたい対象のDynamoDBテーブルのStreamのARN
    DynamoDBコンソールのテーブルのOverviewタブの「DynamoDB stream details」に記載されている。
  • 開発中のAmplifyのAppSync API URL
    AppSyncコンソールから本アプリのAppSyncを開いてSettingsページの「API Details」に記載されている。
  • API key
    同じくSettingsページの「API Details」に記載されている。

ダミーデータソースの定義

上の図左側のラムダからAmplifyへのmutationではデータは書き換える必要が無いため、通常はDynamoDBなどであるデータソースを、Noneタイプを使用して実態のないダミーを指定する。

amplify/backend/api/myapi/stacks/CustomResources.json
"Resources": {
  ...
  "NoneDataSource": {
    "Type": "AWS::AppSync::DataSource",
    "Properties": {
      "ApiId": { "Ref": "AppSyncApiId" },
      "Name": "MyNoneDataSource",
      "Type": "NONE",
      "ServiceRoleArn": { "Fn::GetAtt": [ "DynamoDBDataSourceRole", "Arn" ] }
    }
  },
  ...
}

スキーマの作成

Lambdaから呼び出すMutationを定義する。Mutationの定義に必要な要素も定義する。

amplify/backend/api/myapi/schema.graphql
type Mutation {
  notifyNewTodo(input: TodoInput!): Todo
}

type TodoInput {
  id: ID
  description: String!
  categoryId: ID
}

type Todo {
  id: ID!
  description: String!
  categoryId: ID
}

このmutationの呼び出しを検知するsubscriptionを定義する

amplify/backend/api/myapi/schema.graphql
type Subscription {
  onNotifyNewTodo: Todo @aws_subscribe(mutations: ["notifyNewTodo"])
}

リゾルバの作成

Mutationのリゾルバを作成する。ここでは、Lambdaから notifyNewTodo mutationを呼び出すときに渡される引数をそのままSubscriberに渡したいので、notifyNewTodo mutationの引数の input$ctx.args.input で取得して payload に設定している。

amplify/backend/api/myapi/resolvers/Mutation.notifyNewTodo.req.vtl
{
    "version": "2018-05-29",
    "payload": $util.toJson($ctx.args.input)
}

レスポンスのリゾルバはそのまま返す。

amplify/backend/api/myapi/resolvers/Mutation.notifyNewTodo.res.vtl
$util.toJson($ctx.result)

リゾルバの登録

CustomResource.jsonでこれらのリゾルバを登録する。DataSourceNameでダミーデータソースの名前を指定して、mutationが何もデータを変更しないようにしている。

amplify/backend/api/myapi/stacks/CustomResources.json
"Resources": {
  ...
  "MutationNotifyNewTodoResolver": {
    "Type": "AWS::AppSync::Resolver",
    "Properties": {
      "ApiId": { "Ref": "AppSyncApiId" },
      "DataSourceName": { "Fn::GetAtt": [ "NoneDataSource", "Name" ] },
      "TypeName": "Mutation",
      "FieldName": "notifyNewTodo",
      "RequestMappingTemplateS3Location": {
        "Fn::Sub": [
          "s3://${S3DeploymentBucket}/${S3DeploymentRootKey}/resolvers/Mutation.notifyNewTodo.req.vtl",
          {
            "S3DeploymentBucket": { "Ref": "S3DeploymentBucket"},
            "S3DeploymentRootKey": { "Ref": "S3DeploymentRootKey" }
          }
        ]
      },
      "ResponseMappingTemplateS3Location": {
        "Fn::Sub": [
          "s3://${S3DeploymentBucket}/${S3DeploymentRootKey}/resolvers/Mutation.notifyNewTodo.res.vtl",
          {
            "S3DeploymentBucket": { "Ref": "S3DeploymentBucket" },
            "S3DeploymentRootKey": { "Ref": "S3DeploymentRootKey" }
          }
        ]
      }
    }
  },
  ...
}

Lambdaの作成

今回はLambdaにJavaScriptを例として使用する。
で、JSの場合依存ファイルが多すぎてLambda layerで依存ライブラリをLambda関数から切り離さないとLambdaコンソールのエディタが使用できなくなるので、まずはLayerを作成する。作成方法はこちらの記事に記載している。必要になるパッケージは下記の通り3つ。

{
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "dependencies": {
    "aws-appsync": "^4.1.1",
    "graphql-tag": "^2.12.5",
    "node-fetch": "^2.6.1"
  },
  "devDependencies": {}
}

次に、Lambda関数を作成。amplify add functionコマンドでLambda functionを選んでLambda関数の設定ファイルを作成する。

  • function templateにはLambda triggerを選択
    • トリガにはAmazon DynamoDB Streamを選択
    • 「Provide the ARN of DynamoDB stream directly」を選択
    • 準備で控えた通知を受け取りたい対象のDynamoDBテーブルのStreamのARNを入力
  • Advancedの設定で先程設定したLambda layerを指定する。Spaceキーで選択なので注意。(後でamplify update functionからも可能)
  • 準備で控えた情報を環境変数として登録
    • APP_SYNC_API_URL: 開発中のAmplifyのAppSync API URL
    • API_KEY: 開発中のAmplifyのAPI
% amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: notifynewtodo
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Lambda trigger
? What event source do you want to associate with Lambda trigger? Amazon DynamoDB Stream
? Choose a DynamoDB event source option Provide the ARN of DynamoDB stream directly
? Provide the ARN of Amazon DynamoDB stream arn:aws:dynamodb:ap-northeast-1:803093504115:table/Todo-xxxxxxxxxxxxxxxxx-dev/stream/2021-08-04T10:26:56.947
...

Lambdaのロジックを実装

このLambda関数がDynamoDBのStreamで、DBデータに変化があった場合に呼び出される。このLambdaから上記で定義したmutationを呼んでやる。

const AWS = require('aws-sdk')
const appsync = require('aws-appsync')
const gql = require('graphql-tag')
const fetch = require('node-fetch')

if (!globalThis.fetch) globalThis.fetch = fetch

const graphqlClient = new appsync.AWSAppSyncClient({
  url: process.env.APP_SYNC_API_URL,
  region: process.env.AWS_REGION, // AmplifyのリージョンとLambdaのリージョンは同じ前提
  auth: {
    type: 'API_KEY',
    apiKey: process.env.API_KEY,
  },
  disableOffline: true
})

const mutation = gql`
  mutation NotifyNewTodo($input: TodoInput!) {
    notifyNewTodo(input: $input) {
      id
      description
      categoryId
    }
  }
`;

exports.handler = async event => {
  // 複数来る可能性もあるのでイテレートして処理する
  for (const record of event.Records) {
    if (record.eventName !== 'INSERT') return // この例では作成時のみmutationを呼んでいる

    const item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage)

    await graphqlClient.mutate({
      mutation,
      variables: {
        // `input: item` にするとitemに下記以外が入っているため動作しない
        input: {
          id: item.id,
          description: item.description,
          categoryId: item.categoryId,
        }
      },
    })
  }
};

amplify pushで環境を更新する。

これで動作確認ができるので、AppSyncコンソールのQuery画面でsubscriptionを実行して待機状態にする。
DynamoDBのコンソールなどから対象のテーブルにデータを入れて、このQuery画面でデータを受け取れたら成功。

もし、エラーになったり、データが受け取れない場合は、下記含め、フィールドの指定や定義が正しいか、気をつけてチェックしてみると良い。
* 上記JSコード内のmutation文字列で指定している引数に無いフィールドをSubscriptionでリクエストしていないか
* スキーマ定義にないフィールドをこのmutation文字列で指定していないか

また、デバッグは毎回 amplify push するのではなく、Lambdaコンソールであれこれ試してみると効率が良い。

サブスクライブするデータを絞り込む

現状だと、テーブルへの変更は全て通知されるが、subscriptionに引数を渡して受け取る通知を絞り込むことができる。

onNotifyNewTodo(categoryId: ID): Todo @aws_subscribe(mutations: ["notifyNewTodo"])

この場合、categoryIdにパーティションキーが張られている必要があるかもしれない。テーブルを定義しているAmplifyプロジェクトの方でこのフィールドを引数にしたsubscriptionを作成するとそれをパーティションキーにしたインデックスが自動的に作成されるのかもしれない。が、未確認。

StreamのARNをパラメータ化する

Streamを環境毎に切り替える場合、パラメータ化して対応できる。
環境設定ファイルの中の、StreamでトリガしているLambda (function) の設定にARNを示すパラメータを追加する。

amplify/team-provider-info.json
"function": {
  ...
  "myfunction": {
    ...
    "todoStreamArn":  "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxxx:table/Todo-wwwwwwwwwwwww-dev/stream/2021-08-04T10:26:56.947"
  }
}

amplify/backend/function/myfunction/myfunction-cloudformation-template.json の Parameters にこのパラメータを定義する。そして、それをLambdaTriggerPolicyから参照してやる。

myfunction-cloudformation-template.json
"Parameters": {
  ...
  "todoStreamArn": {  // <-- これ
    "Type": "String"
  }
},
...
"Resources": {
  ...
  "LambdaTriggerPolicy": {
      ...
      "PolicyDocument": {
        ...
        "Statement": [
          {
            ...
            "Resource": {
              "Ref": "todoStreamArn"  // <-- これ
            }
          }
        ]
      }
    }
  },
  "LambdaEventSourceMapping": {
    ...
    "Properties": {
      ...
      "EventSourceArn": {
        "Ref": "todoStreamArn"  // <-- これ
      },
      ...
    }
  }  
}

以上。

参考

AWS AppSync Subscriptions with DynamoDB Streams, Lambda and Serverless

3
5
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5