目的
目的は「1日1回実行のバッチ処理において、Athenaへ100を超えるパーティションを持つテーブルを作り直す形で更新する」です。
背景
- パフォーマンス向上の為、データ利用者がよく使う列に対してパーティションを切りたい
- そのテーブルは巨大なファクトテーブルAと、パーティションに指定したい対象の列を持つディメンションテーブルBをJOINし毎日作り直す必要がある
Amazon Athenaでは、1クエリで作成できるパーティションの数が100までに制限されています。
パーティションの最大数 – CREATE TABLE AS SELECT (CTAS) ステートメントで作成できるパーティションの最大数は 100 です。
よって、1から作り直すようなテーブルに対して100よりも多いパーティションを作りたい場合に作ることができません。
上記ページにはINSERT INTOで処理を分ける方法が記載されていますが、処理が煩雑になりますし、同じデータが入ってしまう懸念もあるので増分更新用のテーブルでなければ一度の処理で作りたいです。
Glue Job上でSparkを使い、IcebergフォーマットでS3にオブジェクトを配置し、Athenaへテーブルを作ることでこれについて対処できるので、一連の流れを書きました。
Glue Job
Glue JobではAWSのAthenaにあるテーブルやS3データを読み込んで加工→S3とAthenaにロードするようなETL処理ができます。
Glue Jobの中にも種類がありますが、ビッグデータも処理できる最低2DPUから使用できるSparkを使います。
この他スモールデータバッチ処理用のPython ShellやSpark Streaming,Rayがあります。
ここにスクリプトを書き、
- データをロード
- 加工
- 100を超えるパーティションを作りAthenaへテーブル作成
のETL処理を行い、目的を達成します。
Glueにはバージョンがあり、今回は5.0を使っています。
処理スクリプト
この記事ではテスト用データを使っているため、目的の流れとは違います。
ですが、少しのコード改変で達成できます。これについては後述しています。
テスト用データの流れは
- データを生成
- Athenaへテーブル作成とS3へデータ書き込み
- Icebergテーブルの最適化と不要なオブジェクトの削除
です。
import string
import random
import sys
from datetime import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
target_database = 'default'
target_table = 'iceberg_compaction_test_1'
# 生成行数
num_rows = 1000000000
# パーティション分割に使うカテゴリ
categories = [f"category_{i}" for i in range(200)]
# ランダム文字列生成
def random_string(length=50):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# RDDでデータ生成
rdd = spark.sparkContext.parallelize(range(num_rows)).map(lambda i: (
random.choice(categories) ,
random_string(),
random.uniform(0, 1000)
))
schema = StructType([
StructField("category", StringType(), nullable=False, metadata=None),
StructField("random_text", StringType(), nullable=False, metadata=None),
StructField("value", DoubleType(), nullable=False, metadata=None)
])
df = spark.createDataFrame(rdd, schema)
# テーブルへデータ書き込み
df.writeTo(f"glue_catalog.{target_database}.{target_table}").tableProperty("format-version","2").partitionedBy("category").createOrReplace()
# AthenaでいうOPTIMIZE
spark.sql(f"CALL glue_catalog.system.rewrite_data_files(table=>'{target_database}.{target_table}')")
# AthenaでいうVACUUM
expire_time = (datetime.utcnow()).strftime('%Y-%m-%d %H:%M:%S')
spark.sql(f"CALL glue_catalog.system.expire_snapshots('{target_database}.{target_table}', TIMESTAMP '{expire_time}')")
spark.sql(f"CALL glue_catalog.system.remove_orphan_files(table=>'{target_database}.{target_table}')")
job.commit()
データ読み込み
「データを読み込み」ですが、テスト用データではGlue Job上でデータを生成し作っています。
パーティション対象とする列のユニークな要素数が200になるようにデータを作っています。
# 生成行数
num_rows = 1000000000
# パーティション分割に使うカテゴリ
categories = [f"category_{i}" for i in range(200)]
# ランダム文字列生成
def random_string(length=50):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# RDDでデータ生成
rdd = spark.sparkContext.parallelize(range(num_rows)).map(lambda i: (
random.choice(categories) ,
random_string(),
random.uniform(0, 1000)
))
schema = StructType([
StructField("category", StringType(), nullable=False, metadata=None),
StructField("random_text", StringType(), nullable=False, metadata=None),
StructField("value", DoubleType(), nullable=False, metadata=None)
])
df = spark.createDataFrame(rdd, schema)
データを生成ではなく、S3からデータをロードしたい場合は以下のように書けます。(他にもやり方はあります)
df = spark.read.parquet("<s3_uri>")
Athenaからデータをロードしたい場合は以下のように書けます。
# Hive式テーブルの場合
dyf = glueContext.create_dynamic_frame.from_catalog(
database="default",
table_name="sample_tsv",
transformation_ctx="DataCatalogtable_node1",
)
df = dyf.toDF()
# Iceberg式テーブルの場合
# 参考 : https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html#aws-glue-programming-etl-format-iceberg-read
df = glueContext.create_data_frame.from_catalog(
database="<your_database_name>",
table_name="<your_table_name>"
)
GlueにはDynamicFrameクラスというスキーマの定義が不要なGlueに最適化されたSparkやPandasのDataFrameに似たクラスが存在します。これは上記コード例ではdyf
と表記しています。
SparkのDataFrameはdf
と表記しています。GlueではSparkContextをラップしたGlueContextクラスがあり、これによりSpark DataFrameを返すメソッドをAWSサービスと連携しやすくしています。
加工やロードについては、Spark DataFrameで行うので、DynamicFrameで読み込んだデータもtoDF()でSpark DataFrameにしています。
データ加工
テストコードではデータの加工は行っていませんが、SparkのDataFrameにした後はDataFrameに対するメソッドが使えるので、以下ドキュメントを参考に加工します。
filter,groupBy,join,where,union
等Pandasに使い慣れていればすぐ分かるような内容になっています。
join
については実際は巨大なファクトテーブルとディメンションテーブルをJOINするので、Broadcast JOINを使いました。
from pyspark.sql.functions import broadcast
merged_df = df_big.join(
broadcast(df_small),
[df_big.user_id == df_small.user_id],
"inner"
)
ただ、Sparkは複数インスタンスから成る並列処理が行えるためそれ用のメソッドも存在します。
cache,checkpoint,coalesce,persist,repartition,etc...
パフォーマンスの観点も含めると上記メソッドも重要だと思います。私は以下の本をある程度読みました。
また、上記本の訳者である一人の @taka_yayoi さんもQiitaにSparkに関する記事を沢山上げておられます。
データ書き込み
コードでは以下の部分に当たります。シンプルです。
df.writeTo(f"glue_catalog.{target_database}.{target_table}").tableProperty("format-version","2").partitionedBy("category").createOrReplace()
partitionedBy()
でパーティション対象の列を選択します。createOrReplace()
でテーブルを作り直します。
後述しますが、createOrReplace()
実行時に既にテーブルが存在する場合、内部的にはAppendを実行しているため、不要になったオブジェクトを削除する処理を後に加えています。
Icebergフォーマットでデータを書き込みテーブルをつくっているのですが、この場合はGlueのJob detailsの項目で設定が必要です。
画像赤枠の通りですが、
AWS Glue で Iceberg を有効化するには、以下のタスクを実行します。
iceberg を --datalake-formats のジョブパラメータの値として指定します。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。
AWS Glue ジョブ用に、--conf という名前でキーを作成し、それに次の値を設定します。または、スクリプトで SparkConf を使用して、次の構成を設定することもできます。これらの設定は、Apache Spark が Iceberg テーブルを適切に処理する際に役立ちます。
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.warehouse=s3://<your-warehouse-dir>/
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
--write-shuffle-files-to-s3
KeyはシャッフルデータをS3に保存するための設定で、必須ではないです。
シャッフル時の中間データをS3に書き出すことで、DPUメモリの利用を抑えることができます。
spark.sql.catalog.glue_catalog.warehouse
は設定したバケット下にdatabase名.db
でicebergデータ用の階層が作られます。既にDWH用にバケットがある場合はそこを指定したほうが良いと思います。
Iceberg最適化
icebergテーブルですが、使い続けるとオブジェクト数が多くなりパフォーマンスに影響します。
その為、「オブジェクトの統合」「不要オブジェクトの削除」を定期的にしてあげたほうが良いです。
何故そのようになるのかは以下ページが参考になります。
# AthenaでいうOPTIMIZE
spark.sql(f"CALL glue_catalog.system.rewrite_data_files(table=>'{target_database}.{target_table}')")
# AthenaでいうVACUUM
expire_time = (datetime.utcnow()).strftime('%Y-%m-%d %H:%M:%S')
spark.sql(f"CALL glue_catalog.system.expire_snapshots('{target_database}.{target_table}', TIMESTAMP '{expire_time}')")
spark.sql(f"CALL glue_catalog.system.remove_orphan_files(table=>'{target_database}.{target_table}')")
AthenaにはIcebergテーブルを最適化&不要オブジェクト削除するコマンドがあり、OPTIMIZE,VACUUMという名前で用意されています。
ですが、こちらもAthenaの1クエリ100パーティション制限の影響か、100パーティションを超えるテーブルに対して実行するとエラーになります。
なので、spark.sql()
メソッドを使ってSpark上でIcebergテーブルを最適化・不要オブジェクトを削除するクエリを作成したテーブルに対して実行します。
Icebergには時点を遡るスナップショット機能がありますが、作り直しのテーブルの場合不要なのでexpire_snapshots
で現時点の時間を指定し、期限切れになるように設定しremove_orphan_files
で削除します。
実行結果
処理データ量やタスク数などを調整してWorker Type=G 2X
を10個立ち上げて11分程度で処理が終了しました。
実行すると、Athenaに対して100を超えるパーティションを持ったテーブルを作ることができます。
Glue Job Spark UIでパフォーマンスの見直し
Glue JobのSpark UIから詳細のパフォーマンスが見られます。
処理によっては、OPTIMIZE(rewrite_data_files)
に時間が掛かる場合がありました。
OPTIMIZE(SparkBinPackDataRewriter)
のJobが大量に出現
今回の実行は「ビッグデータの加工処理→テーブルを作る」ためのDPUスペックとDPU数で設定していて、
OPTIMIZEの場合それよりも処理の小さいタスクが大量に降ってくる=「DPUスペックよりもDPUの数が必要」ということだと思うので、Glue Job内でOPTIMIZEするべきかどうかはチューニングに必要な時間、最適化することによる効果で判断したほうが良さげだと思いました。
Athenaではparquetの場合1オブジェクト128MBより大きいと良い
ようです。
Parquet と ORC は、さまざまなデータセットに合わせて調整できます。たとえば、ブロック (Parquet) またはストライプ (ORC) サイズを大きくすると、状況によっては有益な場合があります。データセットに多数のカラムがある場合は、Parquet のデフォルト 128MB、ORC のデフォルト 64MB から大きくすることをお勧めします。これにより、各カラムの十分な値が一緒に格納され、読み取りの回数が減ります。
また、S3 Tablesでは自動最適化機能があり、ここではparquetファイルがMinimum 64MBで最適化されるようです。
これを目安に1パーティション内のオブジェクト数をaws s3 ls
等で見て確かめたほうが良いと思いました。
また、通常のS3オブジェクトの場合でもGlue Catalogから最適化の自動化ができるようです。
Glue JobのDPUと同料金で動くようなので、金額面は変わりませんが実行は裏でやってくれるというメリットがあるということだと思います。
Icebergオブジェクトのメンテナンスをしない場合
Iceberg最適化の部分ですが、行わない場合どうなるのかというと、createOrReplace()を行うたびにオブジェクト数が増加しパフォーマンスが低下していきます。Icebergの構造上使わないオブジェクトでもメタデータとして読みはするからです。
$ aws s3 ls s3://xxxxxxxx/default.db/iceberg_metadata_check/data/ --recursive --human --sum
Total Objects: 199
Total Size: 283.9 KiB
$ aws s3 ls s3://xxxxxxxx/default.db/iceberg_metadata_check/data/ --recursive --human --sum
Total Objects: 398
Total Size: 567.9 KiB
ALTER TABLE default.iceberg_metadata_check SET TBLPROPERTIES (
'vacuum_max_snapshot_age_seconds'='60'
);
VACUUM default.iceberg_metadata_check;
$ aws s3 ls s3://xxxxxxxx/default.db/iceberg_metadata_check/data/ --recursive --human --sum
Total Objects: 199
Total Size: 283.9 KiB
上記例は主に作り直しにおける過去データがパフォーマンス低下につながる例ですが、Appendで過去データも使う場合、パーティション内の参照データが細かくバラバラになるということもあるので、OPTIMIZEをしたほうが良いと思います。
さいごに
無事100を超えるパーティションテーブルをAthenaに展開することができました。
実務で用いましたが、クエリ実行時間を大分抑えることができたので、今後も活用していこうと思います。
間違いやアドバイス等あればコメントいただけると嬉しいです。
ここまで読んでいただきありがとうございました。
参考