この記事はBeeX Advent Calendar 2024の7日目の記事です。
※本記事には一部AWSのプレビュー機能も含まれており、一般公開された際には記事の内容から変更される可能性がありますので、認識の上ご覧ください。
はじめに
前にAmazon S3 Tablesにデータを投入して、AthenaとRedshiftからクエリする記事を書きました。
S3 Tablesと分析サービスとの統合機能はプレビュー機能となっており、実は他のサービスも対象として含まれています。
このうちQuickSightはAthenaとRedshiftの仕組みを流用してクエリするため、検証していないサービスとしてEMRとData Firehoseの2つがあります。
そのうち、EMRは自環境だとクラスター立ち上げが上手くいかない+普段ほとんど使わない、ということで、今回の記事ではAmazon Data Firehoseを利用したS3 Tablesリソースへのデータ投入を試してみたいと思います。
・Amazon Athena
・Amazon Redshift
・Amazon EMR
・Amazon QuickSight
・Amazon Data Firehose
実作業
今回は次の流れで作業を行います。
このうち、1~4、9は前回の記事で既に説明済みなので、最小限の部分だけ記載します。
- テーブルバケット作成
- 名前空間、テーブル作成、サンプルデータ投入、クエリ確認
- LakeFormationリソースリンク作成
- Data Firehose用IAMロール作成
- リソースリンクへのアクセス権限設定
- Data Firehose作成
- データ投入
- Redshiftからクエリ
1. テーブルバケット作成
S3 Tablesのテーブルバケットはバケット内に複数のテーブルを持つことができます。
そのため、今回は以前作成したテーブルバケットを利用することとし、作成手順は省略します。
2. 名前空間、テーブル作成、サンプルデータ投入、クエリ確認
名前空間も前回作成したものを流用するので、今回はテーブルのみ作成します。
まずローカル環境で以下コマンドを実行してSpark SQLを実行する環境を構築します。(詳細は前記事参照)
spark-shell \
--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1,\
software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3,\
org.apache.hadoop:hadoop-aws:3.3.4,\
software.amazon.awssdk:s3:2.20.68,\
software.amazon.awssdk:sts:2.20.68,\
software.amazon.awssdk:dynamodb:2.20.68,\
software.amazon.awssdk:glue:2.20.68,\
software.amazon.awssdk:kms:2.20.68,\
software.amazon.awssdk:url-connection-client:2.20.68 \
--conf spark.sql.catalog.qiitatablebucket=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.qiitatablebucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
--conf spark.sql.catalog.qiitatablebucket.warehouse=arn:aws:s3tables:us-east-1:123456789123:bucket/qiitatablebucket \
--conf spark.sql.catalog.qiitatablebucket.region=us-east-1 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
その後に以下のコマンドを実行してS3 Tablesにテーブルを作成し、サンプルデータの投入とクエリを行います。
spark.sql("""CREATE TABLE IF NOT EXISTS qiitatablebucket.test_ns.test_table ( ticker_symbol STRING, sector STRING, change DOUBLE , price DOUBLE ) USING iceberg """)
spark.sql("""INSERT INTO qiitatablebucket.test_ns.test_table VALUES ('QXZ', 'HEALTHCARE', -0.05, 84.51)""")
spark.sql(""" SELECT * FROM qiitatablebucket.test_ns.test_table """).show()
次のように表示されれば成功です。
scala> spark.sql("""CREATE TABLE IF NOT EXISTS qiitatablebucket.test_ns.test_table ( ticker_symbol STRING, sector STRING, change DOUBLE , price DOUBLE ) USING iceberg """)
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("""INSERT INTO qiitatablebucket.test_ns.test_table VALUES ('QXZ', 'HEALTHCARE', -0.05, 84.51)""")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql(""" SELECT * FROM qiitatablebucket.test_ns.test_table """).show()
+-------------+----------+------+-----+
|ticker_symbol| sector|change|price|
+-------------+----------+------+-----+
| QXZ|HEALTHCARE| -0.05|84.51|
+-------------+----------+------+-----+
サンプルデータはData Firehoseが提供しているものを参考にしました。
元々はそのサンプルデータを利用しようと思っていましたが、ターゲットにIcebergテーブルを指定すると利用できないようなので、データ構成だけ参考にしました。
実際のデータ投入はスクリプトで行います。
3. LakeFormationリソースリンク作成
Athena以外の分析サービスからアクセスさせたい場合、LakeFormationのリソースリンクを作成する必要があります。
通常はLakeFormationコンソールか以下のコマンドで作成する必要がありますが、今回は名前空間リソースを流用するので既に作成したものがあるので新規作成は行いません。
aws glue create-database --region us-east-1 --catalog-id "123456789123" --database-input \
'{
"Name": "example_db_link",
"TargetDatabase": {
"CatalogId": "123456789123:s3tablescatalog/qiitatablebucket",
"DatabaseName": "test_ns"
},
"CreateTableDefaultPermissions": []
}'
4. Data Firehose用IAMロール作成
ドキュメントを元にData Firehoseが利用するIAMロールを作成します。
ドキュメントに記載のポリシーは不要なものも含まれていたので、今回は以下の最小限のポリシーで作成しました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetDatabase",
"glue:UpdateTable"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::test-logs-123456789123",
"arn:aws:s3:::test-logs-123456789123/*"
]
},
{
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": [
"*"
]
}
]
}
5. リソースリンクへのアクセス権限設定
Data Firehose用のIAMロールができたら、そのロールからS3 Tablesリソースにアクセスさせるためにリソースリンクへのアクセス設定をします。
前回記事の内容を参考にデータベースレベルとテーブルレベルの許可を行います。
6. Data Firehose作成
以下のドキュメントを参考にData Firehoseを作成します。
source and destination
ソースには「Direct PUT」、ディスティネーションには「Apache Iceberg Tables」を設定します。
name , transform
ストリーム名はデフォルト、レコード変換はなしにします。
destination settings
ディスティネーションの設定を行います。
基本的にはデフォルトで問題ないですが、「Unique key configuration」と「Retry duration」だけ変更します。
「Unique key configuration」ではデータ送信先のS3 Tablesリソースの指定を行います。
以下はデフォルトの設定で、このうち"DestinationDatabaseName"と"DestinationTableName"が必須パラメータとなってます。
[
{
"DestinationDatabaseName": "DATABASE_PLACEHOLDER",
"DestinationTableName": "TABLE_PLACEHOLDER",
"UniqueKeys": [
"COLUMN_PLACEHOLDER"
],
"S3ErrorOutputPrefix": "OPTIONAL_PREFIX_PLACEHOLDER"
}
]
"DestinationDatabaseName"にはLakeFormationのリソースリンク名、"DestinationTableName"にはデータ投入先のテーブル名を指定します。
今回だと以下のようになります。
[
{
"DestinationDatabaseName": "test_ns_link",
"DestinationTableName": "test_table"
}
]
Data Firehoseでのデータ保持時間を短くするために、「Retry duration」はデフォルトの300から30に変更しておきます。
backup
データのバックアップ先は適当に既存の汎用S3バケットを指定しておきます。
advanced
最後のAdvanced settingsで作成したIAMロールを指定します。
それ以外の設定は全てデフォルトのままにしておきます。
ここで次のようなエラーが出てうまくいかない場合、IAMロールやLakeFormationの権限設定がうまくできていない可能性があります。
(恐らくLakeFormationが原因のケースが多いかもしれません)
私の環境だと、LakeFormationで以下の権限設定ができていれば正常に作成できました。
7. データ投入
Data Firehoseのサンプルデータが利用できないので、投入用のスクリプトを作成します。
STREAM_NAME
とREGION_NAME
は環境に合わせて修正してください。
#!/bin/bash
# Delivery stream name
STREAM_NAME="PUT-ICE-JoaEq"
# Target region
REGION_NAME="us-east-1"
# Arrays for random data generation
TICKER_SYMBOLS=("AAPL" "GOOGL" "MSFT" "AMZN" "META" "NFLX" "TSLA" "IBM" "ORCL" "INTC")
SECTORS=("TECHNOLOGY" "HEALTHCARE" "FINANCE" "CONSUMER" "ENERGY" "MATERIALS" "INDUSTRIAL" "TELECOM" "UTILITIES" "REAL_ESTATE")
# Function to generate random decimal number
random_decimal() {
local min=$1
local max=$2
local scale=2
local range=$(($max - $min))
local random_int=$((RANDOM % (range * 100) + min * 100))
printf "%.2f" $(echo "$random_int / 100" | awk '{printf "%.2f", $1}')
}
# Function to generate random stock data
generate_stock_data() {
local ticker=${TICKER_SYMBOLS[$((RANDOM % ${#TICKER_SYMBOLS[@]}))]}
local sector=${SECTORS[$((RANDOM % ${#SECTORS[@]}))]}
local change=$(random_decimal -5 5)
local price=$(random_decimal 10 1000)
# Generate the record data
local json_data="{\\\"ticker_symbol\\\":\\\"$ticker\\\",\\\"sector\\\":\\\"$sector\\\",\\\"change\\\":$change,\\\"price\\\":$price}"
# Wrap the data in the required Data parameter
echo "{\"Data\":\"$json_data\"}"
}
echo "Starting data generation for stream: $STREAM_NAME"
echo "Press Ctrl+C to stop"
# Continuous data generation and sending
while true; do
data=$(generate_stock_data)
aws firehose put-record \
--delivery-stream-name "$STREAM_NAME" \
--record "$data" \
--cli-binary-format raw-in-base64-out \
--region "$REGION_NAME"
echo "Sent: $data"
# Wait for 1 second before sending next record
sleep 1
done
作成できたらスクリプトを以下のコマンドで実行します。
※AWS CLIの認証情報は既に設定済みの想定
chmod +x generate_stock_data.sh
./generate_stock_data.sh
スクリプトを実行するとランダムで生成したデータを1秒ごとにストリームに送信します。
止める時はCtrl+Cで止められるので、頃合いを見て止めます。
$ ./generate_stock_data.sh
Starting data generation for stream: PUT-ICE-JoaEq
Press Ctrl+C to stop
{
"RecordId": "X425NyYyHKvkDHcmhtzM8zwcSFP9ZZqF9rZnWYpnzB56foek3o8/u0aoCAwF3ezgN7g3yGPaSismGXSEQHwB/hQBR9Qe8sUgPL9fKJp1UBHB6DGLbhBQFJaC82TqRM4bTHWCNlsj6YkBVKzFROQC05ueB9b3hFftgO+uQyWZ9VodEVod0WRl+awQighUZoojP4tg+32c50DyhULHYrp1/q6Fze0P3Q2Z",
"Encrypted": false
}
Sent: {"Data":"{\"ticker_symbol\":\"META\",\"sector\":\"MATERIALS\",\"change\":317.00,\"price\":16711.00}"}
{
"RecordId": "JHQJYS6hDxEwn5EOkYwkbBZ4Ypb9rGQyQr60z4C088qjg3MIwEiARf7+gYY+lAkNrGdxkwg+DMd+ytUfwL559eroySiZvcGbeAlFtYqSQDADgHh80PTeyYYHYlKPk2veePvPLQUE2h3wDsI1wF3YuTP+vomEC7pzuGEXpSe7CmjRCenhcMIMGoXatTUAKL93iowWkJYfffXmJyAAWMcnwCbmrtw5qaxr",
"Encrypted": false
}
Sent: {"Data":"{\"ticker_symbol\":\"INTC\",\"sector\":\"TECHNOLOGY\",\"change\":498.00,\"price\":15003.00}"}
{
"RecordId": "qlSjBBQMKvAvJcy9r4gBb6z8Gi/RLpPl69HRAkzz0sKokOFbePkvuKzmEngQ5hW0VqkvL35r3gBuOGR0AqWFnjLc9d1qkfzkTjMDS3F6E8pcYA4lYU4oKNbpiesL/0BysPqeyX8dIo1JZ3rYVN/S2M/KAnrJCqsyPXKrSN4wejqZarW2lChiaX4G91ztpKItj+hOim9VyQUqewrdI/jhRDCKNsxYG6pu",
"Encrypted": false
}
Sent: {"Data":"{\"ticker_symbol\":\"INTC\",\"sector\":\"MATERIALS\",\"change\":-358.00,\"price\":14392.00}"}
{
"RecordId": "Pc4N5unOJAY/LQxYR64kc3XOlswKXJotiq+V2rWtfypsKoXJP6sioKWqW7P4T6WHYkQNihEvSCv6NVT3wH3b8VtL1Yea5E5JwstRG6KsxhcZlti/9MrrMdFkPMcyX8IyRlun0l/jDZdFXq5MEC/c2wyfv6T/P/bwlwZ6aEh4u2uKDOVtEPLZDR2T9eYc6H8j+swgE2YsDpkmIemggTh+F4eQCCT0WGBu",
"Encrypted": false
}
Sent: {"Data":"{\"ticker_symbol\":\"TSLA\",\"sector\":\"TELECOM\",\"change\":289.00,\"price\":30951.00}"}
{
"RecordId": "yq0uUF5nQI7dJs/qPPBltIwYCWi+6JkpjktnGufTCmzRVhgzIlaLUTmH7spmRluD/0PLEELIge0TcKFJfUZ9YydY71iMSxcs4Dh4JK8rO6daAMiWW9zNy5lIQclJ7NSGJ3fWxp3AiTTZPj/Lnf5/DUxf8jRUezT/KOWXrOf3bmnu82roWYGFwYpBQBpUi1uab6s1qreOeU5ujoB8QdUOlki7wOYNiyto",
"Encrypted": false
}
...
正常に遅れていればData FirehoseコンソールのMonitoring画面でメトリクスが取られ始めます。
しばらく待っているとS3 Tablesのテーブルにデータが入ってきます。
Redshiftで確認する前にローカルのSpark SQLで確認すると投入したデータが取得できることが確認できます。
scala> spark.sql(""" SELECT * FROM qiitatablebucket.test_ns.test_table """).show()
+-------------+-----------+------+-------+
|ticker_symbol| sector|change| price|
+-------------+-----------+------+-------+
| QXZ| HEALTHCARE| -0.05| 84.51|
| META| MATERIALS| 317.0|16711.0|
| INTC| TECHNOLOGY| 498.0|15003.0|
| INTC| MATERIALS|-358.0|14392.0|
| TSLA| TELECOM| 289.0|30951.0|
| MSFT| FINANCE| 54.0|22112.0|
| META| TECHNOLOGY| -26.0|23838.0|
| NFLX| TECHNOLOGY| 417.0|24234.0|
| AMZN| TELECOM| 472.0|26870.0|
| IBM| MATERIALS|-491.0|27581.0|
| INTC| MATERIALS| 101.0|33479.0|
| GOOGL| UTILITIES|-207.0|16666.0|
| GOOGL| CONSUMER| 438.0|27433.0|
| AMZN| TECHNOLOGY|-244.0|28140.0|
| MSFT| MATERIALS| -38.0|19089.0|
| NFLX| MATERIALS|-294.0| 7602.0|
| AAPL| MATERIALS| 50.0|14238.0|
| META| MATERIALS|-361.0|11616.0|
| MSFT|REAL_ESTATE|-386.0|17390.0|
| NFLX| TELECOM| 3.0|17777.0|
+-------------+-----------+------+-------+
only showing top 20 rows
8. Redshiftからクエリ
また前回の記事を参考に、Redshiftのクエリエディターで利用するIAMユーザーに対してLakeFormationの権限を付与します。
私の環境だと次の権限でうまくいきました。
恐らくチェックを付けている2つの設定があればクエリできると思います。
あとはRedshift query editor v2から以下のSQLを実行すると、Data Firehoseから投入したデータが確認することができました。
SELECT * FROM awsdatacatalog.test_ns_link.test_table;
おわりに
前回一度検証していたので、思ったよりも簡単に構築することができました。
個人的にData FirehoseからS3 Tablesへのアクセス権限が一番のハマりどころだったので、LakeFormation周りの理解が少し進んだ気がします。
前回、今回とS3 Tablesの検証をしてみて、確かにIcebergについての詳しい知識がなくても(比較的)簡単に利用できる良いサービスだなと思いました。
実際の環境で利用する際はS3テーブルやテーブルバケットのパフォーマンスを向上させるためのメンテナンスとして、スナップショット管理や非参照ファイルの削除等の考慮も必要になってくるので、このあたりは引き続き理解していきたいと思います。
あとはS3 Tablesを利用したAmazon S3 Metadata機能がプレビューリリースされたので、時間があればそっちも触ってみたいと思ってます。
まだ触っていないので推測ですが、S3 Tablesの制限にParquetファイルのみというのがあったので、何となくS3 MetadataではParquetファイルしか取得できないんじゃないかと思いました。
・Compaction is only supported on Apache Parquet file types.
・Compaction doesn’t support data type: Fixed.
・Compaction doesn’t support compression types: brotli, lz4.
この記事がどなたかの参考になれば幸いです。