LoginSignup
1
2

More than 1 year has passed since last update.

運用が楽なQuickSightダッシュボードを作る_5(データセット自動更新編)

Posted at

はじめに

CSV群をQuickSightでダッシュボードにして、その後説明文やEメールレポートを追加してきました。

この環境ですが、CSVを変更したら都度、QuickSightのデータセットの更新を手動で行なう必要があります。そのためには、

  1. データセットを閲覧・更新できる権限を持つユーザとして
  2. 都度QuickSightに飛んでログインし
  3. データセットの画面に遷移して
  4. 対象のデータセットを選択し
  5. 更新処理を実行する

という手間が発生します。
これらを省くため、”対象のフォルダに変更(ファイルの追加、削除)があったら、自動でデータセット更新処理を実行する”、という仕組みを入れてみました。

よいこと

  • CSVを置いたら、データセットの更新指示が自動で作られる。
    • ファイルを更新したり、削除しても同様。
    • 運用ユーザに強い権限を与えずに済む。
  • (今回の仕組みを構成するのには、)1ファイルで完結する。

よくないこと

  • データセット更新が正常に完了したことは保証されない。
    • 更新指示を行うだけで、完了までは監視していない。
  • 完了しても通知なし。
    • エラーになった場合は、所有者にメールが送信される機能があります。
      image.png
      • これを入れてエラーは気づけますが、正常終了時は連絡なしです。
        • 「誤ったファイルを置いてスキップ行が発生する」場合も、通知はされません。
  • 設定値は外部に出しておらず、Lambda関数のコード中に直接記述します。
    • 変更都度、スタックの更新が必要です。

構成

以下のような構成図になります。
image.png

  1. 対象の場所をEventBridgeで監視
  2. 変更があったらLambdaを起動
  3. Lambdaから、定義したデータセットを更新するようQuickSightに指示

CloudFormation

個々のポイントを説明し、最後に全体を載せます。拙い英語はご容赦ください。

パラメータ部
Parameters:
  LambdaFunctionName:
    Type: String
  BucketName:
    Type: String
  TagetFolderPath:
    Type: String
    Description: use for prefix. End char MUST BE slash(/).
Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      -
        Label:
          default: User Setting.
        Parameters:
          - LambdaFunctionName
          - BucketName
          - TagetFolderPath
  • Lambda関数名と、監視対象のパスを「バケット」と「フォルダ」で分けました。
  • Metadataはコンソール表示時の並び順を指定しています。
Lambda関数出力先ロググループ
  FunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${LambdaFunctionName}"
      RetentionInDays: 3653   # 未指定時は「失効しない」
  • Lambda関数用ロググループはLambda自身には作らせず、CloudFormationから作ります。
    • Lambdaにロググループ作成の権限を与えて自動で作成させると、スタック削除時に残ってしまいます。
    • ロググループの名前は/aws/lambda/<<関数名>>固定。
Lambda関数用ロール
  FunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "for-lambdafunction-${LambdaFunctionName}"
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: '/service-role/'
      Policies:
        # CloudWatch
        - PolicyName: write-cloudwatchlogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: 
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*"    
        # QuickSight
        - PolicyName: create-quicksight-ingesiton
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "quicksight:CreateIngestion"
                Resource:
                  - !Sub "arn:aws:quicksight:${AWS::Region}:${AWS::AccountId}:dataset/*"
  • ロググループ出力用の権限と、QuickSightのデータセット更新指示を出す権限(CreateIngestion)のみとしています。
