LoginSignup
6
3

More than 5 years have passed since last update.

PySparkカンペ:mapPartitions()を使った分散処理のテンプレート

Last updated at Posted at 2018-11-13

このカンペの説明

PySparkを使った分散処理の作法のカンペである。

想定状況

  • 大きなデータファイルを各エグゼキューターに配って、分散処理をしたい。
  • パーティション単位で分散処理の定義をする。
    • 受け取ったデータファイルに対して、どんな条件で処理をするのかパラメータも渡したい。
    • 分析する範囲を指定するパラメータなど。

カンペ本文

(0) SparkContextの初期化

  from pyspark import SparkContext, SparkConf, SparkFiles

  # SparkContext を初期化する
  spark_conf = SparkConf()

  # RDDは圧縮して配る。そうしないとメモリ不足でエグゼキュータが落ちることがある
  spark_conf.set(u'spark.rdd.compress', u'True')

  spark_conf.set(u'spark.serializer', u'org.apache.spark.serializer.KryoSerializer')
  spark_conf.set(u'spark,kryo.referenceTracking', u'False')

  sc = SparkContext(appName="Some Distributed Application(アプリケーション名を入力)",
                    conf = spark_conf)

(1) パーティションの定義

  # 各分散ジョブの定義を格納する
  job_config = []

  for _idx, _item in enumerate(param_list):
      # この例では、各ジョブごとの開始日付、終了日付、対象カテゴリを定義している
      job_config.jappend((_idx, (_item["start_date"], _item["end_date"], _item["category"])))

  job_rdd = sc.parallelize(job_config, len(job_config))

(2) 各エグゼキューターに配るファイルの準備

  datafile_path_a = '/tmp/data/some_data_a.csv'
  datafile_path_b = '/tmp/data/some_data_b.csv'

  sc.addFile(datafile_path_a)
  sc.addFile(datafile_path_b)

  # 各エグゼキューターが SparkContext 経由でファイルを読み込む際には、
  # ファイル名でアクセスする。
  filepath_to_read_a = datafile_path_a.split('/')[-1]
  filepath_to_read_b = datafile_path_a.split('/')[-1]

(3) 分散処理部分の定義

  def _distributed_job(iterator, datafile_name_list, global_const_a, global_const_b, global_const_c):
    u"""
        iterator には、job_config がセットされている。
        分散処理全体で共有する変数 datafile_name_list, global_const_a, global_const_c, global_const_c を
        受け取る。

    """
    # yield を使って処理結果を返す。
    for _idx, _item in iterator:
        # 各エグゼキューターに配られたデータファイルを読み込む。
        with open(SparkFiles(datafile_name_list["a"])) as datafile_a,
             open(SparkFiles(datafile_name_list["b"])) as datafile_b:
             # データフレームとして読み込む。
             df_data_a = pd.read_csv(datafile_a)
             df_data_b = pd.read_csv(datafile_b)

             result_a, result_b, result_c = some_complex_calc(df_data_a, df_data_b,
                                                              start_date = _item[0],
                                                              end_date = _item[1],
                                                              category = _item[2],
                                                              const_a = global_const_a,
                                                              const_b = global_const_b,
                                                              const_c = global_const_c
                                                              )
            yield (_idx, _item[0], _item[1], _item[2], 
                   result_a, result_b, result_c )

(4) 分散処理の実行

      result_rdd = job_rdd.mapPartitions(lambda iterator:
                                         _distributed_job(iterator,
                                                          {"a": filepath_to_read_b, "b":filepath_to_read_a},
                                                          global_const_a,
                                                          global_const_b,
                                                          global_const_c))

      result_collected = result_rdd.collect()

(5) 分散処理の結果を利用する

      for _idx, _start_date, _end_date, _category, _result_a, _result_b, _result_c in result_collected:
        # 分散処理の結果を用いた何らかの処理
        # 
        #
6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3