0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

運用が楽なQuickSightダッシュボードを作る_6(データセット自動更新+α編)

Posted at

はじめに

前回、ファイルが更新されたらQuickSightデータソースを自動更新する仕組みを記事にしました。

ですが完了を監視したり通知したりはできていなかったので、それらの仕組みを含んだ構成を作成しました。

構成

image.png

大まかな流れは以下。

  1. S3のファイル更新をEventBridgeで検知し、Step Functionsを実行
  2. LambdaでQuickSight更新処理の前準備
  3. QuickSightにデータセットの更新指示
  4. 終わったかを検知
  5. 終了内容を見て、SNSに通知

コード

「共通リソース部」と「自動更新部」で分けています。

  • 共通リソース部
    • 他の仕組みでも使いそうなリソース
    • CloudFormationで定義
      • 運用メール通知用SNSトピック
      • 設定値格納用DynamoDB
  • 自動更新部
    • 自動更新にのみ使うリソース
    • AWS SAMで定義
      • EventBridge
      • Step Functions
      • Lambda

共通リソース部

プロジェクト単位の設定値格納場所とSNSを定義します。

AWSTemplateFormatVersion: 2010-09-09
Description: Create Common Resource On low Burden Operation Dashboard.
Parameters:
  ProjectId:
    Type: String
    Default: "sampleproject"

Resources: 
  # SNS
  OparationTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: !Sub "${ProjectId}-operation-topic"
      Tags:
          - Key: "usefor"
            Value: !Sub "send e-mail to ${ProjectId} operator"
  
  # DynamoDB
  ParameterTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub "${ProjectId}-parameters"
      AttributeDefinitions:
        - AttributeName: S3uri
          AttributeType: S
      KeySchema:
        - AttributeName: S3uri
          KeyType: HASH # パーティションキー
      # option. If not specified, the default is PROVISIONED.
      BillingMode: PAY_PER_REQUEST
      Tags:
          - Key: "usefor"
            Value: !Sub "store using on ${ProjectId}"
  
Outputs:
  OparationTopic:
    Value: !Ref OparationTopic
    Export:
      Name: !Sub "${ProjectId}-operation-topic-arn"
  ParameterTable:
    Value: !Ref ParameterTable
    Export:
      Name: !Sub "${ProjectId}-parameters-tablename"
  ParameterTableArn:
    Value: !GetAtt ParameterTable.Arn
    Export:
      Name: !Sub "${ProjectId}-parameters-tablename-arn"

DynaomoDBはBillingMode: PAY_PER_REQUESTを指定し、従量課金にしています。
Outputは、自動更新部作成時に参照するために使います。

DynamoDBには

  • キーとして、ファイル格納フォルダパス
  • datasetsに、更新するデータセット群の情報を格納
  • (任意)useforに、用途をメモ書き

というデータを登録します。
AWS CLIから登録する際のサンプルファイルは以下になります。

SampleIngestionParameter.yaml
# aws --version が v2 で実行可能(Cloudshellで可能、Cloud9は不可)
# aws ddb put <tablename> file://SampleIngestionParameter.yaml
- 
  S3uri: "s3://<<BUCKET_NAME>>/dashboard/data/tokyo_covid19/"
  usefor: "Create and Monitor QuickSight Ingestion."
  datasets:
    - 
      datasetid : "hogehoge"
      datasetname : "moemoe"
    - 
      datasetid : "togetoge"
      datasetname : "ageage"

自動更新部

AWS SAMで作っています。
フォルダ階層は以下のようにしています。

ingestion-sam
├── functions # Lambdaの場所
│   ├── check_ingestion_status
│   │   └── app.py
│   └── product_ingestion_parameters
│       └── app.py
├── statemachine # Step Functionsの場所
│   └── sfn.asl.json
└── template.yaml

Step Functions

Step Functionsのワークフローは以下になります。
Lambdaは2か所で、他はStep Functionsを組み合わせて作成しました。

image.png

  1. 前処理Lambdaを実行
    1. eventのファイル情報を、後処理で使う形に成型して格納
    2. 更新処理に使うID用の数値列を発番して格納
  2. DynaomoDBに検索
  3. 検索結果の配列部分を並列にして、以下の処理を実行
    1. QuickSightのデータ処理に更新指示
      1. IDは、"{データセット名}-{発番した数値列}"
    2. 数秒待機
    3. 更新指示の完了をチェック
      1. Statusが継続中であれば、”数秒待機”に戻る
      2. Statusが終了していれば、後続処理へ
    4. Passで何もせず、並列処理部分を終了
      1. Choiceで並列処理は終了できなかったため置いたもの、加工等は無し
  4. 各並列処理結果が配列で帰ってくるため、中身をチェック
  5. 結果をSNSに通知
  6. チェック結果によって、正常終了か異常終了かに分別

