Ray Crash Course - Python Multiprocessing with Ray¶
© 2019-2021, Anyscale. All Rights Reserved
This lesson explores how to replace two popular multiprocessing libraries with Ray replacements to break the one-machine boundary:
multiprocessing.Pool
for general management of process pools.joblib
, the underpinnings of scikit-learn, which Ray can scale to a cluster.
We also examine how Ray can work with Python's asyncio
.
Tip: For more about Ray, see ray.io or the Ray documentation.
import ray, time, sys, os
import numpy as np
ray.init(ignore_reinit_error=True)
The Ray Dashboard, if you are running this notebook on a local machine:
Drop-in Replacements for Popular Single-node, Multiprocessing Libraries¶
The Python community has three popular libraries for breaking out of Python's global interpreter lock to enable better multiprocessing and concurrency. Ray now offers drop-in replacements for two of them, multiprocessing.Pool
and joblib
, and integration with the third, Python's asyncio
.
This section explores the multiprocessing.Pool
and joblib
replacements.
Library | Library Docs | Ray Docs | Description |
---|---|---|---|
multiprocessing.Pool |
docs | Ray | Create a pool of processes for running work. The Ray replacement allows scaling to a cluster. |
joblib |
docs | Ray | Ray supports running distributed scikit-learn programs by implementing a Ray backend for joblib using Ray Actors instead of local processes. This makes it easy to scale existing applications that use scikit-learn from a single node to a cluster. |
Multiprocessing.Pool¶
If your application already uses multiprocessing.Pool
, then scaling beyond a single node just requires replacing your import statements from this:
from multiprocessing.pool import Pool
To this:
from ray.util.multiprocessing.pool import Pool
A local Ray cluster will be started the first time you create a Pool and your tasks will be distributed across it. See Run on a Cluster in the Ray documentation for details on how to use a multi-node Ray cluster instead.
Here is an example:
from ray.util.multiprocessing import Pool
def f(index):
return index
def run_with_pool(n=100):
pool = Pool()
for result in pool.map(f, range(n)):
print(f'{result}|', end='')
run_with_pool()
We used a function run_with_pool()
to wrap a scope around the pool
construction. That way, it goes out of scope when we're finished and Ray can reclaim the resources.
The full multiprocessing.Pool
API is supported. Please see Python's multiprocessing documentation for details.
Joblib¶
Ray supports running distributed scikit-learn programs by implementing a Ray backend for joblib using Ray Actors instead of local processes. This makes it easy to scale existing applications that use scikit-learn from a single node to a cluster.
Note: This API is new and may be revised in the future. Please report any issues you encounter.
To get started, use from ray.util.joblib import register_ray
and then run register_ray()
. This will register Ray as a joblib
backend for scikit-learn
to use. Then run your original scikit-learn
code inside with joblib.parallel_backend('ray')
. This will start a local Ray cluster.
See Run on a Cluster in the Ray documentation for details on how to use a multi-node Ray cluster instead.
Here is an example. First, we set up Ray with joblib
:
import joblib
from ray.util.joblib import register_ray
register_ray()
Now let's use an example taken from the scikit-learn examples, Restricted Boltzmann Machine features for digit classification.
# Authors: Yann N. Dauphin, Vlad Niculae, Gabriel Synnaeve
# License: BSD
import numpy as np
from scipy.ndimage import convolve
from sklearn import linear_model, datasets, metrics
from sklearn.model_selection import train_test_split
from sklearn.neural_network import BernoulliRBM
from sklearn.pipeline import Pipeline
from sklearn.base import clone
# #############################################################################
# Setting up
def nudge_dataset(X, Y):
"""
This produces a dataset 5 times bigger than the original one,
by moving the 8x8 images in X around by 1px to left, right, down, up
"""
direction_vectors = [
[[0, 1, 0],
[0, 0, 0],
[0, 0, 0]],
[[0, 0, 0],
[1, 0, 0],
[0, 0, 0]],
[[0, 0, 0],
[0, 0, 1],
[0, 0, 0]],
[[0, 0, 0],
[0, 0, 0],
[0, 1, 0]]]
def shift(x, w):
return convolve(x.reshape((8, 8)), mode='constant', weights=w).ravel()
X = np.concatenate([X] +
[np.apply_along_axis(shift, 1, X, vector)
for vector in direction_vectors])
Y = np.concatenate([Y for _ in range(5)], axis=0)
return X, Y
# Load Data
X, y = datasets.load_digits(return_X_y=True)
X = np.asarray(X, 'float32')
X, Y = nudge_dataset(X, y)
X = (X - np.min(X, 0)) / (np.max(X, 0) + 0.0001) # 0-1 scaling
X_train, X_test, Y_train, Y_test = train_test_split(
X, Y, test_size=0.2, random_state=0)
# Models we will use
logistic = linear_model.LogisticRegression(solver='newton-cg', tol=1)
rbm = BernoulliRBM(random_state=0, verbose=True)
rbm_features_classifier = Pipeline(
steps=[('rbm', rbm), ('logistic', logistic)])
# #############################################################################
# Training
# Hyper-parameters. These were set by cross-validation,
# using a GridSearchCV. Here we are not performing cross-validation to
# save time.
rbm.learning_rate = 0.06
rbm.n_iter = 10
# More components tend to give better prediction performance, but larger
# fitting time
rbm.n_components = 100
logistic.C = 6000
Now we actually use the Ray backend for joblib
:
with joblib.parallel_backend('ray'):
# Training RBM-Logistic Pipeline
rbm_features_classifier.fit(X_train, Y_train)
# Training the Logistic regression classifier directly on the pixel
raw_pixel_classifier = clone(logistic)
raw_pixel_classifier.C = 100.
raw_pixel_classifier.fit(X_train, Y_train)
# #############################################################################
# Evaluation
Y_pred = rbm_features_classifier.predict(X_test)
print("Logistic regression using RBM features:\n%s\n" % (
metrics.classification_report(Y_test, Y_pred)))
Y_pred = raw_pixel_classifier.predict(X_test)
print("Logistic regression using raw pixel features:\n%s\n" % (
metrics.classification_report(Y_test, Y_pred)))
If you see warnings about the The 'context' argument
, you can safely ignore them.
Using Ray with asyncio¶
Python's asyncio
can be used with Ray actors and tasks.
Note: The Async API support is experimental and work is ongoing to improve it. Please report any issues you encounter.
Actors¶
Here is an actor example, adapted from the Ray documentation.
Note the comment before run_concurrent
. While normally actor methods are invoked synchronously, in this case there may be concurrent invocations!
import asyncio
@ray.remote
class AsyncActor:
# Multiple invocations of this method can be running in
# the event loop at the same time.
async def run_concurrent(self, index):
print(f'started {index}')
await asyncio.sleep(0.2) # Concurrent workload here
print(f'finished {index}')
return index
actor = AsyncActor.remote()
refs = []
values = []
for i in range(10):
# regular ray.get
refs.append(actor.run_concurrent.remote(i))
# async ray.get
values.append(await actor.run_concurrent.remote(10+i))
print(ray.get(refs))
print(values)
Note that using await
with a method invocation implicitly invokes ray.get()
on the returned object ref.
Under the hood, Ray runs all of the methods inside a single python event loop.
Note: Running blocking
ray.get
andray.wait
inside async actor methods is not allowed, becauseray.get
will block the execution of the event loop.
You can limit the number of concurrent task running at once using the max_concurrency
flag. By default, 1000 tasks can be running concurrently.
In the following cell, we set the max_concurrency
to 3
, so the subsequent cell will run tasks three at a time. Since there are 12
total, we'll have four groups, each sleeping about 0.2
seconds, so it should take about 0.8
seconds to run.
actor3 = AsyncActor.options(max_concurrency=3).remote()
%time ray.get([actor3.run_concurrent.remote(i) for i in range(12)])
See the 03: Ray Internals lesson in the Advanced Ray tutorial for more details on async actors.
Async Tasks¶
For Ray tasks, the object refs returned by them can be converted to async.Future
instances.
@ray.remote
def some_task():
return 1
# The normal Ray way:
ref, _ = ray.wait([some_task.remote()])
ray.get(ref)
The asyncio
alternative way:
await some_task.remote()
future = await asyncio.wait([some_task.remote()])
print(future)
# A tuple is returned:
for x in future:
print(f' {type(x)} => {x}')
See the asyncio docs for more details on asyncio
patterns, including timeouts and asyncio.gather
.
ray.shutdown() # "Undo ray.init()". Terminate all the processes started in this notebook.
The next lesson, Ray Parallel Iterators introduces the parallel iterator API for simple data ingestion and processing. It can be thought of as syntactic sugar around Ray actors and ray.wait
loops.
NOTE: Since Ray 1.7, ray.util.iter
module has been deprecated, so we advice not to use this notebook.