AWS Glueは、ETL(Extract, Transform, Load)ジョブを簡単に作成できるサービスです。
AWS GlueのETLジョブでは、SparkのDataFrameとDynamicFrameを使用することができます。
違いを理解して、使い分けることで効果的な操作を行う事が出来るようになります。
概要
DataFrameは、Sparkが提供する分散データフレームです。DataFrameは、テーブルのような構造でデータを保持し、SQLライクな操作を行うことができます。DataFrameは、Python、Scala、Java、Rなどの多くの言語で使用できます。
一方、DynamicFrameは、AWS Glueが提供するデータフレームです。DynamicFrameは、DataFrameに比べて、より柔軟なスキーマ設計が可能です。DynamicFrameは、PythonまたはScalaで使用できます。
DynamicFrameは、DataFrameと同様に、データソースからデータを読み込んだり、データを書き込んだりすることができます。ただし、DynamicFrameは、DataFrameに比べてスキーマが動的であるため、スキーマ変換やデータのマッピングがより柔軟に行えます。また、DynamicFrameは、テーブルのような構造ではなく、1つ以上のダイナミックなスキーマを持つコレクションであるため、データを変換するための操作をより簡単に実行することができます。
AWS GlueのETLジョブでは、DataFrameとDynamicFrameの両方を使用できます。ただし、DynamicFrameはAWS Glue固有の機能であり、Sparkで直接使用することはできません。したがって、Sparkで開発されたコードをAWS Glueで実行する場合、SparkのDataFrameを使用することが推奨されます。
AWS GlueのDynamicFrameとDataFrameは、両方ともAWS Glueで使用されるデータ処理の抽象化ですが、いくつかの重要な違いがあります。
🔸データ型の柔軟性
DataFrameは、列のデータ型が一貫している必要がありますが、DynamicFrameは列のデータ型が動的に変更されることができます。これにより、DynamicFrameは、DataFrameよりも柔軟なデータ変換が可能です。
🔸データの参照方法
DataFrameは、列名または列インデックスによって列にアクセスしますが、DynamicFrameは、列名、列インデックス、またはJSONパスによって列にアクセスします。JSONパスを使用すると、より柔軟なデータ参照が可能になります。
🔸パーティショニング
DataFrameは、パーティションに基づいたデータ操作をサポートしますが、DynamicFrameは、パーティショニングをサポートしません。つまり、DynamicFrameは、DataFrameよりもパーティショニングに適していません。
🔸入力と出力
DataFrameは、Sparkのファイル形式やデータベース、または外部データソースとの相互運用性が高く、AWS Glueのジョブを実行するための入力および出力に使用することができます。DynamicFrameは、AWS Glueでのみ使用されるため、入力と出力はAWS Glueのデータストアとのみ対話します。
これらの違いから、DynamicFrameは、より柔軟なデータ変換とJSONパスを使用したより高度なデータ操作に向いています。一方、DataFrameは、外部データソースとの相互運用性が高く、パーティショニングに適しています。
AWS GlueでPythonスクリプトを使用してデータ変換を行うためのサンプルコード
この例では、Amazon S3からCSVファイルを読み込み、数値列を合計し、合計値を含む新しいCSVファイルを書き込みます。
# 必要なモジュールをインポート
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
# AWS Glueの引数を取得
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Sparkコンテキストを作成
sc = SparkContext()
# AWS Glueコンテキストを作成
glueContext = GlueContext(sc)
# Sparkセッションを取得
spark = glueContext.spark_session
# GlueContextオブジェクトからloggerを取得
logger = glueContext.get_logger()
## Read data from S3
# 読み込むファイルのパスを指定
input_file_path = "s3://your-bucket-name/path/to/input/csv/file"
# AWS Glue DynamicFrameを作成
input_dyf = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
connection_options={"paths": [input_file_path]},
format="csv",
format_options={"withHeader": True} # ヘッダーがあることを指定
)
## Convert dynamic frame to data frame
# DynamicFrameをDataFrameに変換
input_df = input_dyf.toDF()
## Sum the numeric columns
# 合計を計算するカラムを指定
sum_columns = ['column1', 'column2', 'column3']
# selectExpr()を使用して、数値カラムの合計を計算する
sum_df = input_df.selectExpr("cast(" + "+".join(sum_columns) + " as double) as total")
## Write output to S3
# 書き込むファイルのパスを指定
output_file_path = "s3://your-bucket-name/path/to/output/csv/file"
# AWS Glue DynamicFrameを作成
output_dyf = DynamicFrame.fromDF(sum_df, glueContext, "output_dyf")
# S3に書き込む
glueContext.write_dynamic_frame.from_options(
frame=output_dyf,
connection_type="s3",
connection_options={"path": output_file_path},
format="csv",
format_options={"writeHeader": True} # ヘッダーを出力することを指定
)
## Job completed
# ジョブの完了をloggerで出力する
logger.info("Job completed: {}".format(args['JOB_NAME']))
このコードでは、AWS GlueのPythonモジュールを使用して、データ変換ジョブを定義しています。最初に、S3からCSVファイルを読み込むためのDynamicFrameを作成します。次に、DynamicFrameをSpark DataFrameに変換して、数値列を合計します。最後に、合計値を含む新しいCSVファイルを書き込みます。AWS GlueのDynamicFrameとwrite_dynamic_frameメソッドを使用することで、データを柔軟に変換し、異なる形式で出力することができます。
詳細
RDD(Resilient Distributed Dataset)とは?
不変(イミュータブル)で並列実行可能な(分割された)コレクション
RDD(Resilient Distributed Dataset)は、Apache Sparkのコアデータモデルの1つであり、分散処理のための不変性と耐障害性を備えたデータセットを表現します。RDDは、大規模なデータセットを複数のノードに分割して並列処理することができます。
RDDの特徴と機能
1.分散性と耐障害性
RDDは、データを複数のノードに分割して格納し、ノードの故障に対して耐性を持ちます。また、RDDは不変であり、変更不可なデータセットとして扱われます。
2.トランスフォーメーション
RDDは、変換操作を適用してデータセットを変換することができます。例えば、map、filter、reduceなどの操作を使用してRDD内のデータを変換、フィルタリング、集計することができます。
3.アクション
RDDは、アクション操作を使用して結果を取得することができます。例えば、count、collect、saveなどのアクションを使用してRDDの要素数を数えたり、結果を収集したり、結果を外部ストレージに保存したりすることができます。
4.データの再計算
RDDは、変換操作の履歴を保持し、必要に応じて再計算することができます。これにより、データの不変性と耐障害性を保ちながら、効率的なデータ処理が可能になります。
5.データのキャッシュ
RDDは、計算結果をメモリ内にキャッシュすることができます。これにより、頻繁にアクセスされるデータに対して高速なアクセスが可能になります。
RDDは、Sparkの分散データ処理エンジンの基盤となっており、Sparkの高速な並列処理とデータの不変性を実現するための重要な概念です。また、RDDはSparkの高レベルAPIであるDataFrameやDatasetの基盤となっています。
RDDメソッド
1.Transformations(変換操作)
RDDの要素を変換する操作で、新しいRDDを返します。主な変換操作として、map、filter、flatMap、distinct、union、intersection、subtract、groupByKey、reduceByKey、joinなどがあります。
2.Actions(アクション操作)
RDDの要素に対して計算を実行し、結果を返します。主なアクション操作として、count、collect、reduce、foreach、saveAsTextFile、saveAsSequenceFile、saveAsObjectFileなどがあります。
3.PairRDDFunctions(キーバリューペアRDDに対する操作)
RDDの要素がキーバリューペアである場合に使用する操作で、主にグループ化、結合、集計などの処理を実行します。主なPairRDDFunctionsの操作として、groupByKey、reduceByKey、join、cogroup、combineByKey、mapValues、flatMapValuesなどがあります。
4.DoubleRDDFunctions(数値型のRDDに対する操作)
RDDの要素が数値型である場合に使用する操作で、主に平均値、分散、標準偏差、相関係数などの計算を実行します。主なDoubleRDDFunctionsの操作として、mean、variance、stdev、corr、histogramなどがあります。
5.SparkContextFunctions(SparkContextに対する操作)
SparkContextを管理するための操作で、主にデータのロード、保存、クラスターの管理などを実行します。主なSparkContextFunctionsの操作として、textFile、wholeTextFiles、newAPIHadoopFile、newAPIHadoopRDD、setCheckpointDir、setJobGroup、cancelJobGroupなどがあります。
RDDは、Sparkの分散処理の中核を担うデータモデルであり、これらのメソッドによって柔軟なデータ処理が可能になります。