はじめに
DataLake利用時によく使われるparquet formatでは、大量のデータの一部を特定し、削除するような対応が難しいのですが、こちらのユースケースに書かれている通り、個人情報を削除する対応を行う場合に非常に重宝しそうなのでどんなものか調べてみることにしました。
ちなみにHudiはDeltaLake, Icebergと並びLakeHouseで必要とされる機能を沢山持っています。
今回はAWS Glue, Athenaを使ってHudiを利用する手順を明らかにし、削除することでS3上のparquetがどのような動きになるのかを把握してみようと思います。
環境・データ
-
Glue 4.0(Spark3.3) + Athena + S3
-
データ定義はOMOP CDM personテーブルを使って試してみます。
"PERSON_ID","YEAR_OF_BIRTH","GENDER_CONCEPT_ID","RACE_CONCEPT_ID","LOCATION_CONCEPT_ID","SOURCE_PERSON_KEY","SOURCE_GENDER_CODE","SOURCE_LOCATION_CODE","SOURCE_RACE_CODE"| "1375550","1938","8507","","","","","",""| "1375551","1985","8532","","","","","",""| "1375552","1993","8532","","","","","",""| "1375553","2003","8507","","","","","",""| "1375554","1993","8507","","","","","",""| "1375555","1999","8532","","","","","",""| "1375556","2003","8507","","","","","",""| "1375557","1983","8532","","","","","",""| "1375558","1997","8532","","","","","",""| "1375559","2005","8507","","","","","",""| "5125481","2001","8507","","","","","",""|
AWSでHudiを利用する
まずはAthenaでHiveテーブルを作る
変換元のテーブルを作ります
CREATE EXTERNAL TABLE IF NOT EXISTS raw_person (
person_id string,
year_of_birth string,
gender_concept_id string,
race_concept_id string,
location_concept_id string,
source_person_key string,
source_gender_code string,
source_location_code string,
source_race_code string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 's3://bucket/osim2/raw-small/person/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'skip.header.line.count'='1'
)
GlueでDataframeをHudiに変換する
実装はこんな感じ。Hudiのオプションを利用するくらいでparquet変換のコードとそんなに大差ないことがわかると思います。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
### person
df = spark.sql("""
select uuid() as uuid, *
FROM cdm.raw_person
WHERE person_id != 'PERSON_ID'
""")
df = df.withColumn("source_race_code", regexp_replace(col("source_race_code"), "\"\|",""))
# データの情報を表示
print('Count: {0}'.format(df.count()))
df.printSchema()
df.show(5)
hudi_database_name = 'cdm'
hudi_table_name = 'person'
base_path = f's3://bucket/osim2/hudi/{hudi_table_name}/'
hudi_options = {
'hoodie.table.name': hudi_table_name,
'hoodie.datasource.write.table.name': hudi_table_name,
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'gender_concept_id',
'hoodie.datasource.write.precombine.field': 'uuid',
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': hudi_database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.partition_fields': 'gender_concept_id',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false'
}
# Dataframe -> Hudi
df.write.format('hudi').options(**hudi_options).mode('append').save(base_path)
# ジョブコミット
job.commit()
公式ガイドに従ってGlueJobのJob details→JobParameterをセットアップしていく
1つ目
- Key:—conf
- Value:spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
2つ目
- Key:-datalake-formats
- Value:hudi
トラブルシューティング
実行時に発生したエラー
Caused by: java.lang.ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 69 more
Githubを見るとGlue Catalog Syncを利用するにはcalcite-core.jarを追加しろとのこと
calcite-core-1.32.0.jarをダウンロードし、S3上に配置する
- 今回はs3:///library/hudi/に配置しました。
「GlueJob」→「Job details」→「Dependent JARs path」に以下を追加します
- s3:///library/hudi/
Athenaで検索してみる
マニュアルに従ってテーブルを作る
テーブルを作る
CREATE EXTERNAL TABLE IF NOT EXISTS person (
uuid string,
person_id string,
year_of_birth string,
gender_concept_id string,
race_concept_id string,
location_concept_id string,
source_person_key string,
source_gender_code string,
source_location_code string,
source_race_code string
)
PARTITIONED BY (
`gender_concept_id` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://bucket/osim2/hudi/person/'
ALTER TABLE ADD PARTITIONはすでにS3に作られているので不要
ALTER TABLE person ADD
PARTITION (gender_concept_id = '8507') LOCATION 's3://bucket/osim2/hudi/person/8507/'
PARTITION (gender_concept_id = '8532') LOCATION 's3://bucket/osim2/hudi/person/8532/'
テーブルをプレビューしてみるとHudiの管理用カラムが追加されてる
データを削除してみる
現状のS3の状態
$ aws s3 ls s3://bucket/osim2/hudi/person/8532/
2023-01-26 04:53:33 96 .hoodie_partition_metadata
2023-01-26 04:53:34 439120 bf187b2a-d716-4642-9af4-e44b5a2cd242-0_1-5-0_20230126045321057.parquet
2023-01-26 04:53:33 438964 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-4-0_20230126045321057.parquet
person_id='1375555’のレコードを削除する(履歴は2世代保持する)
削除対象のレコードのdataframeを作り、それをhudi_optionとともにappendすればOKです。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# dataframeを取得
hudi_database_name = 'cdm'
hudi_table_name = 'person'
base_path = f's3://bucket/osim2/hudi/{hudi_table_name}/'
query = f'SELECT * FROM {hudi_database_name}.{hudi_table_name} WHERE gender_concept_id = "8532" AND person_id="1375555"'
df = spark.sql(query)
# データの情報を表示
print('Count: {0}'.format(df.count()))
df.printSchema()
df.show(5)
hudi_options = {
'hoodie.table.name': hudi_table_name,
'hoodie.datasource.write.table.name': hudi_table_name,
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'gender_concept_id',
'hoodie.datasource.write.precombine.field': 'uuid',
'hoodie.datasource.write.operation': 'delete', # 削除
'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
'hoodie.cleaner.fileversions.retained': 2, # 最新バージョン+1世代保持する
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': hudi_database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.partition_fields': 'gender_concept_id',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false'
}
# Dataframe -> Parquet
df.write.format('hudi').options(**hudi_options).mode('append').save(base_path)
# ジョブコミット
job.commit()
Athenaで検索すると99件になり、少し小さいparquet(①)が別にできる。②は以前のparquetであり、履歴として保持している。
$ aws s3 ls s3://bucket/osim2/hudi/person/8532/
2023-01-26 04:53:33 96 .hoodie_partition_metadata
2023-01-26 04:53:34 439120 bf187b2a-d716-4642-9af4-e44b5a2cd242-0_1-5-0_20230126045321057.parquet
2023-01-26 04:58:55 438884 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-34-750_20230126045829167.parquet①
2023-01-26 04:53:33 438964 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-4-0_20230126045321057.parquet②
今度はperson_id=1375573のレコードを削除し、最新バージョンのみを保持するようにする
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# dataframeを取得
hudi_database_name = 'cdm'
hudi_table_name = 'person'
base_path = f's3://bucket/osim2/hudi/{hudi_table_name}/'
query = f'SELECT * FROM {hudi_database_name}.{hudi_table_name} WHERE gender_concept_id = "8532" AND person_id="1375573"'
df = spark.sql(query)
# データの情報を表示
print('Count: {0}'.format(df.count()))
df.printSchema()
df.show(5)
hudi_options = {
'hoodie.table.name': hudi_table_name,
# 書き込みオプション
'hoodie.datasource.write.table.name': hudi_table_name,
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'gender_concept_id',
'hoodie.datasource.write.precombine.field': 'uuid',
'hoodie.datasource.write.operation': 'delete',
# hudi cleaner オプション
'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
'hoodie.cleaner.fileversions.retained': 1, # 最新バージョンのみ保持する
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': hudi_database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.partition_fields': 'gender_concept_id',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false'
}
# Dataframe -> Parquet
df.write.format('hudi').options(**hudi_options).mode('append').save(base_path)
# ジョブコミット
job.commit()
Athenaで検索すると98件になり、履歴を最新しか保持しないようにしたのでparquetももとの個数にもどる
スゴイ!ちゃんと消えてる!!!
$ aws s3 ls s3://bucket/osim2/hudi/person/8532/
2023-01-26 04:53:33 96 .hoodie_partition_metadata
2023-01-26 05:12:18 439022 bf187b2a-d716-4642-9af4-e44b5a2cd242-0_0-34-750_20230126051145835.parquet
2023-01-26 04:58:55 438884 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-34-750_20230126045829167.parquet
その他注意点
-
Athenaでデータの追加はサポートされていない
Athena は、Hudi データに対する CTAS または INSERT INTO
をサポートしません。Athena による Hudi データセットへの書き込みのサポートをご希望の場合は、<[athena-feedback@amazon.com](mailto:athena-feedback@amazon.com)>
までフィードバックをお送りください。
- データの削除はIcebergしかサポートされていない
最後に
- S3にある生のデータはAthenaの機能を使うことで特定できるのですが、Hudiを使うことでparquetでデータ内でレコードを特定して削除できるのは大変ありがたいですね。
- 現状はGlueやEMRなどからpysparkを実行する必要がありますが、Athenaで削除ができるととても使いやすくなりそうですね。