はじめに
CSV群をQuickSightでダッシュボードにして、その後説明文やEメールレポートを追加してきました。
この環境ですが、CSVを変更したら都度、QuickSightのデータセットの更新を手動で行なう必要があります。そのためには、
- データセットを閲覧・更新できる権限を持つユーザとして
- 都度QuickSightに飛んでログインし
- データセットの画面に遷移して
- 対象のデータセットを選択し
- 更新処理を実行する
という手間が発生します。
これらを省くため、”対象のフォルダに変更(ファイルの追加、削除)があったら、自動でデータセット更新処理を実行する”、という仕組みを入れてみました。
よいこと
- CSVを置いたら、データセットの更新指示が自動で作られる。
- ファイルを更新したり、削除しても同様。
- 運用ユーザに強い権限を与えずに済む。
- (今回の仕組みを構成するのには、)1ファイルで完結する。
よくないこと
- データセット更新が正常に完了したことは保証されない。
- 更新指示を行うだけで、完了までは監視していない。
- 完了しても通知なし。
- 設定値は外部に出しておらず、Lambda関数のコード中に直接記述します。
- 変更都度、スタックの更新が必要です。
構成
- 対象の場所をEventBridgeで監視
- 変更があったらLambdaを起動
- Lambdaから、定義したデータセットを更新するようQuickSightに指示
CloudFormation
個々のポイントを説明し、最後に全体を載せます。拙い英語はご容赦ください。
Parameters:
LambdaFunctionName:
Type: String
BucketName:
Type: String
TagetFolderPath:
Type: String
Description: use for prefix. End char MUST BE slash(/).
Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
-
Label:
default: User Setting.
Parameters:
- LambdaFunctionName
- BucketName
- TagetFolderPath
- Lambda関数名と、監視対象のパスを「バケット」と「フォルダ」で分けました。
-
Metadata
はコンソール表示時の並び順を指定しています。
FunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${LambdaFunctionName}"
RetentionInDays: 3653 # 未指定時は「失効しない」
- Lambda関数用ロググループはLambda自身には作らせず、CloudFormationから作ります。
- Lambdaにロググループ作成の権限を与えて自動で作成させると、スタック削除時に残ってしまいます。
- ロググループの名前は
/aws/lambda/<<関数名>>
固定。
FunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "for-lambdafunction-${LambdaFunctionName}"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: '/service-role/'
Policies:
# CloudWatch
- PolicyName: write-cloudwatchlogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*"
# QuickSight
- PolicyName: create-quicksight-ingesiton
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "quicksight:CreateIngestion"
Resource:
- !Sub "arn:aws:quicksight:${AWS::Region}:${AWS::AccountId}:dataset/*"
- ロググループ出力用の権限と、QuickSightのデータセット更新指示を出す権限(CreateIngestion)のみとしています。
TargetFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref LambdaFunctionName
Role: !GetAtt FunctionRole.Arn
Runtime: python3.9
Handler: index.lambda_handler
Timeout: 15
Code:
ZipFile: !Sub |
# 設定定義
str_prefix_dataset = '''
[
{
"file_prefix": "${TagetFolderPath}tokyo_covid19/",
"datasets": [
{"name": "tokyo_covid19", "Id": "xxxxxxxxxxxxxxx-xxxxxxxxxxxx-xxxxxxxx-xxxx"}
]
}
]
'''
import boto3
import json
import datetime
map_prefix_dataset = json.loads(str_prefix_dataset)
def lambda_handler(event, context):
try:
str_targetfile = event['detail']['object']['key']
except Exception:
print("S3のイベント event['detail']['object']['key'] が取得できませんでした")
raise
# 操作されたファイルの先頭から、設定のprefixと一致するものを探して、それを更新対象のデータセットとする。
dic_target_datasets = None
for eachmap in map_prefix_dataset:
if str_targetfile.find(eachmap['file_prefix']) == 0 :
dic_target_datasets = eachmap['datasets']
break
# 見つかったら更新
if dic_target_datasets is None:
print('更新対象フォルダ以外のファイルの操作でした、終了します。')
else:
for eachDataset in dic_target_datasets:
print( "create ingestion of %s"%(eachDataset['name']) )
# 更新指示作成
try:
# IngestionIdは一意の必要あり。
response = boto3.client('quicksight').create_ingestion(
DataSetId=eachDataset['Id'],
IngestionId="Ingestion-%s-%s"%( eachDataset['name'] , datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')),
AwsAccountId='${AWS::AccountId}'
)
except Exception:
print( "データセット%s(Id:%s)が存在しないためエラーになりました。"%(eachDataset['name'], eachDataset['Id']) )
raise
print(json.dumps(response))
if response['Status'] == 201:
print("更新指示を作成できました")
else:
print("ERROR:更新指示の作成に失敗しました")
# end eachDataset
- Lambda関数のコードは別ファイルにせず、CloudFormation内部に直接書いています。
- 先頭の
str_prefix_dataset
で、格納フォルダと更新させたいデータセット群を指定しています。- eventの中に入っているオブジェクトのキー(バケット名を除いた、ファイルまでのフルパス)に対し、設定したprefixが先頭から一致しているかを見て、一致していたらその設定で動かす、という風にしています。
- IngestionIdは一意である必要があるため、データセット名やミリ秒を入れています。
EventRuleDataSets:
Type: AWS::Events::Rule
Properties:
Name: !Sub "for-dataset-ingestion-${LambdaFunctionName}"
Description: !Sub "invoke ${LambdaFunctionName} when file put folder"
EventBusName: default
State: ENABLED
EventPattern:
source:
- "aws.s3"
detail-type:
- "Object Created"
- "Object Deleted"
detail:
bucket:
name:
- !Ref BucketName
object:
key:
- prefix: !Ref TagetFolderPath
Targets:
- Arn: !GetAtt TargetFunction.Arn
Id: !Ref TargetFunction
# Lambdaのトリガーに、EventBridgeのルールを追加
PermissionForEventsToInvokeLambda:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref TargetFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt EventRuleDataSets.Arn
- S3のイベント通知の指定方法は、2021.11のアップデートで追加された方法を使っています。
- イベントルールの
targets.Id
に、何を指定していいのかわからなかったので、適当にLambda関数名を入れています。- どなたかわかる方いらっしゃったら教えてください。
完全なコード
AWSTemplateFormatVersion: 2010-09-09
Parameters:
LambdaFunctionName:
Type: String
BucketName:
Type: String
TagetFolderPath:
Type: String
Description: use for prefix. End char MUST BE slash(/).
Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
-
Label:
default: User Setting.
Parameters:
- LambdaFunctionName
- BucketName
- TagetFolderPath
Resources:
FunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${LambdaFunctionName}"
RetentionInDays: 3653 # 未指定時は「失効しない」
FunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "for-lambdafunction-${LambdaFunctionName}"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: '/service-role/'
Policies:
# CloudWatch
- PolicyName: write-cloudwatchlogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*"
# QuickSight
- PolicyName: create-quicksight-ingesiton
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "quicksight:CreateIngestion"
Resource:
- !Sub "arn:aws:quicksight:${AWS::Region}:${AWS::AccountId}:dataset/*"
TargetFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref LambdaFunctionName
Role: !GetAtt FunctionRole.Arn
Runtime: python3.9
Handler: index.lambda_handler
Timeout: 15
Code:
ZipFile: !Sub |
# 設定定義
str_prefix_dataset = '''
[
{
"file_prefix": "${TagetFolderPath}tokyo_covid19/",
"datasets": [
{"name": "tokyo_covid19", "Id": "xxxxxxxxxxxxxxx-xxxxxxxxxxxx-xxxxxxxx-xxxx"}
]
}
]
'''
import boto3
import json
import datetime
map_prefix_dataset = json.loads(str_prefix_dataset)
def lambda_handler(event, context):
try:
str_targetfile = event['detail']['object']['key']
except Exception:
print("S3のイベント event['detail']['object']['key'] が取得できませんでした")
raise
# 操作されたファイルの先頭から、設定のprefixと一致するものを探して、それを更新対象のデータセットとする。
dic_target_datasets = None
for eachmap in map_prefix_dataset:
if str_targetfile.find(eachmap['file_prefix']) == 0 :
dic_target_datasets = eachmap['datasets']
break
# 見つかったら更新
if dic_target_datasets is None:
print('更新対象フォルダ以外のファイルの操作でした、終了します。')
else:
for eachDataset in dic_target_datasets:
print( "create ingestion of %s"%(eachDataset['name']) )
# 更新指示作成
try:
# IngestionIdは一意の必要あり。
response = boto3.client('quicksight').create_ingestion(
DataSetId=eachDataset['Id'],
IngestionId="Ingestion-%s-%s"%( eachDataset['name'] , datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')),
AwsAccountId='${AWS::AccountId}'
)
except Exception:
print( "データセット%s(Id:%s)が存在しないためエラーになりました。"%(eachDataset['name'], eachDataset['Id']) )
raise
print(json.dumps(response))
if response['Status'] == 201:
print("更新指示を作成できました")
else:
print("ERROR:更新指示の作成に失敗しました")
# end eachDataset
# EventBridge用ルール
EventRuleDataSets:
Type: AWS::Events::Rule
Properties:
Name: !Sub "for-dataset-ingestion-${LambdaFunctionName}"
Description: !Sub "invoke ${LambdaFunctionName} when file put folder"
EventBusName: default
State: ENABLED
EventPattern:
source:
- "aws.s3"
detail-type:
- "Object Created"
- "Object Deleted"
detail:
bucket:
name:
- !Ref BucketName
object:
key:
- prefix: !Ref TagetFolderPath
Targets:
- Arn: !GetAtt TargetFunction.Arn
Id: !Ref TargetFunction
# Lambdaのトリガーに、EventBridgeのルールを追加
PermissionForEventsToInvokeLambda:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref TargetFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt EventRuleDataSets.Arn
作り方
監視対象のバケットの設定変更
対象のバケットのイベント通知を有効にしてください。やり方は以下のリンク先にあります。
S3バケットをCloudFormationで作る場合は、以下の設定を加えます。
Resources:
Bucket:
Type: AWS::S3::Bucket
Properties:
# Amazon EventBridge に通知を送信する
NotificationConfiguration:
EventBridgeConfiguration:
EventBridgeEnabled: true
設定値修正
先のCloudFormation上のLambda関数内部のstr_prefix_dataset
の設定値を修正します。
例として今回のS3のフォルダ構成を以下とします。
dashboard
├── data <-監視するフォルダ
│ ├── dataset01
│ │ ├── aaaa.csv
│ │ └── bbbb.csv
│ └── tokyo_covid19 <-データセットと対応させるフォルダ
│ ├── 2021.csv
│ └── 2022.csv
└── manifest
├── dataset01.json
└── tokyo_covid19.json
str_prefix_dataset
には、上記のうち「データセットと対応させるフォルダ」を"file_prefix"に指定します。
そして、対応するデータセット群の情報をdatasets
に記載します。
# 設定定義
str_prefix_dataset = '''
[
{
"file_prefix": "dashboard/data/tokyo_covid19/",
"datasets": [
{"name": "tokyo_covid19", "Id": "xxxxxxxxxxxxxxx-xxxxxxxxxxxx-xxxxxxxx-xxxx"}
]
}
]
先のコード内では"file_prefix"の値に、${TagetFolderPath}
で変数を埋め込んでいますが、直書きしたほうがわかりやすいかもしれません。
スタック作成
修正したCloudFormationのコードで、スタックを作成します。
パラメータ
- LambdaFunctionName
- 既存の関数名と重複しない名前で
- BucketName
- バケット名だけを指定
-
s3://
は不要。末尾のスラッシュも不要
-
- バケット名だけを指定
- TagetFolderPath
- 先のフォルダ構成の中の、監視するフォルダのパスを入れてください。
- データ格納フォルダでもよいですし、それより上部のフォルダでもOKです。
- 今回は
dashboard/data/
を指定しました。- 上位フォルダを監視対象にしておけば、他のデータセット用フォルダにファイルが置かれても発火し、(設定値に定義されていれば)設定値に従い更新指示が作成される、という構成です。
- 末尾はスラッシュ(/)で終わらせてください。
- 先のフォルダ構成の中の、監視するフォルダのパスを入れてください。
使い方
対象のバケットにファイルを置く/更新する/削除することで、データセットが更新されます。
おわりに
今回はQuickSightのデータセットの更新を自動化する仕組みを構築しました。
これで普段の運用ユーザはファイルを置くだけでデータ更新が出来て、「データセット更新権限」を割り当てる必要がなくなります。
次回はファイルを置いて更新指示を作成した後、更新が終わっているかチェックし、通知するまでの仕組みを記事にしようと思います。