1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[Rust] BallistaとDataFusionで作る分散処理基盤:2億行を1.17秒で集計する

1
Posted at

こんにちは。トルコでデータエンジニアをしているArdaです。
これがQiitaでの初めての投稿になります。よろしくお願いします。

トピックス

  • Ballista Local Deployment
  • Rust コード 説明
  • コード Execution
  • メトリクス & パフォーマンス

環境

  • Colima: 0.9.1
  • Kubernetes: v1.x.x (Colima内蔵)
  • Rust: 1.92 (bookworm)
  • Ballista: 51.0.0
  • DataFusion: 51.0.0
  • Arrow: 57.3.0
  • Parquet: 57.3.0
  • Tokio: 1.0
  • Protobuf: libprotoc 33.4
  • Helm: 3.x.x

Ballista Local Deployment

Ballistaは分散型なので、デプロイしなければなりません。始めましょう!

Helm Configs

values.yaml
clusterName: ballista-cluster 
image:
  repository: ballista-local
  pullPolicy: IfNotPresent
  tag: "latest"

scheduler:
  replicaCount: 1
  port: 50050
  uiPort: 80
  args:
    - "--bind-host=0.0.0.0"
    - "--bind-port=50050"
    - "--external-host=ballista-cluster-scheduler"
  resources:
    requests:
      cpu: "500m"
      memory: "512Mi"
    limits:
      cpu: "1"
      memory: "1Gi"

executor:
  replicaCount: 2
  port: 50051
  resources:
    requests:
      cpu: "1"
      memory: "1Gi"
    limits:
      cpu: "2"
      memory: "2Gi"
  args:
    - "--bind-port=50051"
    - "--bind-grpc-port=50052"
    - "--scheduler-host=ballista-cluster-scheduler"
    - "--scheduler-port=50050"
  env:
    - name: BALLISTA_SCHEDULER_HOST
      value: "ballista-cluster-scheduler"
    - name: BALLISTA_SCHEDULER_PORT
      value: "50050"
    - name: RUST_LOG
      value: "info"

storage:
  capacity: 20Gi
  accessMode: ReadWriteOnce
  hostPath: "/mnt"
templates/executor-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "ballista.fullname" . }}-executor
spec:
  replicas: {{ .Values.executor.replicaCount }}
  selector:
    matchLabels:
      app: ballista-executor
  template:
    metadata:
      labels:
        app: ballista-executor
        ballista-cluster: {{ .Values.clusterName }}
    spec:
      enableServiceLinks: false
      containers:
        - name: ballista-executor
          image: "{{ .Values.image.repository }}/ballista-executor:{{ .Values.image.tag }}"
          imagePullPolicy: {{ .Values.image.pullPolicy }}
          ports:
            - containerPort: {{ .Values.executor.port }}
              name: flight
            - containerPort: 50052
              name: grpc
          args:
            {{- range .Values.executor.args }}
            - {{ . | quote }}
              {{- end }}
          env:
            {{- if .Values.executor.env }}
            {{- toYaml .Values.executor.env | nindent 12 }}
            {{- end }}
          resources:
            {{- toYaml .Values.executor.resources | nindent 12 }}
          volumeMounts:
            - mountPath: /mnt
              name: data
      volumes:
        - name: data
          persistentVolumeClaim:
            claimName: {{ include "ballista.fullname" . }}-pvc
templates/pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: {{ include "ballista.fullname" . }}-pv
  labels:
    type: local
spec:
  storageClassName: {{ .Values.storage.storageClassName }}
  capacity:
    storage: {{ .Values.storage.capacity }}
  accessModes:
    - {{ .Values.storage.accessMode }}
  hostPath:
    path: {{ .Values.storage.hostPath }}
templates/pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: {{ include "ballista.fullname" . }}-pvc
spec:
  storageClassName: {{ .Values.storage.storageClassName }}
  accessModes:
    - {{ .Values.storage.accessMode }}
  resources:
    requests:
      storage: {{ .Values.storage.capacity }}
