Processing Petabytes of Data in Seconds with Databricks Delta - The Databricks Blogの翻訳です。
イントロダクション
Databricks Deltaは、クラウド上のデータレイクにおけるデータの信頼性と高速な分析を実現する統合データ管理システムです。本記事においては、Databricks Deltaがどのようにして、ペタバイトオーダーのデータを秒で処理するのかを詳かにします。特にデータスキッピングとZORDERクラスタリングに関して議論します。
これら二つの機能によって、Databricksランタイムが大規模なDeltaテーブルに対して検索クエリーを実行する際に必要となるスキャンの時間を劇的に削減することができ、結果的にランタイム自身の改善およびコスト削減につながります。
これらの機能の実践については、2018年のSpark + AI Summitで、AppleのDominique Brezinski氏によるキーノートスピーチで説明された、サイバーセキュリティにおける脅威への対策の文脈でのデータエンジニア、データサイエンスに対する統合的ソリューションとしてのDatabricks Deltaのユースケースで確認することができます。
データスキッピングとZORDERクラスタリングの使い方
データスキッピングを活用するために必要なことは、Databricks Deltaを使うことだけです。SQLクエリーが実行された場合、あるいはデータセットに対して"カラム 演算子 リテラル"
という形式でフィルタリング操作が行われたときに、自動的にデータスキッピングが実行されます。
-
カラム
はDatabricks Deltaテーブルの属性となります。トップレベル、あるいはネストされたものでも構いません。データタイプは文字列、数字、日付、タイムスタンプとなります。 -
演算子
はバイナリー比較演算子、StartsWith / LIKE ‘パターン%’、あるいは IN <値のリスト>
となります。 -
リテラル
はカラムと同じデータタイプの明示的な値となります。
上記述語に加えて、AND / OR / NOT
もサポートされています。
以下で説明するように、上記の条件に合致する際に常にデータスキッピングは実行されますが、必ずしも常に効果的であるとは限りません。しかし、あなたが頻繁に検索条件に指定するカラムが存在しており、その検索を高速にしたいのであれば、以下のコマンドを実行して明示的にデータレイアウトを最適化することで、データスキッピングの効果を固めることができます。
OPTIMIZE <table> [WHERE <partition_filter>]
ZORDER BY (<column>[, …])
後ほど詳細を説明します。まず初めに、一歩下がって文脈を捉えましょう。
データスキッピングとZORDERクラスタリングの動作原理
これらの機能の一般的なユースケースは、膨大なデータセットに対して「藁山から針を見つけ出す」タイプの検索性能を改善するというものです。RDBMSにおける典型的なソリューションであるセカンダリインデックスは、スケーラビリティの観点からビッグデータでは実用的ではありません。
ビッグデータシステム(Apache Spark/Hive/Impala/Verticaなど)に慣れ親しんでいる方であれば、すでに(水平)パーティショニングのことを考えているかもしれません。
ちょっとした注意 SparkはHiveと同様に、パーティションカラムの一意の値を持つサブディレクトリを作成することでパーティショニング1を行います。パーティションカラムに対するフィルタ操作を行うクエリーは、フィルタに合致しないパーティションのスキャンをスキップする、*パーティションプルーニング(刈り取り)*によって性能が改善されます。
ここでの疑問は「どのカラムでパーティショニングするか?」ということです。そして、典型的な回答は「性能が求められるクエリーにおいて、頻繁に検索条件に指定されるカラム」となります。しかし、もし複数のカラム(例えば4つ)がある場合にはどうでしょうか?
その場合、それらの値の膨大な組み合わせを作成することとなり、結果的に膨大なパーティション、すなわちファイルを生成することになります。一つのデータを大量の小規模のファイルに分割することは、以下の問題を引き起こします:
- データ同様にメタデータも膨大になり、様々なドライバーノードにおけるオペレーションの性能問題を引き起こします。
- 特にファイルの一覧取得が影響を受け、非常に遅くになります。
- 圧縮の効果も限定的となり、無駄な容量、IO速度の低下を招きます。
Sparkにおけるパーティショニングは日付型やカテゴリー変数のカラムには有効に動作しますが、カーディナリティの高いカラムにおいてはあまり有効ではありません。また、パーティションのカラム数も1つか2つにすべきです。
データスキッピング
パーティションプルーニングの他に、データウェアハウスではよく使われますが、現在SparkではサポートしていないテクニックはSmall Materialized Aggregatesに基づくI/Oプルーニングです。簡単に言えば、以下のような考え方です:
- I/Oの単位と同じ粒度での最大値、最小値などのシンプルな統計情報を追跡します。
- 不要なI/Oを避けるためにクエリー計画時に上記統計情報を活用します。
これがまさに、Databricks Deltaのデータスキッピングが行なっていることです。Databricks Deltaテーブルにデータがインサートされると、サポートされている全てのカラム(ネストされたものも含みます)の最小値、最大値の統計情報がファイルレベルで収集されます。そして、テーブルに対して検索クエリーが実行された際には、Databricks Deltaはまず初めにこれらの統計情報を見に行き、どのファイルが安全にスキップできるのかを決定します。とは言え、よく言われるように「百聞はGIFにしかず」ですので、以下のGIFを参照ください。
この手法は実装が容易で、軽量かつ柔軟性がある(粒度をチューニングすることが可能です)ものです。しかし、これはパーティショニングと完全に直交する概念です。それぞれは互いにうまく動作しますし、互いに依存はしません。一方、bloomフィルターのように、データがクラスターを形成していない場合に偽陽性を返す可能性がある統計的インデクシング手法も存在します。これは我々に次の手を提供するものです。
ZOREDERクラスタリング
I/Oプルーニングが効果的に動作するためには、データの最小値、最大値の幅が狭く、理想的には、オーバーラップしないようにクラスターを構成する必要があります。あるポイントに対する検索が参照する最小値、最大値のセットの数を最小化し、スキッピングの効果を最大化します。
時には、データは最初からクラスターを構成していることがあります。単調増加するID、インサート時間に基づくカラム(日付/タイムスタンプ)、パーティションキー(pk_brand_name、model_name)など。そうでない場合、データをインサートする前に明示的なソートやレンジパーティションを指定することができます。
ここで再び、異なる複数のカラム(例:n=4カラム)に対して、同等の重要度・頻度の検索条件から構成されるワークロードを想定します。
この場合、全てのカラムに対する「線形」、すなわち「lexicographic(辞書学)」や「major-minor」ソートを行うことで、最初に指定されたカラムを強力に支援する形で、値を完全にクラスタリングします。しかし、最初のカラムにどれだけ重複する値があるかによりますが、2カラム目以降であまり効果が出ない場合もあります。このため、場合によってはn番目のカラムでクラスターが形成されず、そのカラムに対する検索においてデータスキッピングが動作しない場合があります。
この場合どうしたらいいのでしょうか?より正確に言えば、どうすればそれぞれの次元において同様に効果的なデータスキッピングを実現できるのでしょうか?
考えてみると、我々が探しているのはn次元のデータポイントをデータファイルに割り当てる方法であり、同じファイルに割り当てられるデータポイントは、それぞれの次元において近いものになる方法を探しているのです。言い換えれば、我々は多次元のデータポイントを、局所性を維持したまま一次元の値にマッピングしたいのです。
これはよく知られている問題であり、データベースの世界だけで生じているものではなく、コンピュータグラフィックスやジオハッシングでも生じているものです。これに対する回答は、局所性を維持する空間充填曲線であり、よく知られるものとしては、Z-order、Hilbert曲線があります。
下の図は、データスキッピングを効果的に行うために使用できるZ-orderingをシンプルに図示したものです。凡例は以下の通りです。
灰色の点 = データポイント(例:チェス版の座標)
灰色の箱 = データファイル。この例では1ファイルあたり4ポイントとなっています。
黄色の箱 = クエリーで読み込まれるデータファイル
緑の点 = クエリーのフィルターを通過し、返却されるデータポイント
赤の点 = 読み込まれるがフィルターを通過しないデータポイント。すなわち「偽陽性」。
サイバーセキュリティ分析の事例
さあ、理論は十分です。Spark + AI Summitのキーノートに戻って、サイバーセキュリティの脅威に対するリアルタイム対策を実現するために、Databricks Deltaがどのように活用されたのかを見てみましょう。
あなたは、リアルタイムで包括的なネットワークアクティビティ情報を提供する、有名なオープンソースのネットワークトラフィックアナライザーのBroを使っているとします2。あなたが提供するサービスが有名になると、サービスの使用量が増え、結果的にはBroは大量のアウトプットを生成するようになります。将来データを処理できるように、充分に早いペースでデータをストレージに永続化するというものが、あなたが直面しているビッグデータの課題です。
このタスクを高信頼かつ容易に行えるようにすることが、Databricks Deltaが設計された目的となります。あなたが行うことは、構造化ストリーミングをBroのデータに接続して、日付でバーティショニングされたDatabricks Deltaテーブルに流し込み、合理的なサイズのデータファイルに均等にログレコードが分配されるように、定期的にOPTIMIZEを実行するというものです。しかし、これは本記事の焦点ではありません。説明をわかりやすくするために、均等に分散されたランダムデータから構成される、非ストリーミング、パーティショニング無しのDatabricks Deltaを使用します。
潜在的なサイバー攻撃の脅威に対して、あなたが実行するであろうある種のアドホックなデータ分析は、記録されたネットワークコネクションデータに対するインタラクティブな「点の検索」になるでしょう。例えば、「この疑わしいIPアドレスを含む全てのネットワークアクティビティを全て検索する」と言うものになるでしょう。ここでは、ランダムあるいはサンプリングされたIPアドレスとポートを用いて、単一カラムに対するフィルターから構成される基本的な検索クエリーで、このワークロードをモデル化します。このようなクエリーは、IOに制約を受けます。すなわち、クエリーの実行時間はスキャンされるデータ量に対して、線形に依存します。
どれだけのデータを格納しているのか、どこまでのデータを検索するのかに依存して、場合によっては、このクエリーは数時間を要するテーブルのフルスキャンになり得ます。ここでのゴールは、これらのクエリーの合計実行時間を最小化するということですが、説明をわかりやすくするために、ここでは代わりにスキャンされるレコードの総数をコスト関数として定義します。このメトリックは合計実行時間の近似値になりますし、定義され、決定論的であるというメリットがあり、興味を持った読者が容易に実験を再現できます。
さっそくやってみましょう。これが我々の具体的な作業内容です。
case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)
def randomConnRecord(r: Random) = ConnRecord(
src_ip = randomIPv4(r), src_port = randomPort(r),
dst_ip = randomIPv4(r), dst_port = randomPort(r))
case class TestResult(numFilesScanned: Long, numRowsScanned: Long, numRowsReturned: Long)
def testFilter(table: String, filter: String): TestResult = {
val query = s"SELECT COUNT(*) FROM $table WHERE $filter"
val(result, metrics) = collectWithScanMetrics(sql(query).as[Long])
TestResult(
numFilesScanned = metrics("filesNum"),
numRowsScanned = metrics.get("numOutputRows").getOrElse(0L),
numRowsReturned = result.head)
}
// 全てのフィルターにtestFilter()を実行し、スキップされるレコードの比率を返却します
// データスキッピングの効果の指標として0は悪く、1は良いという結果になります
def skippingEffectiveness(table: String, filters: Seq[String]): Double = { ... }
こちらが、ランダムに生成されたテーブルに対応する100のファイルの作成方法となります。それぞれのファイルには、ランダムな1000レコードが含まれます。これらは、以下のようになっています。
SELECT row_number() OVER (ORDER BY file) AS file_id,
count(*) as numRecords, min(src_ip), max(src_ip), min(src_port),
max(src_port), min(dst_ip), max(dst_ip), min(dst_port), max(dst_port)
FROM (
SELECT input_file_name() AS file, * FROM conn_random)
GROUP BY file
全てのファイルの最小値、最大値のレンジを見ると、ほぼ全てのドメインの値をカバーしているので、ファイルスキッピングの恩恵は少ないということは容易に予想できます。我々の評価関数はそのことを裏付けてくれます。
skippingEffectiveness(connRandom, singleColumnFilters)
効果は0と出ています。データはランダムに生成されたものなので、スキャンされるデータと返却されるデータに相関が存在しません。これは期待通りです。次に明示的なソートをしてみましょう。
spark.read.table(connRandom)
.repartitionByRange($"src_ip", $"src_port", $"dst_ip", $"dst_port")
// or just .sort($"src_ip", $"src_port", $"dst_ip", $"dst_port")
.write.format("delta").saveAsTable(connSorted)
skippingEffectiveness(connRandom, singleColumnFilters)
ふむ、メトリックは改善されましたが、25%であり素晴らしいとは言えない結果です。より詳細に見てみましょう。
val src_ip_eff = skippingEffectiveness(connSorted, srcIPv4Filters)
val src_port_eff = skippingEffectiveness(connSorted, srcPortFilters)
val dst_ip_eff = skippingEffectiveness(connSorted, dstIPv4Filters)
val dst_port_eff = skippingEffectiveness(connSorted, dstPortFilters)
上からは、src_ip
の検索は非常に高速(スキャンされるレコードと返却されるレコードがほぼ一致)である一方、他のカラムにおいてはフルテーブルスキャンが実行されていることがわかります。とは言え、これは驚くことではありません。上で説明したように、これが線形ソートで得られる結果だからです。ソートの結果は最初の次元(この場合src_ip
)においては完璧にクラスタリングが行われますが、それ以外の次元ではそうはなりません。
それではどうしたら良いのでしょうか?ここでZORDERクラスタリングの出番です。
spark.read.table(connRandom)
.write.format("delta").saveAsTable(connZorder)
sql(s"OPTIMIZE $connZorder ZORDER BY (src_ip, src_port, dst_ip, dst_port)")
skippingEffectiveness(connZorder, singleColumnFilters)
線形ソートの25%よりは良い結果になっていると思いませんか?ここでも詳細に見ていきましょう。
val src_ip_eff = skippingEffectiveness(connZorder, srcIPv4Filters)
val src_port_eff = skippingEffectiveness(connZorder, srcPortFilters)
val dst_ip_eff = skippingEffectiveness(connZorder, dstIPv4Filters)
val dst_port_eff = skippingEffectiveness(connZorder, dstPortFilters)
特筆すべき事象がいくつかあります。
- src_ipにおけるスキッピングの効果は線形ソートよりも低下していますが、線形ソートはz-orderingと違い、完全なクラスタリングを行うものですので、これは期待通りと言えます。しかし、他のカラムのスコアは、前回の0からすべて改善しています。
- 多くのカラムでz-order byを行うほど、効果が下がることも予想できます。例えば、
ZORDER BY (src_ip, dst_ip)
は0.82を達成します。ですので、どのようなフィルタリングを行うのかはあなた次第と言えます。
Spark + AI summitで説明された実世界のユースケースにおいては、典型的なWHERE src_ip = x AND dst_ip = y
クエリーにおけるデータスキッピングの効果はさらに高いものでした。504テラバイト(11兆レコード以上)において、データスキッピングのおかげで36.5テラバイトをスキャンするだけで済むようになりました。これは、バイト数にすると92.4%、レコード数にすると**93.2%**の劇的な削減となっています。
まとめ
Databricks DeltaのデータスキッピングとZORDERクラスタリングを活用することで、クエリーに関係のないファイルをスキップし、大規模なクラウドデータレイクのデータを秒で処理できるようになります。実世界のサイバーセキュリティ分析ユースケースにおいては、504テラバイトのデータセットに対する典型的なクエリーにおいて、**93.2%**のレコードがスキップされ、クエリーの実行時間を二桁削減しました。
言い換えると、Databricks Deltaはクエリーのスピードを100倍にまで改善すると言えます。
注意: 過去に行われた別のプレビューでは、データスキッピング機能はDatabricks Deltaの外でも独立した選択肢として提供されていました。このオプションは近い将来廃止される予定です。データスキッピングを活用する際には、Databricks Deltaへの移行を強くお勧めします。
参考情報
- Databricks Delta製品ページ
- Databricksクイックスタートガイド - Qiita
- Delta Lakeクイックスタートガイド - Qiita
- Databricks Delta ユーザーガイド(英語)(AWS/Azure)
- Databricks Deltaに関するブログ記事(英語)
オープンソースのDelta Lakeに興味がありますか?
Delta Lakeのオンラインハブで最新コードのダウンロード、Delta Lakeコミュニティへの参加が可能です。
Databricks 無料トライアル
-
具体的には
write.partitionBy()
のことを指しています。RDDのパーティションと混同しないでください。 ↩ -
どのようなものかは、www.secrepo.comで提供されているサンプルBroデータを参照してください。 ↩