環境
- OS:macOS Catalina バージョン10.15.1
- Python:3.4.6
JDKのインストール
Pysparkの裏では実はjavaが動いています。したがって、事前にJDKをインストールする必要があります。
% brew tap homebrew/cask-versions
% brew cask install adoptopenjdk8
余談ですが、Java8をインストールしようとするとJava8のサポートが終了した関係でエラーになります。ご注意ください。
% brew cask install java8
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 2 taps (homebrew/cask-versions and homebrew/core).
No changes to formulae.
Error: Cask 'java8' is unavailable: No Cask with this name exists.
インストールが完了したら、環境変数 JAVA_HOMEの設定をします。
bashの場合はvi ~/.bash_profile
、zshの場合はvi ~/.zprofile
で以下を追記します。
export JAVA_HOME=`/usr/libexec/java_home -v "1.8"`
PATH=${JAVA_HOME}/bin:${PATH}
追記したら設定を有効化します。
% source ~/.zprofile
これでJDKの準備が完了いたしました。
Pysparkのインストール
Pysparkをローカルで起動させる場合、
- Pythonのパッケージ
- インタラクティブシェル
- スタンドアローン
のいずれかの方法で実行させることができます。
個人的におすすめなのは手軽でわかりやすい1のPythonのパッケージを用いる方法です。
先に環境構築後に実行するスクリプトを共有いたします。
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = pyspark.SparkContext(appName='hello_spark')
spark = SparkSession(sc).builder.getOrCreate()
sample = ['Hello', 'Spark', 'World']
rdd = sc.parallelize(sample)
print(rdd.collect())
spark.stop()
このプログラムを実行し、以下のように出力されれば、無事に環境構築できたことを意味します。
> ['Hello', 'Spark', 'World']
では、それぞれの方法で実行できるようにインストール方法を紹介していきます。
1. Pythonのパッケージ
インストール
numpyやpandasのようにpipでインストールします。
% pip install pyspark
実行
通常のpythonを実行させます。今回はsample.pyに前述したサンプルコードを記述したものとします。
% python sample.py
> ['Hello', 'Spark', 'World']
2. インタラクティブシェル
メリット
- ターミナルを開いて
% pyspark
と打つだけで手軽に開始できる - 手軽にpysparkの動作が検証できる
デメリット
- 長いスクリプトには不向き
インストール
% brew install apache-spark
==> Summary
🍺 /usr/local/Cellar/apache-spark/2.4.4: 1,059 files, 248.4MB, built in 14 minutes 45 seconds
インストールが終わったらインストールされたディレクトリにPATHを通します。
※インストールされる場所、バージョンは環境によって変化します。ご注意ください
bashの場合は~/.bash_profile
に、zshの場合は~/.zprofile
に以下を追記します。
export SPARK_HOME=/usr/local/Cellar/apache-spark/2.4.4/libexec/
export PATH=${PATH}:${SPARK_HOME}
これでインストールは完了です。起動してみましょう。
実行
% pyspark
Python 3.4.6 (default, Mar 16 2019, 07:39:35)
[GCC 4.2.1 Compatible Apple LLVM 10.0.0 (clang-1000.10.44.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
19/12/08 00:20:55 WARN Utils: Your hostname, pacion.local resolves to a loopback address: 127.0.0.1; using 192.168.11.3 instead (on interface en0)
19/12/08 00:20:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/12/08 00:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 3.4.6 (default, Mar 16 2019 07:39:35)
SparkSession available as 'spark'.
>>>
これで前述したサンプルコードを実行して問題なければ、環境構築は完了になります。
3. スタンドアローン
メリット
- Spark UIなどでモニタリングできる
- Spark-submitコマンドのオプションが試せる
デメリット
- プログラムを実行するまでが手間
インストール
前述したインタラクティブシェルと同じ方法になります。
実行
まず前述したサンプルコードのファイル(今回はsample.py)を作成します。
ファイルを作成したらマスターを立ち上げます。
${SPARK_HOME}/sbin/start-master.sh
次に、http://localhost:8080 (デフォルト設定)にアクセスしてSpark UIからマスターのURLをメモします。
私の環境だと spark://XXXX.local:7077 のようです。
ワーカーを立ち上げます。先ほどメモしたURLを入力してください。
% ${SPARK_HOME}/sbin/start-slave.sh {マスターのURL}
そしたらいよいよプログラムを実行します。
% spark-submit --master spark://XXXX.local:7077 sample.py
19/12/08 00:49:20 WARN Utils: Your hostname, pacion.local resolves to a loopback address: 127.0.0.1; using 192.168.11.3 instead (on interface en0)
19/12/08 00:49:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/12/08 00:49:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/12/08 00:49:21 INFO SparkContext: Running Spark version 2.4.4
19/12/08 00:49:21 INFO SparkContext: Submitted application: hello_spark
19/12/08 00:49:22 INFO SecurityManager: Changing view acls to: ete
19/12/08 00:49:22 INFO SecurityManager: Changing modify acls to: ete
19/12/08 00:49:22 INFO SecurityManager: Changing view acls groups to:
19/12/08 00:49:22 INFO SecurityManager: Changing modify acls groups to:
19/12/08 00:49:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ete); groups with view permissions: Set(); users with modify permissions: Set(ete); groups with modify permissions: Set()
19/12/08 00:49:22 INFO Utils: Successfully started service 'sparkDriver' on port 58280.
19/12/08 00:49:22 INFO SparkEnv: Registering MapOutputTracker
19/12/08 00:49:22 INFO SparkEnv: Registering BlockManagerMaster
19/12/08 00:49:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/12/08 00:49:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/12/08 00:49:22 INFO DiskBlockManager: Created local directory at /private/var/folders/bb/4rvqk2d102s796201cpvbnjh0000gn/T/blockmgr-cf1f804f-2e26-4c5e-bb5f-79541b701799
19/12/08 00:49:22 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/12/08 00:49:22 INFO SparkEnv: Registering OutputCommitCoordinator
19/12/08 00:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/12/08 00:49:22 INFO Utils: Successfully started service 'SparkUI' on port 4041.
19/12/08 00:49:22 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.11.3:4041
19/12/08 00:49:22 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://pacion.local:7077...
19/12/08 00:49:22 INFO TransportClientFactory: Successfully created connection to pacion.local/127.0.0.1:7077 after 35 ms (0 ms spent in bootstraps)
19/12/08 00:49:22 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20191208004922-0000
19/12/08 00:49:22 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58282.
19/12/08 00:49:22 INFO NettyBlockTransferService: Server created on 192.168.11.3:58282
19/12/08 00:49:22 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/12/08 00:49:22 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20191208004922-0000/0 on worker-20191208004745-192.168.11.3-58271 (192.168.11.3:58271) with 4 core(s)
19/12/08 00:49:22 INFO StandaloneSchedulerBackend: Granted executor ID app-20191208004922-0000/0 on hostPort 192.168.11.3:58271 with 4 core(s), 1024.0 MB RAM
19/12/08 00:49:22 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.11.3, 58282, None)
19/12/08 00:49:22 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.11.3:58282 with 366.3 MB RAM, BlockManagerId(driver, 192.168.11.3, 58282, None)
19/12/08 00:49:22 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.11.3, 58282, None)
19/12/08 00:49:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.11.3, 58282, None)
19/12/08 00:49:23 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20191208004922-0000/0 is now RUNNING
19/12/08 00:49:23 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
19/12/08 00:49:24 INFO SparkContext: Starting job: collect at /Users/ete/Desktop/sample.py:8
19/12/08 00:49:24 INFO DAGScheduler: Got job 0 (collect at /Users/ete/Desktop/sample.py:8) with 2 output partitions
19/12/08 00:49:24 INFO DAGScheduler: Final stage: ResultStage 0 (collect at /Users/ete/Desktop/sample.py:8)
19/12/08 00:49:24 INFO DAGScheduler: Parents of final stage: List()
19/12/08 00:49:24 INFO DAGScheduler: Missing parents: List()
19/12/08 00:49:24 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195), which has no missing parents
19/12/08 00:49:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1432.0 B, free 366.3 MB)
19/12/08 00:49:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 948.0 B, free 366.3 MB)
19/12/08 00:49:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.11.3:58282 (size: 948.0 B, free: 366.3 MB)
19/12/08 00:49:24 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1161
19/12/08 00:49:24 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195) (first 15 tasks are for partitions Vector(0, 1))
19/12/08 00:49:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
19/12/08 00:49:25 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.11.3:58285) with ID 0
19/12/08 00:49:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.11.3, executor 0, partition 0, PROCESS_LOCAL, 7885 bytes)
19/12/08 00:49:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.11.3, executor 0, partition 1, PROCESS_LOCAL, 7914 bytes)
19/12/08 00:49:25 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.11.3:58287 with 366.3 MB RAM, BlockManagerId(0, 192.168.11.3, 58287, None)
19/12/08 00:49:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.11.3:58287 (size: 948.0 B, free: 366.3 MB)
19/12/08 00:49:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 576 ms on 192.168.11.3 (executor 0) (1/2)
19/12/08 00:49:26 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 601 ms on 192.168.11.3 (executor 0) (2/2)
19/12/08 00:49:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/12/08 00:49:26 INFO DAGScheduler: ResultStage 0 (collect at /Users/ete/Desktop/sample.py:8) finished in 2.083 s
19/12/08 00:49:26 INFO DAGScheduler: Job 0 finished: collect at /Users/ete/Desktop/sample.py:8, took 2.148719 s
['Hello', 'Spark', 'World']
19/12/08 00:49:26 INFO SparkUI: Stopped Spark web UI at http://192.168.11.3:4041
19/12/08 00:49:26 INFO StandaloneSchedulerBackend: Shutting down all executors
19/12/08 00:49:26 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
19/12/08 00:49:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/12/08 00:49:26 INFO MemoryStore: MemoryStore cleared
19/12/08 00:49:26 INFO BlockManager: BlockManager stopped
19/12/08 00:49:26 INFO BlockManagerMaster: BlockManagerMaster stopped
19/12/08 00:49:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/12/08 00:49:26 INFO SparkContext: Successfully stopped SparkContext
19/12/08 00:49:26 INFO ShutdownHookManager: Shutdown hook called
19/12/08 00:49:26 INFO ShutdownHookManager: Deleting directory /private/var/folders/bb/4rvqk2d102s796201cpvbnjh0000gn/T/spark-b3816dff-c801-489a-bb36-1452295d0d1c
19/12/08 00:49:26 INFO ShutdownHookManager: Deleting directory /private/var/folders/bb/4rvqk2d102s796201cpvbnjh0000gn/T/spark-667f3326-a798-445a-a5eb-ec301b49eea1/pyspark-83c12670-4752-4871-b930-b95297d42255
19/12/08 00:49:26 INFO ShutdownHookManager: Deleting directory /private/var/folders/bb/4rvqk2d102s796201cpvbnjh0000gn/T/spark-667f3326-a798-445a-a5eb-ec301b49eea1
ログに埋もれてしまっていますが、['Hello', 'Spark', 'World']がちゃんと出力されていますね。
Sparkを停止させるときは以下のコマンドを実行します。
${SPARK_HOME}/sbin/stop-all.sh
spark-submitのオプションについての参考
Web UIの見方についての参考
最後に
今回はMacでpysparkを実行する方法を3通り紹介させていただきました。
繰り返しになりますが、個人的にはPythonのパッケージで扱う方法が検証しやすくてオススメです。
この記事が環境構築の一助になれば幸いです。