Sparkクラスタを用意していくつかサンプルコードを書いていこうと思います。Pythonのデータ分析や機械学習の実行環境としてJupyterは多くの方が利用していると思います。Apache ToreeでSparkアプリも同じようにJupyterからインタラクティブに書くことが目的です。ブラウザから実行できるScalaのREPLしてもJupyterを使うことができます。
Spark
SparkクラスタをDocker Composeで構築します。Docker HubとGitHubに多くのSpark Standalone Cluster用のイメージとdocker-compose.ymlが公開されています。
いくつか試しましたがsemantive/sparkがシンプルで使いやすい印象です。
Docker Compose
semantive/spark
イメージの使い方はDocker Images For Apache Sparkに書いてあります。Docker Hubはこちら、GitHubはこちらになります。
リポジトリにあるdocker-compose.ymlからいくつか変更しました。主な変更点はSparkのバージョンを合わせるためイメージタグを明示的に指定する、SPARK_PUBLIC_DNS
とSPARK_MASTER_HOST
環境変数にクラウド上の仮想マシンのパブリックIPアドレスを指定することです。
version: '2'
services:
master:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
environment:
MASTER: spark://master:7077
SPARK_CONF_DIR: /conf
SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
SPARK_MASTER_HOST: <仮想マシンのパブリックIPアドレス>
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- spark_data:/tmp/data
worker1:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker1
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
depends_on:
- master
ports:
- 8081:8081
volumes:
- spark_data:/tmp/data
worker2:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker2
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8882
SPARK_WORKER_WEBUI_PORT: 8082
SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
depends_on:
- master
ports:
- 8082:8082
volumes:
- spark_data:/tmp/data
volumes:
spark_data:
driver: local
Spark Standalone Clusterを起動します。
$ docker-compose up -d
Spark Master UIを開いてクラスタの状態を確認します。
http://<仮想マシンのパブリックIPアドレス>:8080
Masterコンテナのspark-shellを実行してScalaとSparkのバージョンを確認します。Sparkは開発のスピードがとても速く、Scalaのバージョンも含めてよく確認しないと思わぬエラーに遭遇してしまいます。
- Scala: 2.11.8
- Spark: 2.1.1
$ docker-compose exec master spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Jupyter
JupyterのDockerイメージは公式のjupyter/all-spark-notebookを使います。ScalaやSparkまで使える全部入りのイメージです。
Apache Toree
Apache ToreeはSparkクラスタにJupyterから接続するためのツールです。PySparkに加え、Scala、SparkR、SQLのKernelが提供されます。
Dockerfileを見るとApache Toreeもインストールされています。
# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix
docker-compose.yml
Spark Standalone Clusterのdocker-compose.ymlにJupyterサービスを追加します。
jupyter:
image: jupyter/all-spark-notebook:c1b0cf6bf4d6
depends_on:
- master
ports:
- 8888:8888
volumes:
- ./notebooks:/home/jovyan/work
- ./ivy2:/home/jovyan/.ivy2
env_file:
- ./.env
environment:
TINI_SUBREAPER: 'true'
SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000
Jupyterサービスのオプションについて
Spark Standalone ClusterではHadoopを利用していないため分散ファイルシステムにAmazon S3を利用する設定を追加しています。サンプルデータやParquetファイルの保存先にあると便利です。
image
jupyter/all-spark-notebook
イメージは更新が頻繁に入ります。Apache Toreeで使うSparkとSparkクラスタのバージョンがエラーになり起動しなくなります。今回はSparkクラスタのバージョンは2.1.1
なので同じバージョンのイメージのtagを指定します。jupyter/all-spark-notebook
イメージのタグはIDしかわからないのが不便です。
Sparkのバージョンはすでに2.2.0へ上がっているため、2.1.1
のタグを指定します。
タグのDockerイメージをpullしてspark-shell
で確認します。
$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
jupyter/all-spark-notebook:c1b0cf6bf4d6 \
/usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell
SparkクラスタとSparkとScalaのバージョンが同じであることが確認できました。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Jupyterのバージョンも確認しておきます。
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0
TINI_SUBREAPERとSPARK_OPTS
Apache Toreeを利用してJupyterからリモートのSparkに接続するために必須な設定はこの2つです。TINI_SUBREAPER
環境変数はinitにTiniを使います。
Sparkで追加のJarファイルを使わない場合はSPARK_OPTS
環境変数に以下の指定だけでリモートのSpark Standalone Clusterに接続できます。通常のspark-submitのオプションと同じです。
--master spark://master:7077 --deploy-mode client
追加のJarファイルがある場合はさらに--packages
フラグを追加します。この場合はAmazon S3に接続するために必要なパッケージです。
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
--NotebookApp.iopub_data_rate_limit
Bokehなど可視化ツールで大きな画像イメージを扱う場合はJupyterの起動スクリプトのオプションを指定します。
--NotebookApp.password
Jupyterの認証方法はデフォルトはtokenです。Dockerコンテナのように頻繁に起動と破棄を繰り返す場合に毎回異なるtokenを入れるのは面倒なのでパスワード認証に変更しました。ipythonを使いパスワードのハッシュ値を取得します。
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
パスワードは以下のように生成します。出力されたハッシュ値をJupyterの起動オプションに指定します。
In [1]: from notebook.auth import passwd
In [2]: passwd()
Enter password:
Verify password:
Out[2]: 'sha1:xxx'
volumes
/home/jovyan
はJupyterコンテナを実行しているユーザーのホームディレクトリです。作成したnotebookやダウンロードしたJarファイルをDockerホストにマウントします。
env_file
.env
ファイルに環境変数を記述してコンテナに渡します。Amazon S3への接続に使うaccess key と secret keyを指定します。
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
Gitにcommitしないように忘れずに.gitignoreにも追加します。
.env
JupyterからSparkとAmazon S3を使う
JupyterでSparkとAmazon S3を使うサンプルをScalaとPythonで書いてみようと思います。Monitoring Real-Time Uber Data Using Apache APIs, Part 1: Spark Machine Learningの記事で利用しているUberのピックアップデータをサンプルに使います。ここでは単純にCSVファイルをS3から読み込んで表示するだけです。
docker-compose.ymlに定義した全てのサービスを起動します。
$ docker-compose up -d
Jupyterをブラウザで開きさきほど作成したパスワードでログインします。
http://<仮想マシンのパブリックIPアドレス>:8888
データ準備
リポジトリをcloneしたあとuber.csv
ファイルをs3cmd
から適当なバケットにputします。
$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<バケット名>/uber-csv/
Scala
以下のようなコードを確認したいところでセルに分割してインタラクティブに実行することができます。ScalaのNotebookを書く場合は右上のNew
ボタンからApache Toree - Scala
を選択します。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
getOrCreate()
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("dt", TimestampType, true) ::
StructField("lat", DoubleType, true) ::
StructField("lon", DoubleType, true) ::
StructField("base", StringType, true) :: Nil
)
val df =
spark.read.
option("header", false).
schema(schema).
csv("s3a://<バケット名>/uber-csv/uber.csv")
df.printSchema
df.cache
df.show(false)
Scalaの場合スキーマのStructTypeは次のようにも書くことができます。
val schema = (new StructType).
add("dt", "timestamp", true).
add("lat", "double", true).
add("lon", "double", true).
add("base", "string", true)
最後のdf.show(false)
の出力結果です。
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Python
Python 3のNotebookを書く場合は右上のNew
ボタンからPython 3
を選択します。以下のコードを適当なところでセルに分割して実行していきます。Scalaと異なるのは追加JarはPYSPARK_SUBMIT_ARGS
環境変数に指定する点です。
以下のようにPythonでもほぼScalaと同じようにでSparkアプリを書くことができます。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.getOrCreate()
)
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([
StructField("dt", TimestampType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("base", StringType(), True)
])
df = (
spark.read
.option("header", False)
.schema(schema)
.csv("s3a://<バケット名>/uber-csv/uber.csv")
)
df.printSchema()
df.cache()
df.show(truncate=False)
最後のdf.show(truncate=False)
の出力結果は先ほどのScalaのコードと同じです。
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows