背景・目的
こちらでEMRを整理しましたが、今回は、ドキュメントをもとにEMRのチュートリアルを試してみます。
まとめ
- わずか数クリックでクラスタが起動できます。
概要
EMR を使用すると、ビッグデータフレームワークを使用してデータを処理および分析するクラスターをわずか数分でセットアップできる。
このチュートリアルでは、Spark を使用してサンプルクラスターを起動する方法と、S3 バケットに格納された簡単なPySpark スクリプトを実行する方法について試す。
次の 3 つの主要なワークフローカテゴリにおける EMR の必須タスクを取り上げる。
- 計画と設定
- 管理
- クリーンアップ
※出典:Tutorial: Getting started with Amazon EMR
実践
チュートリアル: Amazon EMR の開始方法を元に試します。
前提
VPCのセットアップ
VPCとプライベートサブネットを設定します。
Amazon EMR のセットアップ
Amazon EC2 のキーペアと Linux インスタンスを作成
作成済みの場合は、スキップします。
-
下記を入力し、「キーペアを作成」をクリックします。
- キーペア名
- キーペアタイプを選択(RSA or ED25519)
- プライベートキーファイル形式(.pem or .ppk)
ステップ 1: Amazon EMR クラスターを計画して設定する
Amazon EMR 用のストレージを準備する
- S3バケットとフォルダを作成します。
- 今回は、インプットデータ、アウトプットデータ、コード配置用のバケット3種類を作成しました。
Amazon EMR の入力データを使用してアプリケーションを準備する
入力データを準備する
-
food_establishment_data.zipからデータをダウンロードします。
- ワシントン州キング郡にある保健局の 2006~2020 年の検査結果の修正版
-
ファイルの中身を確認してみます。
- ヘッダーが含まれていました。
$ head food_establishment_data.csv name,inspection_result,inspection_closed_business,violation_type,violation_points 100 LB CLAM,Incomplete,FALSE,,0 100 LB CLAM,Unsatisfactory,FALSE,BLUE,5 100 LB CLAM,Unsatisfactory,FALSE,RED,5 100 LB CLAM,Unsatisfactory,FALSE,RED,10 100 LB CLAM,Unsatisfactory,FALSE,RED,5 100 LB CLAM,Complete,FALSE,,0 100 LB CLAM,Complete,FALSE,,0 100 PERCENT NUTRICION,Unsatisfactory,FALSE,BLUE,5 100 PERCENT NUTRICION,Unsatisfactory,FALSE,BLUE,5 $
-
ファイルをアップロードします。
スクリプトを準備する
-
下記のコードをファイル名「health_violations.py」として保存します。ロジックは下記の通り。
- argparseを使って、入力パラメータからインプット/アウトプットのS3パスを取得
- 指定されたS3パスのファイルを読み込みます。
- Sparkセッションを作成後、violation_type='RED'を絞り込み、カウントします。
- 指定されたS3のパスに上書きします。
import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri)
Amazon EMR クラスターを起動する
ステップ 2: Amazon EMR クラスターを管理する
Amazon EMR に作業を送信する
-
下記を入力し「追加」をクリックします。
- ステップタイプ:Sparkアプリケーションを選択
- 名前:Sparkアプリケーション(デフォルト)
- デプロイモード:クラスタ
- Spark-submitオプション:未入力
- アプリケーションの場所:コードを指定
- 引数:下記を指定(バケット名やファイル名は適宜変更)
- 失敗時の操作:次へ
--data_source s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv --output_uri s3://DOC-EXAMPLE-BUCKET/myOutputFolder
結果を表示する
データの確認
ジョブの実行の確認
-
Spark History Serverが起動されます。
(オプション) 実行中の Amazon EMR クラスターに接続する
- セキュリティグループのインバウンドルールにSSHを追加します。
- SSHでName Nodeに接続します。
$ ssh -i ~/.ssh/emr-key.pem hadoop@xxxxxxxxxx.ap-northeast-1.compute.amazonaws.com
Last login: Sun Jan 22 13:50:56 2023
__| __|_ )
_| ( / Amazon Linux 2 AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-2/
12 package(s) needed for security, out of 14 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
[hadoop@ip-XXX-XX-XX-XX ~]$
考察
簡単にクラスタが起動できました。History Serverなども自動でセットアップされるのはいいですね。
参考