5. Stateful transformations with Ray Actors#
In cases like batch inference, you want to spin up a number of actor processes that are initialized once with your model and reused to process multiple batches.
To implement this, you can use the map_batches API with a “Callable” class method that implements:
__init__: Initialize any expensive state.__call__: Perform the stateful transformation.
For example, we can implement a MNISTClassifier that:
loads a pre-trained model from a local file
accepts a batch of images and generates the predicted label
device = "cpu" # or "cpu" if you want to run it locally on CPU
class MNISTClassifier:
def __init__(self, remote_path: str, local_path: str):
subprocess.run(f"aws s3 cp --no-sign-request {remote_path} {local_path}", shell=True, check=True)
self.model = torch.jit.load(local_path).to(device).eval()
def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
images = torch.tensor(batch["image"]).float().to(device)
with torch.no_grad():
logits = self.model(images).cpu().numpy()
batch["predicted_label"] = np.argmax(logits, axis=1)
return batch
We can now use the map_batches API to apply the transformation to each batch of data.
storage_folder = 'mnt/cluster_storage' # Modify this path to your local folder if it runs on your local environment
local_path = f"{storage_folder}/model.pt"
mnist_classifier_args = {
"remote_path": "s3://anyscale-public-materials/ray-ai-libraries/mnist/model/model.pt",
"local_path": local_path,
}
ds_preds = ds_normalized.map_batches(
MNISTClassifier,
fn_constructor_kwargs=mnist_classifier_args,
num_gpus= 1 if device == 'cuda' else None, # Use GPU if available
concurrency=3,
batch_size=100,
)
5.1 Resource specification for stateful transformations#
It is common when you have varying hardware types in your cluster to want to further specify which accelerators to use for each stage of your pipeline.
Let’s show how to achieve this with the resources parameter.
To use a GPU for following examples, we suggest to run them on Anyscale Ray Cluster.
ds_preds = ds_normalized.map_batches(
MNISTClassifier,
fn_constructor_kwargs=mnist_classifier_args,
num_gpus= 1 if device == 'cuda' else None, # Use GPU if available
concurrency=3,
batch_size=100,
resources={"accelerator_type:T4": 0.0001},
)
Note: Pass in the Callable class uninitialized. Your driver will not execute the class constructor. Ray will pass in the arguments to the class constructor when the class is actually used in a transformation.
5.2 Note on autoscaling for stateful transformations#
For stateless transformations, Ray Data will automatically scale up the number of tasks to match the number of input blocks.
For stateful transformations, Ray Data will schedule tasks proportional to the number of actors (workers) in the pool.
To specify an autoscaling pool, use a tuple of (min_size, max_size) for the concurrency parameter.
Ray Data will start with min_size actors and automatically scale up to max_size as needed.
ds_preds = ds_normalized.map_batches(
MNISTClassifier,
fn_constructor_kwargs=mnist_classifier_args,
num_gpus= 1 if device == 'cuda' else None, # Use GPU if available
concurrency=(1, 4), # Autoscale pool based on blocks, resources and limits
batch_size=100,
#resources={"accelerator_type:T4": 0.0001}, # Optional if you run it locally
)
batch_preds = ds_preds.take_batch(100)
batch_preds