定義のコードは以下。DynamoDBの取得結果が{"key":{"データ型":"value"}}のように癖がありますので、その点注意。

statemachine/sfn.asl.json
{
  "Comment": "A description of my state machine",
  "StartAt": "Lambda Invoke",
  "States": {
    "Lambda Invoke": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "${ProdParamsFunction}"
      },
      "Next": "DynamoDB GetItem"
    },
    "DynamoDB GetItem": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "faci-parameters",
        "Key": {
          "S3uri": {
            "S.$": "$.targetPrefix"
          }
        }
      },
      "Next": "Map",
      "ResultSelector": {
        "datasets.$": "$.Item.datasets.L"
      },
      "ResultPath": "$.item"
    },
    "Map": {
      "Type": "Map",
      "InputPath": "$",
      "ItemsPath": "$.item.datasets",
      "MaxConcurrency": 0,
      "Parameters": {
        "dataset.$": "$$.Map.Item.Value",
        "ingestionidSuffix.$": "$.ingestionidSuffix"
      },
      "Iterator": {
        "StartAt": "CreateIngestion",
        "States": {
          "CreateIngestion": {
            "Type": "Task",
            "Next": "Wait",
            "Parameters": {
              "AwsAccountId": "${AwsAccountId}",
              "DataSetId.$": "$.dataset.M.datasetid.S",
              "IngestionId.$": "States.Format('{}-{}', $.dataset.M.datasetname.S,$.ingestionidSuffix)"
            },
            "Resource": "arn:aws:states:::aws-sdk:quicksight:createIngestion",
            "ResultPath": null
          },
          "Wait": {
            "Type": "Wait",
            "Next": "DescribeIngestion",
            "Seconds": 10
          },
          "DescribeIngestion": {
            "Type": "Task",
            "Parameters": {
              "AwsAccountId": "${AwsAccountId}",
              "DataSetId.$": "$.dataset.M.datasetid.S",
              "IngestionId.$": "States.Format('{}-{}', $.dataset.M.datasetname.S,$.ingestionidSuffix)"
            },
            "Resource": "arn:aws:states:::aws-sdk:quicksight:describeIngestion",
            "ResultPath": "$.ReturnDescribeIngestion",
            "Next": "Check Status"
          },
          "Check Status": {
            "Type": "Choice",
            "Choices": [
              {
                "Or": [
                  {
                    "Variable": "$.ReturnDescribeIngestion.Ingestion.IngestionStatus",
                    "StringEquals": "INITIALIZED"
                  },
                  {
                    "Variable": "$.ReturnDescribeIngestion.Ingestion.IngestionStatus",
                    "StringEquals": "QUEUED"
                  },
                  {
                    "Variable": "$.ReturnDescribeIngestion.Ingestion.IngestionStatus",
                    "StringEquals": "RUNNING"
                  }
                ],
                "Next": "Wait"
              }
            ],
            "Default": "Pass"
          },
          "Pass": {
            "Type": "Pass",
            "End": true
          }
        }
      },
      "Next": "Check ingestion status"
    },
    "Check ingestion status": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "${ChkIngestionStatusFunction}",
        "Payload.$": "$"
      },
      "Next": "SNS Publish"
    },
    "SNS Publish": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "${TopicArn}",
        "Subject.$": "$.Payload.Subject",
        "Message.$": "$.Payload.Message"
      },
      "ResultPath": null,
      "Next": "Choice"
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Payload.Status",
          "BooleanEquals": true,
          "Next": "Success"
        }
      ],
      "Default": "Fail"
    },
    "Success": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail"
    }
  }
}

"IngestionId.$": "States.Format('{}-{}', $.dataset.M.datasetname.S,$.ingestionidSuffix)"のところでは、組み込み関数を使ってIdを生成しています。使い方は公式に。

前処理Lambda

以下を行っています。

  • eventから抽出したバケット名とファイルのkeyから、データ格納フォルダのS3URIを生成
  • 更新処理用のIDとして、現在時刻を文字列化
functions\product_ingestion_parameters\app.py
import re
import datetime

