1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Glueで配列を行方向に展開する

Last updated at Posted at 2021-12-29

背景・目的

  • jsonファイルでArray&Structになっているデータセットを、Glueで行方向に展開する。

前提(はじめに)

  • 対象のデータセットは、以下の通り。
    • 実際のファイルはJSON lineの形式で一行です。
    • ファイルフォーマット
      • id:レコード単位に一意に振るID
      • actions:Array型。複数のactionをまとめています。
        • action:Struct型。locationとtimestampを有しています。
{
  "id": "0123456789",
  "actions": [
    {
      "action": {
        "location": "Kanagawa",
        "timestamp": "2021-12-28T10:00:00.000+0900"
      }
    },
    {
      "action": {
        "location": "Tokyo",
        "timestamp": "2021-12-28T11:00:00.000+0900"
      }
    },
    {
      "action": {
        "location": "Chiba",
        "timestamp": "2021-12-28T12:00:00.000+0900"
      }
    }
  ]
}

やりたいこと

  • actionsを行方向に展開して、idとジョインします。

結論

  • 行方向に展開するには、explodeを利用します。

内容

配列を行方向に展開する。

  • 以下の要素を組み合わせて作成する。

DataFrameに変換する。

  • 配列を行方向に展開するために、DynamicFrameからDataFrameに変換する。
  • toDF()関数を利用する。
ApplyMapping_node2.toDF()

Arrayを行方向に展開してジョインする。

  •  展開して新しいカラムとしてactionを作成する。
  • explodeとwithColumnを使う。
withColumn("action", explode(col("actions")))

従来のカラムを削除する。

  • actionsカラムを削除する。
  • dropを使う。
drop("actions")

まとめると

  • 以下のようになる。
explodeDf = ApplyMapping_node2.toDF().withColumn("action", explode(col("actions"))).drop("actions")
  • 以下のように表示される。(printの結果)
  • actions列の配列が分割&行方向に展開され、idとジョインされる事がわかります。
root
 |-- id: string (nullable = true)
 |-- action: struct (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- timestamp: string (nullable = true)

+----------+----------------------------------------+
|        id|                                  action|
+----------+----------------------------------------+
|0123456789|{Kanagawa, 2021-12-28T10:00:00.000+0900}|
|0123456789|   {Tokyo, 2021-12-28T11:00:00.000+0900}|
|0123456789|   {Chiba, 2021-12-28T12:00:00.000+0900}|
+----------+----------------------------------------+

DynamicFrameに戻す

  • DataFrameからDynamicFrameに戻します。(DataFrameのまま書き込むことも可能です。)
write_dynamic_frame = DynamicFrame.fromDF(explodeDf,glueContext,"write_dynamic_frame")

全体のコード

  • インプットのS3バケットは、Glueデータカタログを使用しています。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext,DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import explode,col

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="spark_test", table_name="unnestinput", transformation_ctx="S3bucket_node1"
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("id", "string", "id", "string"),
        ("actions", "array", "actions", "array"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

explodeDf = ApplyMapping_node2.toDF().withColumn("action", explode(col("actions"))).drop("actions")
explodeDf.printSchema()
print(explodeDf.show(truncate=300))

write_dynamic_frame = DynamicFrame.fromDF(explodeDf,glueContext,"write_dynamic_frame")


# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=write_dynamic_frame,
    connection_type="s3",
    format="glueparquet",
    connection_options={"path": "s3://${バケット名}/output/", "partitionKeys": []},
    format_options={"compression": "snappy"},
    transformation_ctx="S3bucket_node3",
)

job.commit()

出力結果

  • 出力したParquetファイルをS3 Selectで参照しています。
  • id(string)と、action(struct)の要素になっている。3行表示されているのが分かると思います。
{
  "id": "0123456789",
  "action": {
    "location": "Kanagawa",
    "timestamp": "2021-12-28T10:00:00.000+0900"
  }
}
{
  "id": "0123456789",
  "action": {
    "location": "Tokyo",
    "timestamp": "2021-12-28T11:00:00.000+0900"
  }
}
{
  "id": "0123456789",
  "action": {
    "location": "Chiba",
    "timestamp": "2021-12-28T12:00:00.000+0900"
  }
}

考察

  • 頻繁に使うわけではないが、覚えておいて損はないと思いました。

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?