はじめに
前回、ファイルが更新されたらQuickSightデータソースを自動更新する仕組みを記事にしました。
ですが完了を監視したり通知したりはできていなかったので、それらの仕組みを含んだ構成を作成しました。
構成
大まかな流れは以下。
- S3のファイル更新をEventBridgeで検知し、Step Functionsを実行
- LambdaでQuickSight更新処理の前準備
- QuickSightにデータセットの更新指示
- 終わったかを検知
- 終了内容を見て、SNSに通知
コード
「共通リソース部」と「自動更新部」で分けています。
- 共通リソース部
- 他の仕組みでも使いそうなリソース
- CloudFormationで定義
- 運用メール通知用SNSトピック
- 設定値格納用DynamoDB
- 自動更新部
- 自動更新にのみ使うリソース
- AWS SAMで定義
- EventBridge
- Step Functions
- Lambda
共通リソース部
プロジェクト単位の設定値格納場所とSNSを定義します。
AWSTemplateFormatVersion: 2010-09-09
Description: Create Common Resource On low Burden Operation Dashboard.
Parameters:
ProjectId:
Type: String
Default: "sampleproject"
Resources:
# SNS
OparationTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub "${ProjectId}-operation-topic"
Tags:
- Key: "usefor"
Value: !Sub "send e-mail to ${ProjectId} operator"
# DynamoDB
ParameterTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub "${ProjectId}-parameters"
AttributeDefinitions:
- AttributeName: S3uri
AttributeType: S
KeySchema:
- AttributeName: S3uri
KeyType: HASH # パーティションキー
# option. If not specified, the default is PROVISIONED.
BillingMode: PAY_PER_REQUEST
Tags:
- Key: "usefor"
Value: !Sub "store using on ${ProjectId}"
Outputs:
OparationTopic:
Value: !Ref OparationTopic
Export:
Name: !Sub "${ProjectId}-operation-topic-arn"
ParameterTable:
Value: !Ref ParameterTable
Export:
Name: !Sub "${ProjectId}-parameters-tablename"
ParameterTableArn:
Value: !GetAtt ParameterTable.Arn
Export:
Name: !Sub "${ProjectId}-parameters-tablename-arn"
DynaomoDBはBillingMode: PAY_PER_REQUEST
を指定し、従量課金にしています。
Outputは、自動更新部作成時に参照するために使います。
DynamoDBには
- キーとして、ファイル格納フォルダパス
- datasetsに、更新するデータセット群の情報を格納
- (任意)useforに、用途をメモ書き
というデータを登録します。
AWS CLIから登録する際のサンプルファイルは以下になります。
# aws --version が v2 で実行可能(Cloudshellで可能、Cloud9は不可)
# aws ddb put <tablename> file://SampleIngestionParameter.yaml
-
S3uri: "s3://<<BUCKET_NAME>>/dashboard/data/tokyo_covid19/"
usefor: "Create and Monitor QuickSight Ingestion."
datasets:
-
datasetid : "hogehoge"
datasetname : "moemoe"
-
datasetid : "togetoge"
datasetname : "ageage"
自動更新部
AWS SAMで作っています。
フォルダ階層は以下のようにしています。
ingestion-sam
├── functions # Lambdaの場所
│ ├── check_ingestion_status
│ │ └── app.py
│ └── product_ingestion_parameters
│ └── app.py
├── statemachine # Step Functionsの場所
│ └── sfn.asl.json
└── template.yaml
Step Functions
Step Functionsのワークフローは以下になります。
Lambdaは2か所で、他はStep Functionsを組み合わせて作成しました。
- 前処理Lambdaを実行
- eventのファイル情報を、後処理で使う形に成型して格納
- 更新処理に使うID用の数値列を発番して格納
- DynaomoDBに検索
- 検索結果の配列部分を並列にして、以下の処理を実行
- QuickSightのデータ処理に更新指示
- IDは、"{データセット名}-{発番した数値列}"
- 数秒待機
- 更新指示の完了をチェック
- Statusが継続中であれば、”数秒待機”に戻る
- Statusが終了していれば、後続処理へ
- Passで何もせず、並列処理部分を終了
- Choiceで並列処理は終了できなかったため置いたもの、加工等は無し
- QuickSightのデータ処理に更新指示
- 各並列処理結果が配列で帰ってくるため、中身をチェック
- 結果をSNSに通知
- チェック結果によって、正常終了か異常終了かに分別
定義のコードは以下。DynamoDBの取得結果が{"key":{"データ型":"value"}}
のように癖がありますので、その点注意。
{
"Comment": "A description of my state machine",
"StartAt": "Lambda Invoke",
"States": {
"Lambda Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${ProdParamsFunction}"
},
"Next": "DynamoDB GetItem"
},
"DynamoDB GetItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "faci-parameters",
"Key": {
"S3uri": {
"S.$": "$.targetPrefix"
}
}
},
"Next": "Map",
"ResultSelector": {
"datasets.$": "$.Item.datasets.L"
},
"ResultPath": "$.item"
},
"Map": {
"Type": "Map",
"InputPath": "$",
"ItemsPath": "$.item.datasets",
"MaxConcurrency": 0,
"Parameters": {
"dataset.$": "$$.Map.Item.Value",
"ingestionidSuffix.$": "$.ingestionidSuffix"
},
"Iterator": {
"StartAt": "CreateIngestion",
"States": {
"CreateIngestion": {
"Type": "Task",
"Next": "Wait",
"Parameters": {
"AwsAccountId": "${AwsAccountId}",
"DataSetId.$": "$.dataset.M.datasetid.S",
"IngestionId.$": "States.Format('{}-{}', $.dataset.M.datasetname.S,$.ingestionidSuffix)"
},
"Resource": "arn:aws:states:::aws-sdk:quicksight:createIngestion",
"ResultPath": null
},
"Wait": {
"Type": "Wait",
"Next": "DescribeIngestion",
"Seconds": 10
},
"DescribeIngestion": {
"Type": "Task",
"Parameters": {
"AwsAccountId": "${AwsAccountId}",
"DataSetId.$": "$.dataset.M.datasetid.S",
"IngestionId.$": "States.Format('{}-{}', $.dataset.M.datasetname.S,$.ingestionidSuffix)"
},
"Resource": "arn:aws:states:::aws-sdk:quicksight:describeIngestion",
"ResultPath": "$.ReturnDescribeIngestion",
"Next": "Check Status"
},
"Check Status": {
"Type": "Choice",
"Choices": [
{
"Or": [
{
"Variable": "$.ReturnDescribeIngestion.Ingestion.IngestionStatus",
"StringEquals": "INITIALIZED"
},
{
"Variable": "$.ReturnDescribeIngestion.Ingestion.IngestionStatus",
"StringEquals": "QUEUED"
},
{
"Variable": "$.ReturnDescribeIngestion.Ingestion.IngestionStatus",
"StringEquals": "RUNNING"
}
],
"Next": "Wait"
}
],
"Default": "Pass"
},
"Pass": {
"Type": "Pass",
"End": true
}
}
},
"Next": "Check ingestion status"
},
"Check ingestion status": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${ChkIngestionStatusFunction}",
"Payload.$": "$"
},
"Next": "SNS Publish"
},
"SNS Publish": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${TopicArn}",
"Subject.$": "$.Payload.Subject",
"Message.$": "$.Payload.Message"
},
"ResultPath": null,
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Payload.Status",
"BooleanEquals": true,
"Next": "Success"
}
],
"Default": "Fail"
},
"Success": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail"
}
}
}
"IngestionId.$": "States.Format('{}-{}', $.dataset.M.datasetname.S,$.ingestionidSuffix)"
のところでは、組み込み関数を使ってIdを生成しています。使い方は公式に。
前処理Lambda
以下を行っています。
- eventから抽出したバケット名とファイルのkeyから、データ格納フォルダのS3URIを生成
- 更新処理用のIDとして、現在時刻を文字列化
import re
import datetime
def lambda_handler(event, context):
# eventから必要な情報を変数に格納
targetBucket = event['detail']['bucket']['name']
targetKey = event['detail']['object']['key']
targetPrefix = re.sub('^(.+/)[^/]+$', r'\1', targetKey)
targetS3uriPrefix = "s3://%s/%s"%(targetBucket,targetPrefix)
# 現在日時でIngestionIdの末尾を作成
rtnIngestionidSuffix = datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')
return {
'targetPrefix':targetS3uriPrefix,
'ingestionidSuffix':rtnIngestionidSuffix
}
更新中身チェックLambda
- 以下の状態をチェックして、正常終了したかと、メール用メッセージを作成します。
- ingestionStatusがCOMPLETEDで、スキップ行がなければ正常終了
- スキップ行があればエラーにして、その旨メール用メッセージを生成
- ingestionStatusがCOMPLETEDでなければエラー
- ingestionStatusがCOMPLETEDで、スキップ行がなければ正常終了
def lambda_handler(event, context):
rtnStatus = True
rtnSubject = ''
rtnMessage = ''
for eachDescribe in event:
datasetName = eachDescribe['dataset']['M']['datasetname']['S']
ingestionStatus = eachDescribe['ReturnDescribeIngestion']['Ingestion']['IngestionStatus']
ingestionRowsDropped = eachDescribe['ReturnDescribeIngestion']['Ingestion']['RowInfo']['RowsDropped']
if ingestionStatus == 'COMPLETED':
if ingestionRowsDropped == 0:
rtnMessage += "\n\t・データセット%sの更新は成功しました。"%(datasetName)
else:
rtnStatus = False
rtnMessage += "\n\t・データセット%sは、%d行をスキップして更新しました。"%(datasetName,ingestionRowsDropped)
else:
rtnStatus = False
rtnMessage += "\n\t・データセット%sの更新は失敗し、ステータス%sを戻しました。"%(datasetName,ingestionStatus)
# end for
if rtnStatus:
rtnSubject = '[SUCCESS]QuickSight更新処理正常終了'
rtnMessage = 'QuickSightの更新は正常終了しました。' + rtnMessage
else:
rtnSubject = '[ERROR]QuickSight更新処理異常終了'
rtnMessage = 'QuickSightの更新処理に失敗しました。' + rtnMessage
return {
'Status': rtnStatus,
'Subject': rtnSubject,
'Message': rtnMessage
}
構成テンプレート
- Step Functionsに、DynamoDBやSNSの操作権限をアタッチ
- EventBridgeの部分は、前回とほぼ同じです。
AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Description: SAM Template for Lambda Function
Parameters:
ProjectId:
Type: String
Default: "sampleproject"
BucketName:
Type: String
Default: '<<BUCKET_NAME>>'
TagetFolderPath:
Type: String
Description: use for prefix. End char MUST BE slash(/).
Default: 'dashboard/data/'
Resources:
##################################################
# Product Ingestion Parameters Function
##################################################
ProdParamsFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${ProjectId}-product-ingestion-parameters"
RetentionInDays: 3653
ProdParamsFunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "for-lambda-${ProjectId}-product-ingestion-parameters"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: '/service-role/'
Policies:
# CloudWatch
- PolicyName: write-cloudwatchlogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: !GetAtt ProdParamsFunctionLogGroup.Arn
ProdParamsFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${ProjectId}-product-ingestion-parameters"
Role: !GetAtt ProdParamsFunctionRole.Arn
CodeUri: functions/product_ingestion_parameters/
Handler: app.lambda_handler
Runtime: python3.9
##################################################
# check ingestion status Function
##################################################
ChkIngestionStatusFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${ProjectId}-check-ingestion-status"
RetentionInDays: 3653
ChkIngestionStatusFunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "for-lambda-${ProjectId}-check-ingestion-status"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: '/service-role/'
Policies:
# CloudWatch
- PolicyName: write-cloudwatchlogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: !GetAtt ChkIngestionStatusFunctionLogGroup.Arn
ChkIngestionStatusFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${ProjectId}-check-ingestion-status"
Role: !GetAtt ChkIngestionStatusFunctionRole.Arn
CodeUri: functions/check_ingestion_status/
Handler: app.lambda_handler
Runtime: python3.9
Timeout: 900
##################################################
# Step Functions
##################################################
StateMachineLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName : !Sub "/aws/states/${ProjectId}-create-and-check-ingestion"
StateMachine:
Type: AWS::Serverless::StateMachine
Properties:
Name: !Sub "${ProjectId}-create-and-check-ingestion"
DefinitionUri: statemachine/sfn.asl.json
DefinitionSubstitutions:
AwsAccountId: !Ref AWS::AccountId
ProdParamsFunction: !Ref ProdParamsFunction
ChkIngestionStatusFunction: !Ref ChkIngestionStatusFunction
TopicArn:
Fn::ImportValue:
!Sub "${ProjectId}-operation-topic-arn"
Role: !GetAtt StateMachineRole.Arn
Logging:
Level: ALL
IncludeExecutionData: True
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt StateMachineLogGroup.Arn
StateMachineRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "for-statemachine-${ProjectId}-create-and-check-ingestion"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- states.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
# CloudWatch
- PolicyName: write-cloudwatchlogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogDelivery'
- 'logs:GetLogDelivery'
- 'logs:UpdateLogDelivery'
- 'logs:DeleteLogDelivery'
- 'logs:ListLogDeliveries'
- 'logs:PutResourcePolicy'
- 'logs:DescribeResourcePolicies'
- 'logs:DescribeLogGroups'
Resource: '*'
# Lambda
- PolicyName: invoke-lambdafunction
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'lambda:InvokeFunction'
Resource:
- !GetAtt ProdParamsFunction.Arn
- !GetAtt ChkIngestionStatusFunction.Arn
# DynamoDB
- PolicyName: getitem-dynamodb
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'dynamodb:GetItem'
Resource:
Fn::ImportValue:
!Sub "${ProjectId}-parameters-tablename-arn"
# QuickSight describe ingestion
- PolicyName: describe-ingestion
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'quicksight:CreateIngestion'
- 'quicksight:DescribeIngestion'
Resource:
- !Sub "arn:aws:quicksight:*:${AWS::AccountId}:dataset/*/ingestion/*"
# SNS
- PolicyName: publish-snstopic
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'sns:Publish'
Resource:
Fn::ImportValue:
!Sub "${ProjectId}-operation-topic-arn"
##################################################
# EventBridge(event Rule)
##################################################
## Role for invoke step functions
EventBridgeRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub
- "for-eventrule-${StateMachineName}"
- { StateMachineName: !GetAtt StateMachine.Name }
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- events.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: '/service-role/'
Policies:
- PolicyName: exec-stepfunctions
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'states:StartExecution'
Resource: !Ref StateMachine
EventRulePutOnReceive:
Type: AWS::Events::Rule
Properties:
Name: !Sub
- "for-invoke-statemachine-${StateMachineName}"
- { StateMachineName: !GetAtt StateMachine.Name }
Description: !Sub
- "invoke ${StateMachineName} when file put or delete"
- { StateMachineName: !GetAtt StateMachine.Name }
EventBusName: default
State: ENABLED
EventPattern:
source:
- "aws.s3"
detail-type:
- "Object Created"
- "Object Deleted"
detail:
bucket:
name:
- !Ref BucketName
object:
key:
- prefix: !Ref TagetFolderPath
Targets:
- Arn: !Ref StateMachine
Id: !GetAtt StateMachine.Name
RoleArn: !GetAtt EventBridgeRole.Arn
作り方
- 共通リソース部をCloudFormationのソースを使って作成
-
SampleIngestionParameter.yaml
を参考に、設定値をDynamoDBに登録- DataSetIdの確認方法は、以前の記事で紹介しています。
- SNSで、送信したいメールアドレスを設定
- AWS SAMでデプロイ
- その際、共通リソース部で指定したProjectIdと同じ文字列を指定
- 他パラメータは、監視するバケットとフォルダを指定
使い方
対象のフォルダにファイルを置いたり削除したりするだけ。データセット群が更新され、結果のメールが届くはずです。
おわりに
前回できなかった、完了チェックとメール送信を、Step Functionsを使って実装しました。
"ファイルを置いただけで、ダッシュボードが更新"という運用であれば、だいぶ使い勝手は良いのではないでしょうか。