背景・目的
- SparkでETL処理を行うときに、Parquetファイルを入出力することは多い
- Parquetファイルでは、Array型の中にStruct型が入っていても、内部的には別々の列として保持している
- しかし、単純なSparkのreadだと、Array型丸ごと取ってきてしまい、Parquetでせっかく別々の列として分けてくれているのに、非効率になってしまう
- Array型の中にStruct型が入っているケースで、必要なフィールドだけ取ってくる方法について調べてみた
Array型の中にStruct型があるってどういうこと?
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
schema = StructType([
StructField("a", IntegerType(), True),
StructField("b", ArrayType(StructType([
StructField("c", IntegerType(), True),
StructField("d", StringType(), True),
StructField("e", IntegerType(), True),
]), True))
]
)
df = spark.createDataFrame([
{'a': 1,
'b': [
{
'c': 1,
'd': "a",
'e': 1,
},
{
'c': 3,
'd': "b",
'e': 1,
}
]}
], schema)
df.printSchema()
df.write.parquet('data.parquet')
例えば、上記のようなデータの場合に、型としては、下記のようになる。
root
|-- a: integer (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- c: integer (nullable = true)
| | |-- d: string (nullable = true)
| | |-- e: integer (nullable = true)
この場合、Parquet内部では、下記のように格納されているはず
列 | 型 |
---|---|
a | int |
b.c | (repeated)int |
b.d | (repeated)byte_array |
b.e | (repeated)int |
読み込む方法について、調べてみた
読み込む方法
1. そのまま読む
b.cにアクセスするときに、単純にselect('b.c')
として読むことができる
# data.parquetは上記のコードで生成されたもの
df = spark.read.parquet('data.parquet').select('a', 'b.c', 'b.d')
df.explain()
df.printSchema()
df.show()
== Physical Plan ==
*(1) Project [a#465, b#466.c AS c#469, b#466.d AS d#470]
+- FileScan parquet [a#465,b#466] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/data.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:array<struct<c:int,d:string>>>
root
|-- a: integer (nullable = true)
|-- c: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- d: array (nullable = true)
| |-- element: string (containsNull = true)
+---+------+------+
| a| c| d|
+---+------+------+
| 1|[1, 3]|[a, b]|
+---+------+------+
このようにデータを取得した場合、b.c,b.d
は列をStructとしてまとめることはできず、別々の独立した列として取得することになる
2. arrays_zipを使って構造を維持
下記のようにarrays_zip関数を使えば、array型の中のStruct型を再現できる
df = spark.read.parquet('data.parquet').select(F.arrays_zip('b.c', 'b.d'))
df.explain()
df.printSchema()
df.show()
== Physical Plan ==
*(1) Project [arrays_zip(b#53.c, b#53.d, c, d) AS arrays_zip(b.c, b.d)#58]
+- FileScan parquet [b#53] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/data.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<b:array<struct<c:int,d:string>>>
root
|-- arrays_zip(b.c, b.d): array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- c: integer (nullable = true)
| | |-- d: string (nullable = true)
+--------------------+
|arrays_zip(b.c, b.d)|
+--------------------+
| [{1, a}, {3, b}]|
+--------------------+
もう少し複雑なケースについて調べてみる
schema = StructType([
StructField("a",
ArrayType(StructType([
StructField("b", IntegerType(), True),
StructField("c",
ArrayType(StructType([
StructField("d", IntegerType(), True),
StructField("e",
ArrayType(StructType([
StructField("f", IntegerType(), True),
StructField("g", FloatType(), True),
]), True), True
),
]), True), True
),
]), True), True
),
])
df = spark.createDataFrame([
{
'a': [
{
'b': 1,
'c': [
{
'd': 1,
'e': [
{
'f': 1,
'g': 1.0
}
]
}
]
},
]
},
], schema)
df.printSchema()
df.write.parquet('data2.parquet')
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- b: integer (nullable = true)
| | |-- c: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- d: integer (nullable = true)
| | | | |-- e: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- f: integer (nullable = true)
| | | | | | |-- g: float (nullable = true)
この場合、Parquetの内部は下記のようになっているはず・・。
列 | 型 |
---|---|
a.b | (repeated)int |
a.c.d | (repeated)(repeated)int |
a.c.e.f | (repeated)(repeated)(repeated)int |
a.c.e.g | (repeated)(repeated)(repeated)float |
1. そのまま読むケース
a.c.d
のように二次元(以上の)配列を読もうとするとエラー
df = spark.read.parquet('data2.parquet').select('a.c.d')
AnalysisException: cannot resolve 'a.`c`['d']' due to data type mismatch: argument 2 requires integral type, however, ''d'' is of string type.;
'Project [a#427.c[d] AS d#429]
+- Relation [a#427] parquet
transformを使う必要があった
df = spark.read.parquet('data2.parquet').select(F.transform('a.c', lambda x: x['d']))
df.explain()
df.printSchema()
df.show()
== Physical Plan ==
Project [transform(a#430.c, lambdafunction(lambda x_0#433.d, lambda x_0#433, false)) AS transform(a.c, lambdafunction(lambda x_0#433.d, namedlambdavariable()))#434]
+- FileScan parquet [a#430] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/data2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:array<struct<c:array<struct<d:int,e:array<struct<f:int,g:float>>>>>>>
root
|-- transform(a.c, lambdafunction(lambda x_0#433.d, namedlambdavariable())): array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: integer (containsNull = true)
+-----------------------------------------------------------------------+
|transform(a.c, lambdafunction(lambda x_0#433.d, namedlambdavariable()))|
+-----------------------------------------------------------------------+
| [[1]]|
+-----------------------------------------------------------------------+
3次元配列を読む場合は、
df = spark.read.parquet('data2.parquet').select(F.transform('a.c', lambda x: F.transform(x['e'], lambda y: y['f'])))
df.explain()
df.printSchema()
df.show()
== Physical Plan ==
Project [transform(a#456.c, lambdafunction(transform(lambda x_5#459.e, lambdafunction(lambda x_6#460.f, lambda x_6#460, false)), lambda x_5#459, false)) AS transform(a.c, lambdafunction(transform(lambda x_5#459.e, lambdafunction(lambda x_6#460.f, namedlambdavariable())), namedlambdavariable()))#461]
+- FileScan parquet [a#456] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/data2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:array<struct<c:array<struct<d:int,e:array<struct<f:int,g:float>>>>>>>
root
|-- transform(a.c, lambdafunction(transform(lambda x_5#459.e, lambdafunction(lambda x_6#460.f, namedlambdavariable())), namedlambdavariable())): array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: array (containsNull = true)
| | | |-- element: integer (containsNull = true)
+-------------------------------------------------------------------------------------------------------------------------------------------+
|transform(a.c, lambdafunction(transform(lambda x_5#459.e, lambdafunction(lambda x_6#460.f, namedlambdavariable())), namedlambdavariable()))|
+-------------------------------------------------------------------------------------------------------------------------------------------+
| [[[1]]]|
+-------------------------------------------------------------------------------------------------------------------------------------------+
一応読めるものの、二次元配列・三次元配列の場合でも、ReadSchema: struct<a:array<struct<c:array<struct<d:int,e:array<struct<f:int,g:float>>>>>>>
となっていて、使っていない余計な列も読み込んでしまっているのが残念なところ・・
内部的には、a.c
を全部読み込んで、後処理で絞り込みをしているということだろうな・・
まとめ
1次元配列までであれば、列の絞り込みが効くということがわかった。
データを保存するときも、この点に気をつけることでパフォーマンスを改善できそう。
また、二次元配列以下の列についても読み込み時に絞り込みできる方法が分かれば追記します。
もし、ご存知の方がいらっしゃればご教授いただけますと嬉しいです!