7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Google Cloud PlatformAdvent Calendar 2021

Day 20

Dataproc Serverless for Spark 触ってみた

Last updated at Posted at 2021-12-19

以下の内容は記事公開時の情報に基づいています。最新情報は公式ドキュメントをご確認ください。

はじめに

Google Cloud Next '21 で Dataproc Serverless for Spark が発表されました。

その機能をいくつか試してみましたので、本記事ではそれを紹介致します。

Dataproc Serverless for Spark を含む Spark on Google Cloud の機能は、一般公開待ち、及び非公開プレビューとなっています。利用されたい場合は以下からお申し込みください。
https://docs.google.com/forms/d/e/1FAIpQLSccIXlE5gJNE0dNs6vQvCfrCcSjnoHqaW2lxpoVkAh56KLOwA/viewform

Dataproc Serverless for Sparkとは

Dataproc上でジョブを実行する際には

  1. ジョブに合わせたスペックのクラスタを選択、作成
  2. ジョブを送信
  3. (必要に応じて)クラスタを削除

というフローを利用者側で行う必要があります。
Dataproc Serverless for Spark では、ジョブに合わせて Google Cloud がクラスタを作成、オートスケーリング、削除し、利用者側でクラスタ管理を行う必要が無くなります。

触ってみる

Google Cloud の利用には料金が発生する場合があります。
以下を実行する際は、各自の責任でお願い致します。

実行環境は以下の通りです。

$ gcloud --version
Google Cloud SDK 366.0.0
alpha 2021.12.03
beta 2021.12.03
bq 2.0.72
core 2021.12.03
gsutil 5.5
kubectl 1.20.8

$ docker --version
Docker version 20.10.11, build dea9396

以下の変数を適宜設定ください。以降のコマンドで必要となります。
リージョン名は以下を参考ください。
https://cloud.google.com/compute/docs/regions-zones#available

$ PROJECT=<GCP プロジェクト名>
$ REGION=<リージョン名>
$ NETWORK=<VPC ネットワーク名>
$ SUB_NETWORK=<VPC サブネットワーク名>
$ FW_RULE=<ファイアウォールルール名>

基本編

VPC ネットワークの作成

Dataproc Serverless for Spark では限定公開の Google アクセスを有効にしたサブネットワークが必要なため、まずは VPC ネットワークの作成を行います。

$ gcloud compute networks create ${NETWORK} \
  --project=${PROJECT} \
  --subnet-mode=custom \
  --mtu=1460 \
  --bgp-routing-mode=regional

サブネットを作成します。
IP アドレス範囲は default ネットワークの us-central1 サブネットに合わせており、深い意味はございません。

$ gcloud compute networks subnets create ${SUB_NETWORK} \
  --project=${PROJECT} \
  --range=10.128.0.0/20 \
  --network=${NETWORK} \
  --region=${REGION} \
  --enable-private-ip-google-access

クラスタ内での通信を許可するファイアウォールルールを設定します。
こちらもIP アドレス範囲は default の default-allow-internal に合わせており、深い意味はございません。
ルールに関しては以下もご参考ください。
https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/network#firewall_rule_requirement

$ gcloud compute firewall-rules create ${FW_RULE} \
  --direction=INGRESS \
  --priority=65534 \
  --network=${NETWORK} \
  --action=ALLOW \
  --rules=tcp:0-65535,udp:0-65535,icmp \
  --source-ranges=10.128.0.0/9 \
  --project=${PROJECT}

ジョブの実行

Sparkジョブを実行します。前述した通り、クラスタの作成は不要です。
ジョブの内容は以下の通りで、円周率の近似値を計算するプログラムです。
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala

$ gcloud beta dataproc batches submit spark \
  --region ${REGION} \
  --subnet ${SUB_NETWORK} \
  --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
  --class=org.apache.spark.examples.SparkPi \
  --project ${PROJECT}

実行結果は以下となります。

(前略)
Pi is roughly 3.140255701278506
(後略)

GUI でも結果を確認してみます。

■一覧
01.jpg

■モニタリング
02.jpg
06.png

■詳細
03.jpg

基本的には、4コアのExecutor × 2で動作するようです。
Cloud Logging でも確認してみます。クエリは以下となります。

resource.type="cloud_dataproc_batch"
resource.labels.batch_id="<バッチ ID>"

04.jpg

オプションパラメータ

以下コマンドで、ジョブ実行時のオプションパラメータが確認できます。

$ gcloud beta dataproc batches submit --help

NAME
    gcloud beta dataproc batches submit - submit a Dataproc batch job

SYNOPSIS
    gcloud beta dataproc batches submit COMMAND [--async] [--batch=BATCH]
        [--container-image=CONTAINER_IMAGE]
        [--history-server-cluster=HISTORY_SERVER_CLUSTER] [--kms-key=KMS_KEY]
        [--labels=[KEY=VALUE,...]] [--metastore-service=METASTORE_SERVICE]
        [--properties=[PROPERTY=VALUE,...]] [--region=REGION]
        [--request-id=REQUEST_ID] [--service-account=SERVICE_ACCOUNT]
        [--tags=[TAGS,...]] [--version=VERSION]
        [--network=NETWORK | --subnet=SUBNET] [GCLOUD_WIDE_FLAG ...]
(後略)

応用編

カスタムコンテナイメージの使用

Docker コンテナイメージを基に、クラスタをカスタマイズすることが可能です。イメージは Container Registry にアップロードする必要があります。

まずは Dockerfile を作成します。
一例としてSpark、PySpark、SparkRが実行できる環境を構築します。Miniconda3をPythonディストリビューションとして使用します。

Dockerfile
# Debian 11 is recommended.
FROM debian:11-slim

# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive

# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini wget

# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "\${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "\${SPARK_EXTRA_JARS_DIR}"

# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh \
  && bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict \
  && rm Miniconda3-py39_4.10.3-Linux-x86_64.sh

# (Optional) Install Conda packages.
#
# The following packages are installed in the default image, it is strongly
# recommended to include all of them.
#
# Use mamba to install packages, because it is much faster.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
  && ${CONDA_HOME}/bin/mamba install \
    conda=4.10 \
    cython \
    fastavro \
    fastparquet \
    gcsfs \
    google-cloud-bigquery-storage \
    google-cloud-bigquery[pandas] \
    google-cloud-bigtable \
    google-cloud-container \
    google-cloud-datacatalog \
    google-cloud-dataproc \
    google-cloud-datastore \
    google-cloud-language \
    google-cloud-logging \
    google-cloud-monitoring \
    google-cloud-pubsub \
    google-cloud-redis \
    google-cloud-spanner \
    google-cloud-speech \
    google-cloud-storage \
    google-cloud-texttospeech \
    google-cloud-translate \
    google-cloud-vision \
    koalas \
    matplotlib \
    nltk \
    numba \
    numpy \
    openblas \
    orc \
    pandas \
    pyarrow \
    pysal \
    pytables \
    python=3.9 \
    regex \
    requests \
    rtree \
    scikit-image \
    scikit-learn \
    scipy \
    seaborn \
    sqlalchemy \
    sympy \
    virtualenv

# (Optional) Install PIP packages.
RUN ${CONDA_HOME}/bin/pip install 'google-cloud-translate==3.0.*'
# (Optional) Add extra Python modules.
ENV PYTHONPATH=/opt/python/packages
RUN mkdir -p "\${PYTHONPATH}"

# (Optional) Install R and R libraries.
RUN apt update \
  && apt install -y gnupg \
  && apt-key adv --no-tty \
    --keyserver "hkp://keyserver.ubuntu.com:80" \
    --recv-keys E19F5F87128899B192B1A2C2AD5F960A256A04AF \
  && echo "deb http://cran.microsoft.com/snapshot/2021-10-28/bin/linux/debian bullseye-cran40/" \
    > /etc/apt/sources.list.d/cran-r.list \
  && apt update \
  && apt install -y \
    libopenblas-base \
    libssl-dev \
    r-base \
    r-base-dev \
    r-recommended \
    r-cran-blob

# (Required) Create group/user `spark`.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

必要ファイルのダウンロードを行います。

$ gsutil cp \
  gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar .

イメージをビルドし、Container Registry にアップロードします。

$ IMAGE=gcr.io/${PROJECT}/my-dataproc-image:1.0.0
$ docker build -t "${IMAGE}" .
$ docker push "${IMAGE}"

基本編と同様のジョブをカスタムイメージで実行します。

$ gcloud beta dataproc batches submit spark \
  --region ${REGION} \
  --subnet ${SUB_NETWORK} \
  --container-image=${IMAGE} \
  --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
  --class=org.apache.spark.examples.SparkPi \
  --project ${PROJECT}

実行結果は基本編と同様ですが、カスタムイメージを使用している分、処理完了までの時間は遅くなるようです。
05.jpg

オートスケーリング

次に、オートスケーリング機能について試してみます。
使用するのは、Spark のパフォーマンステストでよく使われる Terasort のサンプルです。このプログラムはパラメータで処理負荷を変えることが可能です。
プログラムは以下にあります。
gs://dataproc-spark-preview/spark_terasort.py

まずは作業用のCloud Storageバケットを作成します。

$ BUCKET=<作業用のバケット名>
$ gsutil mb -p ${PROJECT} \
  -l ${REGION} \
  -c standard \
  -b on \
  gs://${BUCKET}

デフォルトパラメータで実行します。
デフォルトでは gbs(Size of data in GBs = データ量)バラメータが1、partitions(Number of partitions = 分散処理数)バラメータが10となっています。

$ gcloud beta dataproc batches submit pyspark \
  --region ${REGION} \
  --subnet ${SUB_NETWORK} \
  --project ${PROJECT} \
  gs://dataproc-spark-preview/spark_terasort.py \
  -- --base_dir gs://${BUCKET}/spark_terasort

07.png

Executor の実行数は以下となり maximum-needed の数によってrunning 数が最適化されていることが分かります。(おそらくExecutorの下限は2と思われます。)

gbs、partitions共に100倍にして実行してみます。

$ gcloud beta dataproc batches submit pyspark \
  --region ${REGION} \
  --subnet ${SUB_NETWORK} \
  --project ${PROJECT} \
  gs://dataproc-spark-preview/spark_terasort.py \
  -- --gbs 100 \
  --partitions 1000 \
  --base_dir gs://${BUCKET}/spark_terasort

08.png

maximum-needed の最大数までとはいきませんでしたが、running 数は18まで自動で増加しています。

サポートチームに確認した所、Executor は徐々にオートスケーリングしていくので Terasort のような10分弱の短い処理では maximum-needed の最大数に達するまでに処理が完了してしまう、とのことでした。
また、properties パラメータで spark.executor.instances を設定することで、指定した数の Executor を最初から動かすことが可能です。

(おそらく)サポート対象外の機能

  • properties パラメータでは環境変数( spark-env )は指定できません
  • YARN は使えないようで、例えば PySpark で以下のような記述はエラーとなります
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder\
    .master('yarn')\
    .appName('hoge')\
    .getOrCreate()

最後に

Red Frasco は Google Cloud の Service パートナー認定を取得しており、不動産業界を対象に、BigQuery、Cloud Composer、Dataproc 等の Google Cloud サービスを利用したデータ分析、活用支援を行っています。ご興味ある方は以下からご確認ください。
https://www.red-frasco.com/recruit/

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?