def lambda_handler(event, context):

  # eventから必要な情報を変数に格納
  targetBucket = event['detail']['bucket']['name']
  targetKey = event['detail']['object']['key']
  targetPrefix = re.sub('^(.+/)[^/]+$', r'\1', targetKey)
  targetS3uriPrefix = "s3://%s/%s"%(targetBucket,targetPrefix)
  
  # 現在日時でIngestionIdの末尾を作成
  rtnIngestionidSuffix = datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')

  return {
    'targetPrefix':targetS3uriPrefix,
    'ingestionidSuffix':rtnIngestionidSuffix
  }

更新中身チェックLambda

  • 以下の状態をチェックして、正常終了したかと、メール用メッセージを作成します。
    • ingestionStatusがCOMPLETEDで、スキップ行がなければ正常終了
      • スキップ行があればエラーにして、その旨メール用メッセージを生成
    • ingestionStatusがCOMPLETEDでなければエラー
functions\check_ingestion_status\app.py
def lambda_handler(event, context):

  rtnStatus = True
  rtnSubject = ''
  rtnMessage = ''

  for eachDescribe in event:
    datasetName = eachDescribe['dataset']['M']['datasetname']['S']
    ingestionStatus = eachDescribe['ReturnDescribeIngestion']['Ingestion']['IngestionStatus']
    ingestionRowsDropped = eachDescribe['ReturnDescribeIngestion']['Ingestion']['RowInfo']['RowsDropped']
    
    if ingestionStatus == 'COMPLETED':
      if ingestionRowsDropped == 0:
        rtnMessage += "\n\t・データセット%sの更新は成功しました。"%(datasetName)
      else:
        rtnStatus = False
        rtnMessage += "\n\t・データセット%sは、%d行をスキップして更新しました。"%(datasetName,ingestionRowsDropped)
        
    else:
      rtnStatus = False
      rtnMessage += "\n\t・データセット%sの更新は失敗し、ステータス%sを戻しました。"%(datasetName,ingestionStatus)

  # end for

  if rtnStatus:
    rtnSubject = '[SUCCESS]QuickSight更新処理正常終了'
    rtnMessage = 'QuickSightの更新は正常終了しました。' + rtnMessage
  else:
    rtnSubject = '[ERROR]QuickSight更新処理異常終了'
    rtnMessage = 'QuickSightの更新処理に失敗しました。' + rtnMessage

  return {
    'Status': rtnStatus,
    'Subject': rtnSubject,
    'Message': rtnMessage
  }

構成テンプレート

  • Step Functionsに、DynamoDBやSNSの操作権限をアタッチ
  • EventBridgeの部分は、前回とほぼ同じです。
template.yaml
AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Description: SAM Template for Lambda Function

Parameters:
  ProjectId:
    Type: String
    Default: "sampleproject"
  BucketName:
    Type: String
    Default: '<<BUCKET_NAME>>'
  TagetFolderPath:
    Type: String
    Description: use for prefix. End char MUST BE slash(/).
    Default: 'dashboard/data/'

