Ray Crash Course - Tasks¶
© 2019-2021, Anyscale. All Rights Reserved
Let's quickly explore the Ray API using some examples that demonstrate how Ray enables horizontal scalability.
Tip: For more about Ray, see ray.io or the Ray documentation.
Evaluate the next three notebook cells (shift+return, if you're new to notebooks), down to and including run_simulations(...)
, and watch what happens:
Ns = [1000, 10000, 100000]
dmaps = make_dmaps(Ns)
dmaps[0] + dmaps[1] + dmaps[2]
run_simulations(dmaps)
# TIP: If you want to stop them, uncomment and run the next line:
stop_simulations(dmaps)
(If you can't see it, click here for a screen grab.)
What we just did was estimate $\pi$ (~3.14159) using a Monte Carlo technique, where we randomly sampled a uniform distribution, one with equal probably of picking any point in a square.
It works like this. Imagine each blue square is a piece of paper 2 meters by 2 meters you put on a wall. The circle inside each one has radius 1 meter.
Now suppose you throw $N$ darts at each paper. We're seeing $N = {\sim}1000, {\sim}10000, {\sim}100000$ examples. (This will be hard on your wall, so don't try this at home...)
Some darts will land inside the circle, call them $n$, and the rest will land outside, $N-n$. The area of a circle is ${\pi}r^{2}$ and the area of a square is $(2r)^{2} = 4r^{2}$. The ratio of $n/N$ approximately equals the ratio of the circle area over the square area, ${\pi}r^{2}/4r^{2} = {\pi}/4$. (Does it make sense that this ratio is independent of the actual radius value?).
In other words,
$\pi/4 \approx n/N$
$\pi \approx 4n/N$
So, to approximate $\pi$, we can count the number of darts thrown and the number that land inside the circle.
You probably noticed three things while the simulations were running or after they finished:
- The accuracy improved for larger $N$... well usually. Sometimes a lower $N$ simulation gets "lucky" and does as well as a higher $N$. In a real experiment, we would do many runs and then compute the average and standard deviation. (We'll do that below.)
- Because each $N$ is 10 times the $N$ to the left, it took roughly 10 times as long for the second to finish compared to the first, etc.
- The updates in the second and third simulations appeared to go faster as the neighbors to the left finished.
What this means is that if we really want a good estimate of $\pi$, we have to do runs with large $N$, but then we wait longer. Ideally, to get fast and accurate results, we would do as much work as possible in parallel, leveraging all the CPU cores available on our machine ... or our cluster.
Let's use Ray to achieve this.
Note: There is a lot of Python code used in this notebook, both for calculating Pi and for the graphs. We won't show most of it, but all the code can be found in this directory,
pi_calc.py
(calculating $\pi$) andtask_lesson_util.py
(graphics).
Parallelism with Ray¶
We did the previous calculation without fully exploiting all available cores. In a cluster, the rest of the cores on the rest of the machines would be idle, too.
We can use Ray to parallelize a lot of this work. Let's see how.
import numpy as np
import math, statistics, time
import ray
Some constants we'll use (and explain) below.
num_workers = 8
trials = 20
Let's Start Ray¶
If you are running these tutorials on your laptop, each notebook will start a mini-cluster when we call ray.init()
below. Then we'll shut it down at the end of the notebook, so be sure to evaluate the last cell in each notebook that calls ray.shutdown()
.
If you are learning on the Anyscale platform, your environment is already running a Ray cluster with one or more nodes. So, when we call ray.init()
, it will connect to that running cluster. You should still run the ray.shutdown()
cell at the end of each notebook, but it won't shutdown the whole cluster.
For more details on the options you can pass to ray.init()
, see lesson 6, Exploring Ray API Calls.
If you are interested in the details of running a Ray cluster using the ray
CLI, see lesson 7, Running Ray Clusters and see also the corresponding Ray documentation. There is also a script tools/start-ray.sh
that you can play with. (It was used in an earlier version of these tutorials.)
The ignore_reinit_error=True
argument tells Ray not to complain if we rerun the cell; Ray will just ignore the request to initialize.
ray.init(ignore_reinit_error=True)
2021-10-17 17:00:11,652 INFO services.py:1252 -- View the Ray dashboard at http://127.0.0.1:8265
{'node_ip_address': '192.168.86.44', 'raylet_ip_address': '192.168.86.44', 'redis_address': '192.168.86.44:6379', 'object_store_address': '/tmp/ray/session_2021-10-17_17-00-09_857234_12126/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2021-10-17_17-00-09_857234_12126/sockets/raylet', 'webui_url': '127.0.0.1:8265', 'session_dir': '/tmp/ray/session_2021-10-17_17-00-09_857234_12126', 'metrics_export_port': 61014, 'node_id': 'c337886b016e6f8383b380b31d709a1efbd2dc0e9bdcb7be5a0431e2'}
Tip: Having trouble starting Ray? See the Troubleshooting tips.
Although we don't need it immediately, we'll use the Ray Dashboard to watch performance metrics like CPU utilization.
Use the webui_url
from above to view the dashboard.
If you are running on the Anyscale platform, use the URL provided by your instructor to open the Dashboard.
Let's define a function to do the Pi calculation that simplifies the code we used above for graphing purposes. We won't do the "dart graphs" from now on, because they add a lot of overhead that would obscure the performance
This function estimates $\pi$ for the number of samples requested. It uses NumPy. If you're not familiar with it, the implementation details aren't essential to understand, but the comments try to explain them.
def estimate_pi(num_samples):
xs = np.random.uniform(low=-1.0, high=1.0, size=num_samples) # Generate num_samples random samples for the x coordinate.
ys = np.random.uniform(low=-1.0, high=1.0, size=num_samples) # Generate num_samples random samples for the y coordinate.
xys = np.stack((xs, ys), axis=-1) # Like Python's "zip(a,b)"; creates np.array([(x1,y1), (x2,y2), ...]).
inside = xs*xs + ys*ys <= 1.0 # Creates a predicate over all the array elements.
xys_inside = xys[inside] # Selects only those "zipped" array elements inside the circle.
in_circle = xys_inside.shape[0] # Return the number of elements inside the circle.
approx_pi = 4.0*in_circle/num_samples # The Pi estimate.
return approx_pi
Let's try it:
Ns = [10000, 50000, 100000, 500000, 1000000] #, 5000000, 10000000] # Larger values take a long time on small VMs and machines!
maxN = Ns[-1]
maxN
1000000
fmt = '{:10.5f} seconds: pi ~ {:7.6f}, stddev = {:5.4f}, error = {:5.4f}%'
def try_it(n, trials):
print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='') # str_large_n imported above.
start = time.time()
pis = [estimate_pi(n) for _ in range(trials)]
approx_pi = statistics.mean(pis)
stdev = statistics.stdev(pis)
duration = time.time() - start
error = (100.0*abs(approx_pi-np.pi)/np.pi)
print(fmt.format(duration, approx_pi, stdev, error)) # str_large_n imported above.
return trials, n, duration, approx_pi, stdev, error
The next cell will take many seconds to complete (depending on your setup). As soon as it starts, switch to the Ray Dashboard you opened above. Notice the total CPU and memory utilizations at the top while the cell runs.
Tip: If all the following trials finish in under a few seconds for the largest
n
value inNs
and the largest number of trials, consider changingNs
above to add larger values.
data_ns = [try_it(n, trials) for n in Ns]
trials = 20, N = 10000: 0.01174 seconds: pi ~ 3.144980, stddev = 0.0183, error = 0.1078% trials = 20, N = 50000: 0.05045 seconds: pi ~ 3.141376, stddev = 0.0080, error = 0.0069% trials = 20, N = 100000: 0.09126 seconds: pi ~ 3.141838, stddev = 0.0064, error = 0.0078% trials = 20, N = 500000: 0.52302 seconds: pi ~ 3.142480, stddev = 0.0026, error = 0.0282% trials = 20, N = 1000000: 0.92690 seconds: pi ~ 3.141603, stddev = 0.0019, error = 0.0003%
data_trials = [try_it(maxN, trials) for trials in range(5,20,2)]
trials = 5, N = 1000000: 0.25012 seconds: pi ~ 3.141197, stddev = 0.0013, error = 0.0126% trials = 7, N = 1000000: 0.31887 seconds: pi ~ 3.141889, stddev = 0.0013, error = 0.0094% trials = 9, N = 1000000: 0.38537 seconds: pi ~ 3.141680, stddev = 0.0018, error = 0.0028% trials = 11, N = 1000000: 0.47984 seconds: pi ~ 3.140684, stddev = 0.0013, error = 0.0289% trials = 13, N = 1000000: 0.58633 seconds: pi ~ 3.141358, stddev = 0.0018, error = 0.0075% trials = 15, N = 1000000: 0.67253 seconds: pi ~ 3.141415, stddev = 0.0018, error = 0.0056% trials = 17, N = 1000000: 0.78668 seconds: pi ~ 3.141687, stddev = 0.0012, error = 0.0030% trials = 19, N = 1000000: 0.85619 seconds: pi ~ 3.140900, stddev = 0.0018, error = 0.0220%
(We'll graph the data below.)
The CPU utilization never gets close to 100%. On a four-core machine, for example, the number will be about 25%. (The Ray process meters will stay at or near zero until later in this notebook.)
So, this runs on one core, while the other cores are idle. Now we'll try with Ray.
From Python Functions to Ray Tasks¶
You create a Ray task by decorating a normal Python function with @ray.remote
. These tasks will be scheduled across your Ray cluster (or your laptop CPU cores).
Tip: The Ray Package Reference in the Ray Docs is useful for exploring the API features we'll learn.
Here is a Ray task for estimate_pi
. All we need is a wrapper around the original function.
@ray.remote
def ray_estimate_pi(num_samples):
return estimate_pi(num_samples)
Let's try it. To invoke a task, you use function.remote(args)
:
ray_estimate_pi.remote(100)
ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000)
What is this ObjectRef
? A Ray task is an asynchronous computation. The ObjectRef
returned is a future that we use to retrieve the resulting value from the task when it completes. We use ray.get(ref)
to get it:
ref = ray_estimate_pi.remote(100)
print(ray.get(ref))
3.24
We can also work with lists of refs:
refs = [ray_estimate_pi.remote(n) for n in [100, 1000, 10000]]
print(ray.get(refs))
[3.24, 3.2, 3.1468]
Okay, let's try our test run again with our Ray task. We'll need a new "try it" function, because of the different task invocation logic. This function doesn't need to be a Ray task, however, so no @ray.remote
decorator is required.
def ray_try_it(n, trials):
print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='') # str_large_n imported above.
start = time.time()
refs = [ray_estimate_pi.remote(n) for _ in range(trials)]
pis = ray.get(refs)
approx_pi = statistics.mean(pis)
stdev = statistics.stdev(pis)
duration = time.time() - start
error = (100.0*abs(approx_pi-np.pi)/np.pi)
print(fmt.format(duration, approx_pi, stdev, error)) # str_large_n imported above.
return trials, n, duration, approx_pi, stdev, error
ray_data_ns = [ray_try_it(n, trials) for n in Ns]
trials = 20, N = 10000: 0.12880 seconds: pi ~ 3.144740, stddev = 0.0150, error = 0.1002% trials = 20, N = 50000: 0.06693 seconds: pi ~ 3.138804, stddev = 0.0079, error = 0.0888% trials = 20, N = 100000: 0.02578 seconds: pi ~ 3.141124, stddev = 0.0048, error = 0.0149% trials = 20, N = 500000: 0.14190 seconds: pi ~ 3.141432, stddev = 0.0022, error = 0.0051% trials = 20, N = 1000000: 0.27268 seconds: pi ~ 3.141786, stddev = 0.0022, error = 0.0062%
ray_data_trials = [ray_try_it(maxN, trials) for trials in range(5,20,2)]
trials = 5, N = 1000000: 0.09289 seconds: pi ~ 3.142109, stddev = 0.0012, error = 0.0164% trials = 7, N = 1000000: 0.10057 seconds: pi ~ 3.140675, stddev = 0.0017, error = 0.0292% trials = 9, N = 1000000: 0.12925 seconds: pi ~ 3.141338, stddev = 0.0008, error = 0.0081% trials = 11, N = 1000000: 0.14865 seconds: pi ~ 3.141083, stddev = 0.0013, error = 0.0162% trials = 13, N = 1000000: 0.21580 seconds: pi ~ 3.141640, stddev = 0.0014, error = 0.0015% trials = 15, N = 1000000: 0.21099 seconds: pi ~ 3.141667, stddev = 0.0018, error = 0.0024% trials = 17, N = 1000000: 0.23532 seconds: pi ~ 3.142162, stddev = 0.0013, error = 0.0181% trials = 19, N = 1000000: 0.25824 seconds: pi ~ 3.141541, stddev = 0.0021, error = 0.0016%
The durations should be shorter than the non-Ray numbers. Let's graph our results and see. It will be easier if we first convert the *data_*
lists to NumPy arrays, so they are easier to slice.
np_data_ns = np.array(data_ns)
np_data_trials = np.array(data_trials)
np_ray_data_ns = np.array(ray_data_ns)
np_ray_data_trials = np.array(ray_data_trials)
from bokeh_util import two_lines_plot, means_stddevs_plot # Some plotting utilities in `./bokeh_util.py`.
from bokeh.plotting import show, figure
from bokeh.layouts import gridplot
First a linear plot of the results
two_lines = two_lines_plot(
"N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray',
np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2],
x_axis_type='linear', y_axis_type='linear')
show(two_lines, plot_width=800, plot_height=400)
(If you can't see it, click here. Note that this image is from a run that included the larger values for N
that are commented out in the definition of Ns
above.)
For relatively small N
values, the performance overhead of overhead of Ray is a larger percentage of the calculation, so the overall performance benefit is less. However, as N
increases, the advantage of Ray increases. Both plots are roughly-linear, because we are CPU bound, but Ray's execution time/N is lower. On a full cluster, the times could be dramatically better for larger N
.
A log-log plot shows the lower-N behavior more clearly:
two_lines = two_lines_plot(
"N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray',
np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2])
show(two_lines, plot_width=800, plot_height=400)
(If you can't see it, click here.)
What about execution times as a function of the number of trials, for a fixed N
?
two_lines = two_lines_plot(
"Trials (N=10,000,000) vs. Execution Times (Smaller Is Better)", 'Trials', 'Time', 'No Ray', 'Ray',
np_data_trials[:,0], np_data_trials[:,2], np_ray_data_trials[:,0], np_ray_data_trials[:,2],
x_axis_type='linear', y_axis_type='linear')
show(two_lines, plot_width=800, plot_height=400)
(If you can't see it, click here.)
Let's plot the approximate mean values and the standard deviations over the num_workers
trials for each N
.
pi_without_ray_plot = means_stddevs_plot(
np_data_ns[:,1], np_data_ns[:,3], np_data_ns[:,4], title = 'π Results without Ray')
# Use a grid to make it layout better.
pi_without_ray_grid = gridplot([[pi_without_ray_plot]], plot_width=1000, plot_height=400)
show(pi_without_ray_grid)
BokehDeprecationWarning: plot_width and plot_height was deprecated in Bokeh 2.4.0 and will be removed, use width or height instead.
(If you can't see it, click here.) You may have to use the "crossed-arrows" controls scroll horizontally (click and drag) to see all of the graph.
As you might expect, for low N
values, the error bars are large and the mean estimate is poor, but for higher N
, the errors grow smaller and results converge to the correct value.
With Ray, the plot will look similar, because we did the same calculation, just faster:
pi_with_ray_plot = means_stddevs_plot(
np_ray_data_ns[:,1], np_ray_data_ns[:,3], np_ray_data_ns[:,4], title = 'π Results with Ray')
# Use a grid to make it layout better.
pi_with_ray_grid = gridplot([[pi_with_ray_plot]], plot_width=1000, plot_height=400)
show(pi_with_ray_grid)
BokehDeprecationWarning: plot_width and plot_height was deprecated in Bokeh 2.4.0 and will be removed, use width or height instead.
(If you can't see it, click here.)
ray.get() vs. ray.wait()¶
Calling ray.get(ids)
blocks until all the tasks have completed that correspond to the input ids
. That has been fine for this tutorial so far, but what if you're waiting for a number of tasks, where some will finish more quickly than others? What if you would like to process the completed results as they become available, even while other tasks are still running? That's where ray.wait()
is recommended. Here we'll provide a brief example. For more details, see the Advanced Ray, Ray Tasks Revisited lesson.
@ray.remote
def ray_estimate_pi2(n, trial):
time.sleep(trial)
return n, trial, estimate_pi(n)
def ray_try_it2(ns, trials):
start = time.time()
refs = [ray_estimate_pi2.remote(n, trial) for trial in trials for n in ns]
still_running = list(refs)
while len(still_running) > 0:
finished, still_running = ray.wait(still_running)
ns_trials_pis = ray.get(finished) # won't block
print(f'{ns_trials_pis}, elapsed time = {time.time() - start} secs')
Observe what happens next:
ray_try_it2([100000,1000000,1000000], [2,4,6])
[(100000, 2, 3.13668)], elapsed time = 2.0221378803253174 secs [(1000000, 2, 3.142776)], elapsed time = 2.0723750591278076 secs [(1000000, 2, 3.14478)], elapsed time = 2.0806238651275635 secs [(100000, 4, 3.13804)], elapsed time = 4.022966146469116 secs [(1000000, 4, 3.138956)], elapsed time = 4.068006992340088 secs [(1000000, 4, 3.141256)], elapsed time = 4.074803113937378 secs [(100000, 6, 3.13828)], elapsed time = 6.025483131408691 secs [(1000000, 6, 3.140808)], elapsed time = 6.072616100311279 secs [(1000000, 6, 3.143632)], elapsed time = 6.079546928405762 secs
Exercises¶
Try one or more of the following exercises to practice improving scalable performance using Ray. In particular, think about the granularity of tasks where Ray is most beneficial.
See the solutions notebook for a discussion of solutions to these exercises.
Exercise 1¶
As currently written, the memory footprint of estimate_pi
scales linearly with N
, because it allocates two NumPy arrays of size N
. This limits the size of N
we can evaluate (as I confirmed by locking up my laptop...). However, this isn't actually necessary. We could do the same calculation in blocks, for example m
blocks of size N/m
and then combine the results. Furthermore, there's no dependencies between the calculations with those blocks, giving us further potential speed-up by parellelizing them with Ray.
Adapt ray_estimate_pi
to use this technique. Pick some N
value above which the calculation is done in blocks. Compare the performance of the old vs. new implementation.
As you do this exercise, you might ponder the fact that we often averaged multiple trials for a given N
and then ask yourself, what's the difference between averaging 10
trials for N = 1000
vs. 1
trial for N = 10000
, for example?
Exercise 2¶
What N
value is needed to get a reliable estimate to five decimal places, 3.1415
(for some definition of "reliable")? If you have a powerful machine or a cluster, you could try a higher accuracy. You'll need to use the solution to Exercise 1 or you can make a guess based on the results we've already seen in this notebook.
Exercise 3¶
For small computation problems, Ray adds enough overhead that its benefits are outweighed. You can see from the performance graphs above that smaller N
or smaller trial values will likely cause the curves to cross. Try small values of N
and small trial numbers. When do the lines cross? Try timing individual runs for small N
around the crossing point. What can you infer from this "tipping point" about appropriate sizing of tasks, at least for your test environment?
Run the next cell when you are finished with this notebook:
ray.shutdown() # "Undo ray.init()". Terminate all the processes started in this notebook.
The next lesson, Ray Actors, introduces Ray's tool for distributed computation with state, actors, which builds on the familiar Python concept of classes.