1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【AWS】AWS Glue + PySparkでお手軽なサーバレス分散処理

Last updated at Posted at 2021-11-03

はじめに

数TBクラスの大きなデータを変換してDBへputするような場合、
データをいくつかのグループに分割して、複数台のコンバータで分担して処理したい。
AWS Glueを使えば分散処理ライブラリSparkを利用した並列処理をサーバレスで簡単に実現できる。

サンプルコード↓

AWS Glue + PySparkによる分散処理のメリット

  • フルマネージドなサーバレスサービスで、煩雑な保守作業は不要
  • データ量や処理の重さに応じて、コンソールから分散処理の台数をいつでも変更できる (2台 〜 299台)
  • 利用台数 x 利用時間分のみ課金

公式ドキュメント↓

やること

  • S3に置いたPySparkスクリプトをAWS Glueと紐付けて実行する
  • 要素が5000個の整数型リスト1000個用意して、それぞれ(重い処理の疑似として)バブルソートする
  • 分散処理する場合としない場合でパフォーマンスを比較する

もくじ

  1. PySparkスクリプトを用意する
  • S3 bucketにPySparkスクリプトを設置する
  • AWS Glueに割り当てるIAMロールを作成する
  • AWS Glue Jobを新規作成する
  • Jobを実行してパフォーマンスを比較する

1. PySparkスクリプトを用意する

データを分割 & 複数のworkerで分担できていることを確認するために、ある程度まとまった量のダミーデータを用意して、それらを順にバブルソートしていくようなスクリプトを2種類用意した。

  • 分散処理なしでソートする -> non_parallel_processor.py
  • 分散処理ありでソートする -> parallel_processor.py

non_parallel_processor.pyでは愚直にループを回して2次元リストを1要素ずつ捌いている。
一方で、parallel_processor.pyではrdd(レジリエントな分散データセット)と呼ばれるSparkで扱うのに特化したフォーマットにダミーデータを変換してから、Sparkへ受け渡している。

ここではあくまでも重い処理のダミーとしてバブルソートさせており、どのようなデータ・処理でも同様。

rddについて ↓

non_parallel_processor.py
import traceback
from random import randint

# 用意するランダム値リストの要素数
# バブルソートの計算量はこの値の2乗
ITEM_COUNT_IN_A_DUMMY_LIST = 5000

# 用意するランダム値リストの個数
DUMMY_LIST_COUNT = 1000


def main():
    """AWS Glue Jobをキックして最初に呼ばれるエントリポイント 

    Args:

    Returns:

    """
    pre_process_data_queue = create_dummy_data_queue()
    process_in_non_parallel(pre_process_data_queue)


def create_dummy_data_queue():
    """ランダム値を規定の個数だけ詰めた2次元リストを返却(ここでは5000列 x 1000行)

    Args:

    Returns:
        dummy_data_queue: ランダム値の整数型リスト
    """
    dummy_data_queue = []
    for i in range(DUMMY_LIST_COUNT):
        dummy_data = [randint(0,100000) for _ in range(ITEM_COUNT_IN_A_DUMMY_LIST)]
        dummy_data_queue.append(dummy_data)

    return dummy_data_queue


def process_in_non_parallel(pre_process_data_queue):
    """分散処理なしで愚直にループを回して結果を標準出力

    Args:
        pre_process_data_queue: 変換前データ(ランダム値の整数型リスト)

    Returns:

    """
    whole_process_count = len(pre_process_data_queue)

    for index, pre_process_data in enumerate(pre_process_data_queue):
        print(sort_by_bubble_sort(pre_process_data))
        print(f"iteration_count: {index + 1}/{whole_process_count} completed")


def sort_by_bubble_sort(num_list):
    """重い処理の疑似としてバブルソート

    Args:
        num_list: ランダム値の整数型リスト

    Returns:
        num_list: 昇順ソート済の整数型リスト
    """
    for m in range(len(num_list)):
        for n in range(len(num_list)-1, m, -1):
            if num_list[n] < num_list[n-1]:
                num_list[n], num_list[n-1] = num_list[n-1], num_list[n]

    return num_list


if __name__ == "__main__":
    try:
        main()
    except:
        traceback.print_exc()

parallel_processor.py
import traceback
from random import randint
from pyspark import SparkConf, SparkContext

# 用意するランダム値リストの要素数
# バブルソートの計算量はこの値の2乗
ITEM_COUNT_IN_A_DUMMY_LIST = 5000

# 用意するランダム値リストの個数
DUMMY_LIST_COUNT = 1000

# rddをいくつのgroupに分割するか (1000itemを200groupに分ける)
RDD_DIVISION_COUNT = 200


def main():
    """AWS Glue Jobをキックして最初に呼ばれるエントリポイント 

    Args:

    Returns:

    """
    pre_process_data_queue = create_dummy_data_queue()
    process_in_parallel(pre_process_data_queue)


def create_dummy_data_queue():
    """ランダム値を規定の個数だけ詰めた2次元リストを返却(ここでは5000列 x 1000行)

    Args:

    Returns:
        dummy_data_queue: ランダム値の整数型リスト
    """
    dummy_data_queue = []
    for i in range(DUMMY_LIST_COUNT):
        dummy_data = [randint(0,100000) for _ in range(ITEM_COUNT_IN_A_DUMMY_LIST)]
        dummy_data_queue.append(dummy_data)

    return dummy_data_queue


