はじめに
本記事は、以下記事の続きです。
今回は、Data Firehoseストリームを設定してS3テーブルにデータを配信します。
以下AWS公式記事の手順を参考にしています。
構成図
本記事では FirehoseからS3 Tablesにデータ配信する 部分を実装します。
実装手順
以下ステップで実装していきます。
ステップ2~5では、権限管理Lake Formationも出てきます。
本記事では 3. Firehoseからデータ送信 のステップを行っていきます。
Firehoseからデータ送信手順
以下ステップで行います。
Firehose用IAMロール ポリシー設定
まず、Firehose用のカスタムポリシーを作成します。
コンソール操作:
「IAMコンソール」 → 「ポリシー」 → 「ポリシーの作成」 → 「ポリシーエディタ: JSON」
以下JSONを貼り付け → 次へ → 任意のポリシー名設定 → ポリシーの作成
※アカウントIDやS3バケット名(Firehoseストリーム作成ステップの「その他設定」で登場)は環境に合わせて修正ください。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3TableAccessViaGlueFederation",
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetDatabase",
"glue:UpdateTable"
],
"Resource": [
"arn:aws:glue:ap-northeast-1:アカウントID:catalog/s3tablescatalog/*",
"arn:aws:glue:ap-northeast-1:アカウントID:catalog/s3tablescatalog",
"arn:aws:glue:ap-northeast-1:アカウントID:catalog",
"arn:aws:glue:ap-northeast-1:アカウントID:database/*",
"arn:aws:glue:ap-northeast-1:アカウントID:table/*/*"
]
},
{
"Sid": "S3DeliveryErrorBucketPermission",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::FirehoseストリームのS3バックアップバケットで指定するS3バケット名",
"arn:aws:s3:::FirehoseストリームのS3バックアップバケットで指定するS3バケット名/*"
]
},
{
"Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation",
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess"
],
"Resource": "*"
},
{
"Sid": "LoggingInCloudWatch",
"Effect": "Allow",
"Action": [
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:ap-northeast-1:アカウントID:log-group:*"
]
}
]
}
上記設定したら、以下のようなポリシーが許可されていることを確認できます。
次に、Firehose用のサービスロールを作成します。
コンソール操作:
「IAMコンソール」 → 「ロール」 → 「ロールを作成」
今回、「AWSのサービス」 → 「kinesis」を選択してロールを作成しました。
※以前kinesis Data Firehoseという名前だった名残?で、「kinesis」で検索します。
「許可ポリシー」に、先ほど作成したFirehose用ポリシーを選択して「次へ」
任意のロール名を設定 → 「ロールを作成」
Lake Formationからリソースリンク作成
Lake Formationで、S3テーブルバケット名前空間へのリソースリンクを作成します。
※作成したリソースリンクは、Firehoseストリームの宛先として指定することになります。
コンソールやAWS CLIからも作成可能です。
※リソースリンク名は、「大文字、小文字、アンダースコア (_) 」のみ使用可能1のようです。
AWS CLIからリソースリンク作成
aws glue create-database --region ap-northeast-1 --catalog-id "アカウントID" --database-input \
'{
"Name": "リソースリンク名(大文字、小文字、アンダースコア (_) のみ)",
"TargetDatabase": {
"CatalogId": "アカウントID:s3tablescatalog/S3テーブルバケット名",
"DatabaseName": "S3テーブルバケット名前空間名"
},
"CreateTableDefaultPermissions": []
}'
コンソールからリソースリンク作成
コンソールから操作:
Lake Formationコンソール → Databasesタブ → Create database
リソースリンク作成結果
作成完了したら、以下のようにDatabasesの一覧に表示されます。
Lake Formationから権限設定
Data Firehoseは、リソースリンク経由でデータ配信を行います。
Lake Formationにて、Data Firehose用ロールに、リソースリンク経由のデータ配信ができるように権限を付与します。
以下2つの権限を付与します2
- リソースリンクへのDiscribe権限
- テーブルへのALL権限
リソースリンクへのDescribe権限付与
コンソールから操作:
Lake Formationコンソール → Databasesタブ → 作成したリソースリンク選択 → Actions → Grant
以下の通り、Data Firehose用ロールに、リソースリンクへのDescribe権限を付与します。
テーブルへの権限付与
コンソールから操作:
Lake Formationコンソール → Databasesタブ → 作成したリソースリンク選択 → Actions → Grant on target
以下の通り、Data Firehose用ロールに、S3テーブルバケットの指定テーブルへのALL(Super)権限を付与します。
これで、FirehoseストリームがS3テーブルバケットの指定テーブルにデータ配信するための権限設定が完了しました。
Firehoseストリーム作成
ここから、Firehoseストリームを作成していきます。
コンソールから操作:
Data Firehoseコンソール → Firehoseストリームを作成
ソース 送信先の設定
- ソース: 「Direct PUT」を選択します。
今回は、Data Firehoseストリームで処理するデータのソースとして後述するKDGを使うため - 送信先: 「Apache Icebergテーブル」を選択します。
一意のキー設定
一意のキー設定では、以下のようにJSON定義を設定します3
[
{
"DestinationDatabaseName": "リソースリンク名",
"DestinationTableName": "S3テーブルバケットのテーブル名"
}
]
その他設定
-
S3 バックアップバケット:
Firehose用ポリシー記述で、FirehoseストリームのS3バックアップバケットで指定するS3バケット名 で指定したS3バケットを指定します。 -
サービスアクセス:
既存の IAM ロール → 作成したFirehose用ロールを指定します
それ以外は、デフォルト設定のままストリームを作成します。
テストデータ送信
KDGを使って、作成したFirehoseストリームにテストデータを送信します。
以下記事も参考ください。
以下レコードテンプレートを使って、JSONデータを送信しました。
{
"id": {{random.number(1000)}},
"name": "{{random.arrayElement(["S3Tables", "EMR", "DataFirehose", "Athena", "QuickSight"])}}",
"value": {{random.number(
{
"min": 1,
"max": 100
}
)}}
}
※サンプルレコードデータ
{ "id": 893, "name": "QuickSight", "value": 60}
{ "id": 684, "name": "Athena", "value": 69}
{ "id": 107, "name": "EMR", "value": 81}
(次回記事で詳細)Athenaでクエリ
Athenaで、S3テーブルバケットの該当テーブルをクエリすると、データを確認できました。
最後に
ここまでで、S3テーブルバケットにデータを送信するところまで確認しました。
次回記事では、このS3テーブルバケットに、AthenaからSQLクエリを実行するまでの設定を確認します。
参考URL
この記事で参考にしたURLです。
-
AWS記事:Streaming data to tables with Amazon Data Firehose
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-firehose.html ↩ -
AWS記事:Grant Lake Formation permissions on your table resources
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-aws.html#grant-permissions-tables ↩ -
AWS記事:Route incoming records to a single Iceberg table
https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-format-input-record.html ↩