5. Data Operations: Shuffling, Grouping and Aggregation#
Let’s look at some more involved transformations.
Shuffling data#
There are different options to shuffle data in Ray Data of varying degrees of randomness and performance.
File based shuffle on read#
To randomly shuffle the ordering of input files before reading, use the shuffle=”files” parameter.
ds_file_shuffled = ray.data.read_parquet(DATA_PATH, columns=COLUMNS, shuffle="files")
ds_file_shuffled
Shuffling block order#
This option randomizes the order of blocks in a dataset.
Applying this operation alone doesn’t involve heavy computation and communication. However, it requires Ray Data to materialize all blocks before applying the operation.
Let’s read the data and shuffle the block order.
ds = (
ray.data.read_parquet(
"s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2011-05.parquet",
columns=COLUMNS,
)
)
To perform block order shuffling, use randomize_block_order.
ds_block_based_shuffle = ds.randomize_block_order()
ds_block_based_shuffle.to_pandas()
Shuffle all rows globally#
To randomly shuffle all rows globally, call random_shuffle(). This is the slowest option for shuffle, and requires transferring data across network between workers. This option achieves the best randomness among all options.
ds_row_based_shuffle = ds.random_shuffle()
ds_row_based_shuffle.to_pandas()
Custom batching using groupby and aggregations#
In case you want to generate batches according to a specific key, you can use groupby to group the data by the key and then use map_groups to apply the transformation.
For instance, let’s compute the average trip distance per passenger count. Here is how we would do it with pandas:
df.groupby("payment_type")["trip_distance"].mean()
Here is how we would do the same operation with Ray Data:
num_cpus = 8
ds.repartition(num_cpus).groupby("payment_type").mean("trip_distance").to_pandas()
Here are the main aggregation functions available in Ray Data:
count
max
mean
min
sum
std
Note: This is an area of active development in Ray Data. The current implementation of groupby is not as optimized as it could be. We are working on improving the performance of groupby and map_groups operations.
For more details, the current implementation makes use of a sort operation which instead can be done using a hash-based implementation. Additionally, we had to repartition the data to maximize parallelism - in the future Ray Data should be able to dynamically repartition the data to maximize parallelism.