0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Amazon EMRの入門チュートリアルを試してみた

Posted at

背景・目的

こちらでEMRを整理しましたが、今回は、ドキュメントをもとにEMRのチュートリアルを試してみます。

まとめ

  • わずか数クリックでクラスタが起動できます。

概要

EMR を使用すると、ビッグデータフレームワークを使用してデータを処理および分析するクラスターをわずか数分でセットアップできる。
このチュートリアルでは、Spark を使用してサンプルクラスターを起動する方法と、S3 バケットに格納された簡単なPySpark スクリプトを実行する方法について試す。
次の 3 つの主要なワークフローカテゴリにおける EMR の必須タスクを取り上げる。

  • 計画と設定
  • 管理
  • クリーンアップ

image.png
※出典:Tutorial: Getting started with Amazon EMR

実践

チュートリアル: Amazon EMR の開始方法を元に試します。

前提

VPCのセットアップ

VPCとプライベートサブネットを設定します。

Amazon EMR のセットアップ

Amazon EC2 のキーペアと Linux インスタンスを作成

作成済みの場合は、スキップします。

  1. EC2のサイドバーで「キーペア」をクリックします。
    image.png

  2. 「キーペアを作成する」をクリックします。
    image.png

  3. 下記を入力し、「キーペアを作成」をクリックします。

    1. キーペア名
    2. キーペアタイプを選択(RSA or ED25519)
    3. プライベートキーファイル形式(.pem or .ppk)

ステップ 1: Amazon EMR クラスターを計画して設定する

Amazon EMR 用のストレージを準備する

  1. S3バケットとフォルダを作成します。
  • 今回は、インプットデータ、アウトプットデータ、コード配置用のバケット3種類を作成しました。

Amazon EMR の入力データを使用してアプリケーションを準備する

入力データを準備する

  1. food_establishment_data.zipからデータをダウンロードします。

    • ワシントン州キング郡にある保健局の 2006~2020 年の検査結果の修正版
  2. ファイルの中身を確認してみます。

    • ヘッダーが含まれていました。
    $ head food_establishment_data.csv           
    name,inspection_result,inspection_closed_business,violation_type,violation_points
    100 LB CLAM,Incomplete,FALSE,,0
    100 LB CLAM,Unsatisfactory,FALSE,BLUE,5
    100 LB CLAM,Unsatisfactory,FALSE,RED,5
    100 LB CLAM,Unsatisfactory,FALSE,RED,10
    100 LB CLAM,Unsatisfactory,FALSE,RED,5
    100 LB CLAM,Complete,FALSE,,0
    100 LB CLAM,Complete,FALSE,,0
    100 PERCENT NUTRICION,Unsatisfactory,FALSE,BLUE,5
    100 PERCENT NUTRICION,Unsatisfactory,FALSE,BLUE,5
    $
    
  3. ファイルをアップロードします。

    • 71MB程度のファイルでした。
      image.png

スクリプトを準備する

  1. 下記のコードをファイル名「health_violations.py」として保存します。ロジックは下記の通り。

    • argparseを使って、入力パラメータからインプット/アウトプットのS3パスを取得
    • 指定されたS3パスのファイルを読み込みます。
    • Sparkセッションを作成後、violation_type='RED'を絞り込み、カウントします。
    • 指定されたS3のパスに上書きします。
    import argparse
    
    from pyspark.sql import SparkSession
    
    def calculate_red_violations(data_source, output_uri):
        """
        Processes sample food establishment inspection data and queries the data to find the top 10 establishments
        with the most Red violations from 2006 to 2020.
    
        :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
        :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
        """
        with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
            # Load the restaurant violation CSV data
            if data_source is not None:
                restaurants_df = spark.read.option("header", "true").csv(data_source)
    
            # Create an in-memory DataFrame to query
            restaurants_df.createOrReplaceTempView("restaurant_violations")
    
            # Create a DataFrame of the top 10 restaurants with the most Red violations
            top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations 
              FROM restaurant_violations 
              WHERE violation_type = 'RED' 
              GROUP BY name 
              ORDER BY total_red_violations DESC LIMIT 10""")
    
            # Write the results to the specified output URI
            top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
        parser.add_argument(
            '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
        args = parser.parse_args()
    
        calculate_red_violations(args.data_source, args.output_uri)
    			
    
  2. コード用のS3バケットにアップロードします。
    image.png

Amazon EMR クラスターを起動する

  1. マネコンのEMRで「クラスターを作成」をクリックします。
    image.png

  2. 下記を入力して、「クラスターを作成」をクリックします。
    image.png
    image.png

  3. しばらくすると「待機中」になります。
    image.png

ステップ 2: Amazon EMR クラスターを管理する

Amazon EMR に作業を送信する

  1. 作成したクラスターを選択します。
    image.png

  2. 「ステップ」タブをクリックし、「ステップの追加」をクリックします。
    image.png

  3. 下記を入力し「追加」をクリックします。

    • ステップタイプ:Sparkアプリケーションを選択
    • 名前:Sparkアプリケーション(デフォルト)
    • デプロイモード:クラスタ
    • Spark-submitオプション:未入力
    • アプリケーションの場所:コードを指定
    • 引数:下記を指定(バケット名やファイル名は適宜変更)
    • 失敗時の操作:次へ
    --data_source s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv
    --output_uri s3://DOC-EXAMPLE-BUCKET/myOutputFolder						
    

    image.png

  4. 追加直後は、「保留中」になります。
    image.png

  5. しばらくすると、「実行中」に変わります。
    image.png

  6. 処理が終了すると「完了」に変わります。
    image.png

結果を表示する

データの確認

  1. S3の出力パスを確認します。
    image.png

  2. S3 SELECTを実行し、データを確認します。

    • 赤の違反が多い上位10施設がリストされます。
      image.png

ジョブの実行の確認

  1. アプリケーションの履歴をクリックします。
    image.png

  2. Spark History Serverが起動されます。

  3. App IDをクリックします。
    image.png

  4. 直近のJob IDのDescriptionをクリックします。
    image.png

  5. Stageを確認します。
    image.png

  6. メトリクスも確認できます。
    image.png

(オプション) 実行中の Amazon EMR クラスターに接続する

  1. セキュリティグループのインバウンドルールにSSHを追加します。
  2. SSHでName Nodeに接続します。
$ ssh -i ~/.ssh/emr-key.pem hadoop@xxxxxxxxxx.ap-northeast-1.compute.amazonaws.com
Last login: Sun Jan 22 13:50:56 2023

       __|  __|_  )
       _|  (     /   Amazon Linux 2 AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-2/
12 package(s) needed for security, out of 14 available
Run "sudo yum update" to apply all updates.
                                                                    
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R   
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R 
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R 
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR   
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR
                                                                    
[hadoop@ip-XXX-XX-XX-XX ~]$ 

考察

簡単にクラスタが起動できました。History Serverなども自動でセットアップされるのはいいですね。

参考

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?