Spark 環境の構築手順
1. hadoop と hadoop抜きspark 配布物を取得してきて展開する。
今だと以下の二つ
https://www.apache.org/dyn/closer.lua/spark/spark-2.3.0/spark-2.3.0-bin-without-hadoop.tgz
http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.0.1/hadoop-3.0.1.tar.gz
ほかに jdk がインストールされていること
2. 展開
例えば、${HOME}/UsrLocal/
以下に展開し、以下のように環境変数を設定しておく。
## Hadoop
#export JAVA_HOME=`/usr/libexec/java_home`
export JAVA_HOME=`dirname $(readlink $(readlink $(which java)))`/../../
export HADOOP_HOME=$HOME/UsrLocal/hadoop-3.0.1
export PATH=${HADOOP_HOME}/bin:${PATH}
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
## Spark
export PATH=$HOME/UsrLocal/spark-2.3.0-bin-without-hadoop/bin:${PATH}
export SPARK_DIST_CLASSPATH=`hadoop classpath`
export PYSPARK_DRIVER_PYTHON=ipython
JAVA_HOMEのコメントアウトされている方は MacOS 用。PYSPARK_DRIVER_PYTHON は後述の ipython を使うためのもの
3. python module まわり
python3 の venv 環境を作っておいて、その中で pip インストールする。
この際、ipythonが venv の外にあるとエラーになるので ipython も venv でインストールする。
ほかに、 pandas, xlrd (pandasでExcelファイル読み込みに使った), scikit-learn, scipy などをインストールしておいた。
mkdir proj_dir
cd proj_dir
python3 -m venv P
source P/bin/activate
pip3 install ipython
pip3 install pandas
pip3 install xlrd
4. 実例
この状態で pyspark を実行すると IPython terminal で spark が使える。
Python 3.6.5 (default, Apr 2 2018, 10:07:26)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.3.0 -- An enhanced Interactive Python. Type '?' for help.
2018-04-11 16:52:44 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 3.6.5 (default, Apr 2 2018 10:07:26)
SparkSession available as 'spark'.
In [1]:
pandas で Excel ファイルを DataFrame として読み込み、これを Spark の DataFrame に変換する。
いつの間にか RDD ではなく DataFrame (DataSet) が基本的なデータ型になっているそうでこちらでデータ操作することになりそう。
In [1]: import pandas as pd
In [2]: pde=pd.ExcelFile('testdata1.xlsx')
In [3]: pds=pde.parse(pde.sheet_names[0])
In [4]: %time sdf=sqlContext.createDataFrame(pds)
CPU times: user 154 ms, sys: 27.1 ms, total: 181 ms
Wall time: 2.45 s
In [5]: %time sdf.groupBy('性別').count().show()
2018-04-11 16:58:05 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavig.maxToStringFields' in SparkEnv.conf.
+---+-----+
| 性別|count|
+---+-----+
| 男| 16|
| 女| 84|
+---+-----+
CPU times: user 22.7 ms, sys: 11.6 ms, total: 34.4 ms
Wall time: 3.45 s
上の pdf が pandas dataframe で sdf が Spark dataframe。