7. Data Operations: grouping, aggregation, and shuffling#
Let’s look at some more involved transformations.
Some operations require all inputs to be materialized in object store. To determinte this, look for the methods with the AllToAllAPI decorator in the Dataset.py.
7.1. Custom batching using groupby.#
In case you want to generate batches according to a specific key, you can use groupby to group the data by the key and then use map_groups to apply the transformation.
For instance, let’s compute the accuracy of the model by “ground truth label”.
def add_label(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
batch["ground_truth_label"] = [int(path.split("/")[-2]) for path in batch["path"]]
return batch
def compute_accuracy(group: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
return {
"accuracy": [np.mean(group["predicted_label"] == group["ground_truth_label"])],
"ground_truth_label": group["ground_truth_label"][:1],
}
ds_preds.map_batches(add_label).groupby("ground_truth_label").map_groups(compute_accuracy).to_pandas()
Note: ds_preds is not re-computed given we have already materialized the dataset.
7.2. Aggregations#
Ray Data also supports a variety of aggregations. For instance, we can compute the mean accuracy across the entire dataset.
ds_preds.map_batches(add_label).map_batches(compute_accuracy).mean(on="accuracy")
Note: this is ConsumptionAPI
Ray Data provides collection of aggregation functions including:
countmaxmeanminsumstd
See relevant docs page here.
7.3. Shuffling data#
There are different options to shuffle data in Ray Data of varying degrees of randomness and performance.
7.3.1. File based shuffle on read#
To randomly shuffle the ordering of input files before reading, call a read function that supports shuffling, such as read_images(), and use the shuffle=”files” parameter.
ray.data.read_images("s3://anyscale-public-materials/ray-ai-libraries/mnist/50_per_index/", shuffle="files")
7.3.2. Shuffling block order#
This option randomizes the order of blocks in a dataset. Blocks are the basic unit of data chunk that Ray Data stores in the object store. Applying this operation alone doesn’t involve heavy computation and communication. However, it requires Ray Data to materialize all blocks in memory before applying the operation. Only use this option when your dataset is small enough to fit into the object store memory.
To perform block order shuffling, use randomize_block_order.
ds_randomized_blocks = ds_preds.randomize_block_order()
ds_randomized_blocks.materialize()
7.3.3. Shuffle all rows globally#
To randomly shuffle all rows globally, call random_shuffle(). This is the slowest option for shuffle, and requires transferring data across network between workers. This option achieves the best randomness among all options.
ds_randomized_rows = ds_preds.random_shuffle()
ds_randomized_rows.materialize()