LoginSignup
2
2

More than 1 year has passed since last update.

Hudiを利用しParquetの特定のデータを削除する

Last updated at Posted at 2023-01-26

はじめに

DataLake利用時によく使われるparquet formatでは、大量のデータの一部を特定し、削除するような対応が難しいのですが、こちらのユースケースに書かれている通り、個人情報を削除する対応を行う場合に非常に重宝しそうなのでどんなものか調べてみることにしました。

ちなみにHudiはDeltaLake, Icebergと並びLakeHouseで必要とされる機能を沢山持っています。

今回はAWS Glue, Athenaを使ってHudiを利用する手順を明らかにし、削除することでS3上のparquetがどのような動きになるのかを把握してみようと思います。

環境・データ

  • Glue 4.0(Spark3.3) + Athena + S3

  • データ定義はOMOP CDM personテーブルを使って試してみます。

    "PERSON_ID","YEAR_OF_BIRTH","GENDER_CONCEPT_ID","RACE_CONCEPT_ID","LOCATION_CONCEPT_ID","SOURCE_PERSON_KEY","SOURCE_GENDER_CODE","SOURCE_LOCATION_CODE","SOURCE_RACE_CODE"|
    "1375550","1938","8507","","","","","",""|
    "1375551","1985","8532","","","","","",""|
    "1375552","1993","8532","","","","","",""|
    "1375553","2003","8507","","","","","",""|
    "1375554","1993","8507","","","","","",""|
    "1375555","1999","8532","","","","","",""|
    "1375556","2003","8507","","","","","",""|
    "1375557","1983","8532","","","","","",""|
    "1375558","1997","8532","","","","","",""|
    "1375559","2005","8507","","","","","",""|
    "5125481","2001","8507","","","","","",""|
    

AWSでHudiを利用する

まずはAthenaでHiveテーブルを作る

変換元のテーブルを作ります

CREATE EXTERNAL TABLE IF NOT EXISTS raw_person (
  person_id string,
  year_of_birth string,
  gender_concept_id string,
  race_concept_id string,
  location_concept_id string,
  source_person_key string,
  source_gender_code string,
  source_location_code string,
  source_race_code string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 's3://bucket/osim2/raw-small/person/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
)

GlueでDataframeをHudiに変換する

実装はこんな感じ。Hudiのオプションを利用するくらいでparquet変換のコードとそんなに大差ないことがわかると思います。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

### person 
df = spark.sql("""
    select uuid() as uuid, *
    FROM cdm.raw_person
    WHERE person_id != 'PERSON_ID'
""")
df = df.withColumn("source_race_code", regexp_replace(col("source_race_code"), "\"\|",""))

# データの情報を表示
print('Count: {0}'.format(df.count()))
df.printSchema()
df.show(5)

hudi_database_name = 'cdm'
hudi_table_name = 'person'
base_path = f's3://bucket/osim2/hudi/{hudi_table_name}/'

hudi_options = {
    'hoodie.table.name': hudi_table_name,
    'hoodie.datasource.write.table.name': hudi_table_name,
    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'gender_concept_id',
    'hoodie.datasource.write.precombine.field': 'uuid',
    'hoodie.datasource.write.operation': 'bulk_insert',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': hudi_database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.partition_fields': 'gender_concept_id',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.use_jdbc': 'false'
}

# Dataframe -> Hudi
df.write.format('hudi').options(**hudi_options).mode('append').save(base_path)

# ジョブコミット
job.commit()

公式ガイドに従ってGlueJobのJob details→JobParameterをセットアップしていく

1つ目

  • Key:—conf
  • Value:spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false

2つ目

  • Key:-datalake-formats
  • Value:hudi

トラブルシューティング

実行時に発生したエラー

Caused by: java.lang.ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 69 more

Githubを見るとGlue Catalog Syncを利用するにはcalcite-core.jarを追加しろとのこと

calcite-core-1.32.0.jarをダウンロードし、S3上に配置する

  • 今回はs3:///library/hudi/に配置しました。

「GlueJob」→「Job details」→「Dependent JARs path」に以下を追加します

  • s3:///library/hudi/

Athenaで検索してみる

マニュアルに従ってテーブルを作る

テーブルを作る

