2024/4/12に翔泳社よりApache Spark徹底入門を出版します!
書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter12/12-1 Dataset and SQL Explain
となります。
翻訳ノートブックのリポジトリはこちら。
ノートブックはこちら
EXPLAINのマニュアルはこちらです。
The EXPLAIN statement is used to provide logical/physical plans for an input statement. By default, this clause provides information about a physical plan only.
EXPLAIN文は入力された文の論理/物理計画を提供するために使用されます。デフォルトでは、この句は物理計画のみに関する情報を提供します。
テキストファイルを読み込んでSpark
という単語を含む行の数をカウントしています。
strings = spark.read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")
filtered = strings.filter(strings.value.contains("Spark"))
filtered.count()
simple
デフォルトはsimple
。
filtered.explain(mode="simple")
物理計画(Physical Plan)のみが表示されます。
== Physical Plan ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>
- FileScan text
- DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)]
などからどのような処理が行われているのかがわかります。
extended
パースされた論理計画、解析された論理計画、最適化された論理計画、そして、物理計画を生成します。パースされた論理計画はクエリーから抽出された未解決の計画です。解析された論理計画は未解決の属性や未解決のリレーションを完全に型付けされたオブジェクトに変換します。最適化された論理計画は一連の最適化ルールを通じて変換され、物理計画になります。
filtered.explain(mode="extended")
== Parsed Logical Plan ==
'Filter 'contains(value#1244, Spark)
+- Relation [value#1244] text
== Analyzed Logical Plan ==
value: string
Filter Contains(value#1244, Spark)
+- Relation [value#1244] text
== Optimized Logical Plan ==
Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- Relation [value#1244] text
== Physical Plan ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>
Parsed Logical PlanからAnalyzed Logical Plan、Optimized Logical Planを見ていくとどのように最適化されているのかがわかります。このケースでは、フィルタ条件にisnotnull
が追加されています。
cost
計画のノードの統計情報がある場合には、論理計画と統計情報を生成します。
filtered.explain(mode="cost")
== Optimized Logical Plan ==
Filter (isnotnull(value#1244) AND Contains(value#1244, Spark)), Statistics(sizeInBytes=3.3 KiB, ColumnStat: N/A)
+- Relation [value#1244] text, Statistics(sizeInBytes=3.3 KiB, ColumnStat: N/A)
== Physical Plan ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>
この場合ですと、Statistics(sizeInBytes=3.3 KiB, ColumnStat: N/A)
と読み込んでいるファイルのサイズが表示されています。
codegen
文に対応するコードと物理計画を生成します。
filtered.explain(mode="codegen")
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:138; maxConstantPoolSize:105(0.16% used); numInnerClasses:0) ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator inputadapter_input_0;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 011 */
/* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */ this.references = references;
/* 014 */ }
/* 015 */
/* 016 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 017 */ partitionIndex = index;
/* 018 */ this.inputs = inputs;
/* 019 */ inputadapter_input_0 = inputs[0];
/* 020 */ filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 021 */
/* 022 */ }
/* 023 */
/* 024 */ protected void processNext() throws java.io.IOException {
/* 025 */ while ( inputadapter_input_0.hasNext()) {
/* 026 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 027 */
/* 028 */ do {
/* 029 */ final boolean inputadapter_isNull_0 = (inputadapter_row_0.isNullAt(0));
/* 030 */ final UTF8String inputadapter_value_0 = (inputadapter_isNull_0 ? null : (inputadapter_row_0.getUTF8String(0)));
/* 031 */
/* 032 */ if (!!inputadapter_isNull_0) continue;
/* 033 */
/* 034 */ if (!(inputadapter_value_0.contains(((UTF8String) references[1] /* literal */)))) continue;
/* 035 */
/* 036 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 037 */
/* 038 */ filter_mutableStateArray_0[0].reset();
/* 039 */
/* 040 */ filter_mutableStateArray_0[0].write(0, inputadapter_value_0);
/* 041 */ append((filter_mutableStateArray_0[0].getRow()));
/* 042 */
/* 043 */ } while(false);
/* 044 */ if (shouldStop()) return;
/* 045 */ }
/* 046 */ }
/* 047 */
/* 048 */ }
formatted
物理計画の概要とノードの詳細の2つのセクションを生成します。
filtered.explain(mode="formatted")
== Physical Plan ==
* Filter (2)
+- Scan text (1)
(1) Scan text
Output [1]: [value#1244]
Batched: false
Location: InMemoryFileIndex [dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md]
PushedFilters: [IsNotNull(value), StringContains(value,Spark)]
ReadSchema: struct<value:string>
(2) Filter [codegen id : 1]
Input [1]: [value#1244]
Condition : (isnotnull(value#1244) AND Contains(value#1244, Spark))
SQLでのEXPLAIN
strings.createOrReplaceTempView("tmp_spark_readme")
%sql
EXPLAIN FORMATTED SELECT * FROM tmp_spark_readme WHERE value LIKE '%Spark%'
== Physical Plan ==
* Filter (2)
+- Scan text (1)
(1) Scan text
Output [1]: [value#1244]
Batched: false
Location: InMemoryFileIndex [dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md]
PushedFilters: [IsNotNull(value), StringContains(value,Spark)]
ReadSchema: struct<value:string>
(2) Filter [codegen id : 1]
Input [1]: [value#1244]
Condition : (isnotnull(value#1244) AND Contains(value#1244, Spark))
デバッグやパフォーマンスチューニングなどで活用できそうです。