templates/scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "ballista.fullname" . }}-scheduler
  labels:
    app: ballista-scheduler
spec:
  replicas: {{ .Values.scheduler.replicaCount }}
  selector:
    matchLabels:
      app: ballista-scheduler
  template:
    metadata:
      labels:
        app: ballista-scheduler
        ballista-cluster: {{ .Values.clusterName }}
    spec:
      containers:
        - name: ballista-scheduler
          image: "{{ .Values.image.repository }}/ballista-scheduler:{{ .Values.image.tag }}"
          imagePullPolicy: {{ .Values.image.pullPolicy }}
          args:
            {{- range .Values.scheduler.args }}
            - {{ . | quote }}
            {{- end }}
          ports:
            - containerPort: {{ .Values.scheduler.port }}
              name: flight
          resources:
            {{- toYaml .Values.scheduler.resources | nindent 12 }}
          volumeMounts:
            - mountPath: /mnt
              name: data
      volumes:
        - name: data
          persistentVolumeClaim:
            claimName: {{ include "ballista.fullname" . }}-pvc
templates/scheduler-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: {{ include "ballista.fullname" . }}-scheduler
spec:
  ports:
    - port: {{ .Values.scheduler.port }}
      name: scheduler
    - port: {{ .Values.scheduler.uiPort }}
      targetPort: 80
      name: scheduler-ui
  selector:
    app: ballista-scheduler

簡単なデプロイスクリプト

ballista-deploy.sh
#!/bin/bash

set -e

REPO_URL="https://github.com/apache/datafusion-ballista.git"
TARGET_DIR="deployment/datafusion-ballista"
DOCKER_REGISTRY="ballista-local"

echo "[1/5] Repo is cloning: $REPO_URL"
if [ ! -d "$TARGET_DIR" ]; then
    git clone "$REPO_URL" "$TARGET_DIR"
else
    echo "Directory already exists, skipping clone"
fi

echo "[2/5] Action: Building Ballista Docker Images"
cd "$TARGET_DIR"
./dev/build-ballista-docker.sh
cd - > /dev/null

echo "[3/5] Action: Tagging Ballista Images"
docker tag apache/datafusion-ballista-scheduler:latest "$DOCKER_REGISTRY/ballista-scheduler:latest"
docker tag apache/datafusion-ballista-executor:latest "$DOCKER_REGISTRY/ballista-executor:latest"

echo "[4/5] Action: Deploying Ballista Cluster via Helm"
helm upgrade --install ballista-cluster deployment/ballista

echo "[5/5] Status: Waiting for Scheduler to be Ready"
kubectl wait --for=condition=ready pod -l app=ballista-scheduler --timeout=60s

このスクリプトは、以下のコマンドで実行してください。

chmod +x ballista-deploy.sh
./ballista-deploy.sh

次のステップは、Rustコードの実装です。このプロセスには2つのステップがあります

1. データの生成 (Data Generation)
use arrow::array::*;
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_writer::ArrowWriter;
use rand::Rng;
use std::fs;
use std::fs::File;
use std::sync::Arc;
use rusty_cloud::get_log_schema;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let schema = get_log_schema();
    let output_dir = "/mnt/";
    fs::create_dir_all(output_dir)?;

    let num_files = 40;
    let rows_per_file = 5_000_000;
    let batch_size = 1_000_000;
    let mut rng = rand::rng();

    println!("Starting generation of {} files, total ~200M rows...", num_files);

    for file_idx in 0..num_files {
        let file_path = format!("{}/logs_part_{}.parquet", output_dir, file_idx);
        let file = File::create(&file_path)?;
        let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?;

        println!("Generating file: {}", file_path);

        for _batch_idx in 0..(rows_per_file / batch_size) {
            let mut timestamps = Vec::with_capacity(batch_size);
            let mut user_ids = Vec::with_capacity(batch_size);
            let mut response_times = Vec::with_capacity(batch_size);
            let mut status_codes = Vec::with_capacity(batch_size);

            for _ in 0..batch_size {
                timestamps.push(1708527600 + (file_idx as i64 * rows_per_file as i64));
                user_ids.push(rng.random_range(1000..9999) as i64);
                response_times.push(rng.random_range(10.0..500.0));

                let status = if rng.random_bool(0.05) { 500 } else { 200 };
                status_codes.push(status);
            }

            let batch = RecordBatch::try_new(
                schema.clone(),
                vec![
                    Arc::new(Int64Array::from(timestamps)),
                    Arc::new(Int64Array::from(user_ids)),
                    Arc::new(Float64Array::from(response_times)),
                    Arc::new(Int32Array::from(status_codes)),
                ],
            )?;

            writer.write(&batch)?;
        }
        writer.close()?;
        println!("File {} completed.", file_idx + 1);
    }

    println!("Success! High-density dataset created in {}", output_dir);
    Ok(())
}