CREATE EXTERNAL TABLE IF NOT EXISTS person (
  uuid string,
  person_id string,
  year_of_birth string,
  gender_concept_id string,
  race_concept_id string,
  location_concept_id string,
  source_person_key string,
  source_gender_code string,
  source_location_code string,
  source_race_code string
)
PARTITIONED BY ( 
  `gender_concept_id` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION
  's3://bucket/osim2/hudi/person/'

ALTER TABLE ADD PARTITIONはすでにS3に作られているので不要

ALTER TABLE person ADD
  PARTITION (gender_concept_id = '8507') LOCATION 's3://bucket/osim2/hudi/person/8507/' 
  PARTITION (gender_concept_id = '8532') LOCATION 's3://bucket/osim2/hudi/person/8532/'

テーブルをプレビューしてみるとHudiの管理用カラムが追加されてる

データを削除してみる

現状のS3の状態

$ aws s3 ls s3://bucket/osim2/hudi/person/8532/
2023-01-26 04:53:33         96 .hoodie_partition_metadata
2023-01-26 04:53:34     439120 bf187b2a-d716-4642-9af4-e44b5a2cd242-0_1-5-0_20230126045321057.parquet
2023-01-26 04:53:33     438964 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-4-0_20230126045321057.parquet

person_id='1375555’のレコードを削除する(履歴は2世代保持する)
削除対象のレコードのdataframeを作り、それをhudi_optionとともにappendすればOKです。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# dataframeを取得
hudi_database_name = 'cdm'
hudi_table_name = 'person'
base_path = f's3://bucket/osim2/hudi/{hudi_table_name}/'

query = f'SELECT * FROM {hudi_database_name}.{hudi_table_name} WHERE gender_concept_id = "8532" AND person_id="1375555"'
df = spark.sql(query)

# データの情報を表示
print('Count: {0}'.format(df.count()))
df.printSchema()
df.show(5)

hudi_options = {
    'hoodie.table.name': hudi_table_name,
    'hoodie.datasource.write.table.name': hudi_table_name,
    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'gender_concept_id',
    'hoodie.datasource.write.precombine.field': 'uuid',
    'hoodie.datasource.write.operation': 'delete', # 削除

    'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
    'hoodie.cleaner.fileversions.retained': 2, # 最新バージョン+1世代保持する

    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': hudi_database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.partition_fields': 'gender_concept_id',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.use_jdbc': 'false'
}

# Dataframe -> Parquet
df.write.format('hudi').options(**hudi_options).mode('append').save(base_path)

# ジョブコミット
job.commit()

Athenaで検索すると99件になり、少し小さいparquet(①)が別にできる。②は以前のparquetであり、履歴として保持している。

$ aws s3 ls s3://bucket/osim2/hudi/person/8532/
2023-01-26 04:53:33         96 .hoodie_partition_metadata
2023-01-26 04:53:34     439120 bf187b2a-d716-4642-9af4-e44b5a2cd242-0_1-5-0_20230126045321057.parquet
2023-01-26 04:58:55     438884 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-34-750_20230126045829167.parquet
2023-01-26 04:53:33     438964 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-4-0_20230126045321057.parquet

今度はperson_id=1375573のレコードを削除し、最新バージョンのみを保持するようにする

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# dataframeを取得
hudi_database_name = 'cdm'
hudi_table_name = 'person'
base_path = f's3://bucket/osim2/hudi/{hudi_table_name}/'

query = f'SELECT * FROM {hudi_database_name}.{hudi_table_name} WHERE gender_concept_id = "8532" AND person_id="1375573"'
df = spark.sql(query)

# データの情報を表示
print('Count: {0}'.format(df.count()))
df.printSchema()
df.show(5)

hudi_options = {
    'hoodie.table.name': hudi_table_name,

    # 書き込みオプション
    'hoodie.datasource.write.table.name': hudi_table_name,
    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'gender_concept_id',
    'hoodie.datasource.write.precombine.field': 'uuid',
    'hoodie.datasource.write.operation': 'delete', 

    # hudi cleaner オプション
    'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
    'hoodie.cleaner.fileversions.retained': 1, # 最新バージョンのみ保持する

    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': hudi_database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.partition_fields': 'gender_concept_id',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.use_jdbc': 'false'
}

# Dataframe -> Parquet
df.write.format('hudi').options(**hudi_options).mode('append').save(base_path)

# ジョブコミット
job.commit()

Athenaで検索すると98件になり、履歴を最新しか保持しないようにしたのでparquetももとの個数にもどる

スゴイ!ちゃんと消えてる!!!

$ aws s3 ls s3://bucket/osim2/hudi/person/8532/
2023-01-26 04:53:33         96 .hoodie_partition_metadata
2023-01-26 05:12:18     439022 bf187b2a-d716-4642-9af4-e44b5a2cd242-0_0-34-750_20230126051145835.parquet
2023-01-26 04:58:55     438884 ddc406df-b089-46dc-8f97-8434eed73f25-1_0-34-750_20230126045829167.parquet

その他注意点

  • Athenaでデータの追加はサポートされていない

    Athena は、Hudi データに対する CTAS または INSERT INTO
     をサポートしません。Athena による Hudi データセットへの書き込みのサポートをご希望の場合は、<[athena-feedback@amazon.com](mailto:athena-feedback@amazon.com)> までフィードバックをお送りください。

  • データの削除はIcebergしかサポートされていない

最後に

  • S3にある生のデータはAthenaの機能を使うことで特定できるのですが、Hudiを使うことでparquetでデータ内でレコードを特定して削除できるのは大変ありがたいですね。
  • 現状はGlueやEMRなどからpysparkを実行する必要がありますが、Athenaで削除ができるととても使いやすくなりそうですね。
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2