7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

NTTテクノクロスAdvent Calendar 2023

Day 5

AppSyncのSubscriptionをPythonで動作確認

Last updated at Posted at 2023-12-04

この記事はNTTテクノクロスアドベントカレンダー2023 の3日目の記事です。

NTT-TXの定行です。普段はARやVR関連の業務をしたり、たまにデータ解析の業務をしています。

今年もアドベントカレンダーに参加させていただくことになりました。今年は業務絡みで学習したAppSyncについて構築手順や検証方法を備忘録的に残しておきたいと思います。今回の検証環境はaws-sam-cliで一括で構築します。興味がある方はぜひ試してみてください。接続検証にはPythonを使用しています。

1.そもそもAppSyncってなに?

AppSyncはAWSが提供しているサーバーレスの GraphQL および Pub/Sub API 用のサービスです。

GraphQLはクライアント端末がWeb基盤と連携する手段として使用できます。クライアント端末がWeb基盤と連携する方法としてREST APIが有名ですが、GraphQLはREST APIに比べていくつかの利点があります。

  • 柔軟性と効率的なデータ取得:
    GraphQLではクライアントが必要なデータだけをリクエストすることができます。これによりネットワークトラフィックが削減され、パフォーマンスが向上します。

  • リアルタイムデータ更新:
    GraphQLはリアルタイムデータ更新をサポートしています。これにより、クライアントはデータの変更を即座に取得できます。

2.検証環境構築

検証環境はAWS SAMを使って構築します。AWS SAMはCloudFormationを拡張したサービスで、AWSのインフラストラクチャをコードで管理するためのサービスです。

AWS SAMテンプレートは、CloudFormationテンプレートを拡張したものであり、AWS SAM CLI(Command Line Interface)を使用してローカルでの開発やデバッグが容易になります。AWS SAMテンプレートは通常のCloudFormationテンプレートと互換性があり、AWS SAM CLIを使用しなくても通常のCloudFormationスタックとしてデプロイできます。AppSyncに関してはAWS SAMがすべての機能をサポートしているわけではないので、CloudFormationの記述で行っています。

今回はDynamoDBをリゾルバで使用するシンプルな構成で実施します。データの設定(Mutation)、データの取得(Query)、データの通知(Subscription)の確認をします。

samテンプレートの記載はこんな感じです。

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'

#テンプレートの説明
Description: "appsync-test"

#共通パラメータ
Globals:
  Function:
    Runtime: python3.9
    MemorySize: 512
    Timeout: 10
    Tracing: Active
  Api:
    TracingEnabled: true

#リソース設定
Resources:

  #appsyncの定義  
  AppSyncApi:
    Type: 'AWS::AppSync::GraphQLApi'
    Properties:
      Name: 'AppSync Sam Template Test'
      AuthenticationType: API_KEY

  #API-KEYの定義
  ApiKeyAppSync:
    Type: AWS::AppSync::ApiKey
    Properties:
      ApiId: !GetAtt AppSyncApi.ApiId

  # DynamoDBのテーブルを作成
  MyDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: MyDynamoDBTable
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: HASH
      BillingMode: PAY_PER_REQUEST

  # DynamoDBのテーブルとLambda関数にアクセスするためのロール作成
  AppSyncDynamoDBRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: AppSyncDynamoDBRole
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - "appsync.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: AppSyncDynamoDBAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - "dynamodb:*" #本来は権限を絞って指定するべき
                Resource:
                  - !GetAtt MyDynamoDBTable.Arn

  MyDataSource:
    Type: 'AWS::AppSync::DataSource'
    Properties:
      ApiId: !GetAtt AppSyncApi.ApiId
      Name: MyDataSource
      Type: AMAZON_DYNAMODB
      ServiceRoleArn: !GetAtt AppSyncDynamoDBRole.Arn
      DynamoDBConfig:
        AwsRegion: ap-northeast-1 
        TableName: MyDynamoDBTable

  # スキーマ定義
  SchemaMyApp:
    Type: AWS::AppSync::GraphQLSchema
    Properties:
      ApiId: !GetAtt AppSyncApi.ApiId
      Definition: >
        schema {
          query: Query
          mutation: Mutation
          subscription: Subscription
        }

        type Query {
          getItem(id: ID!): Item
        }

        type Mutation {
          putItem(message: String!): Item
        }

        type Subscription {
          putItem: Item
          @aws_subscribe(mutations: ["putItem"])
        }

        type Item {
          id: ID!
          message: String
        }

  MyGetItemResolver:
    Type: 'AWS::AppSync::Resolver'
    Properties:
      ApiId: !GetAtt AppSyncApi.ApiId
      TypeName: Query
      FieldName: getItem
      DataSourceName: !GetAtt MyDataSource.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "operation": "GetItem",
          "key": {
            "id": $util.dynamodb.toDynamoDBJson($context.arguments.id)
          }
        }
      ResponseMappingTemplate: |
        $util.toJson($context.result)

  MyPutItemResolver:
    Type: 'AWS::AppSync::Resolver'
    Properties:
      ApiId: !GetAtt AppSyncApi.ApiId
      TypeName: Mutation
      FieldName: putItem
      DataSourceName: !GetAtt MyDataSource.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "operation": "PutItem",
          "key": {
            "id": $util.dynamodb.toDynamoDBJson($util.autoId()),
            "message": $util.dynamodb.toDynamoDBJson($context.arguments.message)
          }
        }
      ResponseMappingTemplate: |
        $util.toJson($context.result)