出力例!

Ekran Resmi 2026-05-19 18.20.07.png

この Parquetファイルは、PVCを使ってマウントしています

2. クエリの実行 (Query Execution)
use ballista::datafusion::execution::SessionStateBuilder;
use ballista::datafusion::prelude::{SessionConfig, SessionContext};
use std::time::Instant;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::count::count;
use datafusion::prelude::{col, lit, ParquetReadOptions};
use anyhow::Result;
use rusty_cloud::DATA_PATH;


#[tokio::main]
async fn main() -> Result<()>{
    use ballista::extension::{SessionConfigExt, SessionContextExt};
    let url = "df1://ballista-cluster-scheduler:50050";

    let session_config = SessionConfig::new_with_ballista()
        .with_information_schema(true)
        .with_ballista_job_name("Super Cool Ballista App");

    let state = SessionStateBuilder::new()
        .with_default_features()
        .with_config(session_config)
        .build();

    let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;

    ctx.register_parquet("logs", DATA_PATH, ParquetReadOptions::default()).await?;

    let start = Instant::now();

    let df = ctx.table("logs").await?
        .filter(col("status_code").eq(lit(200)))?
        .aggregate(vec![col("user_id")], vec![count(col("response_time_ms")), avg(col("response_time_ms"))])?
        .filter(col("count(logs.response_time_ms)").gt(lit(10)))?
        .sort(vec![col("avg(logs.response_time_ms)").sort(false, true)])?
        .limit(0, Some(5))?;

    df.clone().explain(false, false)?.show().await?;

    df.show().await?;

    println!("Query Time: {:?}", start.elapsed());

    Ok(())

}

rustのデプロイスクリプト

echo "[1/3] Building Rust Bins: $TAG"
docker build -t ballista-tools:v1 .
echo "[2/3] Action: Creating Test Data"
kubectl delete job ballista-data-generator --ignore-not-found
kubectl apply -f deployment/data-gen-job.yaml

echo "[...] Status: Waiting for data-generator to complete (Timeout: 10m)"
if ! kubectl wait --for=condition=complete job/ballista-data-generator --timeout=600s; then
    echo "[ERROR] Data generation failed or timed out!"
    kubectl logs job/ballista-data-generator
    exit 1
fi

echo "[3/3] Action: Launching Query Runner"
kubectl delete pod query-runner --ignore-not-found
kubectl apply -f deployment/query-runner.yaml

echo "----------------------------------------------------------"
echo "[SUCCESS] Pipeline Finished Successfully."
echo "Check Query Runner logs: kubectl logs -f query-runner"

出力例!
Ekran Resmi 2026-05-19 18.26.55.png

2億行のデータを、わずか1.17秒で集計・フィルタリングできました。BallistaとRustの組み合わせは非常に強力です。

おわりに

今回は、BallistaとRustを使用した分散処理の実装とパフォーマンスについて紹介しました。
今後もRustを使ったデータエンジニアリングの可能性を追求していきたいと思います。

ご質問やアドバイスがあれば、ぜひコメントをお願いします!
最後まで読んでいただき、ありがとうございました。

完成したプロジェクトのソースコード: https://github.com/ardvci/ballista-k8s-implementation

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?