Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

【pyspark】バッチ処理を複数のPCに分担する

More than 1 year has passed since last update.

はじめに

10GBの大きめなテキストファイルに対して、laptopPCを複数台使ってバッチ処理をする手順について書きます
- ローカルモードで処理する方法
- standaloneモードで複数PCに処理を分散する方法
の2つの方法で実践してみます

環境構築に関しては、【pyspark】タスクを複数PCで処理させるための環境構築の通りに進んだ想定です。
※諸事情でPCは2台のみでやってみます

目次

  • 処理内容
    • ID列のフォーマットを変更する
  • 環境
  • 方法
    • 0. 大きめのデータ用意
    • 1. ローカルモードで処理
    • 2. standaloneモードで分散処理
  • 結果
    • 1. の処理時間
    • 2. の処理時間
  • 感想

バッチ処理の内容

  • メモリに乗り切らないテキストファイル(csv)に対して、ある1列のみフォーマットを変更する という処理を想定します

image.png

データはなんでも良いのですが、Breast Cancer Wisconsin (Diagnostic) Data Setを増幅して使ってみます

便宜上1列目をID列ということにしています

環境

  • 2台のPCで以下の様なクラスタを組んでおります

image.png

  1. master : windows10 (Virtual Box, ubuntu)

  2. worker : Mac OS X (local)

1のみで処理する方法と、1から2に処理を割り振る方法を比較してみます

python : 3.6.1
spark : 2.4.0

  • ディレクトリは以下のようにしております 
BatchProcessing/
.
├── Makefile
├── README.md
├── data
│   ├── breast-cancer-wisconsin.csv
│   └── sample_10g.csv
├── notebooks
│   ├── format_local.ipynb
│   └── format_standalone.ipynb
├── others
├── output
└── src
    └── bulkout_tp.py

方法

0. 大きめのデータ用意

  • sampeのデータを10GBまでとりあえず増幅させます
  • src/bulkout.pyを使います
python src/bulkout.py "10G" sample_10g.csv

data/sample_10g.csvというかさ増しされたファイルができます

$ tree data/
data/
├── breast-cancer-wisconsin.csv
└── sample_10g.csv

sparkのspark.read.csv()を使う場合、pandasのread_csv()とは違い、ファイルの置き場所を外部からもアクセス可能な箇所に設定する必要があります

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

参考 External Datasets
参考 Pyspark - Load file: Path does not exist

master_ubuntu_terminal
chmod a+w /etc/hadoop/
hadoop fs -get /data/sample_10g.csv /etc/hadoop/hadoop_sample_10g.csv

これで指定した箇所にhadoop_sample_10g.csvというファイルができます

1. ローカルモードで処理

【pyspark】タスクを複数PCで処理させるための環境構築で設定したとおり、jupyter上で実行していきます

  1. pyspark起動
master_ubuntu_terminal
$ pyspark
  • 以下のコードでID列のフォーマット変更の処理を行います
format_local(.ipynb).py
# 読み込み
whole_data = spark.read.csv("file:///<path-to-hadoop>/hadoop/hadoop_sample_10g.csv")

# ID列のフォーマット変更
whole_data_formatted = whole_data.withColumn('ID', sf.concat(sf.col('ID'),sf.lit('-00')))
whole_data_formatted.show()
  • %%timeitでセルの実行時間を計測してみます

local_cell.PNG

2. standaloneモードで分散処理

  • クラスタを組む
  1. masterを起動
master_ubuntu_terminal
sudo /usr/local/spark/sbin/start-master.sh -h <your_master_ip>
  1. workerを起動
master_ubuntu_terminal
sudo /usr/local/spark/sbin/start-slave.sh spark://<your_master_ip>:7077
  1. 他のPCでもworkernodeを起動
  • pc1から、masterに参加
worker1-OSX-terminal
sudo /usr/local/spark/sbin/start-slave.sh spark://<your_master_ip>:7077
  1. pyspark起動
master_ubuntu_terminal
/usr/local/spark/bin/pyspark --master spark://<your_master_ip>:7077 --packages org.apache.hadoop:hadoop-aws:2.7.0
  • これで以下のようなクラスタができます

standalone_image.PNG

  • localと同様、以下のコードでID列のフォーマット変更の処理を行います
format_standalone.py
# 読み込み
whole_data = spark.read.csv("file:///<path-to-hadoop>/hadoop/hadoop_sample_10g.csv")

# ID列のフォーマット変更
whole_data_formatted = whole_data.withColumn('ID', sf.concat(sf.col('ID'),sf.lit('-00')))
whole_data_formatted.show()

結果

ID列のフォーマット変更は無事にできました

  • before
    csv_before.PNG

  • after
    after_result.PNG

  • 1.の処理時間

local_result.PNG

  • 2.の処理時間

standalone_result.PNG

感想

  • sparkのチューニングを全くしていないので、この段階で実行時間を比べる意味はあまりない気がします
  • 色々詰まりましたが、ひとまず別のPCをworkerとして動かすことはできました!
  • 起動のコマンドなどはMakefileに書くとちょっと楽になりました

困ったこと

  1. masterとworkerの通信がうまくいかない
    • pingでお互いに通信を確認(IPなどに間違いがないか)
    • spark/conf/spark-env.shの設定が正しいか確認
  2. masterとworkerのpythonのバージョンが異なる
    • Exception: Python in worker has different version 3.1 than that in driver 2.7, PySpark cannot run with different minor versions
    • もしmasterとworkerで別々のversionになっていれば、ひとまず同じversionに揃えます
    • それでも上記のエラーが出る場合は、明示的にconf/spark-env.shPYSPARK_PYTHONのpathを記載します
/usr/local/spark/conf/spark-env.sh
export PYSPARK_PYTHON=<worker_python_path>
export PYSPARK_DRIVER_PYTHON=/home/name/.pyenv/shims/Jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
export SPARK_MASTER_HOST=<your_master_ip>
export SPARK_MASTER_IP=<your_master_ip>
export SPARK_LOCAL_IP=<your_master_ip>

参考

色々とその場しのぎの書き方になってしまったので、もっと勉強します!

sasaki_K_sasaki
学生時代は医療レセプトを扱っていました。 今は予約のトランザクションデータを扱っています。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away