0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

SparkのPartition FilteringとFilter Pushdownを試してみた

Posted at

背景・目的

こちらの、AWS Glue ETL パフォーマンス・チューニング② チューニングパターン編を読んだ際に、読み取りデータ量を削減する機能の紹介として下記が紹介されてました。

  • Partition Filtering
  • Filter Pushdown

今回は、こちらを実際に試してみました。

まとめ

  • Partition Filteringは、パーティション列を対象としており、パーティション配下のファイルを読み込む機能
  • Filter Pushdownは、パーティション列に指定されていない列を対象としており、該当するブロックのみ絞り込む機能

概要

AWS Glue ETL パフォーマンス・チューニング② チューニングパターン編には、下記の説明があります。

Partition Filtering

  • filter句やWhere句で指定されたパーティション内のファイルのみを読み取る機能
  • Text/CSV/JSON/ORC/Parquetで利用可能

Filter Pushdown

  • パーティション列に利用されていない列に対するfilter句やwhere句にヒットするブロックのみを読み取る機能
  • AWS GlueではParquetを利用した場合に自動的に適用される

実践

事前準備

データ準備

データをパーティションありと、パーティションなしで2つ準備します。

パーティションあり

image.png

image.png

パーティションなし

image.png

image.png

Partition Filteringを確認

フィルタなしで、パーティションありのデータを読み込む

  1. フィルタなしでデータと実行計画を確認します。

    from pyspark.sql import SparkSession,types as T,functions as F
    
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    df = spark.read.parquet("s3://xxxx/input/partition-test/")
    df.show()
    df.explain()
    
    
  2. 結果を確認します。

    • 結果は当然絞られません。
    • PartitionFilters:[]のままです。
    +---+------+---+
    | id| value| pk|
    +---+------+---+
    | 10|test10| 10|
    |  6| test6|  6|
    |  1| test1|  1|
    |  3| test3|  3|
    |  4| test4|  4|
    |  8| test8|  8|
    |  2| test2|  2|
    |  7| test7|  7|
    |  9| test9|  9|
    |  5| test5|  5|
    +---+------+---+
    
    == Physical Plan ==
    *(1) ColumnarToRow
    +- FileScan parquet [id#85L,value#86,pk#87] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/partition-test], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,value:string>
    

フィルタありで、パーティションありのデータを読み込む

  1. フィルタありでデータと実行計画を確認します。
    from pyspark.sql import SparkSession,types as T,functions as F
    
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    df = spark.read.parquet("s3://XXXX/input/partition-test/").filter(F.col('pk')==1)
    df.show()
    df.explain()
    
  2. 結果を確認します。
    • 結果が絞られています。
    • PartitionFilters: [isnotnull(pk#119), (pk#119 = 1)]になっています。
    +---+-----+---+
    | id|value| pk|
    +---+-----+---+
    |  1|test1|  1|
    +---+-----+---+
    
    == Physical Plan ==
    *(1) ColumnarToRow
    +- FileScan parquet [id#117L,value#118,pk#119] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/partition-test], PartitionFilters: [isnotnull(pk#119), (pk#119 = 1)], PushedFilters: [], ReadSchema: struct<id:bigint,value:string>
    

フィルタありで、パーティションありのデータを読み込む。(パーティションキーではない条件)

  1. フィルタありでデータと実行計画を確認します。

    from pyspark.sql import SparkSession,types as T,functions as F
    
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    df = spark.read.parquet("s3://XXXX/input/partition-test/").filter(F.col('id')==1)
    df.show()
    df.explain()
    
  2. 結果を確認します。

    • 結果が絞られています。
    • PartitionFilters:[]のままです。
    +---+-----+---+
    | id|value| pk|
    +---+-----+---+
    |  1|test1|  1|
    +---+-----+---+
    
    == Physical Plan ==
    *(1) Filter (isnotnull(id#137L) AND (id#137L = 1))
    +- *(1) ColumnarToRow
       +- FileScan parquet [id#137L,value#138,pk#139] Batched: true, DataFilters: [isnotnull(id#137L), (id#137L = 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/partition-test], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1)], ReadSchema: struct<id:bigint,value:string>
    

フィルタなしで、パーティションなしのデータを読み込む

  1. フィルタなしでデータと実行計画を確認します。
    from pyspark.sql import SparkSession,types as T,functions as F
    
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    df = spark.read.parquet("s3://XXXX/input/partition-test2/")
    df.show()
    df.explain()
    
  2. 結果を確認します。
    • 結果は当然絞られません。
    • PartitionFilters:[]のままです。
    +---+---+------+
    | id| pk| value|
    +---+---+------+
    | 10| 10|test10|
    |  1|  1| test1|
    |  2|  2| test2|
    |  3|  3| test3|
    |  4|  4| test4|
    |  5|  5| test5|
    |  6|  6| test6|
    |  7|  7| test7|
    |  8|  8| test8|
    |  9|  9| test9|
    +---+---+------+
    
    == Physical Plan ==
    *(1) ColumnarToRow
    +- FileScan parquet [id#157L,pk#158L,value#159] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/partition-test2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,pk:bigint,value:string>
    

フィルタありで、パーティションなしのデータを読み込む

  1. フィルタありでデータと実行計画を確認します。
    from pyspark.sql import SparkSession,types as T,functions as F
    
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    df = spark.read.parquet("s3://XXXX/input/partition-test2/").filter(F.col('pk')==1)
    df.show()
    df.explain()
    
  2. 結果を確認します。
    • 結果は絞られています。
    • PartitionFilters:[]のままです。
    +---+---+-----+
    | id| pk|value|
    +---+---+-----+
    |  1|  1|test1|
    +---+---+-----+
    
    == Physical Plan ==
    *(1) Filter (isnotnull(pk#178L) AND (pk#178L = 1))
    +- *(1) ColumnarToRow
       +- FileScan parquet [id#177L,pk#178L,value#179] Batched: true, DataFilters: [isnotnull(pk#178L), (pk#178L = 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/partition-test2], PartitionFilters: [], PushedFilters: [IsNotNull(pk), EqualTo(pk,1)], ReadSchema: struct<id:bigint,pk:bigint,value:string>
    

Filter Pushdownを確認

フィルタありでデータと実行計画を確認します。

  1. パーティションキーではない列でfilterをします。
    from pyspark.sql import SparkSession,types as T,functions as F
    
    spark = SparkSession \
            .builder \
            .appName("Python example") \
            .getOrCreate()
    
    df = spark.read.parquet("s3://XXXX/input/partition-test2/").filter(F.col('id')==1)
    df.show()
    df.explain()
    
  2. 結果を確認します。
    • 結果は絞られています。
    • PushedFilters: [IsNotNull(id), EqualTo(id,1)]になっています。
    +---+---+-----+
    | id| pk|value|
    +---+---+-----+
    |  1|  1|test1|
    +---+---+-----+
    
    == Physical Plan ==
    *(1) Filter (isnotnull(id#217L) AND (id#217L = 1))
    +- *(1) ColumnarToRow
       +- FileScan parquet [id#217L,pk#218L,value#219] Batched: true, DataFilters: [isnotnull(id#217L), (id#217L = 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/partition-test2], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1)], ReadSchema: struct<id:bigint,pk:bigint,value:string>
    

考察

今回、Partition FilteringとFilter Pushdownを試してみました。実行計画で変わっていることが確認できました。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?