Resources:
  ##################################################
  # Product Ingestion Parameters Function
  ##################################################
  ProdParamsFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${ProjectId}-product-ingestion-parameters"
      RetentionInDays: 3653

  ProdParamsFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "for-lambda-${ProjectId}-product-ingestion-parameters"
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: '/service-role/'
      Policies:
        # CloudWatch
        - PolicyName: write-cloudwatchlogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: !GetAtt ProdParamsFunctionLogGroup.Arn

  ProdParamsFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub "${ProjectId}-product-ingestion-parameters"
      Role: !GetAtt ProdParamsFunctionRole.Arn
      CodeUri: functions/product_ingestion_parameters/
      Handler: app.lambda_handler
      Runtime: python3.9

  ##################################################
  # check ingestion status Function
  ##################################################
  ChkIngestionStatusFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${ProjectId}-check-ingestion-status"
      RetentionInDays: 3653

  ChkIngestionStatusFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "for-lambda-${ProjectId}-check-ingestion-status"
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: '/service-role/'
      Policies:
        # CloudWatch
        - PolicyName: write-cloudwatchlogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: !GetAtt ChkIngestionStatusFunctionLogGroup.Arn

  ChkIngestionStatusFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub "${ProjectId}-check-ingestion-status"
      Role: !GetAtt ChkIngestionStatusFunctionRole.Arn
      CodeUri: functions/check_ingestion_status/
      Handler: app.lambda_handler
      Runtime: python3.9
      Timeout: 900

  ##################################################
  # Step Functions
  ##################################################
  StateMachineLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName : !Sub "/aws/states/${ProjectId}-create-and-check-ingestion"

  StateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      Name: !Sub "${ProjectId}-create-and-check-ingestion"
      DefinitionUri: statemachine/sfn.asl.json
      DefinitionSubstitutions:
        AwsAccountId: !Ref AWS::AccountId
        ProdParamsFunction: !Ref ProdParamsFunction
        ChkIngestionStatusFunction: !Ref ChkIngestionStatusFunction
        TopicArn:
          Fn::ImportValue:
            !Sub "${ProjectId}-operation-topic-arn"
      Role: !GetAtt StateMachineRole.Arn
      Logging:
        Level: ALL
        IncludeExecutionData: True
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt StateMachineLogGroup.Arn

  StateMachineRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "for-statemachine-${ProjectId}-create-and-check-ingestion"
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        # CloudWatch
        - PolicyName: write-cloudwatchlogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogDelivery'
                  - 'logs:GetLogDelivery'
                  - 'logs:UpdateLogDelivery'
                  - 'logs:DeleteLogDelivery'
                  - 'logs:ListLogDeliveries'
                  - 'logs:PutResourcePolicy'
                  - 'logs:DescribeResourcePolicies'
                  - 'logs:DescribeLogGroups'
                Resource: '*'
        # Lambda
        - PolicyName: invoke-lambdafunction
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'lambda:InvokeFunction'
                Resource:
                  - !GetAtt ProdParamsFunction.Arn
                  - !GetAtt ChkIngestionStatusFunction.Arn
        # DynamoDB
        - PolicyName: getitem-dynamodb
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'dynamodb:GetItem'
                Resource:
                  Fn::ImportValue:
                    !Sub "${ProjectId}-parameters-tablename-arn"
        # QuickSight describe ingestion
        - PolicyName: describe-ingestion
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'quicksight:CreateIngestion'
                  - 'quicksight:DescribeIngestion'
                Resource:
                  - !Sub "arn:aws:quicksight:*:${AWS::AccountId}:dataset/*/ingestion/*"
        # SNS
        - PolicyName: publish-snstopic
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'sns:Publish'
                Resource:
                  Fn::ImportValue:
                    !Sub "${ProjectId}-operation-topic-arn"

  ##################################################
  # EventBridge(event Rule)
  ##################################################
  ## Role for invoke step functions
  EventBridgeRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub 
        - "for-eventrule-${StateMachineName}"
        - { StateMachineName:  !GetAtt StateMachine.Name }
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - events.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: '/service-role/'
      Policies:
        - PolicyName: exec-stepfunctions
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'states:StartExecution'
                Resource: !Ref StateMachine
  EventRulePutOnReceive:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub 
        - "for-invoke-statemachine-${StateMachineName}"
        - { StateMachineName:  !GetAtt StateMachine.Name }
      Description: !Sub 
        - "invoke ${StateMachineName} when file put or delete"
        - { StateMachineName:  !GetAtt StateMachine.Name }
      EventBusName: default
      State: ENABLED
      EventPattern:
        source:
          - "aws.s3"
        detail-type:
          - "Object Created"
          - "Object Deleted"
        detail:
          bucket:
            name:
              - !Ref BucketName
          object:
            key:
              - prefix: !Ref TagetFolderPath
      Targets:
        - Arn: !Ref StateMachine
          Id: !GetAtt StateMachine.Name
          RoleArn: !GetAtt EventBridgeRole.Arn

作り方

  1. 共通リソース部をCloudFormationのソースを使って作成
  2. SampleIngestionParameter.yamlを参考に、設定値をDynamoDBに登録
    1. DataSetIdの確認方法は、以前の記事で紹介しています。
  3. SNSで、送信したいメールアドレスを設定
  4. AWS SAMでデプロイ
    1. その際、共通リソース部で指定したProjectIdと同じ文字列を指定
    2. 他パラメータは、監視するバケットとフォルダを指定

使い方

対象のフォルダにファイルを置いたり削除したりするだけ。データセット群が更新され、結果のメールが届くはずです。

おわりに

前回できなかった、完了チェックとメール送信を、Step Functionsを使って実装しました。
"ファイルを置いただけで、ダッシュボードが更新"という運用であれば、だいぶ使い勝手は良いのではないでしょうか。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?