はじめに
AWS Glueを利用してデータ処理パイプラインを開発していると、PySparkを利用することが多いと思います。
pandasなどに比べて情報が少なく、データの操作や取り回しなど躓くことも多かったので、利用頻度が高そうな操作をまとめました。
サンプルデータ
以下のデータを利用します。
+---+----+-----------------------------------------+
| id|type| log |
+---+----+-----------------------------------------+
| 1|test|{p1:100,p2:"test1",p3:"a", p4:"01:00:10"}|
| 2|test|{p1:110,p2:"test1",p3:"a", p4:"01:01:10"}|
| 3|test|{p1:110,p2:"test1",p3:"b", p4:"01:02:30"}|
| 4|test|{p1:105,p2:"test2",p3:"a", p4:"01:00:30"}|
| 5|test|{p1:115,p2:"test2",p3:"c", p4:"01:00:30"}|
+---+----+-----------------------------------------+
このデータはcsvの項目にjsonが文字列として格納されています。ログデータなどである形式だと思います。
この記事では、このデータを例に文字列をパースしたりフラット化したり、少し複雑なデータ処理を試します。
データ操作
データの読込
前準備としてGlueContextをimportして定義します。
from pyspark.context import SparkContext
from awsglue.context import GlueContext
glueContext = GlueContext(SparkContext.getOrCreate())
AWS Glueでは、カタログからのデータ取得時、DataFrame(pyspark.sql.DataFrame)の独自拡張であるDynamicFrameになります。
必要に応じてPySpark標準のDataFrameに変換して操作します。
上記のデータをGlueでテーブル化済み(database=glue,table_name=data)として、以下でDynamicFrameとして読込みます。
dyf_data = glueContext.create_dynamic_frame.from_catalog(database="glue", table_name="data")
dyf_data.printSchema()
dyf_data.toDF().show(10)
スキーマやデータはこんな感じ
root
|-- id: long
|-- type: string
|-- log: string
+---+----+--------------------+
| id|type| log|
+---+----+--------------------+
| 1|test|{p1:100,p2:"test1...|
| 2|test|{p1:110,p2:"test1...|
| 3|test|{p1:105,p2:"test2...|
| 4|test|{p1:115,p2:"test2...|
+---+----+--------------------+
以後特別に記述がない場合は、上から順に処理された出力データが次の処理の入力になります。
文字列データのパース -> Unbox
文字列データをパースして分割する場合、DynamicFrameでは Unbox を利用します。
from awsglue.transforms import *
unbox_data = Unbox.apply(frame = dyf_data, path = "log", format="json")
unbox_data.printSchema()
root
|-- id: long
|-- type: string
|-- log: struct
| |-- p1: int
| |-- p2: string
| |-- p3: string
| |-- p4: string
log
のtypeが struct
になり、p1~4
がパースされました。
それぞれのデータタイプは自動で解釈されています。
AWS Glueは、データ型の自動解釈が便利です。
データの構造化(ネスト)解除 -> unnest or Relationalize
構造化の状態のままでは操作しづらいので、フラット化します。
unnest を利用します。
flat_data = unbox_data.unnest()
flat_data.printSchema()
root
|-- id: long
|-- type: string
|-- log.p1: int
|-- log.p2: string
|-- log.p3: string
|-- log.p4: string
p1~p4
が root
直下になりました。項目名は親の項目名(log)がdot(.)で繋がります。
もしくはRelationalizeを利用します。
relationalize_data = Relationalize.apply(frame = unbox_data, staging_path = "", name = dfc_root_table_name, transformation_ctx = "")
flat_data = relationalize_data.select('root')
flat_data.printSchema()
データの抽出 -> select_fields
加工する際に必要なデータのみを抽出するには select_fields を利用できます。
サンプルデータのtype項目がいらないので、type項目を覗いてデータを抽出します。
select_data = flat_data.select_fields([
'id',
'`log.p1`',
'`log.p2`',
'`log.p3`',
'`log.p4`',
])
select_data.printSchema()
root
|-- id: long
|-- log.p1: int
|-- log.p2: string
|-- log.p3: string
|-- log.p4: string
unnest等で項目名にdot(.)が含まれている場合、back-ticks(`) で囲みます。
カラム名の変更 -> withColumnRenamed
カラム名の変更方法は色々ありますが、ここでは指定的に変更できるdataframeの withColumnRenamed を利用します。
この方法だとDynamicFrameをDataFrameに変換するため、再度、DynamicFrameに戻したい場合はfromDF()を使う必要があります。
サンプルデータの log.p1
,log.p2
,log.p3
を項目名を p1
,p2
,p3
にします。
rename_data = select_data.toDF().withColumnRenamed('log.p1', 'p1').withColumnRenamed('log.p2', 'p2').withColumnRenamed('log.p3', 'p3').withColumnRenamed('log.p4', 'p4')
rename_data.printSchema()
root
|-- id: long (nullable = true)
|-- p1: integer (nullable = true)
|-- p2: string (nullable = true)
|-- p3: string (nullable = true)
|-- p4: string (nullable = true)
データをグループ化、加工し新しいデータを作る -> pandasUDF
データをグループ化し、様々な加工を加え、新しいデータテーブルを生成してみます。
最も取り回しがしやすいのが Sparkの pandasUDF だと思います。
pandasUDFを利用することで、SparkのDataFrame をグループごとにpandasのDataFrameに変換して処理し、SparkのDataFrameとしてoutputできます。
なお、2020年11月現在のAWSGlueのSparkは2.43です。Spark3.0からpandasUDFの仕様が変わりましたが、AWSGlueでは3.0以降のpandasUDFはまだ使えません。
ここでは例として、元のデータをp2でグループ化し、適当に加工し以下のデータを作ってみます。
- p2 : グループしたp2の値
- p1_max: グループ毎のp1の値の最大値
- p1_avg: グループ毎のp1の値の平均
- p3_a_p1_min_p4: グループ毎にp3がaかつp1が最小値の時のp4の値
- included_p3_b: グループ毎にp3がbのデータが有るか
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField, BooleanType
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
import pandas as pd
schema = StructType([
StructField("p2", StringType(), True),
StructField("p1_max", IntegerType(), True),
StructField("p1_avg", IntegerType(), True),
StructField("p3_a_p1_min_p4", StringType(), True),
StructField("included_p3_b", BooleanType(), True),
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def f(pandas_df):
p3_a_p1_min_p4 = "null"
if(len(pandas_df[pandas_df['p3'] == 'a']) > 0):
p3_a_p1_min_p4 = pandas_df[pandas_df['p3'] == 'a'].sort_values('p1', ascending=True).iloc[0].p4 # グループ毎にp3がaかつp1が最小値のデータのp4の値
df = pd.DataFrame({
'p2' : pandas_df.p2[0], #1つ目のデータのp2
'p1_max' : pandas_df.p1.max(), # p1のmax
'p1_avg' : pandas_df.p1.mean(), # p1のmean
'p3_a_p1_min_p4' : p3_a_p1_min_p4,
'included_p3_b': len(pandas_df[pandas_df['p3'] == 'b']) > 0 # p3がbのデータがあるならTrue
},index=['1',])
return df
new_df = rename_data.groupBy("p2").apply(f)
new_df.show(10)
@pandas_udf
によりSparkのデータをpandasで操作するユーザ関数を作成できます。
出力スキーマを定義し、PandasUDFType.GROUPED_MAP
によりグループ化したデータに対して処理できます。
グループ化されたデータをpandasのDataFrameで受け取り、ユーザ関数で処理できるのでかなり自由度が高いです。
+-----+------+------+--------------+--------------+
| p2|p1_max|p1_avg|p3_a_p1_min_p4| included_p3_b|
+-----+------+------+--------------+--------------+
|test1| 110| 106| 01:00:10| true|
|test2| 115| 110| 01:00:30| false|
+-----+------+------+--------------+--------------+
いくつか注意点もあります。
- groupbyの対象項目の値にnullがあると返却されたデータテーブルの操作時にインデックスエラーが起きます。事前にnullのデータは除去しましょう。
- グループのすべてのデータが強制的にメモリにロードされます。非常に大きなグループがあったときなどメモリ不足になる可能性があります。
まとめ
今回はAWS Glue上で、よく使うデータ操作についてまとめました。
AWS Glueでは、SparkのDataFrameを拡張したDynamicFrameを利用でき、Spark単体に比べてデータ解釈などが便利になっています。
また、Sparkも昔はPandasに比べて操作系が貧弱でしたが、pandasUDFなどが実装され使いやすくなっている印象です。
pandasをこれまで利用していた方々も大きな違和感なく使えると思いますので、ぜひ触ってみてください。
参考
商標
- AWSおよびAWSの各種サービスは、Amazon.com, Inc.またはその関連会社の商標です。
- Apache Sparkは、Apache Software Foundationの米国およびその他の国における登録商標または商標です。
- Pythonは,Python Software Foundationの登録商標です。
- 記載の会社名、製品名、サービス名等はそれぞれの会社の商標または登録商標です。