こちらのサンプルノートブックをウォークスルーします。
翻訳版のノートブックです。
イントロダクション
Bloomフィルターは、偽陽性の確率を用いてセットにキーがあるかどうか、セットにアイテムが あるかもしれない ことを教えてくれます。インデックスで使用すると、Bloomフィルターは他のテクニックでは高速化できない様なフィールドに対して「藁山から針を探す」ようなクエリーの高速化に役立ちます。
このノートブックでは以下のことを行います:
- テーブルをセットアップし、テーブルにBloomフィルターを設定し、テーブルをハッシュ値で埋めます。
- 「藁山から針を探す」クエリーを実行します:
- インデックスが作成されていないカラム
- インデックスが作成されているカラム
- 結果が0になることが予想されるインデックスが作成されたカラム(藁山に針がない場合にどうなるか)
- サンプルデータセットのクリーンアップ
テーブル、インデックスの作成、データのロード
Bloomフィルターインデックスの有効化
注意
デフォルトで以下の設定は有効になっていますが、デモのため明示的に設定しています。
SQL
SET spark.databricks.io.skipping.bloomFilter.enabled = true;
ユーザー間でデータベース名やファイルパスが重複しない様にします。
Python
%python
import re
from pyspark.sql.types import *
# Username を取得
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
# Username の英数字以外を除去し、全て小文字化。Username をファイルパスやデータベース名の一部で使用可能にするため。
username = re.sub('[^A-Za-z0-9]+', '', username_raw).lower()
# データベース名
db_name = f"bloom_filter_{username}"
# パス
data_path = f"dbfs:/tmp/{username_raw}/bloom_test"
# Hiveメタストアのデータベースの準備:データベースの作成
spark.sql(f"DROP DATABASE IF EXISTS {db_name} CASCADE")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
# Hiveメタストアのデータベースの選択
spark.sql(f"USE {db_name}")
print("database name: " + db_name)
テーブルの作成
Python
%python
spark.sql(f"""
CREATE OR REPLACE TABLE bloom_test (
id BIGINT NOT NULL,
str1 STRING NOT NULL,
sha STRING NOT NULL,
sha1 STRING NOT NULL,
sha2_256 STRING NOT NULL,
row_hash_too_big STRING NOT NULL,
row_hash STRING NOT NULL
)
USING DELTA
LOCATION '{data_path}'
""")
データを追加する前にインデックスを作成
SQL
CREATE BLOOMFILTER INDEX
ON TABLE bloom_test
FOR COLUMNS(sha OPTIONS (fpp=0.1, numItems=50000000))
データ生成
SQL
TRUNCATE TABLE bloom_test;
WITH sample (
SELECT
id,
'windows.exe' as str1,
monotonically_increasing_id() mono_id,
hash(id) hash,
sha (cast(id % 50000000 as string)) sha,
sha1(cast(id % 50000000 as string)) sha1,
sha2(cast(id as string), 256) sha2_256
from
RANGE(0, 1000000000, 1, 448) -- start, end, step, numPartitions
)
INSERT INTO bloom_test
SELECT id,
str1,
sha,
sha1,
sha2_256,
sha2(concat_ws('||',id, str1, mono_id, hash, sha, sha1, sha2_256),512) row_hash_too_big,
sha2(concat_ws('||',id, str1, mono_id, hash, sha, sha1, sha2_256),256) row_hash
FROM sample
-- LIMIT 20000
以下の様なデータがインサートされます。10億レコードのテーブルとなります。
Z-orderingしておきます。
SQL
SET spark.databricks.delta.optimize.maxFileSize = 1610612736;
OPTIMIZE bloom_test
ZORDER BY id
物理テーブルとインデックスを確認
Python
%python
display(dbutils.fs.ls(data_path))
Python
%python
display(dbutils.fs.ls(f"{data_path}/_delta_index"))
SQL
DESCRIBE EXTENDED bloom_test
テストクエリーの実行
こちらは8コアのワーカー、最大10台のオートスケール構成のクラスターを使用しています。
同じファイルに存在しないであろうハッシュ値の検索
SQL
SELECT * FROM bloom_test WHERE id in ( 0, 1, 999999998, 999999999)
Bloomフィルターのインデックスが無いカラムに対するクエリー
SQL
SELECT count(*) FROM bloom_test WHERE sha1 = 'b2f9544427aed7b712b209fffc756c001712b7ca'
Bloomフィルターのインデックスがあるカラムに対するクエリー
SQL
SELECT count(*) FROM bloom_test WHERE sha = 'b6589fc6ab0dc82cf12099d1c2d40ab994e8410c'
Bloomフィルターのインデックスに対して存在しないものを検索
ハッシュ値の末尾に_
を追加しています。
SQL
SELECT count(*) FROM bloom_test WHERE sha = 'b6589fc6ab0dc82cf12099d1c2d40ab994e8410_'
このように10億レコードという比較的大きな規模のテーブルに対しても、Bloomフィルターインデックスを設定することで、「藁山から針を探す(needle in a haystack)」ようなクエリーを高速に処理できる様になります。