Lambda関数部分
  TargetFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Ref LambdaFunctionName
      Role: !GetAtt FunctionRole.Arn
      Runtime: python3.9
      Handler: index.lambda_handler
      Timeout: 15
      Code:
        ZipFile: !Sub |
          # 設定定義
          str_prefix_dataset = '''
          [
              {
                "file_prefix": "${TagetFolderPath}tokyo_covid19/",
                "datasets": [
                  {"name": "tokyo_covid19", "Id": "xxxxxxxxxxxxxxx-xxxxxxxxxxxx-xxxxxxxx-xxxx"}
                ]
              }              
          ]

          '''

          import boto3
          import json
          import datetime

          map_prefix_dataset = json.loads(str_prefix_dataset)

          def lambda_handler(event, context):

            try:
              str_targetfile = event['detail']['object']['key']
            except Exception:
              print("S3のイベント event['detail']['object']['key'] が取得できませんでした")
              raise

            # 操作されたファイルの先頭から、設定のprefixと一致するものを探して、それを更新対象のデータセットとする。
            dic_target_datasets = None
            for eachmap in map_prefix_dataset:
              if str_targetfile.find(eachmap['file_prefix']) == 0 :
                dic_target_datasets = eachmap['datasets']
                break

            # 見つかったら更新 
            if dic_target_datasets is None:
              print('更新対象フォルダ以外のファイルの操作でした、終了します。')
            else:
              for eachDataset in dic_target_datasets:
                print( "create ingestion of %s"%(eachDataset['name']) )

                # 更新指示作成
                try:
                  # IngestionIdは一意の必要あり。
                  response = boto3.client('quicksight').create_ingestion(
                    DataSetId=eachDataset['Id'],
                    IngestionId="Ingestion-%s-%s"%( eachDataset['name'] , datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')),
                    AwsAccountId='${AWS::AccountId}'
                  )
                except Exception:
                  print( "データセット%s(Id:%s)が存在しないためエラーになりました。"%(eachDataset['name'], eachDataset['Id']) )
                  raise
                
                print(json.dumps(response))
                if response['Status'] == 201:
                  print("更新指示を作成できました")
                else:
                  print("ERROR:更新指示の作成に失敗しました")
              # end eachDataset

  • Lambda関数のコードは別ファイルにせず、CloudFormation内部に直接書いています。
  • 先頭のstr_prefix_datasetで、格納フォルダと更新させたいデータセット群を指定しています。
    • eventの中に入っているオブジェクトのキー(バケット名を除いた、ファイルまでのフルパス)に対し、設定したprefixが先頭から一致しているかを見て、一致していたらその設定で動かす、という風にしています。
  • IngestionIdは一意である必要があるため、データセット名やミリ秒を入れています。
EventBridge
  EventRuleDataSets:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub "for-dataset-ingestion-${LambdaFunctionName}"
      Description: !Sub "invoke ${LambdaFunctionName} when file put folder"
      EventBusName: default
      State: ENABLED
      EventPattern:
        source:
          - "aws.s3"
        detail-type:
          - "Object Created"
          - "Object Deleted"
        detail:
          bucket:
            name:
              - !Ref BucketName
          object:
            key:
              - prefix: !Ref TagetFolderPath
      Targets:
        - Arn: !GetAtt TargetFunction.Arn
          Id: !Ref TargetFunction

  # Lambdaのトリガーに、EventBridgeのルールを追加
  PermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref TargetFunction
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt EventRuleDataSets.Arn
  • S3のイベント通知の指定方法は、2021.11のアップデートで追加された方法を使っています。

  • イベントルールのtargets.Idに、何を指定していいのかわからなかったので、適当にLambda関数名を入れています。
    • どなたかわかる方いらっしゃったら教えてください。

完全なコード

AWSTemplateFormatVersion: 2010-09-09

Parameters:
  LambdaFunctionName:
    Type: String
  BucketName:
    Type: String
  TagetFolderPath:
    Type: String
    Description: use for prefix. End char MUST BE slash(/).
Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      -
        Label:
          default: User Setting.
        Parameters:
          - LambdaFunctionName
          - BucketName
          - TagetFolderPath
 
Resources:
  FunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${LambdaFunctionName}"
      RetentionInDays: 3653   # 未指定時は「失効しない」

  FunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "for-lambdafunction-${LambdaFunctionName}"
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: '/service-role/'
      Policies:
        # CloudWatch
        - PolicyName: write-cloudwatchlogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: 
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*"    
        # QuickSight
        - PolicyName: create-quicksight-ingesiton
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "quicksight:CreateIngestion"
                Resource:
                  - !Sub "arn:aws:quicksight:${AWS::Region}:${AWS::AccountId}:dataset/*"

  TargetFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Ref LambdaFunctionName
      Role: !GetAtt FunctionRole.Arn
      Runtime: python3.9
      Handler: index.lambda_handler
      Timeout: 15
      Code:
        ZipFile: !Sub |
          # 設定定義
          str_prefix_dataset = '''
          [
              {
                "file_prefix": "${TagetFolderPath}tokyo_covid19/",
                "datasets": [
                  {"name": "tokyo_covid19", "Id": "xxxxxxxxxxxxxxx-xxxxxxxxxxxx-xxxxxxxx-xxxx"}
                ]
              }              
          ]

          '''

          import boto3
          import json
          import datetime

          map_prefix_dataset = json.loads(str_prefix_dataset)

          def lambda_handler(event, context):

            try:
              str_targetfile = event['detail']['object']['key']
            except Exception:
              print("S3のイベント event['detail']['object']['key'] が取得できませんでした")
              raise

            # 操作されたファイルの先頭から、設定のprefixと一致するものを探して、それを更新対象のデータセットとする。
            dic_target_datasets = None
            for eachmap in map_prefix_dataset:
              if str_targetfile.find(eachmap['file_prefix']) == 0 :
                dic_target_datasets = eachmap['datasets']
                break

            # 見つかったら更新 
            if dic_target_datasets is None:
              print('更新対象フォルダ以外のファイルの操作でした、終了します。')
            else:
              for eachDataset in dic_target_datasets:
                print( "create ingestion of %s"%(eachDataset['name']) )

                # 更新指示作成
                try:
                  # IngestionIdは一意の必要あり。
                  response = boto3.client('quicksight').create_ingestion(
                    DataSetId=eachDataset['Id'],
                    IngestionId="Ingestion-%s-%s"%( eachDataset['name'] , datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')),
                    AwsAccountId='${AWS::AccountId}'
                  )
                except Exception:
                  print( "データセット%s(Id:%s)が存在しないためエラーになりました。"%(eachDataset['name'], eachDataset['Id']) )
                  raise
                
                print(json.dumps(response))
                if response['Status'] == 201:
                  print("更新指示を作成できました")
                else:
                  print("ERROR:更新指示の作成に失敗しました")
              # end eachDataset

  # EventBridge用ルール
  EventRuleDataSets:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub "for-dataset-ingestion-${LambdaFunctionName}"
      Description: !Sub "invoke ${LambdaFunctionName} when file put folder"
      EventBusName: default
      State: ENABLED
      EventPattern:
        source:
          - "aws.s3"
        detail-type:
          - "Object Created"
          - "Object Deleted"
        detail:
          bucket:
            name:
              - !Ref BucketName
          object:
            key:
              - prefix: !Ref TagetFolderPath
      Targets:
        - Arn: !GetAtt TargetFunction.Arn
          Id: !Ref TargetFunction

  # Lambdaのトリガーに、EventBridgeのルールを追加
  PermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref TargetFunction
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt EventRuleDataSets.Arn

作り方

監視対象のバケットの設定変更

対象のバケットのイベント通知を有効にしてください。やり方は以下のリンク先にあります。

S3バケットをCloudFormationで作る場合は、以下の設定を加えます。

Resources:
  Bucket:
    Type: AWS::S3::Bucket
    Properties:
      # Amazon EventBridge に通知を送信する
      NotificationConfiguration:
        EventBridgeConfiguration:
          EventBridgeEnabled: true

設定値修正

先のCloudFormation上のLambda関数内部のstr_prefix_datasetの設定値を修正します。
例として今回のS3のフォルダ構成を以下とします。

dashboard
├── data              <-監視するフォルダ
│   ├── dataset01    
│   │   ├── aaaa.csv
│   │   └── bbbb.csv
│   └── tokyo_covid19 <-データセットと対応させるフォルダ
│       ├── 2021.csv
│       └── 2022.csv
└── manifest
    ├── dataset01.json
    └── tokyo_covid19.json

str_prefix_datasetには、上記のうち「データセットと対応させるフォルダ」を"file_prefix"に指定します。
そして、対応するデータセット群の情報をdatasetsに記載します。

          # 設定定義
          str_prefix_dataset = '''
          [
              {
                "file_prefix": "dashboard/data/tokyo_covid19/",
                "datasets": [
                  {"name": "tokyo_covid19", "Id": "xxxxxxxxxxxxxxx-xxxxxxxxxxxx-xxxxxxxx-xxxx"}
                ]
              }              
          ]

先のコード内では"file_prefix"の値に、${TagetFolderPath}で変数を埋め込んでいますが、直書きしたほうがわかりやすいかもしれません。

スタック作成

修正したCloudFormationのコードで、スタックを作成します。

パラメータ

  • LambdaFunctionName
    • 既存の関数名と重複しない名前で
  • BucketName
    • バケット名だけを指定
      • s3://は不要。末尾のスラッシュも不要
  • TagetFolderPath
    • 先のフォルダ構成の中の、監視するフォルダのパスを入れてください。
      • データ格納フォルダでもよいですし、それより上部のフォルダでもOKです。
    • 今回はdashboard/data/を指定しました。
      • 上位フォルダを監視対象にしておけば、他のデータセット用フォルダにファイルが置かれても発火し、(設定値に定義されていれば)設定値に従い更新指示が作成される、という構成です。
    • 末尾はスラッシュ(/)で終わらせてください。

使い方

対象のバケットにファイルを置く/更新する/削除することで、データセットが更新されます。

おわりに

今回はQuickSightのデータセットの更新を自動化する仕組みを構築しました。
これで普段の運用ユーザはファイルを置くだけでデータ更新が出来て、「データセット更新権限」を割り当てる必要がなくなります。

次回はファイルを置いて更新指示を作成した後、更新が終わっているかチェックし、通知するまでの仕組みを記事にしようと思います。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2