概要
Databricks(Spark)のPythonにてブルームフィルターが設定されているカラム情報を取得する方法を共有します。
詳細は下記のGithub pagesのページをご確認ください。
コードを実行したい方は、下記のdbcファイルを取り込んでください。
https://github.com/manabian-/databricks_tecks_for_qiita/blob/main/tecks/get_bloom_filter_columns_info/dbc/get_bloom_filter_columns_info.dbc
環境
databricks runtime: 8.3.x-cpu-ml-scala2.12
Python version: 3.8.8
pyspark version: 3.1.2.dev0
手順
事前準備
検証に利用するデータベースとテーブルを作成します。
dbutils.fs.rm("/Filestore/qiita/bloom", True)
%sql
create database if not exists qiita;
drop table if exists qiita.bloomfilter_table;
create table qiita.bloomfilter_table
(
string long
,number int
,date date
)
using delta
location "/Filestore/qiita/bloom"
;
CREATE BLOOMFILTER INDEX
ON TABLE qiita.bloomfilter_table
FOR COLUMNS(string OPTIONS (fpp=0.1, numItems=50000000))
CREATE BLOOMFILTER INDEX
ON TABLE qiita.bloomfilter_table
FOR COLUMNS(number OPTIONS (fpp=0.1, numItems=10000000))
関数を定義
def get_defination(schema, prefix=''):
outstring_format='"name":{0} nullable:{1}'
fields = schema['fields']
list_definations=[]
prefix = prefix+'.' if prefix !='' else ''
for field in fields:
if type(field['type']) is dict:
if field['type']['type'] == 'struct':
list_definations.extend(get_defination(field['type'], prefix+field['name']))
else:
field["name"]=prefix + field["name"]
list_definations.append(field)
else:
field["name"]=prefix + field["name"]
list_definations.append(field)
return list_definations
def get_bloom_filter_columns_info(schema_json):
allcols = get_defination(schema_json)
return list(filter(lambda col: col['metadata'].get('delta.bloomFilter.enabled') is not None, allcols))
階層をフラットにしたDDLのリスト型変数を取得
## dict形式のddl文を取得
table_name = 'qiita.bloomfilter_table'
schema_dict = spark.table(table_name).schema.jsonValue()
not_null_columns_list = get_bloom_filter_columns_info(schema_dict)
利用例)リスト型変数からDDL文を作成
# 利用例
outstring_format='''
CREATE BLOOMFILTER INDEX
ON TABLE {0}
FOR COLUMNS({1} OPTIONS (fpp={2}, numItems={3}))
'''.strip()
for l in not_null_columns_list:
name = l['name']
sample_string = outstring_format.format(table_name,
l['name'],
l['metadata'].get('delta.bloomFilter.fpp'),
l['metadata'].get('delta.bloomFilter.numItems'),
)
print('---ddl_sample---')
print(sample_string)