1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS EMRこと始め

Last updated at Posted at 2021-08-21

これは何

Spark を使用してサンプルクラスターを起動し、Amazon S3 バケットに格納する単純な PySpark スクリプトを実行していきます。

S3作成

適当にS3を作成します。

スクリーンショット 2021-08-21 21.28.59.png

フォルダを作成する

以下のフォルダを作成します。

output:ログと進行中のアウトプットを入れます。
script:PySparkスクリプト、csvファイルを入れます。

先ほど作成したオブジェクトへ行き、上記フォルダを作成します。

スクリーンショット 2021-08-21 21.36.33.png

scriptファイルを置く

import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI where the food establishment data CSV is saved, typically
			  an Amazon S3 bucket, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where the output is written, typically an Amazon S3
                       bucket, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("SELECT name, count(*) AS total_red_violations " +
          "FROM restaurant_violations " +
          "WHERE violation_type = 'RED' " +
          "GROUP BY name " +
          "ORDER BY total_red_violations DESC LIMIT 10 ")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI where the CSV restaurant data is saved, typically an S3 bucket.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, typically an S3 bucket.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)

このpyファイルを、S3コンソールで、scriptフォルダにアップロードします。

また、food_establishment_data.zipファイルをダウンロードします。

コンテンツを解凍し、food_establishment_data.csv としてローカルに保存します。
これも同様に、scriptフォルダにアップロードします。

EMR作成

スクリーンショット 2021-08-21 21.51.37.png

スクリーンショット 2021-08-21 21.57.28.png

デフォルトで3つのノード(1つのマスターノードと2つのコアノード)を設定されます。
実行時に、マスターノードはデータセットの処理を処理するコアノードにワークロードを分散します。

スクリーンショット 2021-08-21 21.59.17.png

EMR クラスターを調整

[Steps (ステップ)]、[Add step (ステップの追加)] の順に選択します。

スクリーンショット 2021-08-21 22.17.40.png

アプリケーションの場所:S3に入れたPySparkスクリプトの場所
引数:
--data_source s3://作成したバケット名/script/food_establishment_data.csv
--output_uri s3://作成したバケット名/output

スクリーンショット 2021-08-21 22.21.48.png

結果の表示

ステップが正常に実行されると、ステップの送信時に指定した Amazon S3 出力フォルダに出力結果を表示できます。

次の項目が出力フォルダにあることを確認します。

_SUCCESS
ステップの成功を示します。
CSVファイルpart-***
結果を持つオブジェクトです。

スクリーンショット 2021-08-21 22.30.49.png

なんと、pyファイルは、出力ファイルに、最も赤い違反が多い食品施設の上位 10 件がリストするものでした(!?)
面白いです。

name, total_red_violations
SUBWAY, 322
T-MOBILE PARK, 315
WHOLE FOODS MARKET, 299
PCC COMMUNITY MARKETS, 251
TACO TIME, 240
MCDONALD'S, 177
THAI GINGER, 153
SAFEWAY INC #1508, 143
TAQUERIA EL RINCONSITO, 134
HIMITSU TERIYAKI, 128	

EMR リソースのクリーンアップ

EMRを終了するを押下します。
5 ~ 10 分かかる場合があります。

S3 リソースを削除します。
バケットを削除する前に、クラスターを完全にシャットダウンする必要があります。そうしないと、バケットを空にしようとすると問題が発生する可能性があります。

総括

最初の Amazon EMR クラスターを最初から最後まで起動し、ビッグデータアプリケーションの準備と送信、結果の表示、クラスターのシャットダウンなど、EMR 諸々を実行しました。

なんとなくイメージを掴むにはハンズオンが一番ですね。

今回はAWSから提供されているファイルを使用しましたが、COVID-19 Dataset by Our World in DataというGit Hubから、データを使って分析してみるのも面白いかもしれません。

参考
https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/emr-gs.html
https://github.com/owid/covid-19-data
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?