背景・目的
- jsonファイルでArray&Structになっているデータセットを、Glueで行方向に展開する。
前提(はじめに)
- 対象のデータセットは、以下の通り。
- 実際のファイルはJSON lineの形式で一行です。
- ファイルフォーマット
- id:レコード単位に一意に振るID
- actions:Array型。複数のactionをまとめています。
- action:Struct型。locationとtimestampを有しています。
{
"id": "0123456789",
"actions": [
{
"action": {
"location": "Kanagawa",
"timestamp": "2021-12-28T10:00:00.000+0900"
}
},
{
"action": {
"location": "Tokyo",
"timestamp": "2021-12-28T11:00:00.000+0900"
}
},
{
"action": {
"location": "Chiba",
"timestamp": "2021-12-28T12:00:00.000+0900"
}
}
]
}
やりたいこと
- actionsを行方向に展開して、idとジョインします。
結論
- 行方向に展開するには、explodeを利用します。
内容
配列を行方向に展開する。
- 以下の要素を組み合わせて作成する。
DataFrameに変換する。
- 配列を行方向に展開するために、DynamicFrameからDataFrameに変換する。
- toDF()関数を利用する。
ApplyMapping_node2.toDF()
Arrayを行方向に展開してジョインする。
- 展開して新しいカラムとしてactionを作成する。
- explodeとwithColumnを使う。
withColumn("action", explode(col("actions")))
従来のカラムを削除する。
- actionsカラムを削除する。
- dropを使う。
drop("actions")
まとめると
- 以下のようになる。
explodeDf = ApplyMapping_node2.toDF().withColumn("action", explode(col("actions"))).drop("actions")
- 以下のように表示される。(printの結果)
- actions列の配列が分割&行方向に展開され、idとジョインされる事がわかります。
root
|-- id: string (nullable = true)
|-- action: struct (nullable = true)
| |-- location: string (nullable = true)
| |-- timestamp: string (nullable = true)
+----------+----------------------------------------+
| id| action|
+----------+----------------------------------------+
|0123456789|{Kanagawa, 2021-12-28T10:00:00.000+0900}|
|0123456789| {Tokyo, 2021-12-28T11:00:00.000+0900}|
|0123456789| {Chiba, 2021-12-28T12:00:00.000+0900}|
+----------+----------------------------------------+
DynamicFrameに戻す
- DataFrameからDynamicFrameに戻します。(DataFrameのまま書き込むことも可能です。)
write_dynamic_frame = DynamicFrame.fromDF(explodeDf,glueContext,"write_dynamic_frame")
全体のコード
- インプットのS3バケットは、Glueデータカタログを使用しています。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext,DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import explode,col
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="spark_test", table_name="unnestinput", transformation_ctx="S3bucket_node1"
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("id", "string", "id", "string"),
("actions", "array", "actions", "array"),
],
transformation_ctx="ApplyMapping_node2",
)
explodeDf = ApplyMapping_node2.toDF().withColumn("action", explode(col("actions"))).drop("actions")
explodeDf.printSchema()
print(explodeDf.show(truncate=300))
write_dynamic_frame = DynamicFrame.fromDF(explodeDf,glueContext,"write_dynamic_frame")
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=write_dynamic_frame,
connection_type="s3",
format="glueparquet",
connection_options={"path": "s3://${バケット名}/output/", "partitionKeys": []},
format_options={"compression": "snappy"},
transformation_ctx="S3bucket_node3",
)
job.commit()
出力結果
- 出力したParquetファイルをS3 Selectで参照しています。
- id(string)と、action(struct)の要素になっている。3行表示されているのが分かると思います。
{
"id": "0123456789",
"action": {
"location": "Kanagawa",
"timestamp": "2021-12-28T10:00:00.000+0900"
}
}
{
"id": "0123456789",
"action": {
"location": "Tokyo",
"timestamp": "2021-12-28T11:00:00.000+0900"
}
}
{
"id": "0123456789",
"action": {
"location": "Chiba",
"timestamp": "2021-12-28T12:00:00.000+0900"
}
}
考察
- 頻繁に使うわけではないが、覚えておいて損はないと思いました。
参考