def process_in_parallel(pre_process_data_queue):
    """分散処理ありでデータ処理(rdd作成 -> sparkを呼ぶ -> 結果を標準出力)

    Args:
        pre_process_data_queue: 変換前データ(ランダム値の整数型リスト)

    Returns:

    """
    whole_process_count = len(pre_process_data_queue)

    rdd = organize_rdd(pre_process_data_queue)
    spark_artifacts = rdd.map(entry_spark_process).collect()

    for artifact in spark_artifacts:
        print(artifact["data"])


def organize_rdd(pre_process_data_queue):
    """受け取ったデータをtupleに入れてrddを作る

    Args:
        pre_process_data_queue: 変換前データ(ランダム値の整数型リスト)

    Returns:
        rdd: レジリエントな分散データセット
    """
    rdd_elements = []
    
    for index, pre_process_data in enumerate(pre_process_data_queue):
        new_tuple = (index, pre_process_data)
        rdd_elements.append(new_tuple)

    spark_context = SparkContext.getOrCreate()
    rdd = spark_context.parallelize(rdd_elements, RDD_DIVISION_COUNT)

    return rdd


###
### ↓ ここから下の処理が分散処理される ↓
###


def entry_spark_process(spark_arg_obj):
    """分散処理のworker内で実行される処理のエントリポイント

    Args:
        spark_arg_obj: rddの1要素(tuple)

    Returns:
        dict: rdd生成時に振られたindexと変換後データの組
    """
    index = spark_arg_obj[0]
    pre_process_data = spark_arg_obj[1]

    post_process_data = sort_by_bubble_sort(pre_process_data)

    return dict(
      {
        "index": index,
        "data": post_process_data
      }
    )


def sort_by_bubble_sort(num_list):
    """重い処理の疑似としてバブルソート

    Args:
        num_list: ランダム値の整数型リスト

    Returns:
        num_list: 昇順ソート済の整数型リスト
    """
    for m in range(len(num_list)):
        for n in range(len(num_list)-1, m, -1):
            if num_list[n] < num_list[n-1]:
                num_list[n], num_list[n-1] = num_list[n-1], num_list[n]

    return num_list


if __name__ == "__main__":
    try:
        main()
    except:
        traceback.print_exc()

2. S3 bucketにPySparkスクリプトを設置する

S3 bucketを作ってファイルをアップロードする手順 (1.を参照) ↓

本ページではuni-glue-scriptという名称でS3 bucketを作成した。
先の項で触れた2つのPySparkスクリプトを下記のパスにそれぞれ設置した。

s3://uni-glue-script/dev/glue-spark-test/non_parallel_processor.py
s3://uni-glue-script/dev/glue-spark-test/parallel_processor.py

Screen Shot 2021-11-03 at 21.53.17.png

3. AWS Glueに割り当てるIAMロールを作成する

IAMロールを作成する手順 (2.を参照) ↓

本ページではglue-service-role-basicという名称でロールを作成した。
付与するポリシーはAWSGlueServiceRoleAmazonS3ReadOnlyAccessが最低限。

Screen Shot 2021-11-02 at 22.10.10.png

4. AWS Glue Jobを新規作成する

やっと本題。
まずは分散処理なしのスクリプトでjobを作ってみる。

AWS Glueのコンソールを開いたら画面左のJobsをクリックしてから、画面上部のAdd jobをクリックする。
Screen Shot 2021-11-02 at 22.06.17.png

Jobの作成画面に移ったら、次のように設定する。

① Jobの名称は適当に(ここではglue_spark_non_parallel_processingとした)。
IAMロールは手順3で作成したものを割り当てる。

②TypeはSparkを選択。
GlueのVersionは現時点で最新のSpark3.1(Python)を選択。
(お好みでScalaも選択できる。)

③今回はS3に自作scriptを配置済なので、An existing script that you provideを選択。

④自作scriptを置いているS3のファイルパスを指定する。
Temporary directoryの欄では中間ファイルが生成された際に置かれるパスを指定する(どこでも良い)。

設定完了したら画面下部のNextを押す。
Screen Shot 2021-11-03 at 22.10.46.png

特に変更しない。
Save job and edit scriptをクリック。
Screen Shot 2021-11-03 at 22.10.53.png

スクリプトの編集画面が表示されて、期待通りの内容が表示されていればok。
画面右上のXを押して、AWS Glueのコンソールに戻る。
Screen Shot 2021-11-03 at 22.11.26.png

分散処理ありのスクリプトについても、同じ要領でjobを作成できる。
紐付けるPysparkスクリプトのみ変更して、glue_spark_parallel_processing を追加した。
Screen Shot 2021-11-03 at 22.28.10.png

5. Jobを実行してパフォーマンスを比較する

ジョブを実行する際は、実行したいジョブにチェックを入れてから、Action -> Run jobで実行できる。

Screen Shot 2021-11-03 at 22.11.54.png

分散処理なしで処理した場合の実行時間はおよそ37分だった。
Screen Shot 2021-11-03 at 22.05.37.png

分散処理ありで処理した場合、実行時間はworker2台でおよそ11分、worker8台でおよそ3分だった。
Screen Shot 2021-11-03 at 22.05.06.png

workerは1台の中でさらにExcutorが2つ走るため、処理時間がそれぞれ1/4、1/16程度…とまではいかないが、おおよそ順当に早く処理が終わるようになる様子を読み取れた。
(workerの立ち上げにも時間がかかるため、台数を倍にすれば処理時間半減、とはいかないらしい。)

なお、workerの数と性能は、Action -> Edit jobからSecurity configuration, script libraries, and job parameters (optional)の項で変更できる。

Screen Shot 2021-11-03 at 22.42.57.png

最大299台まで増やせるが、請求額がとんでもないことになりそうなので、試していない。
Screen Shot 2021-11-03 at 22.43.38.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?