目的
IKS 上で Celery を動かすとき、負荷がない時間帯も worker を常時起動していると無駄なリソース消費になる。
KEDA の Redis Lists スケーラを使い、Redis キュー長に応じて worker を自動増減(アイドル時は 0)させる。
KEDA はイベント駆動で HPA を生成・制御する仕組みで、外部キューの長さなどをトリガに Deployment/Job をスケールできる。(KEDA)
Redis Lists スケーラは Redis list の長さ(listLength)をメトリクスにする。(KEDA)
全体アーキテクチャ
- Producer / Web / Scheduler が Celery task を Redis broker に push
- Celery worker Deployment は replicas=0 を許容
- KEDA Operator が Redis list 長を監視
- キュー長が閾値を超えたら KEDA が HPA を作成し worker をスケール
- キューが空になったら cooldown 後に 0 へ縮退
[Producer/Web] --(task enqueue)--> [Redis broker] --(dequeue)--> [Celery worker pods]
^
|
[KEDA ScaledObject]
前提
-
IKS クラスタが作成済み
-
kubectl が IKS に向いている
-
Helm v3
-
Celery アプリをコンテナ化済み(今回はサンプルを後述)
-
Redis が用意済み
- IBM Cloud Databases for Redis でも、クラスタ内 Redis でもよい
- 外部 Redis の場合は TLS/認証の扱いに注意
IKS で Helm は標準的に使える。(Helm)
1. サンプル Celery アプリ
1-1. Celery 設定
broker_url を Redis にする。デフォルトキューは celery。Celery は Redis broker に list としてキューを保持するため、KEDA 側はこの list を見る。(ThinhDA)
# app/celery_app.py
from celery import Celery
celery_app = Celery(
"app",
broker="redis://:REDIS_PASSWORD@REDIS_HOST:6379/0",
backend="redis://:REDIS_PASSWORD@REDIS_HOST:6379/1",
)
celery_app.conf.update(
task_default_queue="celery",
worker_prefetch_multiplier=1, # スケール時に過剰に先読みしない
task_acks_late=True,
)
1-2. タスク
# app/tasks.py
import time
from .celery_app import celery_app
@celery_app.task
def heavy_task(x):
time.sleep(10)
return x * 2
1-3. Producer
# app/producer.py
from .tasks import heavy_task
def enqueue(n=100):
for i in range(n):
heavy_task.delay(i)
2. Worker コンテナ
2-1. Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY app ./app
ENV PYTHONUNBUFFERED=1
CMD ["celery", "-A", "app.celery_app:celery_app", "worker", "--loglevel=INFO", "--concurrency=2"]
-
--concurrencyは 1 pod あたりの処理並列数。
KEDA のlistLengthと合わせて最終スループットを決める。
3. Kubernetes リソース
3-1. Namespace
apiVersion: v1
kind: Namespace
metadata:
name: celery
3-2. Redis 接続情報 Secret
外部 Redis を想定。
apiVersion: v1
kind: Secret
metadata:
name: redis-secret
namespace: celery
type: Opaque
stringData:
address: "REDIS_HOST:6379"
password: "REDIS_PASSWORD"
TLS を使う場合は CA 証明書も Secret に入れてマウントする。
3-3. Celery worker Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
namespace: celery
spec:
replicas: 0 # KEDA に任せる
selector:
matchLabels:
app: celery-worker
template:
metadata:
labels:
app: celery-worker
spec:
containers:
- name: worker
image: <your-registry>/celery-worker:latest
env:
- name: REDIS_HOST
valueFrom:
secretKeyRef:
name: redis-secret
key: address
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
cpu: "1"
memory: "1Gi"
4. KEDA の導入
KEDA は Helm で入れるのが簡単。(KEDA)
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda \
--namespace keda \
--create-namespace
確認:
kubectl get pods -n keda
kubectl get crd | grep keda
5. Redis Lists Trigger で Celery をスケール
5-1. KEDA の TriggerAuthentication
Redis の認証情報を Secret から参照する。
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: redis-auth
namespace: celery
spec:
secretTargetRef:
- parameter: address
name: redis-secret
key: address
- parameter: password
name: redis-secret
key: password
TLS の場合は tls/enableTLS 相当のメタデータと証明書マウントを追加する。
5-2. ScaledObject
Celery デフォルトキューが celery の場合、Redis list 名も celery。
ここがズレると一切スケールしない。
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: celery-worker-scaledobject
namespace: celery
spec:
scaleTargetRef:
name: celery-worker
pollingInterval: 5 # Redis を見る間隔(秒)
cooldownPeriod: 30 # キューが空になってから縮退するまで(秒)
minReplicaCount: 0
maxReplicaCount: 20
triggers:
- type: redis
authenticationRef:
name: redis-auth
metadata:
address: REDIS_HOST:6379 # TriggerAuthentication で上書きされる
listName: celery
listLength: "5" # 5件/Pod を目安にスケール
databaseIndex: "0"
KEDA の Redis list スケールは概ね下の計算で HPA の desiredReplicas が決まる。(Stack Overflow)
desired = ceil(queueLength / listLength)
desired = min(max(desired, minReplicaCount), maxReplicaCount)
listLength=5 は「キューが 5 件増えるごとに 1 pod 追加」の意味。
6. 動作確認
6-1. 何も積んでいない状態
kubectl get deploy -n celery
# celery-worker 0/0 pods
6-2. タスクを投入
Producer をどこかで実行して 100 件 enqueue。
python -c "from app.producer import enqueue; enqueue(100)"
Redis list が伸びる。
redis-cli -h REDIS_HOST -a REDIS_PASSWORD LLEN celery
# 例: 100
数秒後:
kubectl get hpa -n celery
kubectl get pods -n celery -w
- KEDA が
celery-worker用の HPA を自動生成する - キュー長の増加に応じて pod が増える
6-3. キューが空になる
redis-cli LLEN celery # 0 になればOK
cooldownPeriod(30秒)後に worker が 0 に戻る。
7. チューニングと注意点
7-1. listLength と concurrency の整合
1 pod の実処理能力は概ね:
podスループット ≒ concurrency / 平均タスク時間
listLength はここに合わせる。
タスク時間が長いのに listLength が大きいと、スケールが追いつかずキュー遅延が増える。
目安:
listLength ≒ concurrency * (スケール許容遅延 / 平均タスク時間)
7-2. Prefetch の罠
Celery はデフォルトで prefetch が大きく、少数 pod が大量に task を抱え込むと Redis のキュー長が先に減ってしまい、早期にスケールダウンする。
worker_prefetch_multiplier=1 を推奨。
7-3. スケールは「キューに積まれてから」起動
KEDA のキュー長ベースは「積まれて初めて増える」ため、低遅延が最重要なら厳しいケースがある。
即時性が必要な場合は、ワーカー側の active task 数や到着率を元にスケールするカスタム scaler を検討する。実装例として Celery 専用 scaler を提供する OSS もある。(GitHub)
7-4. 複数キュー運用
Celery キューを分けると Redis list も分かれる。
高優先度キューだけを KEDA で監視する、あるいは ScaledObject をキューごとに作る。
まとめ
- IKS 上でも KEDA はそのまま Helm で導入できる
- Celery + Redis broker は Redis list の長さで自然にスケール可能
-
listName(=Celeryキュー名) とlistLengthの設計が肝 - Prefetch とタスク特性を踏まえたチューニングが必要
- 低遅延要件が強い場合はキュー長以外の指標も検討
この構成で、アイドル時の worker = 0 を実現しつつ、バースト負荷にも追従できる。
by 花村勇輝(Hanamura Yuki)