この記事は MicroAd Advent Calendar 2020 の7日目の記事です。
概要
HiveのComplex型のarray<struct>
は曲者で、structオブジェクトに対するちょっとしたフィールド操作も簡単ではなく、クエリのみで実現するとなるとLATERAL VIEW EXPLODE
で展開してオブジェクトを作り直す必要があったりします。
今回はPySparkのUDFを使ってそのようなフィールド操作をやってみました。
実施内容
以下のようなarray<struct>
型のフィールドに対して、フィールド名の変更と型のキャストを行ってみます。
変更前
test_array_struct ARRAY<STRUCT<
id: bigint,
score: decimal(38,18)
>>
変更後
test_array_struct ARRAY<STRUCT<
renamed_id: int,
renamed_score: double
>>
ソースコード
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, struct
from pyspark.sql.types import *
sc = SparkSession \
.builder \
.master("yarn") \
.appName("hogehoge") \
.enableHiveSupport() \
.getOrCreate()
query = """
SELECT
test_array_struct
FROM test_db.test_table
LIMIT 20
"""
df = sc.sql(query)
# 新たなarray<struct>に値を入れ直すことで、フィールド名の変更と型変換を行うudfを定義
cast_udf = udf(lambda array: [(x.asDict()['id'], float(x.asDict()['score'])) for x in array],
ArrayType(
StructType(
[StructField('renamed_id', IntegerType(), False),
StructField('renamed_score', DoubleType(), False)]
)
)
)
df_new = df\
.select("test_array_struct") \
.withColumn("test_array_struct", cast_udf(col("test_array_struct")))
df_new \
.repartition(10) \
.write \
.mode('overwrite') \
.option('compression', 'snappy') \
.parquet('hdfs://test-path/test_db/renamed_table/')
SELECT * FROM test_db.renamed_table LIMIT 20;
+----------------------------------------------------------+--+
| renamed_table.test_array_struct |
+----------------------------------------------------------+--+
| [{"renamed_id":1,"renamed_score":1.73160173160173E-4}] |
| [{"renamed_id":2,"renamed_score":2.5901875901876E-5}] |
| [{"renamed_id":3,"renamed_score":7.5036075036075E-4}] |
| [{"renamed_id":4,"renamed_score":1.44300144300145E-4}] |
| [{"renamed_id":5,"renamed_score":1.73160173160173E-4}] |
| [{"renamed_id":6,"renamed_score":4.3290043290043E-5}] |
| [{"renamed_id":7,"renamed_score":0.005343694083694084}] |
| [{"renamed_id":8,"renamed_score":7.35930735930736E-4}] |
| [{"renamed_id":9,"renamed_score":1.90663780663781E-4}] |
| [{"renamed_id":10,"renamed_score":1.24920634920635E-4}] |
| [{"renamed_id":11,"renamed_score":1.4430014430014E-5}] |
| [{"renamed_id":12,"renamed_score":2.67099567099567E-4}] |
| [{"renamed_id":13,"renamed_score":1.32438672438672E-4}] |
| [{"renamed_id":14,"renamed_score":8.24473304473305E-4}] |
| [{"renamed_id":15,"renamed_score":4.61760461760462E-4}] |
| [{"renamed_id":16,"renamed_score":5.72106782106782E-4}] |
| [{"renamed_id":17,"renamed_score":0.002999437229437229}] |
| [{"renamed_id":18,"renamed_score":6.49350649350649E-4}] |
| [{"renamed_id":19,"renamed_score":5.1948051948052E-4}] |
| [{"renamed_id":20,"renamed_score":2.5974025974026E-4}] |
+----------------------------------------------------------+--+
20 rows selected (0.132 seconds)
変更後のテーブル定義に対しても値がきちんと入っていて、SELECTできることが確認できました。
まとめ
array<struct>
型はデータの構造を表すのに便利な場面も多いですが、一方で操作しづらいというデメリットがあります。フィールド操作のような処理をクエリのみで完結させようとすると処理が重くなりがちですが、PySparkであればその辺りが軽減できるかと思います。
この記事が何らかの参考になれば幸いです。