Spark
pandas
sparksql

Apache Sparkの初心者が環境構築とPySparkでのデータ集計までやってみる

はじめに

Apache Sparkの初心者がPySparkで、DataFrame API、SparkSQL、Pandasを動かしてみた際のメモです。
Hadoop、Sparkのインストールから始めていますが、インストール方法等は何番煎じか分からないほどなので自分用のメモの位置づけです。

環境

お試し用に以下のバージョンのHadoop(HDFS)とSparkをインストールします。

・Apache Hadoop 3.1.1
・Apache Spark 2.3.2

OSはCentOS 7.4を使用します。
お試しですので、Hadoopは疑似分散を使用します。
HadoopはSpark付属のものでも良かったのですが、今回は別にインストールしています。
Sparkはスタンドアロンモードを使用します。

JDKインストール

最初にOpenJDK 8をインストールします。

# yum -y install java-1.8.0-openjdk-devel

# java -version
openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)

JAVA_HOMEを設定し、PATHにJavaのパスを追加しています。

# echo "export JAVA_HOME=$(readlink -e $(which java)|sed 's:/bin/java::')" > /etc/profile.d/java.sh
# echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile.d/java.sh
# source /etc/profile

Hadoopインストール

Hadoopを疑似分散モードでインストールします。
以下のサイト・投稿を参考にしています。

まず、Hadoopのモジュールを以下からダウンロードします。

 http://hadoop.apache.org/releases.html

以下の3.1.1.をダウンロードし、「/opt」以下に展開しました。

image.png

# tar xvzf hadoop-3.1.1.tar.gz -C /opt
# ln -s /opt/hadoop-3.1.1 /opt/hadoop
HDFSのデータ用ディレクトリを作成します。
# mkdir -p /var/hadoop/data

必要な環境変数を定義します。

/etc/profile.d/hadoop.sh
export HADOOP_HOME=/opt/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

Hadoop/Spark用のユーザ(Hadoop)を作成します。

# useradd -s /bin/bash -m hadoop -p XXXXXXXX
# chown -R hadoop:hadoop /opt/hadoop-3.1.1
# chown -R hadoop:hadoop /opt/hadoop
# chown -R hadoop:hadoop /var/hadoop/data
# su - hadoop

擬似分散モードで動かすための設定の以下のファイルを修正します。

・HDFS用のディレクトリを/var/hadoop/dataに指定

/opt/hadoop/etc/hadoop/core-site.xml
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/var/hadoop/data</value>
    </property>
</configuration>

・今回は1ノードのためレプリケーションを"1"に設定

/opt/hadoop/etc/hadoop/hdfs-site.xml
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Hadoop は SSH 経由でホストを操作するので、"ssh localhost" でパスフレーズなしでログインできる必要があります。

$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
$ chmod 600 ~/.ssh/authorized_keys
パスフレーズなしで設定できることを確認します。
$ ssh localhost

ファイルシステムをフォーマット

$ hdfs namenode -format
WARNING: /opt/hadoop/logs does not exist. Creating.
2018-10-19 23:00:14,379 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = sparkserver1/127.0.0.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 3.1.1

~省略~
2018-10-19 23:00:32,388 INFO common.Storage: Storage directory /var/hadoop/data/dfs/name has been successfully formatted.
~省略~

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at sparkserver1/127.0.0.1
************************************************************/

HDFSの起動

$ start-dfs.sh
Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [sparkserver1]

jpsでDataNode、NameNode、SecondaryNameNodeの3つのプロセスが起動していることを確認します。

$ jps 
21505 DataNode
21829 Jps
21388 NameNode
21709 SecondaryNameNode

HDFSを参照できることを確認するために以下のコマンドを実行します。
フォーマット直後のため、結果は何も表示されません。

$ $hadoop fs -ls /

Apache Sparkのインストール

以下を参考にしています。

Spark Standalone Mode
Apache Spark で分散処理入門

以下から、version 2.3.2をダウンロードし、インストールします。

https://spark.apache.org/downloads.html

package typeは、既にHadoopをインストールしているため、Hadoopを含まない「Pre-build with user-provided Apache Hadopp」を選択しています。

image.png

「/opt」以下に展開しました。

