はじめに
数TBクラスの大きなデータを変換してDBへputするような場合、
データをいくつかのグループに分割して、複数台のコンバータで分担して処理したい。
AWS Glueを使えば分散処理ライブラリSparkを利用した並列処理をサーバレスで簡単に実現できる。
サンプルコード↓
AWS Glue + PySparkによる分散処理のメリット
- フルマネージドなサーバレスサービスで、煩雑な保守作業は不要
- データ量や処理の重さに応じて、コンソールから分散処理の台数をいつでも変更できる (2台 〜 299台)
- 利用台数 x 利用時間分のみ課金
公式ドキュメント↓
やること
- S3に置いたPySparkスクリプトをAWS Glueと紐付けて実行する
- 要素が5000個の整数型リスト1000個用意して、それぞれ(重い処理の疑似として)バブルソートする
- 分散処理する場合としない場合でパフォーマンスを比較する
もくじ
- PySparkスクリプトを用意する
- S3 bucketにPySparkスクリプトを設置する
- AWS Glueに割り当てるIAMロールを作成する
- AWS Glue Jobを新規作成する
- Jobを実行してパフォーマンスを比較する
1. PySparkスクリプトを用意する
データを分割 & 複数のworkerで分担できていることを確認するために、ある程度まとまった量のダミーデータを用意して、それらを順にバブルソートしていくようなスクリプトを2種類用意した。
- 分散処理なしでソートする ->
non_parallel_processor.py
- 分散処理ありでソートする ->
parallel_processor.py
non_parallel_processor.py
では愚直にループを回して2次元リストを1要素ずつ捌いている。
一方で、parallel_processor.py
ではrdd(レジリエントな分散データセット)と呼ばれるSparkで扱うのに特化したフォーマットにダミーデータを変換してから、Sparkへ受け渡している。
ここではあくまでも重い処理のダミーとしてバブルソートさせており、どのようなデータ・処理でも同様。
rddについて ↓
import traceback
from random import randint
# 用意するランダム値リストの要素数
# バブルソートの計算量はこの値の2乗
ITEM_COUNT_IN_A_DUMMY_LIST = 5000
# 用意するランダム値リストの個数
DUMMY_LIST_COUNT = 1000
def main():
"""AWS Glue Jobをキックして最初に呼ばれるエントリポイント
Args:
Returns:
"""
pre_process_data_queue = create_dummy_data_queue()
process_in_non_parallel(pre_process_data_queue)
def create_dummy_data_queue():
"""ランダム値を規定の個数だけ詰めた2次元リストを返却(ここでは5000列 x 1000行)
Args:
Returns:
dummy_data_queue: ランダム値の整数型リスト
"""
dummy_data_queue = []
for i in range(DUMMY_LIST_COUNT):
dummy_data = [randint(0,100000) for _ in range(ITEM_COUNT_IN_A_DUMMY_LIST)]
dummy_data_queue.append(dummy_data)
return dummy_data_queue
def process_in_non_parallel(pre_process_data_queue):
"""分散処理なしで愚直にループを回して結果を標準出力
Args:
pre_process_data_queue: 変換前データ(ランダム値の整数型リスト)
Returns:
"""
whole_process_count = len(pre_process_data_queue)
for index, pre_process_data in enumerate(pre_process_data_queue):
print(sort_by_bubble_sort(pre_process_data))
print(f"iteration_count: {index + 1}/{whole_process_count} completed")
def sort_by_bubble_sort(num_list):
"""重い処理の疑似としてバブルソート
Args:
num_list: ランダム値の整数型リスト
Returns:
num_list: 昇順ソート済の整数型リスト
"""
for m in range(len(num_list)):
for n in range(len(num_list)-1, m, -1):
if num_list[n] < num_list[n-1]:
num_list[n], num_list[n-1] = num_list[n-1], num_list[n]
return num_list
if __name__ == "__main__":
try:
main()
except:
traceback.print_exc()
import traceback
from random import randint
from pyspark import SparkConf, SparkContext
# 用意するランダム値リストの要素数
# バブルソートの計算量はこの値の2乗
ITEM_COUNT_IN_A_DUMMY_LIST = 5000
# 用意するランダム値リストの個数
DUMMY_LIST_COUNT = 1000
# rddをいくつのgroupに分割するか (1000itemを200groupに分ける)
RDD_DIVISION_COUNT = 200
def main():
"""AWS Glue Jobをキックして最初に呼ばれるエントリポイント
Args:
Returns:
"""
pre_process_data_queue = create_dummy_data_queue()
process_in_parallel(pre_process_data_queue)
def create_dummy_data_queue():
"""ランダム値を規定の個数だけ詰めた2次元リストを返却(ここでは5000列 x 1000行)
Args:
Returns:
dummy_data_queue: ランダム値の整数型リスト
"""
dummy_data_queue = []
for i in range(DUMMY_LIST_COUNT):
dummy_data = [randint(0,100000) for _ in range(ITEM_COUNT_IN_A_DUMMY_LIST)]
dummy_data_queue.append(dummy_data)
return dummy_data_queue
def process_in_parallel(pre_process_data_queue):
"""分散処理ありでデータ処理(rdd作成 -> sparkを呼ぶ -> 結果を標準出力)
Args:
pre_process_data_queue: 変換前データ(ランダム値の整数型リスト)
Returns:
"""
whole_process_count = len(pre_process_data_queue)
rdd = organize_rdd(pre_process_data_queue)
spark_artifacts = rdd.map(entry_spark_process).collect()
for artifact in spark_artifacts:
print(artifact["data"])
def organize_rdd(pre_process_data_queue):
"""受け取ったデータをtupleに入れてrddを作る
Args:
pre_process_data_queue: 変換前データ(ランダム値の整数型リスト)
Returns:
rdd: レジリエントな分散データセット
"""
rdd_elements = []
for index, pre_process_data in enumerate(pre_process_data_queue):
new_tuple = (index, pre_process_data)
rdd_elements.append(new_tuple)
spark_context = SparkContext.getOrCreate()
rdd = spark_context.parallelize(rdd_elements, RDD_DIVISION_COUNT)
return rdd
###
### ↓ ここから下の処理が分散処理される ↓
###
def entry_spark_process(spark_arg_obj):
"""分散処理のworker内で実行される処理のエントリポイント
Args:
spark_arg_obj: rddの1要素(tuple)
Returns:
dict: rdd生成時に振られたindexと変換後データの組
"""
index = spark_arg_obj[0]
pre_process_data = spark_arg_obj[1]
post_process_data = sort_by_bubble_sort(pre_process_data)
return dict(
{
"index": index,
"data": post_process_data
}
)
def sort_by_bubble_sort(num_list):
"""重い処理の疑似としてバブルソート
Args:
num_list: ランダム値の整数型リスト
Returns:
num_list: 昇順ソート済の整数型リスト
"""
for m in range(len(num_list)):
for n in range(len(num_list)-1, m, -1):
if num_list[n] < num_list[n-1]:
num_list[n], num_list[n-1] = num_list[n-1], num_list[n]
return num_list
if __name__ == "__main__":
try:
main()
except:
traceback.print_exc()
2. S3 bucketにPySparkスクリプトを設置する
S3 bucketを作ってファイルをアップロードする手順 (1.を参照) ↓
本ページではuni-glue-script
という名称でS3 bucketを作成した。
先の項で触れた2つのPySparkスクリプトを下記のパスにそれぞれ設置した。
s3://uni-glue-script/dev/glue-spark-test/non_parallel_processor.py
s3://uni-glue-script/dev/glue-spark-test/parallel_processor.py
3. AWS Glueに割り当てるIAMロールを作成する
IAMロールを作成する手順 (2.を参照) ↓
本ページではglue-service-role-basic
という名称でロールを作成した。
付与するポリシーはAWSGlueServiceRole
とAmazonS3ReadOnlyAccess
が最低限。
4. AWS Glue Jobを新規作成する
やっと本題。
まずは分散処理なし
のスクリプトでjobを作ってみる。
AWS Glueのコンソールを開いたら画面左のJobs
をクリックしてから、画面上部のAdd job
をクリックする。
Jobの作成画面に移ったら、次のように設定する。
① Jobの名称は適当に(ここではglue_spark_non_parallel_processing
とした)。
IAMロールは手順3で作成したものを割り当てる。
②TypeはSparkを選択。
GlueのVersionは現時点で最新のSpark3.1(Python)
を選択。
(お好みでScalaも選択できる。)
③今回はS3に自作scriptを配置済なので、An existing script that you provide
を選択。
④自作scriptを置いているS3のファイルパスを指定する。
Temporary directory
の欄では中間ファイルが生成された際に置かれるパスを指定する(どこでも良い)。
特に変更しない。
Save job and edit script
をクリック。
スクリプトの編集画面が表示されて、期待通りの内容が表示されていればok。
画面右上のXを押して、AWS Glueのコンソールに戻る。
分散処理あり
のスクリプトについても、同じ要領でjobを作成できる。
紐付けるPysparkスクリプトのみ変更して、glue_spark_parallel_processing
を追加した。
5. Jobを実行してパフォーマンスを比較する
ジョブを実行する際は、実行したいジョブにチェックを入れてから、Action -> Run job
で実行できる。
分散処理あり
で処理した場合、実行時間はworker2台でおよそ11分
、worker8台でおよそ3分
だった。
workerは1台の中でさらにExcutorが2つ走るため、処理時間がそれぞれ1/4、1/16
程度…とまではいかないが、おおよそ順当に早く処理が終わるようになる様子を読み取れた。
(workerの立ち上げにも時間がかかるため、台数を倍にすれば処理時間半減、とはいかないらしい。)
なお、workerの数と性能は、Action -> Edit job
からSecurity configuration, script libraries, and job parameters (optional)
の項で変更できる。