構築に必要になるのはこのテンプレートファイル一つです。あとはAWS SAMのコマンドでビルドとデプロイを実行します。今回はCloud9上で実施しました。テンプレートファイルを作成した階層で以降を実施します。

ビルド
sam build

ビルドに成功したら後はデプロイのみです。テンプレート内でRoleを作成しているので"--capabilities CAPABILITY_NAMED_IAM"を付与しています。"--guided"は2回目以降は省略可です。

デプロイ
sam deploy --capabilities CAPABILITY_NAMED_IAM --guidead

以上で環境構築は終了です。

3.データ設定(Mutation)とデータ取得(Query)の確認

MutationとQueryの動作確認を行います。Pythonを使って動作確認をします。AppSyncはクライアント側である程度データの取得を自由に行えます。今回はMutationで"message"を登録して、割り振られた"id"を元にQueryで"message"を検索します。Pythonのコードは以下の通りです。

mutation&query
from graphqlclient import GraphQLClient
import json

# AppSyncエンドポイントのURL(作成したものを記載)
api_url = "https://xxxxxxxxx.appsync-api.ap-northeast-1.amazonaws.com/graphql"

# AppSyncの(作成したものを記載)
api_key = 'xxx-xxxxxxxxxxxxxxxxxxxxxxx'

if __name__ == '__main__':

    # AppSyncのエンドポイントとAPIキーを使ってGraphQLクライアントを作成
    c = GraphQLClient(api_url)
    c.inject_token(api_key, 'X-Api-Key')

    ################################################
    # mutationでmessageを登録する
    ################################################

    # mutation用のクエリを作成
    mutation = """
        mutation TestMutation ( $set_message: String!){
            putItem(message: $set_message) {
            id
            message
            }
        }
    """
    #設定したい値を設定
    variables = {
        "set_message": "hello world",
    }
        
    #クエリを送信
    result = c.execute(mutation,variables=variables)

    #stringをdict型に変換
    result_dict = json.loads(result)
    print("mutation result=",result_dict)

    search_id = result_dict["data"]["putItem"]["id"]

    ################################################
    # queryでidからmessageを取得する
    ################################################

    # mutation用のクエリを作成
    query = """
        query TestQuery($search_id: ID!) {
        getItem(id: $search_id) {
            id
            message
            }
        }
    """

    #設定したい値を設定
    variables = {
        "search_id": search_id,
    }

    #クエリを送信
    result = c.execute(query,variables=variables)
    print("query result=",result)

実行すると"message"の登録に成功して、idを元に登録した"message"を取得できている事がわかります。

デプロイ
mutation result= {'data': {'putItem': {'id': '8e59fa44-1b54-40f3-a9d6-4e5096c157b7', 'message': 'hello world'}}}
query result= {"data":{"getItem":{"id":"8e59fa44-1b54-40f3-a9d6-4e5096c157b7","message":"hello world"}}}

4.データ通知(Subscription)の確認

最後にsubscriptionの確認です。こちらもpythonで動作確認を行います。実装は以下のURLの情報を参考にしています。今回の構成ではMutationのputItem実行されたら通知されるように構築してあります。

実装コードはこんな感じです。MutationやQueryと違ってAppSyncをWebsocketのセッションを張る事になります。ここで使用しているweboscket用のモジュールは"webscoket-client"なので注意していください。

