Documentation Index Fetch the complete documentation index at: https://ray-preview.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Batch inference is one of Ray Data’s flagship use cases: load data, run a model, write results. Ray Data handles streaming, GPU placement, and backpressure for you.
Pattern
import ray
from ray.data import Dataset
class Predictor :
def __init__ ( self , model_path : str ):
import torch
self .model = torch.load(model_path).cuda().eval()
def __call__ ( self , batch ):
import torch
with torch.no_grad():
x = torch.from_numpy(batch[ "features" ]).cuda()
preds = self .model(x).argmax( dim =- 1 ).cpu().numpy()
return { ** batch, "prediction" : preds}
ds = ray.data.read_parquet( "s3://bucket/inputs/" )
ds = ds.map_batches(
Predictor,
fn_constructor_args = ( "model.pt" ,),
concurrency = 4 ,
num_gpus = 1 ,
batch_size = 256 ,
)
ds.write_parquet( "s3://bucket/outputs/" )
Why this works
Class-based UDF : the model loads once per actor, not per batch.
concurrency=4, num_gpus=1 : four replicas, each on its own GPU. Ray Data places them across the cluster.
Streaming execution : reading, predicting, and writing run concurrently.
Lazy evaluation : map_batches and write_parquet together produce a streaming pipeline that never materializes the full dataset.
CPU pre-processing + GPU inference
Mix stages in the same pipeline. Each stage gets its own resource budget.
ds = ray.data.read_images( "s3://bucket/images/" , size = ( 224 , 224 ))
ds = ds.map_batches(decode_and_normalize, num_cpus = 2 ) # CPU stage
ds = ds.map_batches(Predictor, num_gpus = 1 , concurrency = 4 ) # GPU stage
ds.write_parquet( "s3://bucket/predictions/" )
The streaming executor pipelines the CPU and GPU stages, so the GPU stays busy while the CPU pre-processes the next batch.
Image classification
import torchvision
class ImageClassifier :
def __init__ ( self ):
self .model = torchvision.models.resnet50( weights = "DEFAULT" ).cuda().eval()
def __call__ ( self , batch ):
import torch, numpy as np
x = torch.from_numpy(batch[ "image" ]).cuda().permute( 0 , 3 , 1 , 2 ).float() / 255.0
with torch.no_grad():
logits = self .model(x)
batch[ "pred" ] = logits.argmax( dim =- 1 ).cpu().numpy()
return batch
ds = ray.data.read_images( "s3://bucket/images/" , size = ( 224 , 224 ))
ds = ds.map_batches(ImageClassifier, num_gpus = 1 , batch_size = 128 , concurrency = 4 )
ds.write_parquet( "s3://bucket/preds/" )
Tabular models
For lightweight models (sklearn, XGBoost), skip the GPU and use a function UDF:
def predict ( batch , model ):
batch[ "pred" ] = model.predict_proba(batch[ "features" ])
return batch
ds.map_batches(predict, fn_kwargs = { "model" : model}, num_cpus = 2 , batch_size = 10_000 )
Throughput tuning
Knob Effect batch_sizeLarger batches → higher GPU utilization, more memory. concurrencyMore replicas → more parallelism if you have spare GPUs. num_cpus (CPU stages)Avoid over-subscribing the host. Block size (target_max_block_size) Smaller blocks → lower latency, higher overhead.
Multi-model pipelines
Compose models by adding more stages:
ds = ds.map_batches(Detector, num_gpus = 1 , concurrency = 2 )
ds = ds.map_batches(Classifier, num_gpus = 1 , concurrency = 2 )
ds.write_parquet( "s3://bucket/out/" )
Next steps
Working with LLMs LLM-specific batch inference.
Performance tips Squeeze out the last 20% of throughput.