Handling large queries in interactive workflows | Databricks on AWS [2021/6/11時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
インタラクティブなデータワークフローにおける課題は大規模クエリーの取り扱いです。これには、大量の行を出力するクエリー、大量の外部パーティションのフェッチ、極端に大きいデータセットの計算などが含まれます。これらのクエリーは極端に遅くなり、クラスターのリソースを食い尽くし、他の人とのクラスターの共有を困難にします。
クエリーウォッチドッグ(Query Watchdog)は、大規模クエリーの最も一般的な原因を検証し、閾値を超えるクエリーを停止させることで、クラスター資源の占有を防ぐプロセスです。本書では、クエリーウォッチドッグの有効化、設定方法を説明します。
重要!
UIを用いて作成されたall-purposeクラスターでは、クエリーウォッチドッグは有効化されています。
破壊的クエリーの例
あるアナリストがジャストインタイムのデータウェアハウスでアドホックなクエリーを実行しています。アナリストは、同時に複数のユーザーが同じクラスターを使用する共有オートスケーリングクラスターを使用しています。百万行の行を持つ二つのテーブルを考えてみます。
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_x")
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_y")
これらのテーブルサイズはApache Sparkで対応可能なものです。しかし、それぞれのテーブルには、各行に空の文字列のjoin_key
カラムが含まれています。これは、データが完全にクリーンではない、あるいは、いくつかのキーが他のキーよりも大勢を占めており偏りが発生している場合に起こり得ることです。これらの空のジョインキーは他のいかなる値よりも大勢を占めています。
以下のコードでは、アナリストはこれらのキーを用いて二つのテーブルをジョインしており、(キー" "を受け取る)一台のエグゼキューターで1兆のアウトプットを生成することになります。
SELECT
id, count()
FROM
(SELECT
x.id
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key)
GROUP BY id
このクエリーは動作しているように見えます。しかし、アナリストはデータに対する理解なしに、ジョブの実行原因を引き起こした単一のタスク「のみ」を見ています。クエリーは決して完了せず、アナリストのフラストレーションは増加し、どうして動作しないのかについて混乱することになります。
この場合、一つの問題のあるジョインキーのみです。別のケースではより多くの問題があることでしょう。
クエリーウォッチドッグの有効化
入力行に対して多すぎる出力行を生成するクエリーを防ぐために、クエリーウォッチドッグを有効化し、入力行の倍数として出力行の最大数を設定することができます。この例では、比率として1000(デフォルト)を使用します。
spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)
設定の二行目では、いかなるタスクも入力行の1000倍の出力行を生成してはならないということを宣言しています。
ティップス
出力の比率は完全にカスタマイズ可能です。皆様のチームにおいては、少ない値からスタートし閾値がどのように動作するのかをみていくことをお勧めします。1,000から10,000からスタートすることをお勧めします。
クエリーウォッチドッグは、決して終了しないジョブがクラスターのリソースを占有することを防ぐだけでなく、決して終了しないクエリーを早い段階で失敗させることで時間を節約することができます。例えば、以下のクエリーは閾値を超過するので数分後に失敗します。
SELECT
join_key,
sum(x.id),
count()
FROM
(SELECT
x.id,
y.join_key
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key)
GROUP BY join_key
通常はクエリーウォッチドッグを有効化し、入出力の比率を指定するだけで十分ですが、追加で二つのプロパティspark.databricks.queryWatchdog.minTimeSecs
、spark.databricks.queryWatchdog.minOutputRows
を設定することができます。これらのプロパティでは、特定のタスクをキャンセルする前に最低実行すべき時間とクエリーにおけるタスクが生成すべき行の最小値を指定します。
例えば、タスクごとにより多くの行を生成するチャンスを与えるために、大きいminTimeSecs
を設定するコオができます。同様に、クエリーが1000万行を生成した後にのみクエリーを停止したい場合には、spark.databricks.queryWatchdog.minOutputRows
を1000万に設定することができます。入出力比を超過したとしても、これらより少ない場合にはクエリーは成功します。
spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)
ティップス
ノートブックでクエリーウォッチドッグの設定を行なった場合、クラスターを再起動した際には設定は永続化されません。クラスターを使用している全てのユーザーに適用したい場合には、クラスター設定を使用することをお勧めします。
極大データセットに対するクエリーの検知
典型的な大規模クエリーの別の例としては、大規模テーブル・データセットに対するスキャンが挙げられます。このスキャンは長い時間を要し、クラスターの資源を消費します(大規模なHiveテーブルのメタデータの読み込みでも大量のリソースを必要とする場合があります)。大規模Hiveテーブルから多数のパーティションのフェッチを防ぐためにmaxHivePartitions
を設定することができます。同様に、極大データセットに対するクエリーを制限するためにmaxQueryTasks
を設定することができます。
spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)
いつクエリーウォッチドッグを有効化すべきか?
クエリーウォッチドッグは、SQLアナリストとデータサイエンティストが特定のクラスターを共有しており、管理者が彼らのクエリーが互いに「適切に計画」されることを保証したい場合には有効化すべきです。
いつクエリーウォッチドッグを無効化すべきか?
通常のETLシナリオにおいては、エラーを修正するのは人間ではないため、ETLで用いられるクエリーを積極的にキャンセルすることはあまりお勧めしません。アドホックな分析用クラスター以外では、クエリーウォッチドッグを無効化することをお勧めします。