# tar xvzf spark-2.3.2-bin-without-hadoop.tgz -C /opt
# ln -s /opt/spark-2.3.2-bin-without-hadoop /opt/spark
/etc/profile.d/spark.sh
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
export SPARK_HOME=/opt/spark
# chown -R hadoop:hadoop /opt/spark-2.3.2-bin-without-hadoop
# chown -R hadoop:hadoop /opt/spark
# su - hadoop

Using Spark's "Hadoop Free" Build」に記載されていましたが、HadoopなしのSparkをインストールする場合は、以下のようにSPARK_DIST_CLASSPATHにHadoopのクラスパスを設定します。

$ cp -p /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
$ vi /opt/spark/conf/spark-env.sh
export SPARK_DIST_CLASSPATH=`hadoop classpath`

Sparkを起動します。

$ start-all.sh 
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-hadoop-org.apache.spark.deploy.master.Master-1-sparkserver1.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-hadoop-org.apache.spark.deploy.worker.Worker-1-sparkserver1.out

Sparkを起動すると、Spark-UIも利用できるようになります。

http://[Sparkが起動しているサーバのIPアドレス]:8080/

image.png

いよいよPySparkを使う

HDFS上のCSVを読み込んで、PySparkからデータを操作しています。

データを準備する

まずは手頃なCSVファイルが欲しかったので、kaggleから以下のデータをダウンロードしました。

https://www.kaggle.com/datasets

image.png

データは以下のように、パン屋さんのお客さんごとの購入物になっています。
PySparkで商品ごとの売り上げを集計してみます。

PySparkといっても色々やり方があるようで、DataFrame API、SparkSQL、Pandasの3つで同じように試してみます。

Date,Time,Transaction,Item
2016-10-30,09:58:11,1,Bread
2016-10-30,10:05:34,2,Scandinavian
2016-10-30,10:05:34,2,Scandinavian
2016-10-30,10:07:57,3,Hot chocolate
2016-10-30,10:07:57,3,Jam
2016-10-30,10:07:57,3,Cookies
2016-10-30,10:08:41,4,Muffin
2016-10-30,10:13:03,5,Coffee
2016-10-30,10:13:03,5,Pastry
2016-10-30,10:13:03,5,Bread
2016-10-30,10:16:55,6,Medialuna
2016-10-30,10:16:55,6,Pastry
2016-10-30,10:16:55,6,Muffin

DataFrame API

以下を参考にしています。

Spark SQLとDataFrame API入門
PySparkのチートシート

まずは、HDFS にディレクトリを作成し、サンプルデータのCSVファイルを格納します。

$ hadoop fs -mkdir /data/
$ hadoop fs -put BreadBasket_DMS.csv /data/
$ hadoop fs -ls /data
Found 1 items
-rw-r--r--   1 hadoop supergroup     710518 2018-10-21 02:56 /data/BreadBasket_DMS.csv

PySparkを起動します。

$ pyspark --name sample-app --master spark://localhost:7077

まず、HDFS上のCSVファイルを読み込んで、最初の5行を表示します。

from pyspark.sql.types import *
struct = StructType([
        StructField('bb_date', DateType(), False),
        StructField('bb_time', StringType(), False),
        StructField('bb_tran', IntegerType(), False),
        StructField('bb_item', StringType(), False)
     ])

df = spark.read.csv('hdfs://localhost:9000/data/BreadBasket_DMS.csv', schema=struct)

df.show(5)
+----------+--------+-------+-------------+
|   bb_date| bb_time|bb_tran|      bb_item|
+----------+--------+-------+-------------+
|      null|    null|   null|         null|
|2016-10-30|09:58:11|      1|        Bread|
|2016-10-30|10:05:34|      2| Scandinavian|
|2016-10-30|10:05:34|      2| Scandinavian|
|2016-10-30|10:07:57|      3|Hot chocolate|
+----------+--------+-------+-------------+
only showing top 5 rows

最初のnullはCSVのヘッダのようです。集計に影響はないのでこのまま進めます。

次に、商品ごとにグルーピングし、売上をカウントして降順に並び替えます。

>>> df.groupBy('bb_item').count().sort('count', ascending=False).show(5)
+-------+-----+                                                                 
|bb_item|count|
+-------+-----+
| Coffee| 5471|
|  Bread| 3325|
|    Tea| 1435|
|   Cake| 1025|
| Pastry|  856|
+-------+-----+
only showing top 5 rows

