こんにちは。トルコでデータエンジニアをしているArdaです。
これがQiitaでの初めての投稿になります。よろしくお願いします。
トピックス
- Ballista Local Deployment
- Rust コード 説明
- コード Execution
- メトリクス & パフォーマンス
環境
- Colima: 0.9.1
- Kubernetes: v1.x.x (Colima内蔵)
- Rust: 1.92 (bookworm)
- Ballista: 51.0.0
- DataFusion: 51.0.0
- Arrow: 57.3.0
- Parquet: 57.3.0
- Tokio: 1.0
- Protobuf: libprotoc 33.4
- Helm: 3.x.x
Ballista Local Deployment
Ballistaは分散型なので、デプロイしなければなりません。始めましょう!
Helm Configs
values.yaml
clusterName: ballista-cluster
image:
repository: ballista-local
pullPolicy: IfNotPresent
tag: "latest"
scheduler:
replicaCount: 1
port: 50050
uiPort: 80
args:
- "--bind-host=0.0.0.0"
- "--bind-port=50050"
- "--external-host=ballista-cluster-scheduler"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"
executor:
replicaCount: 2
port: 50051
resources:
requests:
cpu: "1"
memory: "1Gi"
limits:
cpu: "2"
memory: "2Gi"
args:
- "--bind-port=50051"
- "--bind-grpc-port=50052"
- "--scheduler-host=ballista-cluster-scheduler"
- "--scheduler-port=50050"
env:
- name: BALLISTA_SCHEDULER_HOST
value: "ballista-cluster-scheduler"
- name: BALLISTA_SCHEDULER_PORT
value: "50050"
- name: RUST_LOG
value: "info"
storage:
capacity: 20Gi
accessMode: ReadWriteOnce
hostPath: "/mnt"
templates/executor-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "ballista.fullname" . }}-executor
spec:
replicas: {{ .Values.executor.replicaCount }}
selector:
matchLabels:
app: ballista-executor
template:
metadata:
labels:
app: ballista-executor
ballista-cluster: {{ .Values.clusterName }}
spec:
enableServiceLinks: false
containers:
- name: ballista-executor
image: "{{ .Values.image.repository }}/ballista-executor:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- containerPort: {{ .Values.executor.port }}
name: flight
- containerPort: 50052
name: grpc
args:
{{- range .Values.executor.args }}
- {{ . | quote }}
{{- end }}
env:
{{- if .Values.executor.env }}
{{- toYaml .Values.executor.env | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.executor.resources | nindent 12 }}
volumeMounts:
- mountPath: /mnt
name: data
volumes:
- name: data
persistentVolumeClaim:
claimName: {{ include "ballista.fullname" . }}-pvc
templates/pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: {{ include "ballista.fullname" . }}-pv
labels:
type: local
spec:
storageClassName: {{ .Values.storage.storageClassName }}
capacity:
storage: {{ .Values.storage.capacity }}
accessModes:
- {{ .Values.storage.accessMode }}
hostPath:
path: {{ .Values.storage.hostPath }}
templates/pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ include "ballista.fullname" . }}-pvc
spec:
storageClassName: {{ .Values.storage.storageClassName }}
accessModes:
- {{ .Values.storage.accessMode }}
resources:
requests:
storage: {{ .Values.storage.capacity }}
templates/scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "ballista.fullname" . }}-scheduler
labels:
app: ballista-scheduler
spec:
replicas: {{ .Values.scheduler.replicaCount }}
selector:
matchLabels:
app: ballista-scheduler
template:
metadata:
labels:
app: ballista-scheduler
ballista-cluster: {{ .Values.clusterName }}
spec:
containers:
- name: ballista-scheduler
image: "{{ .Values.image.repository }}/ballista-scheduler:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
{{- range .Values.scheduler.args }}
- {{ . | quote }}
{{- end }}
ports:
- containerPort: {{ .Values.scheduler.port }}
name: flight
resources:
{{- toYaml .Values.scheduler.resources | nindent 12 }}
volumeMounts:
- mountPath: /mnt
name: data
volumes:
- name: data
persistentVolumeClaim:
claimName: {{ include "ballista.fullname" . }}-pvc
templates/scheduler-service.yaml
apiVersion: v1
kind: Service
metadata:
name: {{ include "ballista.fullname" . }}-scheduler
spec:
ports:
- port: {{ .Values.scheduler.port }}
name: scheduler
- port: {{ .Values.scheduler.uiPort }}
targetPort: 80
name: scheduler-ui
selector:
app: ballista-scheduler
簡単なデプロイスクリプト
#!/bin/bash
set -e
REPO_URL="https://github.com/apache/datafusion-ballista.git"
TARGET_DIR="deployment/datafusion-ballista"
DOCKER_REGISTRY="ballista-local"
echo "[1/5] Repo is cloning: $REPO_URL"
if [ ! -d "$TARGET_DIR" ]; then
git clone "$REPO_URL" "$TARGET_DIR"
else
echo "Directory already exists, skipping clone"
fi
echo "[2/5] Action: Building Ballista Docker Images"
cd "$TARGET_DIR"
./dev/build-ballista-docker.sh
cd - > /dev/null
echo "[3/5] Action: Tagging Ballista Images"
docker tag apache/datafusion-ballista-scheduler:latest "$DOCKER_REGISTRY/ballista-scheduler:latest"
docker tag apache/datafusion-ballista-executor:latest "$DOCKER_REGISTRY/ballista-executor:latest"
echo "[4/5] Action: Deploying Ballista Cluster via Helm"
helm upgrade --install ballista-cluster deployment/ballista
echo "[5/5] Status: Waiting for Scheduler to be Ready"
kubectl wait --for=condition=ready pod -l app=ballista-scheduler --timeout=60s
このスクリプトは、以下のコマンドで実行してください。
chmod +x ballista-deploy.sh
./ballista-deploy.sh
次のステップは、Rustコードの実装です。このプロセスには2つのステップがあります
1. データの生成 (Data Generation)
use arrow::array::*;
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_writer::ArrowWriter;
use rand::Rng;
use std::fs;
use std::fs::File;
use std::sync::Arc;
use rusty_cloud::get_log_schema;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let schema = get_log_schema();
let output_dir = "/mnt/";
fs::create_dir_all(output_dir)?;
let num_files = 40;
let rows_per_file = 5_000_000;
let batch_size = 1_000_000;
let mut rng = rand::rng();
println!("Starting generation of {} files, total ~200M rows...", num_files);
for file_idx in 0..num_files {
let file_path = format!("{}/logs_part_{}.parquet", output_dir, file_idx);
let file = File::create(&file_path)?;
let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?;
println!("Generating file: {}", file_path);
for _batch_idx in 0..(rows_per_file / batch_size) {
let mut timestamps = Vec::with_capacity(batch_size);
let mut user_ids = Vec::with_capacity(batch_size);
let mut response_times = Vec::with_capacity(batch_size);
let mut status_codes = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
timestamps.push(1708527600 + (file_idx as i64 * rows_per_file as i64));
user_ids.push(rng.random_range(1000..9999) as i64);
response_times.push(rng.random_range(10.0..500.0));
let status = if rng.random_bool(0.05) { 500 } else { 200 };
status_codes.push(status);
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(Int64Array::from(user_ids)),
Arc::new(Float64Array::from(response_times)),
Arc::new(Int32Array::from(status_codes)),
],
)?;
writer.write(&batch)?;
}
writer.close()?;
println!("File {} completed.", file_idx + 1);
}
println!("Success! High-density dataset created in {}", output_dir);
Ok(())
}
出力例!
この Parquetファイルは、PVCを使ってマウントしています
2. クエリの実行 (Query Execution)
use ballista::datafusion::execution::SessionStateBuilder;
use ballista::datafusion::prelude::{SessionConfig, SessionContext};
use std::time::Instant;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::count::count;
use datafusion::prelude::{col, lit, ParquetReadOptions};
use anyhow::Result;
use rusty_cloud::DATA_PATH;
#[tokio::main]
async fn main() -> Result<()>{
use ballista::extension::{SessionConfigExt, SessionContextExt};
let url = "df1://ballista-cluster-scheduler:50050";
let session_config = SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_ballista_job_name("Super Cool Ballista App");
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
.build();
let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;
ctx.register_parquet("logs", DATA_PATH, ParquetReadOptions::default()).await?;
let start = Instant::now();
let df = ctx.table("logs").await?
.filter(col("status_code").eq(lit(200)))?
.aggregate(vec![col("user_id")], vec![count(col("response_time_ms")), avg(col("response_time_ms"))])?
.filter(col("count(logs.response_time_ms)").gt(lit(10)))?
.sort(vec![col("avg(logs.response_time_ms)").sort(false, true)])?
.limit(0, Some(5))?;
df.clone().explain(false, false)?.show().await?;
df.show().await?;
println!("Query Time: {:?}", start.elapsed());
Ok(())
}
rustのデプロイスクリプト
echo "[1/3] Building Rust Bins: $TAG"
docker build -t ballista-tools:v1 .
echo "[2/3] Action: Creating Test Data"
kubectl delete job ballista-data-generator --ignore-not-found
kubectl apply -f deployment/data-gen-job.yaml
echo "[...] Status: Waiting for data-generator to complete (Timeout: 10m)"
if ! kubectl wait --for=condition=complete job/ballista-data-generator --timeout=600s; then
echo "[ERROR] Data generation failed or timed out!"
kubectl logs job/ballista-data-generator
exit 1
fi
echo "[3/3] Action: Launching Query Runner"
kubectl delete pod query-runner --ignore-not-found
kubectl apply -f deployment/query-runner.yaml
echo "----------------------------------------------------------"
echo "[SUCCESS] Pipeline Finished Successfully."
echo "Check Query Runner logs: kubectl logs -f query-runner"
2億行のデータを、わずか1.17秒で集計・フィルタリングできました。BallistaとRustの組み合わせは非常に強力です。
おわりに
今回は、BallistaとRustを使用した分散処理の実装とパフォーマンスについて紹介しました。
今後もRustを使ったデータエンジニアリングの可能性を追求していきたいと思います。
ご質問やアドバイスがあれば、ぜひコメントをお願いします!
最後まで読んでいただき、ありがとうございました。
完成したプロジェクトのソースコード: https://github.com/ardvci/ballista-k8s-implementation

