はじめに
- Dataproc Serverless Batchesを試してみたときのメモです
- Sparkの概要については下記参照
手順
- 以下の手順は、CloudShellから実行した
変数設定
$ PROJECT=<GCP プロジェクト名>
$ REGION=<リージョン名>
$ NETWORK=<VPC ネットワーク名>
$ SUB_NETWORK=<VPC サブネットワーク名>
$ FW_RULE=<ファイアウォールルール名>
# pyspark実行に必要
$ BUCKET=<GCSバケット名>
環境構築
Dataproc Serverless Batches環境構築手順
ネットワーク構築
Dataproc Serverless for Spark では限定公開の Google アクセスを有効にしたサブネットワークが必要。
- VPCネットワーク作成
$ gcloud compute networks create ${NETWORK} \
--project=${PROJECT} \
--subnet-mode=custom \
--mtu=1460 \
--bgp-routing-mode=regional
- サブネット作成
$ gcloud compute networks subnets create ${SUB_NETWORK} \
--project=${PROJECT} \
--range=10.128.0.0/20 \
--network=${NETWORK} \
--region=${REGION} \
--enable-private-ip-google-access
- ファイアウォールルール作成
- クラスタ内での通信を許可する
$ gcloud compute firewall-rules create ${FW_RULE} \
--direction=INGRESS \
--priority=65534 \
--network=${NETWORK} \
--action=ALLOW \
--rules=tcp:0-65535,udp:0-65535,icmp \
--source-ranges=10.128.0.0/9 \
--project=${PROJECT}
動作確認
$ gcloud beta dataproc batches submit spark \
--region ${REGION} \
--subnet ${SUB_NETWORK} \
--jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
--class=org.apache.spark.examples.SparkPi \
--project ${PROJECT}
→
# 実行結果
Pi is roughly 3.137495687478437
ジョブ実行
Dataproc Serverless Batches実行手順
PySpark/gcloudコマンドでSUBMIT JOB
- ファイル準備
distinct.py
#! /usr/bin/python
import pyspark
#Create List
numbers = [1,2,1,2,3,4,4,6]
#SparkContext
sc = pyspark.SparkContext()
# Creating RDD using parallelize method of SparkContext
rdd = sc.parallelize(numbers)
#Returning distinct elements from RDD
distinct_numbers = rdd.distinct().collect()
#Print
print('Distinct Numbers:', distinct_numbers)
- GCSアップロード
- 事前にGCSバケットを作成しておくこと
$ gsutil cp distinct.py gs://${BUCKET}/distinct.py
- ジョブ実行
$ gcloud dataproc batches submit pyspark \
--region=${REGION} \
--project=${PROJECT} \
--subnet=${SUB_NETWORK} \
gs://${BUCKET}/distinct.py
→
# 実行結果
Distinct Numbers: [2, 4, 6, 1, 3]
PySpark/Vertex Pipeline ComponentでSUBMIT JOB
- 以下参照
- 抜粋
aiplatform.init(project=PROJECT_ID, staging_bucket=GCS_STAGING_LOCATION)
@dsl.pipeline(
name="dataproc-templates-pyspark",
description="An example pipeline that uses DataprocPySparkBatchOp to run a PySpark Dataproc Template batch workload",
)
def pipeline(
batch_id: str = BATCH_ID,
project_id: str = PROJECT_ID,
location: str = REGION,
main_python_file_uri: str = MAIN_PYTHON_FILE,
python_file_uris: list = PYTHON_FILE_URIS,
jar_file_uris: list = JARS,
subnetwork_uri: str = SUBNET,
args: list = TEMPLATE_SPARK_ARGS,
):
from google_cloud_pipeline_components.experimental.dataproc import \
DataprocPySparkBatchOp
_ = DataprocPySparkBatchOp(
project=project_id,
location=location,
batch_id=batch_id,
main_python_file_uri=main_python_file_uri,
python_file_uris=python_file_uris,
jar_file_uris=jar_file_uris,
subnetwork_uri=subnetwork_uri,
args=args,
)
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")
pipeline = aiplatform.PipelineJob(
display_name="pipeline",
template_path="pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
)
pipeline.run()
設計のポイント
- w/データサイエンティスト
- PySparkのバージョン
- GPU利用するか
- インタラクティブモードを利用するか
- 処理方式
- コンソールからSUBMIT JOB
- gcloudコマンドでSUBMIT JOB
- python sdkでSUBMIT JOB
- 現時点ではなさそう
- Vertex Pipeline ComponentでSUBMIT JOB
- Composer(Airflow Operator)でSUBMIT JOB
- アーキテクチャ構成
- データ格納先: BQ or GCS or ローカル
- BQのスロット消費に注意(BQコネクタ)
- ワークフロー
- リポジトリ、ディレクトリ構成
- 各種ファイルの配置
- pysparkジョブファイル, workflow, properties, metastore, egg, etc
- CI/CD
- 変数管理
- データ格納先: BQ or GCS or ローカル
- インフラ
- スケール方法
- historyサーバ
- エフェメラルインスタンス
- 課金管理