Documentation Index Fetch the complete documentation index at: https://ray-preview.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Ray Data exposes a small set of operators that compose into rich pipelines.
map and map_batches
map applies a function to each row; map_batches applies a function to each block. Prefer map_batches — it amortizes function-call overhead and works with vectorized libraries like NumPy and pandas.
def add_score ( batch ):
batch[ "score" ] = batch[ "x" ] ** 2
return batch
ds = ds.map_batches(add_score)
The function must return a dict of column-arrays, a pandas DataFrame, or an Arrow Table.
Class-based UDFs
For stateful or expensive setup (loading a model, opening a connection), pass a class. Ray Data instantiates one actor per worker.
class Predictor :
def __init__ ( self ):
self .model = load_model()
def __call__ ( self , batch ):
batch[ "pred" ] = self .model.predict(batch[ "features" ])
return batch
ds = ds.map_batches(Predictor, concurrency = 4 , num_gpus = 1 , batch_size = 64 )
Resource requests
num_cpus, num_gpus, memory, and concurrency apply per stage:
ds.map_batches(score, num_gpus = 1 , concurrency = 8 , batch_size = 128 )
filter
ds = ds.filter( lambda row : row[ "score" ] > 0.5 )
flat_map
Emit zero or more rows per input row.
ds = ds.flat_map( lambda row : [{ "token" : t} for t in row[ "text" ].split()])
select_columns and drop_columns
ds = ds.select_columns([ "x" , "y" ])
ds = ds.drop_columns([ "debug_info" ])
groupby and aggregate
ds.groupby( "category" ).mean( "value" )
ds.groupby( "category" ).count()
Custom aggregators implement AggregateFn:
from ray.data.aggregate import AggregateFn
class TopK ( AggregateFn ):
def __init__ ( self , k ):
self .k = k
def init ( self , key ): return []
def accumulate_row ( self , acc , row ): return sorted (acc + [row[ "v" ]])[ - self .k:]
def merge ( self , a , b ): return sorted (a + b)[ - self .k:]
def finalize ( self , acc ): return acc
ds.groupby( "cat" ).aggregate(TopK( 10 ))
sort
ds = ds.sort( "score" , descending = True )
random_shuffle and randomize_block_order
ds = ds.random_shuffle() # global shuffle
ds = ds.randomize_block_order() # cheap block-level shuffle
random_shuffle is the more expensive of the two but yields a uniformly random ordering.
repartition
ds = ds.repartition( 64 ) # split into 64 blocks
union and zip
combined = ds_a.union(ds_b) # concat
zipped = ds_a.zip(ds_b) # row-wise zip
Next steps
Iterating Consume transformed data in training and inference loops.
Batch inference Run a model over a dataset.