このカンペの説明
PySparkを使った分散処理の作法のカンペである。
想定状況
- 大きなデータファイルを各エグゼキューターに配って、分散処理をしたい。
- パーティション単位で分散処理の定義をする。
- 受け取ったデータファイルに対して、どんな条件で処理をするのかパラメータも渡したい。
- 分析する範囲を指定するパラメータなど。
カンペ本文
(0) SparkContextの初期化
from pyspark import SparkContext, SparkConf, SparkFiles
# SparkContext を初期化する
spark_conf = SparkConf()
# RDDは圧縮して配る。そうしないとメモリ不足でエグゼキュータが落ちることがある
spark_conf.set(u'spark.rdd.compress', u'True')
spark_conf.set(u'spark.serializer', u'org.apache.spark.serializer.KryoSerializer')
spark_conf.set(u'spark,kryo.referenceTracking', u'False')
sc = SparkContext(appName="Some Distributed Application(アプリケーション名を入力)",
conf = spark_conf)
(1) パーティションの定義
# 各分散ジョブの定義を格納する
job_config = []
for _idx, _item in enumerate(param_list):
# この例では、各ジョブごとの開始日付、終了日付、対象カテゴリを定義している
job_config.jappend((_idx, (_item["start_date"], _item["end_date"], _item["category"])))
job_rdd = sc.parallelize(job_config, len(job_config))
(2) 各エグゼキューターに配るファイルの準備
datafile_path_a = '/tmp/data/some_data_a.csv'
datafile_path_b = '/tmp/data/some_data_b.csv'
sc.addFile(datafile_path_a)
sc.addFile(datafile_path_b)
# 各エグゼキューターが SparkContext 経由でファイルを読み込む際には、
# ファイル名でアクセスする。
filepath_to_read_a = datafile_path_a.split('/')[-1]
filepath_to_read_b = datafile_path_a.split('/')[-1]
(3) 分散処理部分の定義
def _distributed_job(iterator, datafile_name_list, global_const_a, global_const_b, global_const_c):
u"""
iterator には、job_config がセットされている。
分散処理全体で共有する変数 datafile_name_list, global_const_a, global_const_c, global_const_c を
受け取る。
"""
# yield を使って処理結果を返す。
for _idx, _item in iterator:
# 各エグゼキューターに配られたデータファイルを読み込む。
with open(SparkFiles(datafile_name_list["a"])) as datafile_a,
open(SparkFiles(datafile_name_list["b"])) as datafile_b:
# データフレームとして読み込む。
df_data_a = pd.read_csv(datafile_a)
df_data_b = pd.read_csv(datafile_b)
result_a, result_b, result_c = some_complex_calc(df_data_a, df_data_b,
start_date = _item[0],
end_date = _item[1],
category = _item[2],
const_a = global_const_a,
const_b = global_const_b,
const_c = global_const_c
)
yield (_idx, _item[0], _item[1], _item[2],
result_a, result_b, result_c )
(4) 分散処理の実行
result_rdd = job_rdd.mapPartitions(lambda iterator:
_distributed_job(iterator,
{"a": filepath_to_read_b, "b":filepath_to_read_a},
global_const_a,
global_const_b,
global_const_c))
result_collected = result_rdd.collect()
(5) 分散処理の結果を利用する
for _idx, _start_date, _end_date, _category, _result_a, _result_b, _result_c in result_collected:
# 分散処理の結果を用いた何らかの処理
#
#