トップはCoffeeで、5471杯の売り上げでした。

SparkUIではDAGが以下のように表示できます。

image.png

SparkSQL

次にSparkSQLで同様のことを試してみます。
CSVファイルを読み込むところは同じです。

参考:
Spark SQLサンプルアプリの実行

struct = StructType([
        StructField('bb_date', DateType(), False),
        StructField('bb_time', StringType(), False),
        StructField('bb_tran', IntegerType(), False),
        StructField('bb_item', StringType(), False)
     ])

df = spark.read.csv('hdfs://localhost:9000/data/BreadBasket_DMS.csv', schema=struct)

bbという一時的なテーブルを作成し、最初の5行を表示しています。

>>> df.registerTempTable('bb')
>>> spark.sql('select * from bb').show(5)
+----------+--------+-------+-------------+
|   bb_date| bb_time|bb_tran|      bb_item|
+----------+--------+-------+-------------+
|      null|    null|   null|         null|
|2016-10-30|09:58:11|      1|        Bread|
|2016-10-30|10:05:34|      2| Scandinavian|
|2016-10-30|10:05:34|      2| Scandinavian|
|2016-10-30|10:07:57|      3|Hot chocolate|
+----------+--------+-------+-------------+
only showing top 5 rows

最後に集計しています。当然ですが、DataFrame APIを用いた時と同じ結果になっています。

>>> spark.sql('select bb_item, count(*) as n from bb group by bb_item order by n desc').show(5)
+-------+----+                                                                  
|bb_item|   n|
+-------+----+
| Coffee|5471|
|  Bread|3325|
|    Tea|1435|
|   Cake|1025|
| Pastry| 856|
+-------+----+
only showing top 5 rows

Pandas

最後はPandasで試してみます。
以下を参考にしました。

Pandas 公式チートシートを翻訳しました
https://qiita.com/ysdyt/items/9ccca82fc5b504e7913a

まずは、pipが入っていなかったのでインストールして、そのままPandasもインストールします。

# wget https://bootstrap.pypa.io/get-pip.py
# python get-pip.py
# pip install pandas

PySparkで、最初にCSVファイルを読み込んでいます。DataFrame API、SparkSQLではHDFSから読み込んでいたのですが、Pandasでは若干面倒なようなので今回はローカルファイルから読み込んでいます。

import pandas as pd
names = ['bb_date', 'bb_time', 'bb_tran', 'bb_item']
pandasdf = pd.read_csv('/tmp/BreadBasket_DMS.csv', names=names)

最初の5行を表示。

>>> pandasdf.head(5)
      bb_date   bb_time      bb_tran        bb_item
0        Date      Time  Transaction           Item
1  2016-10-30  09:58:11            1          Bread
2  2016-10-30  10:05:34            2   Scandinavian
3  2016-10-30  10:05:34            2   Scandinavian
4  2016-10-30  10:07:57            3  Hot chocolate

最後に集計結果を表示します。

>>> pandasdf['bb_item'].value_counts().head(5)
Coffee    5471
Bread     3325
Tea       1435
Cake      1025
Pastry     856
Name: bb_item, dtype: int64

最後に

最後にpyファイルで実行しようとしたが失敗。
pythonファイルは以下のとおり。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd

names = ['bb_date', 'bb_time', 'bb_tran', 'bb_item']
pandasdf = pd.read_csv('/tmp/BreadBasket_DMS.csv', names=names)
pandasdf['bb_item'].value_counts().head(5)

これをpysparkで実行しようとすると、spark-submitを使えと怒られる。

pyspark --name sample-app --master spark://localhost:7077 sample.py
Running python applications through 'pyspark' is not supported as of Spark 2.0.
Use ./bin/spark-submit <python file> 

spark-submitで実行すると、エラーにはならなかったが、結果が表示されず。これはまた今度調べてみよう。

$ spark-submit --name sample-app --master spark://localhost:7077 sample.py 
2018-10-21 06:49:39 WARN  Utils:66 - Your hostname, sparkserver1 resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
2018-10-21 06:49:39 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-10-21 06:49:40 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-10-21 06:49:40 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-10-21 06:49:40 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-0edd9099-fe73-4ebb-b7dd-5b9b9cc7829e