Skip to main content

Documentation Index

Fetch the complete documentation index at: https://ray-preview.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Ray Data exposes a small set of operators that compose into rich pipelines.

map and map_batches

map applies a function to each row; map_batches applies a function to each block. Prefer map_batches — it amortizes function-call overhead and works with vectorized libraries like NumPy and pandas.
def add_score(batch):
    batch["score"] = batch["x"] ** 2
    return batch

ds = ds.map_batches(add_score)
The function must return a dict of column-arrays, a pandas DataFrame, or an Arrow Table.

Class-based UDFs

For stateful or expensive setup (loading a model, opening a connection), pass a class. Ray Data instantiates one actor per worker.
class Predictor:
    def __init__(self):
        self.model = load_model()

    def __call__(self, batch):
        batch["pred"] = self.model.predict(batch["features"])
        return batch

ds = ds.map_batches(Predictor, concurrency=4, num_gpus=1, batch_size=64)

Resource requests

num_cpus, num_gpus, memory, and concurrency apply per stage:
ds.map_batches(score, num_gpus=1, concurrency=8, batch_size=128)

filter

ds = ds.filter(lambda row: row["score"] > 0.5)

flat_map

Emit zero or more rows per input row.
ds = ds.flat_map(lambda row: [{"token": t} for t in row["text"].split()])

select_columns and drop_columns

ds = ds.select_columns(["x", "y"])
ds = ds.drop_columns(["debug_info"])

groupby and aggregate

ds.groupby("category").mean("value")
ds.groupby("category").count()
Custom aggregators implement AggregateFn:
from ray.data.aggregate import AggregateFn

class TopK(AggregateFn):
    def __init__(self, k):
        self.k = k
    def init(self, key): return []
    def accumulate_row(self, acc, row): return sorted(acc + [row["v"]])[-self.k:]
    def merge(self, a, b): return sorted(a + b)[-self.k:]
    def finalize(self, acc): return acc

ds.groupby("cat").aggregate(TopK(10))

sort

ds = ds.sort("score", descending=True)

random_shuffle and randomize_block_order

ds = ds.random_shuffle()                # global shuffle
ds = ds.randomize_block_order()         # cheap block-level shuffle
random_shuffle is the more expensive of the two but yields a uniformly random ordering.

repartition

ds = ds.repartition(64)  # split into 64 blocks

union and zip

combined = ds_a.union(ds_b)        # concat
zipped = ds_a.zip(ds_b)            # row-wise zip

Next steps

Iterating

Consume transformed data in training and inference loops.

Batch inference

Run a model over a dataset.