発端
AWS Glue を S3 にあるデータを整形する ETL として使ってみる上で、 Glue の重要な要素である DynamicFrame について整理する
そもそも AWS Glue とは?
AWS Glue はフルマネージドな ETL サービス。
その中でデータの抽出・変換をする際に、 spark の DataFrame そのままではなくその Wrapper である DynamicFrame が使われている。
DynamicFrame とは?
DynamicFrame とは何なのかを理解するために、公式ドキュメント と GitHub 上のコード を参考にする。
Glue には Python と Scala 両方の API が用意されており、ドキュメントも若干説明が違うため、まずはそれぞれ確認してみる。
DynamicFrame class (Python)
Python の DynamicFrame の説明 を下記に引用する。
DataFrames は、強力で広く使用されていますが、抽出、変換、およびロード (ETL) 操作に関しては制限があります。
最も重要なのは、データをロードする前にスキーマを指定する必要があることです。
(中略)
これらの制限に対応するために、AWS Glue により DynamicFrame が導入されました。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。
つまり、Spark の DataFrame ではスキーマを事前に定義する必要があることが ETL のサービスを実現するうえでデメリットなので、Wrapper である DynamicFrame を用意したということ。
例えば ETL 前のデータの中に、基本的には数値型・不正なデータは文字列型になっているカラムがあることを想定していると思われる。
この場合、 Spark の DataFrame だと文字列型にしか扱えないが、 DynamicFrame だと選択型として扱ってくれるみたい。
DynamicFrame class (Scala)
次に Scalaの DynamicFrame の説明 を下記に引用する。
DynamicFrame は、自己記述型の DynamicRecord オブジェクトの分散コレクションです。
DynamicFrame は、ETL (抽出、変換、ロード) オペレーションの柔軟なデータモデルを提供するように設計されています。これらのオブジェクトを作成するのにスキーマは必要なく、乱雑または不整合な値や型を持つデータの読み取りと変換に使用できます。スキーマは、スキーマを必要とするオペレーションでオンデマンドで計算できます。
DynamicFrame は、データクリーニングと ETL 用の広範な変換を提供します。また、既存のコードと統合するための SparkSQL DataFrames との相互変換や、DataFrames が提供する多くの分析オペレーションをサポートしています。
基本的には Python の説明と同様。
追加の情報として、 DynamicRecord というワードが出てきた。
DynamicRecord とは?
DynamicRecord の説明 をみると、
DynamicRecord は、処理対象のデータセット内のデータ行を表す自己記述型のデータ構造体です。自己記述型とは、DynamicRecord が表す行のスキーマを、レコード自体を検査することで取得できるという意味です。DynamicRecord は Apache Spark の Row に似ています。
つまり、 Spark における DataFrame, Row が Glue における DynamicFrame, DynamicRecord に対応しているみたい。
実際の変換処理のコードを参照してみる
ドキュメントだけだと限界があるので、実際の変換処理のコードを参照してみる。
filter 処理をピックアップした結果が下記コードになる。
def filter(self, f, transformation_ctx = "", info="", stageThreshold=0, totalThreshold=0):
def wrap_dict_with_dynamic_records(x):
rec = _create_dynamic_record(x["record"])
try:
return f(rec)
except Exception as E:
if isinstance(E, KeyError) or isinstance(E, ValueError) or isinstance(E, TypeError):
return False
x['isError'] = True
x['errorMessage'] = E.message
return True
def func(iterator):
return ifilter(wrap_dict_with_dynamic_records, iterator)
return self.mapPartitions(func)
def mapPartitions(self, f, preservesPartitioning=True):
def func(s, iterator):
return f(iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitionsWithIndex(self, f, preservesPartitioning=True):
return DynamicFrame(self.glue_ctx._jvm.DynamicFrame.fromPythonRDD(
PipelinedRDD(self._rdd, f, preservesPartitioning)._jrdd, self.glue_ctx._ssql_ctx), self.glue_ctx, self.name)
残念ながら、このコードからは下記程度しかわからない。
- 1要素ずつ dict 型を DynamicRecord に変換して扱っていること
- pyspark の PipelinedRDD から RDD を生成していること
- 変換処理のたび JVM の DynamicFrame.fromPythonRDD 経由で DynamicFrame が生成されていること
JVM の DynamicFrame コードを参照してみようと思ったものの、公開されていない模様。
さいごに
不正なデータにより型が統一されていないデータを扱う場合は、 DynamicFrame の威力が発揮されそう。
ただ、 操作ごとに JVM の処理を呼び出すオーバーヘッドがかかっているので、パフォーマンスは多少なりとも悪化していそう。
肝心な JVM の DynamicFrame コードが参照できないため、中途半端な調査に終わってしまったのが心残り。