#Apache Spark on Kubernetes とは
Apache Spark(以下、Spark)は、v2.3.0よりKubernetesをNativeサポートしました。これまで、管理者により事前に用意されたSpark/Hadoopと、それに割り当てられたコンピュートリソースを使って並列計算を実行していました。今回のNativeサポートでは、Kubernetesを管理する管理者がSpark用のPod(コンテナ)をあらかじめ準備する必要はありません。Kubernetes環境さえあれば、Spark用に用意されたコンピュートリソースを気にせずに実行することが可能となります。
大まかな実行の流れとしては、SparkのユーザがSparkのCLI(spark-submit
コマンド)を実行すると、KubernetesのAPI Serverを通じ、SchedulerにてSparkを実行するコンピュートリソース(Node)を定めます。次にNodeにSpark DriverのPod(Executorの管理を行うPod)と、計算を行うSpark ExecutorのPodがデプロイされ、並列計算を実行します。また、Sparkでの計算を完了する毎に、Podを削除/停止するため、計算を行っていない時に無駄なコンピュートリソースを消費することがありません。つまり、AWSやGCPなどの従量課金のパブリッククラウドを使う場合において、常に起動しているデーモンプロセスのようなPodが存在しないため、コンピュートリソース(CPU,メモリ)については実際に計算に使った分のみの課金となります。
このように、Sparkのユーザは、spark-submit
コマンドで指定するKubernetesのURLさえ知っていれば、多数のコンピュートリソースを使った並列計算環境を簡単に手にいれることが出来ます。これによって、自分が所有するコンピュートリソースで計算パワーが不足している場合に、手間と時間をかけず(お金だけで)解決できる並列計算環境が実現できます。
#処理の流れ
Spark on Kubernetesの処理の流れを下図を使い順番に追っていきます。
- (Step1)ユーザにて'spark-submit'コマンドを使い、計算の実行を指示します
- (Step2) KubernetesのNode上にSpark DriverのPod(コンテナ)がデプロイされます
- (Step3) SparkDriverが
spark-submit
で指定された数のExecutorのPodをデプロイし、各Executorにて計算が実行されます - (Step4) 計算が終了したExecutorのPodは自動で削除されます。全てのExecutorのPodが削除された後、最後にSpark DriverのPodが停止(Terminate)されます。計算結果は、Spark DriverのPodのログに出力されます。
Spark DriverのPodは自動で削除されないため、計算結果を確認後にkubectl
コマンドを使い手動で削除します。
#Spark v2.3.0での制限事項
Spark on Kubernetesは、まだまだ開発途上のプロジェクトです。
そのため、以下がFuture Workとなっています。
- PySpark
- R
- Dynamic Executor Scaling
- Local File Dependency Management
- Spark Application Management
- Job Queues and Resource Management
PythonやRでSpark on Kubernetesを試せるのは、少し先になりそうです。
詳細はこちらを参照ください。
#動作検証
次に、実際にSpark on Kubernetesを動かし検証します。
動作検証の環境は、次の環境を使います。
- Sparkのコンパイル/CLI実行環境: Ubuntu 16.04.4 LTS + Docker 18.03.1-ce
- Kubernetes環境: Kubernetes v1.10.2
(kubeadmを使いVirtual Box上に構築したMaster, WorkerNode x 2VMの合計3VMのクラスタ環境) - コンテナレポジトリ: Docker Hub
- サンプルアプリ: Sparkに付属の
SparkPi
(Javaで書かれたπの計算プログラム)
OS, Docker, Kubernetes自身のセットアップおよびDocker Hubのユーザ作成については割愛します。
##ダウンロード&コンパイル
SparkのダウンロードサイトからSpark 2.3.0をダウンロードします。
$ wget http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.3.0/spark-2.3.0.tgz
$ gzip -cd spark-2.3.0.tgz | tar xvf -
$ cd spark-2.3.0/
以下、spark-2.3.0のディレクトリで作業します。
次に、ダウンロードしたSparkのソースをコンパイルします(約45分)。
$ build/mvn -DskipTests clean package
<snip>
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [09:56 min]
[INFO] Spark Project Tags ................................. SUCCESS [05:12 min]
[INFO] Spark Project Sketch ............................... SUCCESS [ 9.913 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 58.391 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 12.961 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 32.245 s]
[INFO] Spark Project Launcher ............................. SUCCESS [02:26 min]
[INFO] Spark Project Core ................................. SUCCESS [05:59 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [02:44 min]
[INFO] Spark Project GraphX ............................... SUCCESS [ 25.961 s]
[INFO] Spark Project Streaming ............................ SUCCESS [01:10 min]
[INFO] Spark Project Catalyst ............................. SUCCESS [02:45 min]
[INFO] Spark Project SQL .................................. SUCCESS [05:33 min]
[INFO] Spark Project ML Library ........................... SUCCESS [02:18 min]
[INFO] Spark Project Tools ................................ SUCCESS [ 10.025 s]
[INFO] Spark Project Hive ................................. SUCCESS [02:19 min]
[INFO] Spark Project REPL ................................. SUCCESS [ 6.327 s]
[INFO] Spark Project Assembly ............................. SUCCESS [ 3.664 s]
[INFO] Spark Project External Flume Sink .................. SUCCESS [ 33.437 s]
[INFO] Spark Project External Flume ....................... SUCCESS [ 12.331 s]
[INFO] Spark Project External Flume Assembly .............. SUCCESS [ 3.056 s]
[INFO] Spark Integration for Kafka 0.8 .................... SUCCESS [ 39.349 s]
[INFO] Kafka 0.10 Source for Structured Streaming ......... SUCCESS [ 33.933 s]
[INFO] Spark Project Examples ............................. SUCCESS [ 25.154 s]
[INFO] Spark Project External Kafka Assembly .............. SUCCESS [ 4.582 s]
[INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [ 12.030 s]
[INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 4.851 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 45:57 min
[INFO] Finished at: 2018-05-1T23:21:21+09:00
[INFO] Final Memory: 86M/861M
[INFO] ------------------------------------------------------------------------
BUILD SUCCESSが出力されればコンパイル成功です。
今回、筆者は、ソースファイルからコンパイルしましたが、Apache SparkのWebサイトからコンパイル済みのバイナリをダウンロードしても構いません。
##Sparkのコンテナ作成&レポジトリ登録
KubernetesにデプロイするSparkのコンテナイメージを作成します。
作成には、docker-image-tool.sh
を使います。
コンテナをBuildするためのDockerfileは、kubernetes/dockerfiles/spark
ディレクトリ配下にあるので、必要に応じて自身のSparkのプログラムなどを追加してください。
$ sudo bin/docker-image-tool.sh -r ysakashita -t v2.3.0 build
<snap>
Successfully built f9cd85baa796
Successfully tagged ysakashita/spark:v2.3.0
$ sudo docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ysakashita/spark v2.3.0 f9cd85baa796 2 minutes ago 350MB
次に、Buildしたコンテナをレポジトリに登録します。
$ sudo docker login
Login with your Docker ID to push and pull images from Docker Hub. If you don't have a Docker ID, head over to https://hub.docker.com to create one.
Username (ysakashita): ysakashita
Password:
$ sudo bin/docker-image-tool.sh -r ysakashita -t v2.3.0 push
The push refers to a repository [docker.io/ysakashita/spark]
<snip>
v2.3.0: digest: sha256:3252c88b5527a97b9743824ae283c40e9d69b8587155ebd8c4d6f1b451d972f8 size: 2626
Docker Hubにログインし、実際に登録されているかを確認します。
登録されていれば、コンテナイメージの完了です。
次に、Spark on Kubernetesを実行するアカウントをKubernetesに作成します。
$ kubectl create serviceaccount spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
今回の検証では、defaultネームスペースを利用するので、defaultに権限を与えておきます。
以上で、検証の準備は完了です。
次は、いよいよ Spark on Kubernetesを使った並列計算を実行していきます。
Spark on Kubernetesを使った並列計算の実行
Spark on Kubernetesを使った並列計算を実行します。
実行には、spark-submit
コマンドを使います。
$ kubectl cluster-info
Kubernetes master is running at https://192.168.0.23:6443
KubeDNS is running at https://192.168.0.23:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
$ bin/spark-submit \
--master k8s://https://192.168.0.23:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=ysakashita/spark:v2.3.0 \
--class org.apache.spark.examples.SparkPi \
--name spark-pi \
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
Option | Description |
---|---|
--master | Kubernetes master(API Server)のアドレス |
--deploy-mode | Spark on Kubernetesではclusterを指定 |
--conf spark.executor.instances | Spark Executorの数 |
--conf spark.kubernetes.authenticate.driver.serviceAccountName | Service Account名 |
--conf spark.kubernetes.container.image | Spark のコンテナイメージ |
--class | 実行するJavaアプリのクラス名 |
--name | 実行するJavaアプリのプログラム名 |
<app jar> | 実行するJavaアプリのjar ファイル名 |
その他のオプションについては、こちらをご参照ください。
spark-submit
コマンドの実行結果にて、Exit codeが0(=正常終了)と表示されていれば、計算が正しく終了しています1。
Container name: spark-kubernetes-driver
Container image: ysakashita/spark:v2.3.0
Container state: Terminated
Exit code: 0
Spark DriverのPodのログをkubectl logs
を使い計算結果を確認します。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 0/1 Completed 0 29s
$ kubectl logs spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver |less
<snip>
2018-05-03 01:04:22 INFO DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 1.594295 s
Pi is roughly 3.144675723378617
<snip>
SparkPiの実行結果である3.144675723378617が出力されているのが確認できます。
ユーザが実行結果を確認できるように、Spark DriverのPodはReadyが0/1と停止している状態で削除されずに残り続けています。
実行結果が確認したら、Podを削除してください。
$ kubectl delete pods spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver
##計算処理中のPodデプロイメントの検証
上記でspark-submit
コマンドにて計算処理を実行している最中の、Kubernetes上のPodのデプロイメントについて見ていきます。
まずは、spark-submit
コマンドを実行した直後です。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 1/1 Running 0 4s
Spark DriverのPodがデプロイされています。
処理の流れの章で述べた(Step2)の状態です。
数秒後に再度Podの状態を確認してみます。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 1/1 Running 0 14s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-1 1/1 Running 0 7s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-2 1/1 Running 0 7s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-3 1/1 Running 0 7s
すると、今度はSpark ExecutorのPodが3つデプロイされているのが確認できます。
処理の流れの章で述べた(Step3)の状態です。
今回はspark-submit
コマンドでExecutorの数を3つと指定しているので、3Podsです。並列数を上げて計算したい人は、Spark Executorの数を増やしてみてください。
さらに、しばらくの間待ちます。
すると、各Spark Executorは、計算処理が完了すると自動的に削除されます。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 0/1 Completed 0 19s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-1 0/1 Terminating 0 12s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-2 0/1 Terminating 0 12s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-3 0/1 Terminating 0 12s
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 0/1 Completed 0 29s
最後に、Spark DriverのPodがREADY 0/1になった状態で残っているのが確認できます。
処理の流れの章で述べた(Step4)の状態です。
このように、Sparkの並列計算の処理にあわせ、Kubernetes上にPodがデプロイ&削除されていきます。
##クリーンアップ
作成したSparkのコンテナイメージをレポジトリから削除します。
Docker Hubにログインし、Settingsのタブを選択後、Delete RepositoryのDelete
ボタンをクリックし、削除します。
ローカルのDockerからもコンテナイメージを削除します。
$ sudo docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ysakashita/spark v2.3.0 f9cd85baa796 14 hours ago 350MB
$ sudo docker rmi f9cd85baa796
Untagged: ysakashita/spark:v2.3.0
Untagged: ysakashita/spark@sha256:3252c88b5527a97b9743824ae283c40e9d69b8587155ebd8c4d6f1b451d972f8
Deleted: sha256:f9cd85baa79655ccefe0d7b646b499cbf568acd064c77add7e1773dfe8e14eaa
<snip>
最後に、ダウンロード&コンパイルしたSparkを削除します。
$ cd ..
$ rm -rf spark-2.3.0
#Know-How
今回の検証中に、Spark ExecutorのPodが、Pendingのまま実行されないという現象が発生しました。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
<snip>
spark-pi-8d89987d72ba32c2a89b99876ca81129-exec-3 0/1 Pending 0 10s
<snip>
そこで、失敗したPodをkubectl describe
コマンドで確認してみると、
$ kubectl describe pods spark-pi-8d89987d72ba32c2a89b99876ca81129-exec-3
<snip>
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning FailedScheduling 12s (x6 over 27s) default-scheduler 0/3 nodes are available: 1 Insufficient memory, 3 Insufficient cpu.
CPUとMemoryが不足しているため、Kubernetes SchedulerがPodを動作するNodeを割り当てられない状況になっているのがわかります。
Spark 2.3.0 の公式ドキュメントを確認してみると
We recommend 3 CPUs and 4g of memory to be able to start a simple Spark application with a single executor.
と記載にあるように、3CPU, Memory 4Gbytes以上必要そうです。
そこで、VirtualBoxのVMのCPU数とメモリを増やしてみたところ、無事にPodが実行されるようになりました。
以下のコマンドでは、VirtualBox上のVMに割り当てられたCPU数とMemoryを変更しています。
事前にKubernetesのVMを全てShutdownしておいた状態で、VirtualBoxの母艦のPCで実行してください。
$ VBoxManage list vms
"k8s" {3c2bf51f-3344-431f-8a3e-5925e065389b}
"k8s-node1" {82226c81-30ac-4292-9987-e401d9f41356}
"k8s-node2" {e82c1953-1d60-4f3f-9a2b-282299249f78}
$ VBoxManage modifyvm k8s --memory 4096 --cpus 3
$ VBoxManage modifyvm k8s-node1 --memory 4096 --cpus 3
$ VBoxManage modifyvm k8s-node2 --memory 4096 --cpus 3
上記変更では、仮想CPU数が物理マシンに搭載しているCPU数を超えてしまう人が大半かと思います。
VirtualBoxのGUIでは、以下の警告メッセージが表示されたりしますが、仮想CPUのコンテキストスイッチ&割り込み処理で多少性能が落ちるだけで動作します。
性能を気にしない検証であれば無視してください。
感想
Spark on Kubernetesを触って見た第一印象は、「Kubernetes」ユーザのためのSpark連携ではなく、「Spark」ユーザのためのKubernetes連携ということです。Kubernetesを利用しているユーザであれば、KubernetesのReplicasetを使わずに、独自にExecutorの数をコントロールしている点など、少し違和感を感じるかもしれません。
SparkとKubernetesの両方を使いこなせるユーザは、世界的にも希有なのかもしれません。Sparkのユーザは、データサイエンティストのようなデータ分析に特化したスキルを持っている人達かと思います。そのようなユーザ向けには、従来のSparkでも利用していたspark-submit
コマンドのみで、並列計算を指示できるのは、嬉しいかもしれません。--conf 以外の拡張では、--masterにk8s://
の指定が出来るようになったのみで、他のオプションは変わりません。そのため、インフラやKubernetesを知らないSparkのユーザでも、敷居は低いかと思います。
また、別の観点としては、コンテナイメージの作成&レポジトリ登録が、Sparkのユーザに可能なのか?という心配が生まれるかと思います。今回の検証では、サンプルプログラムを使ったため、Sparkのアプリ自体はコンテナに含めていましたが、コンテナに含めずに外部に置くことも可能です(Using Remote Dependencies参照)。これによりSparkのアプリを更新するたびに、コンテナイメージを再作成&リポジトリ登録する作業がなくなります。Kubernetesのクラスタを管理するインフラのスキルを持った管理者が、一度Spark on Kubernetesのコンテナイメージを作成&レポジトリ登録しておけば、Sparkユーザは、spark.kubernetes.container.image
を指定するだけでSpark on Kubernetesを利用できます。
上述のように、Spark on Kubernetesは「Spark」ユーザ向けには、使い易いのではないかと思います。残念な点をあげるとすれば、Job Queuesがv2.3.0の時点では未サポートの点です。Sparkのユーザは、Intaractiveな計算よりは、バッチなどのJobを利用したケースが多いかと思います。この点については、Future Workにあがっているので、楽しみに待ちたいと思います。
#参考情報
#追記(2018/8)
HDFS on Kubernetesの検証の記事を公開しました。
これにより、Using Remote Dependenciesに記載されているspark-submit
コマンドで指定できるspark.files
(or --files)のデータ格納先HDFSもKubernetes上で管理することが可能となります。