subscription
import websocket
import time,uuid
import base64
import json


# AppSyncエンドポイントのURL(作成したものを記載)
api_url = "https://xxxxxxxxx.appsync-api.ap-northeast-1.amazonaws.com/graphql"

# AppSyncの(作成したものを記載)
api_key = 'xxx-xxxxxxxxxxxxxxxxxxxxxxx'

wss_url = api_url.replace("https","wss").replace("appsync-api","appsync-realtime-api")

#指定した文字列を除去するコード
def remove_str(text, remove_str):
    return text.replace(remove_str, '')


#ヘッダー情報作成
header_dict = dict()
header_dict['host'] = appsync_host
header_dict["x-api-key"] = api_key

#dict型を文字列に変換
header_str = json.dumps(header_dict)

#Base64エンコード
header_bytes = base64.b64encode(header_str.encode("utf-8"))
header_str = header_bytes.decode("utf-8")
connection_url = wss_url + '?header=' + header_str + '&payload=e30='

print("connection_url:", connection_url)
print("")

# GraphQL subscription Registration object
GQL_SUBSCRIPTION = json.dumps({
        'query': 'subscription MySubscription {putItem{id,message}}',
        'variables': {}
})

def make_subscription_query(id):

    register = {
        'id': id,
        'payload': {
            'data': GQL_SUBSCRIPTION,
            'extensions': {
                'authorization': {
                    'host': appsync_host,
                    'x-api-key': api_key,
                }
            }
        },
        'type': 'start'
    }

    #dict型を文字列に変換
    return json.dumps(register)


def create_on_open(message):
    def on_open(ws):
        print("WebSocket connection opened")
        print("message:", message)
        hogehoge = message 
        ws.send(hogehoge)
    return on_open

def on_message(ws,message):
    print("on message:", message)

def on_error(ws, message):
    print("on error message:", message)

def on_close(ws, message):
    print("on close message:", message)

if __name__ == "__main__":

    #ランダムなハッシュ値を作成
    sub_id = str(uuid.uuid4())
    query = make_subscription_query(sub_id)

    #websocket作成
    ws = websocket.WebSocketApp(connection_url,subprotocols=["graphql-ws"])
    ws.on_open = create_on_open(query)
    ws.on_close = on_close
    ws.on_message = on_message

    ws.run_forever()

Pythonを実行後に、前の節で作成したMutationとQuery確認用のPythonコードを実行すると、Mutationした結果がSubscriptionとして通知されます。Subscriptionを使用することで、リアルタイムで通知を受けることができます。

subscription
appsync_host: 5zq5irujtnffvjkuta4ils5rpi.appsync-api.ap-northeast-1.amazonaws.com
connection_url: wss://5zq5irujtnffvjkuta4ils5rpi.appsync-realtime-api.ap-northeast-1.amazonaws.com/graphql?header=eyJob3N0IjogIjV6cTVpcnVqdG5mZnZqa3V0YTRpbHM1cnBpLmFwcHN5bmMtYXBpLmFwLW5vcnRoZWFzdC0xLmFtYXpvbmF3cy5jb20iLCAieC1hcGkta2V5IjogImRhMi02YmYzNTd5a3BuZmJsZXlzMm94cGNraDR6cSJ9&payload=e30=

WebSocket connection opened
message: {"id": "8f9e73fe-d915-43d5-9281-688cef5523d5", "payload": {"data": "{\"query\": \"subscription MySubscription {putItem{id,message}}\", \"variables\": {}}", "extensions": {"authorization": {"host": "5zq5irujtnffvjkuta4ils5rpi.appsync-api.ap-northeast-1.amazonaws.com", "x-api-key": "da2-6bf357ykpnfbleys2oxpckh4zq"}}}, "type": "start"}
on message: {"id":"8f9e73fe-d915-43d5-9281-688cef5523d5","type":"start_ack"}
on message: {"type":"ka"}
on message: {"id":"8f9e73fe-d915-43d5-9281-688cef5523d5","type":"data","payload":{"data":{"putItem":{"id":"54bccb72-fa90-444f-83ac-42731b788c07","message":"hello world"}}}}

5.最後に

AWS SAMを使ったAppSyncの検証環境構築とPythonを使った動作確認方法について書きました。今後、AppSyncを使ったWebアプリケーションは増えていくと思います。興味を持った方はぜひこの記事を参考にトライしてみてください。

明日移行も記事が公開予定です。

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?