Skip to main content

Documentation Index

Fetch the complete documentation index at: https://ray-preview.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Batch inference is one of Ray Data’s flagship use cases: load data, run a model, write results. Ray Data handles streaming, GPU placement, and backpressure for you.

Pattern

import ray
from ray.data import Dataset

class Predictor:
    def __init__(self, model_path: str):
        import torch
        self.model = torch.load(model_path).cuda().eval()

    def __call__(self, batch):
        import torch
        with torch.no_grad():
            x = torch.from_numpy(batch["features"]).cuda()
            preds = self.model(x).argmax(dim=-1).cpu().numpy()
        return {**batch, "prediction": preds}

ds = ray.data.read_parquet("s3://bucket/inputs/")
ds = ds.map_batches(
    Predictor,
    fn_constructor_args=("model.pt",),
    concurrency=4,
    num_gpus=1,
    batch_size=256,
)
ds.write_parquet("s3://bucket/outputs/")

Why this works

  • Class-based UDF: the model loads once per actor, not per batch.
  • concurrency=4, num_gpus=1: four replicas, each on its own GPU. Ray Data places them across the cluster.
  • Streaming execution: reading, predicting, and writing run concurrently.
  • Lazy evaluation: map_batches and write_parquet together produce a streaming pipeline that never materializes the full dataset.

CPU pre-processing + GPU inference

Mix stages in the same pipeline. Each stage gets its own resource budget.
ds = ray.data.read_images("s3://bucket/images/", size=(224, 224))
ds = ds.map_batches(decode_and_normalize, num_cpus=2)        # CPU stage
ds = ds.map_batches(Predictor, num_gpus=1, concurrency=4)    # GPU stage
ds.write_parquet("s3://bucket/predictions/")
The streaming executor pipelines the CPU and GPU stages, so the GPU stays busy while the CPU pre-processes the next batch.

Image classification

import torchvision

class ImageClassifier:
    def __init__(self):
        self.model = torchvision.models.resnet50(weights="DEFAULT").cuda().eval()

    def __call__(self, batch):
        import torch, numpy as np
        x = torch.from_numpy(batch["image"]).cuda().permute(0, 3, 1, 2).float() / 255.0
        with torch.no_grad():
            logits = self.model(x)
        batch["pred"] = logits.argmax(dim=-1).cpu().numpy()
        return batch

ds = ray.data.read_images("s3://bucket/images/", size=(224, 224))
ds = ds.map_batches(ImageClassifier, num_gpus=1, batch_size=128, concurrency=4)
ds.write_parquet("s3://bucket/preds/")

Tabular models

For lightweight models (sklearn, XGBoost), skip the GPU and use a function UDF:
def predict(batch, model):
    batch["pred"] = model.predict_proba(batch["features"])
    return batch

ds.map_batches(predict, fn_kwargs={"model": model}, num_cpus=2, batch_size=10_000)

Throughput tuning

KnobEffect
batch_sizeLarger batches → higher GPU utilization, more memory.
concurrencyMore replicas → more parallelism if you have spare GPUs.
num_cpus (CPU stages)Avoid over-subscribing the host.
Block size (target_max_block_size)Smaller blocks → lower latency, higher overhead.

Multi-model pipelines

Compose models by adding more stages:
ds = ds.map_batches(Detector, num_gpus=1, concurrency=2)
ds = ds.map_batches(Classifier, num_gpus=1, concurrency=2)
ds.write_parquet("s3://bucket/out/")

Next steps

Working with LLMs

LLM-specific batch inference.

Performance tips

Squeeze out the last 20% of throughput.