4. Transforming data#

To transform data, we can use the map_batches API.

map_batches takes a user-defined function which accepts a batch of data and returns a batch of transformed data.

def normalize(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
    transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
    batch["image"] = [transform(image) for image in batch["image"]]
    return batch

Calling ds.map_batches will add a map_batches operator to the execution plan.

ds_normalized = ds.map_batches(normalize)

To tune the batch size for the transformation, specify the batch_size parameter.

ds_normalized = ds.map_batches(normalize, batch_size=32)

Note: batching only helps with performance when a transformation is vectorized - i.e. benefits from processing multiple rows at once.

Finding the optimal batch size depends on the hardware available and the target utilization.

4.1 On resource specification#

To specify the exact resources to use for a map_batches transformation, specify the num_cpus, num_gpus, memory, and resources parameters.

  • num_cpus: Number of CPUs to use for each task (use >1 if task performs multithreaded operations).

  • num_gpus: Number of GPUs to use for each task.

  • memory: Amount of RAM to use for each task (in bytes).

  • resources: What is referred to as custom resources in Ray. It is a way to specify which node types to use for each task.

ds_normalized = ds.map_batches(normalize, batch_size=32, num_cpus=1, memory=100 * 1024**2)

Note: Ray only performs a logical allocation of resources and does not physically enforce resource limits.

By default, Ray will retry OOM errors and Ray Data will infinitely retry tasks that fail due to system failures.

Specifying resources helps avoid resource contention, avoiding unnecessary retries and confusing OOM errors.

4.2 On concurrency limiting#

Ray Data will attempt to use all the resources available in the cluster.

In particular, it will schedule as many tasks as there are input blocks for each operator (stage) in the pipeline.

To limit the concurrency for a particular transformation, specify the concurrency parameter.

concurrency_limit = 10  # Don't schedule more than 10 tasks at a time

ds_normalized = ds.map_batches(
    normalize,
    batch_size=32,
    num_cpus=1,
    memory=100 * 1024**2,
    concurrency=concurrency_limit,
)

Note: Limiting concurrency might be helpful when you have an unbounded compute configuration (max number of nodes is too high) and you want to avoid aggressive scaling for a fast step in the pipeline (e.g. light preprocessing of data).

Additionally, note that Ray Data will attempt to fuse transformations together to reduce data transfer between stages. Setting different concurrency limits for different transformations might prevent this optimization.

To verify the output of normalize(), call take_batch() on the dataset.

normalized_batch = ds_normalized.take_batch(batch_size=10)

Check the normalized pixel value range:

for image in normalized_batch["image"]:
    assert image.shape == (1, 28, 28) # channel, height, width
    assert image.min() >= -1 and image.max() <= 1 # normalized to [-1, 1]

Activity

Add the ground truth label extracted from the image path.

Starting point:

ds = ray.data.read_images("s3://anyscale-public-materials/ray-ai-libraries/mnist/50_per_index/", include_paths=True)
ds_normalized = ds.map_batches(normalize)
# batch = ds_normalized.take_batch(batch_size=3)

# add_label
ds_labeled = ds_normalized.map_batches(add_label)
labeled_batch = ds_labeled.take_batch(10)
print(labeled_batch)

The task

Implement add_label function that takes batch of data and return batch with image label.

The image path is in the format:s3://anyscale-public-materials/ray-ai-libraries/mnist/50_per_index/{label}/{image_id}.png.

Hint: Define the add_label function; use take_batch() to better understand the data format.

def add_label(batch):
    ...
    return batch
# Write your solution here
Click to view solution
def add_label(batch):
    labels = []
    for item in batch["path"]:
        label = int(item.split("/")[-2])
        labels.append(label)

    batch["label"] = np.array(labels)
    return batch

ds = ray.data.read_images("s3://anyscale-public-materials/ray-ai-libraries/mnist/50_per_index/", include_paths=True)
ds_normalized = ds.map_batches(normalize)
ds_labeled = ds_normalized.map_batches(add_label)
labeled_batch = ds_labeled.take_batch(10)
print(labeled_batch)