これは ZOZO Advent Calendar 2024 カレンダー Vol.9 の 1日目の記事です。
はじめに
本記事では、Ray ClusterをGoogle Kubernetes Engine(以下GKE)上に構築し、外部Load Balancer経由でJobのsubmit等を可能にする方法を記載します。
現在、Google Cloudでは、Rayのマネージドサービスとして、Ray on Vertex AIが提供されています。一方でサービス提供当初はオートスケールの機能がありませんでした。
オートスケール可能なRayの利用環境を構築するため、GKE上に自前でRay Clusterを構築する方法を取りました。
現在はRay on Vertex AIにもオートスケール機能が実装されていますが、マネージドサービスではなく自前でRay Clusterを構築し、GKE上で運用する方もいるかと思います。本記事が構成時の参考になれば幸いです。
Ray Clusterとは
Rayとは並列分散処理を手軽に実行できるフレームワークです。
Rayでは簡単なアノテーションを付与することで分散処理として関数を実行できるようになるため既存のコードベースを利用しやすく、分散処理用のコードなどでプログラムが複雑化しにくいというメリットがあります。
Ray Clusterとは、Kubernetes上に構築したRayの実行環境です。ローカル環境と異なり、分散されたRayのプロセスはKubernetesのNodeで実行されるため、スケールアウト可能です。
RayやRay Clusterの詳細については公式ドキュメントをご参照ください。
外部Load BalancerによるRay Clusterの公開
モチベーション
Ray Clusterで分散処理を実行するためには、Ray ClusterのHead NodeにRayのJobをSubmitする必要があります。
Head NodeはKubernetesのPodとして動作するため、KubernetesのServiceリソースを通じてポートフォーワーディングにより外部からアクセスできます。一方で、例えば社内向けにRay Clusterを導入する場合にはポートフォワーディングではなく、Ray Cluster用のエンドポイントを公開し、かつWAF等でアクセス制限を行いたいかと思います。
Serviceの前段にIngressを作成することでこのような構成を取ることが可能です。
GKEではIngressオブジェクトを通じてGoogle CloudのL7 Load Balancerを設定できます。
今回はローカルPCからのアクセスを想定しているため、外部Load Balancerを構築してRay ClusterにJobをSubmitできるようにします。
Kubernetes Serviceの作成
RayのHead NodeのPodに対応するKubernetesのServiceリソースは、RayClusterオブジェクトをKubernetesクラスタにデプロイすることで作成されます。
RayClusterオブジェクトがデプロイされると、Ray Cluster用のCustom ControllerであるKube Ray OperatorがRayClusterオブジェクトに対応するServiceリソースを作成します。
以下はRayClusterオブジェクトのマニフェスト例です。
こちらのサンプル実装を参考に、一部バージョン等を変更し、作成しています。
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-autoscaler
spec:
# The version of Ray you are using. Make sure all Ray containers are running this version of Ray.
rayVersion: '2.22.0'
# If `enableInTreeAutoscaling` is true, the Autoscaler sidecar will be added to the Ray head pod.
# Ray Autoscaler integration is Beta with KubeRay >= 0.3.0 and Ray >= 2.0.0.
enableInTreeAutoscaling: true
# `autoscalerOptions` is an OPTIONAL field specifying configuration overrides for the Ray Autoscaler.
# The example configuration shown below below represents the DEFAULT values.
# (You may delete autoscalerOptions if the defaults are suitable.)
autoscalerOptions:
# `upscalingMode` is "Default" or "Aggressive."
# Conservative: Upscaling is rate-limited; the number of pending worker pods is at most the size of the Ray cluster.
# Default: Upscaling is not rate-limited.
# Aggressive: An alias for Default; upscaling is not rate-limited.
upscalingMode: Default
# `idleTimeoutSeconds` is the number of seconds to wait before scaling down a worker pod which is not using Ray resources.
idleTimeoutSeconds: 60
# `image` optionally overrides the Autoscaler's container image. The Autoscaler uses the same image as the Ray container by default.
## image: "my-repo/my-custom-autoscaler-image:tag"
# `imagePullPolicy` optionally overrides the Autoscaler container's default image pull policy (IfNotPresent).
imagePullPolicy: IfNotPresent
# Optionally specify the Autoscaler container's securityContext.
securityContext: {}
env: []
envFrom: []
# resources specifies optional resource request and limit overrides for the Autoscaler container.
# The default Autoscaler resource limits and requests should be sufficient for production use-cases.
# However, for large Ray clusters, we recommend monitoring container resource usage to determine if overriding the defaults is required.
resources:
limits:
cpu: "500m"
memory: "512Mi"
requests:
cpu: "500m"
memory: "512Mi"
headServiceAnnotations:
cloud.google.com/neg: '{"ingress":true}' # ref. https://cloud.google.com/kubernetes-engine/docs/how-to/container-native-load-balancing
cloud.google.com/backend-config: '{"default":"ray-head-node-backend-config"}'
# Ray head pod template
headGroupSpec:
serviceType: ClusterIP
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available opheadServiceAnnotations in `rayStartParams`.
rayStartParams:
# Setting "num-cpus: 0" to avoid any Ray actors or tasks being scheduled on the Ray head Pod.
num-cpus: "0"
# Use `resources` to optionally specify custom resource annotations for the Ray node.
# The value of `resources` is a string-integer mapping.
# Currently, `resources` must be provided in the specific format demonstrated below:
# resources: '"{\"Custom1\": 1, \"Custom2\": 5}"'
dashboard-host: '0.0.0.0'
# Pod template
template:
spec:
containers:
# The Ray head container
- name: ray-head
image: rayproject/ray:2.22.0-py311
ports:
- containerPort: 6379
name: gcs
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","ray stop"]
resources:
limits:
cpu: "1"
memory: "2G"
requests:
cpu: "1"
memory: "2G"
workerGroupSpecs:
# the Pod replicas in this group typed worker
- replicas: 0
minReplicas: 0
maxReplicas: 3
# logical group name, for this called small-group, also can be functional
groupName: small-group
# If worker pods need to be added, Ray Autoscaler can increment the `replicas`.
# If worker pods need to be removed, Ray Autoscaler decrements the replicas, and populates the `workersToDelete` list.
# KubeRay operator will remove Pods from the list until the desired number of replicas is satisfied.
#scaleStrategy:
# workersToDelete:
# - raycluster-complete-worker-small-group-bdtwh
# - raycluster-complete-worker-small-group-hv457
# - raycluster-complete-worker-small-group-k8tj7
rayStartParams: {}
# Pod template
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.22.0-py311
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","ray stop"]
resources:
limits:
cpu: "1"
memory: "1G"
requests:
cpu: "1"
memory: "1G"
Head Nodeに対応するKubernetes Serviceの設定値もRayClusterのマニフェストで設定可能です。
例えば、作成されるServiceリソースにアノテーションを付与したい場合、specs.headServiceAnnotations
フィールドを追加することで指定可能です。
またServiceのTypeについてもheadGroupSpec.serviceType
フィールドで指定可能です。
その他、RayClusterオブジェクトで設定可能なフィールドについては、RayClusterオブジェクトに対応する型定義を見るとわかります。
Ingressの作成
上記で作成したServiceリソースに対応するIngressリソースを作成します。
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ray-head-node-ingress
annotations:
kubernetes.io/ingress.global-static-ip-name: "ray-global-ip"
ingress.gcp.kubernetes.io/pre-shared-cert: "ray-external-cert"
kubernetes.io/ingress.allow-http: "false"
networking.gke.io/v1beta1.FrontendConfig: "ray-head-node-frontend-config"
kubernetes.io/ingress.class: "gce"
spec:
defaultBackend:
service:
name: raycluster-autoscaler-head-svc
port:
number: 8265
---
apiVersion: networking.gke.io/v1beta1
kind: FrontendConfig
metadata:
name: ray-head-node-frontend-config
spec:
redirectToHttps:
enabled: true
sslPolicy: example-ssl-policy
---
apiVersion: cloud.google.com/v1
kind: BackendConfig
metadata:
name: ray-head-node-backend-config
spec:
# needed for Google Cloud Armor
securityPolicy:
name: example-waf-rule
timeoutSec: 3600
基本的には通常Ingressを作成するのと同様の手順で問題ないです。一方で、以下の2点に注意する必要があります。
- ServiceとHead Nodeは1対1で作成する
- パスごとのリクエストの振り分けができない
ServiceとHead Nodeは1対1で作成する
例えば1つのRay Clusterに対し、外部Load Balancerと内部Load Balancerをそれぞれ立てたい場合があるかと思います。この時、それぞれのIngressオブジェクトに対応するServiceリソースを作成するかと思います。
一方で、1つのHead NodeのPodに対応するServiceリソースを外部Load Balancer用、内部Load Balancer用で作成するとRayのJobをSubmitできないという問題が発生しました。
そのため、Ingress・Service・Head NodeのPodはそれぞれ1対1で作成するようにしました。
こちらの問題については、Istioでのプロキシを挟むことで解決できたため、明日以降の記事でご紹介します。
パスごとのリクエストの振り分けができない
例えば、1つのIngressオブジェクトの裏側に複数のRay Clusterを構築し、Ingressへのアクセス時のパスでリクエスト先のRay Clusterを振り分けたいケースがあるかと思います。
上記を実現するために、Ingressオブジェクトの作成時に次のようにパスのPrefixを指定してルーティングを行うことを試しましたが、この方法では404のステータスコードが返り、RayのDashboardにアクセスすることはできませんでした。
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ray-head-node-ingress
annotations:
kubernetes.io/ingress.global-static-ip-name: "ray-global-ip"
ingress.gcp.kubernetes.io/pre-shared-cert: "ray-external-cert"
kubernetes.io/ingress.allow-http: "false"
networking.gke.io/v1beta1.FrontendConfig: "ray-head-node-frontend-config"
kubernetes.io/ingress.class: "gce"
spec:
rules:
- http:
paths:
- path: /hoge
pathType: Prefix
backend:
service:
name: raycluster-autoscaler-head-svc
port:
number: 8265
パスごとにリクエストの振り分けを行わない次の構成では、Dashboardへのアクセスを正常に行うことができました。
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ray-head-node-ingress
annotations:
kubernetes.io/ingress.global-static-ip-name: "ray-global-ip"
ingress.gcp.kubernetes.io/pre-shared-cert: "ray-external-cert"
kubernetes.io/ingress.allow-http: "false"
networking.gke.io/v1beta1.FrontendConfig: "ray-head-node-frontend-config"
kubernetes.io/ingress.class: "gce"
spec:
defaultBackend:
service:
name: raycluster-autoscaler-head-svc
port:
number: 8265
こちらの問題については、アクセスパスのrewriteにより解決できたため、明日以降の記事でご紹介します。
Ingress経由でのRay Clusterへのアクセス
Dashboardへのアクセス
上記で構成した外部Load Balancerに紐づくホストを指定することで、Ray ClusterのDashboardへアクセス可能です。
例: https://<sample-ray-cluster-host>/
JobのSubmit
上記で構成した外部Load Balancerに紐づくホストを指定することで、外部Load Balancer経由でRay ClusterへJobをSubmit可能です。
例:RAY_ADDRESS="https://<sample-ray-cluster-host>/"
例えば次のサンプルコードは次のコマンドでSubmitできます。
例:RAY_ADDRESS="https://<sample-ray-cluster-host>/" poetry run ray job submit --working-dir . -- python main.py
import ray
import time
def main():
@ray.remote(num_cpus=1)
def _hello_world():
time.sleep(1)
return "Hello World"
start = time.time()
ray.init()
ray.get([_hello_world.remote() for i in range(30)])
print(f"%s seconds" % (time.time() - start))
ray.shutdown()
if __name__ == "__main__":
main()
RayClusterオブジェクトのマニフェストで指定したHead Nodeのバージョンとray sdkのバージョンが一致していることを確認してください。
[tool.poetry]
name = "use-ray"
version = "0.1.0"
description = ""
authors = [""]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
ray = {version = "2.22.0", extras = ["default"]}
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
SubmitしたJobは次のようにコンソールから確認可能です。