Preprocess

Preprocess#

You need to convert the classes to labels (unique integers) so that you can train a classifier that can correctly predict the class given an input image. But before you do this, apply the same data ingestion and preprocessing as the previous notebook.

def add_class(row):
    row["class"] = row["path"].rsplit("/", 3)[-2]
    return row
# Preprocess data splits.
train_ds = ray.data.read_images(
    "s3://doggos-dataset/train", include_paths=True, shuffle="files"
)
train_ds = train_ds.map(add_class)
val_ds = ray.data.read_images("s3://doggos-dataset/val", include_paths=True)
val_ds = val_ds.map(add_class)

Define a Preprocessor class that:

  • creates an embedding. A later step moves the embedding layer outside of the model since you freeze the embedding layer’s weights and so you don’t have to do it repeatedly as part of the model’s forward pass, saving on unnecessary compute.

  • convert the classes into labels for the classifier.

While you could’ve just done this step as a simple operation, you’re taking the time to organize it as a class so that you can save and load for inference later.

def convert_to_label(row, class_to_label):
    if "class" in row:
        row["label"] = class_to_label[row["class"]]
    return row
import numpy as np
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor
from doggos.embed import EmbedImages
class Preprocessor:
    """Preprocessor class."""

    def __init__(self, class_to_label=None):
        self.class_to_label = class_to_label or {}  # mutable defaults
        self.label_to_class = {v: k for k, v in self.class_to_label.items()}

    def fit(self, ds, column):
        self.classes = ds.unique(column=column)
        self.class_to_label = {tag: i for i, tag in enumerate(self.classes)}
        self.label_to_class = {v: k for k, v in self.class_to_label.items()}
        return self

    def transform(self, ds, concurrency=4, batch_size=64, num_gpus=1):
        ds = ds.map(
            convert_to_label,
            fn_kwargs={"class_to_label": self.class_to_label},
        )
        ds = ds.map_batches(
            EmbedImages,
            fn_constructor_kwargs={
                "model_id": "openai/clip-vit-base-patch32",
                "device": "cuda",
            },
            concurrency=4,
            batch_size=64,
            num_gpus=1,
            accelerator_type="T4",
        )
        ds = ds.drop_columns(["image"])
        return ds

    def save(self, fp):
        with open(fp, "w") as f:
            json.dump(self.class_to_label, f)
# Preprocess.
preprocessor = Preprocessor()
preprocessor = preprocessor.fit(train_ds, column="class")
train_ds = preprocessor.transform(ds=train_ds)
val_ds = preprocessor.transform(ds=val_ds)
Data processing

See this extensive guide on data loading and preprocessing for the last-mile preprocessing you need to do prior to training your models. However, Ray Data does support performant joins, filters, aggregations, etc., for the more structure data processing your workloads may need.

import shutil
# Write processed data to cloud storage.
preprocessed_data_path = os.path.join(
    "/mnt/cluster_storage", "doggos/preprocessed_data"
)
if os.path.exists(preprocessed_data_path):  # Clean up.
    shutil.rmtree(preprocessed_data_path)
preprocessed_train_path = os.path.join(preprocessed_data_path, "preprocessed_train")
preprocessed_val_path = os.path.join(preprocessed_data_path, "preprocessed_val")
train_ds.write_parquet(preprocessed_train_path)
val_ds.write_parquet(preprocessed_val_path)
(autoscaler +25s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(autoscaler +25s) [autoscaler] [4xT4:48CPU-192GB] Attempting to add 1 node to the cluster (increasing from 0 to 1).
(autoscaler +30s) [autoscaler] [4xT4:48CPU-192GB|g4dn.12xlarge] [us-west-2a] [on-demand] Launched 1 instance.
(autoscaler +1m15s) [autoscaler] Cluster upscaled to {104 CPU, 8 GPU}.
(autoscaler +3m10s) [autoscaler] [8CPU-32GB] Attempting to add 1 node to the cluster (increasing from 0 to 1).
(autoscaler +3m10s) [autoscaler] [8CPU-32GB|m5.2xlarge] [us-west-2a] [on-demand] Launched 1 instance.
Store often, save compute

Store the preprocessed data into shared cloud storage to:

  • save a record of what this preprocessed data looks like

  • avoid triggering the entire preprocessing for each batch the model processes

  • avoid materialize of the preprocessed data because you shouldn’t force large data to fit in memory