以下の内容は記事公開時の情報に基づいています。最新情報は公式ドキュメントをご確認ください。
はじめに
Google Cloud Next '21 で Dataproc Serverless for Spark が発表されました。
- Spark jobs that autoscale and made seamless for all data users | Google Cloud Blog
- Google Cloud Next におけるデータ分析に関するまとめ | Google Cloud Blog
その機能をいくつか試してみましたので、本記事ではそれを紹介致します。
Dataproc Serverless for Spark を含む Spark on Google Cloud の機能は、一般公開待ち、及び非公開プレビューとなっています。利用されたい場合は以下からお申し込みください。
https://docs.google.com/forms/d/e/1FAIpQLSccIXlE5gJNE0dNs6vQvCfrCcSjnoHqaW2lxpoVkAh56KLOwA/viewform
Dataproc Serverless for Sparkとは
Dataproc上でジョブを実行する際には
- ジョブに合わせたスペックのクラスタを選択、作成
- ジョブを送信
- (必要に応じて)クラスタを削除
というフローを利用者側で行う必要があります。
Dataproc Serverless for Spark では、ジョブに合わせて Google Cloud がクラスタを作成、オートスケーリング、削除し、利用者側でクラスタ管理を行う必要が無くなります。
触ってみる
Google Cloud の利用には料金が発生する場合があります。
以下を実行する際は、各自の責任でお願い致します。
実行環境は以下の通りです。
$ gcloud --version
Google Cloud SDK 366.0.0
alpha 2021.12.03
beta 2021.12.03
bq 2.0.72
core 2021.12.03
gsutil 5.5
kubectl 1.20.8
$ docker --version
Docker version 20.10.11, build dea9396
以下の変数を適宜設定ください。以降のコマンドで必要となります。
リージョン名は以下を参考ください。
https://cloud.google.com/compute/docs/regions-zones#available
$ PROJECT=<GCP プロジェクト名>
$ REGION=<リージョン名>
$ NETWORK=<VPC ネットワーク名>
$ SUB_NETWORK=<VPC サブネットワーク名>
$ FW_RULE=<ファイアウォールルール名>
基本編
VPC ネットワークの作成
Dataproc Serverless for Spark では限定公開の Google アクセスを有効にしたサブネットワークが必要なため、まずは VPC ネットワークの作成を行います。
$ gcloud compute networks create ${NETWORK} \
--project=${PROJECT} \
--subnet-mode=custom \
--mtu=1460 \
--bgp-routing-mode=regional
サブネットを作成します。
IP アドレス範囲は default ネットワークの us-central1 サブネットに合わせており、深い意味はございません。
$ gcloud compute networks subnets create ${SUB_NETWORK} \
--project=${PROJECT} \
--range=10.128.0.0/20 \
--network=${NETWORK} \
--region=${REGION} \
--enable-private-ip-google-access
クラスタ内での通信を許可するファイアウォールルールを設定します。
こちらもIP アドレス範囲は default の default-allow-internal に合わせており、深い意味はございません。
ルールに関しては以下もご参考ください。
https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/network#firewall_rule_requirement
$ gcloud compute firewall-rules create ${FW_RULE} \
--direction=INGRESS \
--priority=65534 \
--network=${NETWORK} \
--action=ALLOW \
--rules=tcp:0-65535,udp:0-65535,icmp \
--source-ranges=10.128.0.0/9 \
--project=${PROJECT}
ジョブの実行
Sparkジョブを実行します。前述した通り、クラスタの作成は不要です。
ジョブの内容は以下の通りで、円周率の近似値を計算するプログラムです。
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
$ gcloud beta dataproc batches submit spark \
--region ${REGION} \
--subnet ${SUB_NETWORK} \
--jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
--class=org.apache.spark.examples.SparkPi \
--project ${PROJECT}
実行結果は以下となります。
(前略)
Pi is roughly 3.140255701278506
(後略)
GUI でも結果を確認してみます。
基本的には、4コアのExecutor × 2で動作するようです。
Cloud Logging でも確認してみます。クエリは以下となります。
resource.type="cloud_dataproc_batch"
resource.labels.batch_id="<バッチ ID>"
オプションパラメータ
以下コマンドで、ジョブ実行時のオプションパラメータが確認できます。
$ gcloud beta dataproc batches submit --help
NAME
gcloud beta dataproc batches submit - submit a Dataproc batch job
SYNOPSIS
gcloud beta dataproc batches submit COMMAND [--async] [--batch=BATCH]
[--container-image=CONTAINER_IMAGE]
[--history-server-cluster=HISTORY_SERVER_CLUSTER] [--kms-key=KMS_KEY]
[--labels=[KEY=VALUE,...]] [--metastore-service=METASTORE_SERVICE]
[--properties=[PROPERTY=VALUE,...]] [--region=REGION]
[--request-id=REQUEST_ID] [--service-account=SERVICE_ACCOUNT]
[--tags=[TAGS,...]] [--version=VERSION]
[--network=NETWORK | --subnet=SUBNET] [GCLOUD_WIDE_FLAG ...]
(後略)
応用編
カスタムコンテナイメージの使用
Docker コンテナイメージを基に、クラスタをカスタマイズすることが可能です。イメージは Container Registry にアップロードする必要があります。
まずは Dockerfile を作成します。
一例としてSpark、PySpark、SparkRが実行できる環境を構築します。Miniconda3をPythonディストリビューションとして使用します。
Dockerfile
# Debian 11 is recommended.
FROM debian:11-slim
# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive
# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini wget
# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "\${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "\${SPARK_EXTRA_JARS_DIR}"
# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh \
&& bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
&& ${CONDA_HOME}/bin/conda config --system --set always_yes True \
&& ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
&& ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
&& ${CONDA_HOME}/bin/conda config --system --set channel_priority strict \
&& rm Miniconda3-py39_4.10.3-Linux-x86_64.sh
# (Optional) Install Conda packages.
#
# The following packages are installed in the default image, it is strongly
# recommended to include all of them.
#
# Use mamba to install packages, because it is much faster.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
&& ${CONDA_HOME}/bin/mamba install \
conda=4.10 \
cython \
fastavro \
fastparquet \
gcsfs \
google-cloud-bigquery-storage \
google-cloud-bigquery[pandas] \
google-cloud-bigtable \
google-cloud-container \
google-cloud-datacatalog \
google-cloud-dataproc \
google-cloud-datastore \
google-cloud-language \
google-cloud-logging \
google-cloud-monitoring \
google-cloud-pubsub \
google-cloud-redis \
google-cloud-spanner \
google-cloud-speech \
google-cloud-storage \
google-cloud-texttospeech \
google-cloud-translate \
google-cloud-vision \
koalas \
matplotlib \
nltk \
numba \
numpy \
openblas \
orc \
pandas \
pyarrow \
pysal \
pytables \
python=3.9 \
regex \
requests \
rtree \
scikit-image \
scikit-learn \
scipy \
seaborn \
sqlalchemy \
sympy \
virtualenv
# (Optional) Install PIP packages.
RUN ${CONDA_HOME}/bin/pip install 'google-cloud-translate==3.0.*'
# (Optional) Add extra Python modules.
ENV PYTHONPATH=/opt/python/packages
RUN mkdir -p "\${PYTHONPATH}"
# (Optional) Install R and R libraries.
RUN apt update \
&& apt install -y gnupg \
&& apt-key adv --no-tty \
--keyserver "hkp://keyserver.ubuntu.com:80" \
--recv-keys E19F5F87128899B192B1A2C2AD5F960A256A04AF \
&& echo "deb http://cran.microsoft.com/snapshot/2021-10-28/bin/linux/debian bullseye-cran40/" \
> /etc/apt/sources.list.d/cran-r.list \
&& apt update \
&& apt install -y \
libopenblas-base \
libssl-dev \
r-base \
r-base-dev \
r-recommended \
r-cran-blob
# (Required) Create group/user `spark`.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark
必要ファイルのダウンロードを行います。
$ gsutil cp \
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar .
イメージをビルドし、Container Registry にアップロードします。
$ IMAGE=gcr.io/${PROJECT}/my-dataproc-image:1.0.0
$ docker build -t "${IMAGE}" .
$ docker push "${IMAGE}"
基本編と同様のジョブをカスタムイメージで実行します。
$ gcloud beta dataproc batches submit spark \
--region ${REGION} \
--subnet ${SUB_NETWORK} \
--container-image=${IMAGE} \
--jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
--class=org.apache.spark.examples.SparkPi \
--project ${PROJECT}
実行結果は基本編と同様ですが、カスタムイメージを使用している分、処理完了までの時間は遅くなるようです。
オートスケーリング
次に、オートスケーリング機能について試してみます。
使用するのは、Spark のパフォーマンステストでよく使われる Terasort のサンプルです。このプログラムはパラメータで処理負荷を変えることが可能です。
プログラムは以下にあります。
gs://dataproc-spark-preview/spark_terasort.py
まずは作業用のCloud Storageバケットを作成します。
$ BUCKET=<作業用のバケット名>
$ gsutil mb -p ${PROJECT} \
-l ${REGION} \
-c standard \
-b on \
gs://${BUCKET}
デフォルトパラメータで実行します。
デフォルトでは gbs(Size of data in GBs = データ量)バラメータが1、partitions(Number of partitions = 分散処理数)バラメータが10となっています。
$ gcloud beta dataproc batches submit pyspark \
--region ${REGION} \
--subnet ${SUB_NETWORK} \
--project ${PROJECT} \
gs://dataproc-spark-preview/spark_terasort.py \
-- --base_dir gs://${BUCKET}/spark_terasort
Executor の実行数は以下となり maximum-needed の数によってrunning 数が最適化されていることが分かります。(おそらくExecutorの下限は2と思われます。)
gbs、partitions共に100倍にして実行してみます。
$ gcloud beta dataproc batches submit pyspark \
--region ${REGION} \
--subnet ${SUB_NETWORK} \
--project ${PROJECT} \
gs://dataproc-spark-preview/spark_terasort.py \
-- --gbs 100 \
--partitions 1000 \
--base_dir gs://${BUCKET}/spark_terasort
maximum-needed の最大数までとはいきませんでしたが、running 数は18まで自動で増加しています。
サポートチームに確認した所、Executor は徐々にオートスケーリングしていくので Terasort のような10分弱の短い処理では maximum-needed の最大数に達するまでに処理が完了してしまう、とのことでした。
また、properties パラメータで spark.executor.instances
を設定することで、指定した数の Executor を最初から動かすことが可能です。
(おそらく)サポート対象外の機能
- properties パラメータでは環境変数( spark-env )は指定できません
- YARN は使えないようで、例えば PySpark で以下のような記述はエラーとなります
from pyspark.sql import SparkSession
spark = SparkSession \
.builder\
.master('yarn')\
.appName('hoge')\
.getOrCreate()
最後に
Red Frasco は Google Cloud の Service パートナー認定を取得しており、不動産業界を対象に、BigQuery、Cloud Composer、Dataproc 等の Google Cloud サービスを利用したデータ分析、活用支援を行っています。ご興味ある方は以下からご確認ください。
https://www.red-frasco.com/recruit/