2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SparkのEXPLAINによる論理計画・物理計画の確認

Last updated at Posted at 2024-03-03

2024/4/12に翔泳社よりApache Spark徹底入門を出版します!

書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter12/12-1 Dataset and SQL Explainとなります。

翻訳ノートブックのリポジトリはこちら。

ノートブックはこちら

EXPLAINのマニュアルはこちらです。

The EXPLAIN statement is used to provide logical/physical plans for an input statement. By default, this clause provides information about a physical plan only.

EXPLAIN文は入力された文の論理/物理計画を提供するために使用されます。デフォルトでは、この句は物理計画のみに関する情報を提供します。

テキストファイルを読み込んでSparkという単語を含む行の数をカウントしています。

strings = spark.read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")
filtered = strings.filter(strings.value.contains("Spark"))
filtered.count()

simple

デフォルトはsimple

filtered.explain(mode="simple")

物理計画(Physical Plan)のみが表示されます。

== Physical Plan ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>
  • FileScan text
  • DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)]

などからどのような処理が行われているのかがわかります。

extended

パースされた論理計画、解析された論理計画、最適化された論理計画、そして、物理計画を生成します。パースされた論理計画はクエリーから抽出された未解決の計画です。解析された論理計画は未解決の属性や未解決のリレーションを完全に型付けされたオブジェクトに変換します。最適化された論理計画は一連の最適化ルールを通じて変換され、物理計画になります。

filtered.explain(mode="extended")
== Parsed Logical Plan ==
'Filter 'contains(value#1244, Spark)
+- Relation [value#1244] text

== Analyzed Logical Plan ==
value: string
Filter Contains(value#1244, Spark)
+- Relation [value#1244] text

== Optimized Logical Plan ==
Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- Relation [value#1244] text

== Physical Plan ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>

Parsed Logical PlanからAnalyzed Logical Plan、Optimized Logical Planを見ていくとどのように最適化されているのかがわかります。このケースでは、フィルタ条件にisnotnullが追加されています。

cost

計画のノードの統計情報がある場合には、論理計画と統計情報を生成します。

filtered.explain(mode="cost")
== Optimized Logical Plan ==
Filter (isnotnull(value#1244) AND Contains(value#1244, Spark)), Statistics(sizeInBytes=3.3 KiB, ColumnStat: N/A)
+- Relation [value#1244] text, Statistics(sizeInBytes=3.3 KiB, ColumnStat: N/A)

== Physical Plan ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>

この場合ですと、Statistics(sizeInBytes=3.3 KiB, ColumnStat: N/A)と読み込んでいるファイルのサイズが表示されています。

codegen

文に対応するコードと物理計画を生成します。

filtered.explain(mode="codegen")
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:138; maxConstantPoolSize:105(0.16% used); numInnerClasses:0) ==
*(1) Filter (isnotnull(value#1244) AND Contains(value#1244, Spark))
+- FileScan text [value#1244] Batched: false, DataFilters: [isnotnull(value#1244), Contains(value#1244, Spark)], Format: Text, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md], PartitionFilters: [], PushedFilters: [IsNotNull(value), StringContains(value,Spark)], ReadSchema: struct<value:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input_0;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 011 */
/* 012 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */     this.references = references;
/* 014 */   }
/* 015 */
/* 016 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 017 */     partitionIndex = index;
/* 018 */     this.inputs = inputs;
/* 019 */     inputadapter_input_0 = inputs[0];
/* 020 */     filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 021 */
/* 022 */   }
/* 023 */
/* 024 */   protected void processNext() throws java.io.IOException {
/* 025 */     while ( inputadapter_input_0.hasNext()) {
/* 026 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 027 */
/* 028 */       do {
/* 029 */         final boolean inputadapter_isNull_0 = (inputadapter_row_0.isNullAt(0));
/* 030 */         final UTF8String inputadapter_value_0 = (inputadapter_isNull_0 ? null : (inputadapter_row_0.getUTF8String(0)));
/* 031 */
/* 032 */         if (!!inputadapter_isNull_0) continue;
/* 033 */
/* 034 */         if (!(inputadapter_value_0.contains(((UTF8String) references[1] /* literal */)))) continue;
/* 035 */
/* 036 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 037 */
/* 038 */         filter_mutableStateArray_0[0].reset();
/* 039 */
/* 040 */         filter_mutableStateArray_0[0].write(0, inputadapter_value_0);
/* 041 */         append((filter_mutableStateArray_0[0].getRow()));
/* 042 */
/* 043 */       } while(false);
/* 044 */       if (shouldStop()) return;
/* 045 */     }
/* 046 */   }
/* 047 */
/* 048 */ }

formatted

物理計画の概要とノードの詳細の2つのセクションを生成します。

filtered.explain(mode="formatted")
== Physical Plan ==
* Filter (2)
+- Scan text  (1)


(1) Scan text 
Output [1]: [value#1244]
Batched: false
Location: InMemoryFileIndex [dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md]
PushedFilters: [IsNotNull(value), StringContains(value,Spark)]
ReadSchema: struct<value:string>

(2) Filter [codegen id : 1]
Input [1]: [value#1244]
Condition : (isnotnull(value#1244) AND Contains(value#1244, Spark))

SQLでのEXPLAIN

strings.createOrReplaceTempView("tmp_spark_readme")
%sql
EXPLAIN FORMATTED SELECT * FROM tmp_spark_readme WHERE value LIKE '%Spark%'
== Physical Plan ==
* Filter (2)
+- Scan text  (1)


(1) Scan text 
Output [1]: [value#1244]
Batched: false
Location: InMemoryFileIndex [dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md]
PushedFilters: [IsNotNull(value), StringContains(value,Spark)]
ReadSchema: struct<value:string>

(2) Filter [codegen id : 1]
Input [1]: [value#1244]
Condition : (isnotnull(value#1244) AND Contains(value#1244, Spark))

デバッグやパフォーマンスチューニングなどで活用できそうです。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?