本記事は、オープンソースのフレームワークを使用したデータ分析、分散処理を AWSマネージドサービスにて使用する事が出来る Amazon EMRの理解を深めるための用途とする。
使用するフレームワークについて
Amazon EMRでは、Apache Sparkや Hadoopなどの、分散処理フレームワークを使用する。
これらは、大量なデータを処理する場合に使用されるフレームワークであり、導入するケースとして以下のようなケースが存在する。
- データ対する処理にリアルタイム性が要求される場合
- 多様な分析手法を用いる場合
- 大規模なデータを扱う機械学習を行う場合
- ETL(データ変換)やクレンジングを行う場合
- 取り扱うデータ量が非常に大きく、通常のコンピューターでは処理が出来ない場合
Apache Spark、Hadoop共にフレームワークとなるため、複数のミドルウェア、コンポーネントが機能して構成される。いずれも特徴があるようだが、今回は Amazon EMRで使用するフレームワークとしての存在という事を理解しておく。
EMR の用語
- クラスター
EMR におけるクラスターは、EC2インスタンスの集合体。EMR on EC2 のセットアップはクラスターがメイン。
クラスター内の各インスタンスは、ノードと呼ばれる。各ノードに分散処理フレームワークのソフトウェアコンポーネントがインストールされて、セットアップ後はロールが各ノードに付与される。
マスターノード : 各ノード間でのデータおよびタスクの分散を調整するソフトウェアを実行し、クラスターを管理するノード。タスクのステータスを追跡し、クラスターの状態を監視する。マスターノードのみで 1 つのノードクラスターを作成することが可能。
コアノード : タスクを実行し、クラスター上の HDFS にデータを保存するソフトウェアを持つノード。マルチノードクラスターには、少なくとも 1 つのコアノードが存在する。
タスクノード : タスクを実行するのみで、HDFS にデータを保存しないソフトウェアコンポーネントを持つノード。タスクノードはオプションのためセットアップ時は省略が可能。
- ユニフォームインスタンスグループ
インスタンスグループとは、各ノード単位に構成された EC2インスタンスの集合体を示し、
クラスターを構成するためのインスタンスグループを、1つのインスタンスタイプと購入オプションで指定する方法。
マスターインスタンスグループ: 1つの マスターノードに該当する EC2インスタンスを含む 1つのグループ
コアインスタンスグループ: 1つ以上の コアノードに該当する EC2インスタンスを含む 1つ以上のグループ
タスクインスタンスグループ: 1つ以上の タスクノードに該当する EC2インスタンスを含む 最大48のグループ
- インスタンスフリート
クラスターを構成するための各ノードタイプ単位にで複数のインスタンスタイプや購入オプションを組み合わせることができる方法。
希望する容量を満たすように、オンデマンドやスポット、インスタンスタイプを組み合わせながら EMR が各ノードに EC2をプロビジョニングしていく。
- EMRFS
Amazon EMRからS3に通常のファイルを直接読み書きするために使用するもので、S3 をHDFSのように扱える。
- ステップ
クラスターにインストールされたソフトウェアが処理する作業単位。
クラスター作成後にステップを追加し、実行する事が可能。
- ブートストラップアクション
クラスターが Hadoopを起動する前にスクリプトを実行するために使用される。
これにより、追加のソフトウェアをインストールし、アプリケーションをカスタマイズが出来る。
- ノートブック
クエリやコードを実行する用途、jupyter notebookの環境から開く事が可能。
コマンドは EMR クラスター上のカーネルを使用して実行される。
- EMR マネージドスケーリング/カスタム自動スケーリング
クラスターとして起動している EC2インスタンスの数を自動でスケーリングする事が可能な EMRのAutoScalingのようなもの。
詳細な比較は以下に記載がある。
但し、これらのスケーリングが有効なのは、クラスター内のコアノードとタスクノードのみであり、マスターノードに関しては Multi-masterサポートを有効にして、3つのマスターノード(同じサブネット上)から構成するかどうかを事前に設定する。
構成情報
クラスタを構成する際にいくつか構成を検討する必要がある。
クラスタを構成する EC2インスタンス
クラスタの構成検討で事前検討が必要であろう情報は以下。
- ユニフォームインスタンスグループかインスタンスフリートから構成するか、またインスタンスタイプと購入オプションを事前に検討。
- マスターノードを単一構成とするか3台構成とするか検討する。
- クラスタを構成するソフトウェアを検討する。ソフトウェアの選択は以下となる。
ストレージ
EMR クラスターで構成された EBS は通常の EC2で構成される EBSと動作が異なる。EMR クラスターにアタッチされた Amazon EBS ボリュームはエフェメラルで、ボリュームは、クラスターとインスタンスを削除すると消失されるため、注意が必要。また、データの存続は一時的だが、クラスター内のノードの数と仕様によっては、HDFS 内のデータがレプリケートされることがある。
EMR は一般的に、クラスターを処理するときに HDFSと EMRFSを使用する。
-
HDFSは、クラスターを管理する Hadoop クラスターノード、個別のステップを管理する Hadoop クラスターノードの間でのデータ連携を行うため、マスターノードまたは、コアノードによって使用される。高速であるが、クラスターが終了すると消失されてしまうため、最適な用途としては、ステップの実行により得られた結果のキャッシュ場所として使用が推奨される。
-
EMRFSは、通常のファイルを Amazon EMRから S3に直接読み書きするために使用されるファイルシステムで、S3に永続的なデータを保存できるようにしながら、暗号化、整合性の機能を提供する。
ネットワーク
Multi-masterを含め、クラスターは、1 つのアベイラビリティーゾーンまたはサブネットにのみ存在する事できる。
そのため、EMR クラスターを配置する VPC およびサブネットを検討する必要がある。
EMR クラスターはインターネットへの接続が必須ではないため、Private Subnetに配置する事も可能である。
但し、S3エンドポイントを作成し、EMR が特定のバケットへのアクセスする経路が無いと、データの受け渡しが出来ない為、S3エンドポイントの作成は推奨されている。
なお、クラスタは全て EC2 のため、アクセス許可に関しては一般的な SGで管理する事が可能。
チュートリアルから理解する
EMR のドキュメントにチュートリアルが記載されていたため、Amazon EMRのコンソールを併せてしながら解説をしていく。
1.事前に PySpark スクリプト、データセット(CSV)、クラスター出力を格納するための Amazon S3 バケットを事前に用意。
2.今回使用するサンプルの python スクリプト health_violations.py を作成したバケットに保存。
import argparse
from pyspark.sql import SparkSession
def calculate_red_violations(data_source, output_uri):
"""
Processes sample food establishment inspection data and queries the data to find the top 10 establishments
with the most Red violations from 2006 to 2020.
:param data_source: The URI where the food establishment data CSV is saved, typically
an Amazon S3 bucket, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
:param output_uri: The URI where the output is written, typically an Amazon S3
bucket, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
"""
with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
# Load the restaurant violation CSV data
if data_source is not None:
restaurants_df = spark.read.option("header", "true").csv(data_source)
# Create an in-memory DataFrame to query
restaurants_df.createOrReplaceTempView("restaurant_violations")
# Create a DataFrame of the top 10 restaurants with the most Red violations
top_red_violation_restaurants = spark.sql("SELECT name, count(*) AS total_red_violations " +
"FROM restaurant_violations " +
"WHERE violation_type = 'RED' " +
"GROUP BY name " +
"ORDER BY total_red_violations DESC LIMIT 10 ")
# Write the results to the specified output URI
top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'--data_source', help="The URI where the CSV restaurant data is saved, typically an S3 bucket.")
parser.add_argument(
'--output_uri', help="The URI where output is saved, typically an S3 bucket.")
args = parser.parse_args()
calculate_red_violations(args.data_source, args.output_uri)
3.データセットは、以下からダウンロードし、CSV ファイルを同じバケットに格納
4.Apache Sparkがインストールされた EMRクラスターを設定し、起動する。
今回は簡易インストールという事で、インスタンスの詳細オプション等を設定せずクイックオプションから起動する。
クラスター名 : 任意
S3フォルダ(EMRのログ出力用) : 任意のバケット
起動モード : クラスター
ソフトウェア設定 : EMR 6.3 / Spark:Spark 3.1.1 on Hadoop 3.2.1 YARN with and Zeppelin 0.9.0
インスタンス : m5.xlarge 3台(マスターノード 1 / コアノード 2)
EC2キーペア : 任意
EMR ロール : EMR DefaultRole
EC2 インスタンスプロファイル : EMR EC2 DefaultRole
※クイックオプションでは VPC / Subnet を選択する画面が無いので、EMR の左ペインから VPC サブネットを選択し、有効なサブネットをネットワークへ選択しておく。
5.作成後、クラスターのステータスページで、Starting~実行中~待機中というように変化する。ステータスが待機中ですに設定されている場合、クラスターは稼働しており、作業を受け入れる準備が可能。
6.待機中となったクラスタを選択し、ステップの設定を実施。
なお、アプリケーションおよびアプリケーションの引数には初めに作成したバケット/health_violations.py を指定する。引数は以下を指定。
引数
--data_source s3://初めに作成したバケット/food_establishment_data.csv
--output_uri s3://初めに作成したバケット/myOutputFolder
7.ステップが完了すると、S3 で指定したmyOutputFolderへ、CSV 形式のファイルが出力されている。
このチュートリアルでは、食品施設の検査データを処理し、「赤」タイプの違反が最も多い上位 10 の施設を S3 バケットにリストする python スクリプトがデータセットをソースとしてステップ実行される。
最も赤い違反が多い食品施設の上位 10 件が出力先にリストされる結果が得られると、チュートリアルは完了となる。
最も赤い違反が多い食品施設の上位 10 件のリスト
name, total_red_violations
SUBWAY, 322
T-MOBILE PARK, 315
WHOLE FOODS MARKET, 299
PCC COMMUNITY MARKETS, 251
TACO TIME, 240
MCDONALD'S, 177
THAI GINGER, 153
SAFEWAY INC #1508, 143
TAQUERIA EL RINCONSITO, 134
HIMITSU TERIYAKI, 128