Ray Crash Course - Exploring Ray API Calls¶
© 2019-2021, Anyscale. All Rights Reserved
This lesson explores a few of the other API calls you might find useful, as well as options that can be used with the API calls we've already learned.
Tip: The Ray Package Reference in the Ray Docs is useful for exploring the API features we'll learn.
import ray, time, sys
import numpy as np
sys.path.append("..")
from util.printing import pd, pnd # convenience methods for printing results.
ray.init(ignore_reinit_error=True)
The Ray Dashboard URL is printed above. Use it on your laptop.
When using the Anyscale platform, use the URL provided by your instructor to access the Ray Dashboard.
ray.init()¶
When we used ray.init()
, we used it to start Ray on our local machine. When the optional address=...
argument is specified, the driver connects to the corresponding Ray cluster.
There are a lot of optional keyword arguments you can pass to ray.init()
. Here are some of them. All options are described in the documentation.
Name | Type | Example | Description |
---|---|---|---|
address |
str |
address='auto' |
The address of the Ray cluster to connect to. If this address is not provided, then this command will start Redis, a raylet, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits. If the driver is running on a node in a Ray cluster, using auto as the value tells the driver to detect the the cluster, removing the need to specify a specific node address. |
num_cpus |
int |
num_cpus=4 |
Number of CPUs the user wishes to assign to each raylet. |
num_gpus |
int |
num_gpus=1 |
Number of GPUs the user wishes to assign to each raylet. |
resources |
dictionary |
resources={'resource1': 4, 'resource2': 16} |
Maps the names of custom resources to the quantities of those resources available. |
memory |
int |
memory=1000000000 |
The amount of memory (in bytes) that is available for use by workers requesting memory resources. By default, this is automatically set based on the available system memory. |
object_store_memory |
int |
object_store_memory=1000000000 |
The amount of memory (in bytes) for the object store. By default, this is automatically set based on available system memory, subject to a 20GB cap. |
log_to_driver |
bool |
log_to_driver=True |
If true, then the output from all of the worker processes on all nodes will be directed to the driver program. |
local_mode |
bool |
local_mode=True |
If true, the code will be executed serially. This is useful for debugging. |
ignore_reinit_error |
bool |
ignore_reinit_error=True |
If true, Ray suppresses errors from calling ray.init() a second time (as we've done in these notebooks). Ray won't be restarted. |
include_webui |
bool |
include_webui=False |
Boolean flag indicating whether or not to start the web UI, which displays the status of the Ray cluster. By default, or if this argument is None , then the UI will be started if the relevant dependencies are present. |
webui_host |
address | webui_host=1.2.3.4 |
The host to bind the web UI server to. Can either be localhost (or 127.0.0.1 ) or 0.0.0.0 (available from all interfaces). By default, this is set to localhost to prevent access from external machines. |
configure_logging |
bool |
configure_logging=True |
If true (default), configuration of logging is allowed here. Otherwise, the user may want to configure it separately. |
logging_level |
Flag | logging_level=logging.INFO |
The logging level, defaults to logging.INFO . Ignored unless "configure_logging" is true. |
logging_format |
str |
logging_format='...' |
The logging format to use, defaults to a string containing a timestamp, filename, line number, and message. See the Ray source code ray_constants.py for details. Ignored unless "configure_logging" is true. |
temp_dir |
str |
temp_dir=/tmp/myray |
If provided, specifies the root temporary directory for the Ray process. Defaults to an OS-specific conventional location, e.g., /tmp/ray . |
See also the documentation for ray.shutdown(), which is needed in some contexts.
ray.is_initialized()¶
Is Ray initialized?
ray.is_initialized()
@ray.remote()¶
We've used @ray.remote a lot. You can pass arguments when using it. Here are some of them.
Name | Type | Example | Description |
---|---|---|---|
num_cpus |
int |
num_cpus=4 |
The number of CPU cores to reserve for this task or for the lifetime of the actor. |
num_gpus |
int |
num_gpus=1 |
The number of GPU cores to reserve for this task or for the lifetime of the actor. |
num_returns |
int |
num_returns=2 |
(Only for tasks, not actors.) The number of object refs returned by the remote function invocation. |
resources |
map |
resources={'flibberts': 5} |
The quantity of various custom resources to reserve for this task or for the lifetime of the actor. This is a dictionary mapping strings (resource names) to numbers. |
max_calls |
int |
max_calls=5 |
Only for remote tasks. This specifies the maximum of times that a given worker can execute the given remote function before it must exit (this can be used to address memory leaks in third-party libraries or to reclaim resources that cannot easily be released, e.g., GPU memory that was acquired by TensorFlow). By default this is infinite. |
max_restarts |
int |
max_restarts=-1 |
Only for actors. This specifies the maximum number of times that the actor should be restarted when it dies unexpectedly. The minimum valid value is 0 (default), which indicates that the actor doesn't need to be restarted. A value of -1 indicates that an actor should be restarted indefinitely. |
max_task_retries |
int |
max_task_retries=-1 |
Only for actors. How many times to retry an actor task if the task fails due to a system error, e.g., the actor has died. If set to -1, the system will retry the failed task until the task succeeds, or the actor has reached its max_restarts limit. If set to n > 0, the system will retry the failed task up to n times, after which the task will throw a RayActorError exception upon ray.get . Note that Python exceptions are not considered system errors and will not trigger retries. |
max_retries |
int |
max_retries=-1 |
Only for remote functions. This specifies the maximum number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, the default is 4 (default), and a value of -1 indicates infinite retries. |
Here's an example with and without num_return_vals
:
@ray.remote(num_returns=3)
def tuple3(one, two, three):
return (one, two, three)
x_ref, y_ref, z_ref = tuple3.remote("a", 1, 2.2)
x, y, z = ray.get([x_ref, y_ref, z_ref])
print(f'({x}, {y}, {z})')
@ray.remote
def tuple3(one, two, three):
return (one, two, three)
xyz_ref = tuple3.remote("a", 1, 2.2)
x, y, z = ray.get(xyz_ref)
print(f'({x}, {y}, {z})')
@ray.method()¶
Related to @ray.remote()
, @ray.method() allows you to specify the number of return values for a method in an actor, by passing the num_returns
keyword argument. None of the other @ray.remote()
keyword arguments are allowed. Here is an example:
@ray.remote
class Tupleator:
@ray.method(num_returns=3)
def tuple3(self, one, two, three):
return (one, two, three)
tupleator = Tupleator.remote()
x_ref, y_ref, z_ref = tupleator.tuple3.remote("a", 1, 2.2)
x, y, z = ray.get([x_ref, y_ref, z_ref])
print(f'({x}, {y}, {z})')
ref = ray.put("Hello World!")
print(f'Object returned: {ray.get(ref)}')
There is an optional flag you can pass weakref=True
(defaults to False
). If true, Ray is allowed to evict the object while a reference to the returned ref still exists. This is useful if you are putting a lot of objects into the object store and many of them might not be needed in the future. It allows Ray to more aggressively reclaim memory.
Fetching Information¶
Many methods return information:
Method | Brief Description |
---|---|
ray.get_gpu_ids() |
GPUs |
ray.nodes() |
Cluster nodes |
ray.cluster_resources() |
All the available resources, used or not |
ray.available_resources() |
Resources not in use |
print(f"""
ray.get_gpu_ids(): {ray.get_gpu_ids()}
ray.nodes(): {ray.nodes()}
ray.cluster_resources(): {ray.cluster_resources()}
ray.available_resources(): {ray.available_resources()}
""")
Recall that we used ray.nodes()[0]['Resources']['CPU']
in the second lesson to determine the number of CPU cores on our machines:
import json
ray.nodes()[0]['Resources']['CPU']
ray.timeline()¶
Sometimes you need to find task bottlenecks. ray.timeline()
helps. It returns a list of profiling events that can be viewed as a timeline. To use the results, the easiest method is to dump the data to a JSON file by passing in filename=...
argument. Or, you can call json.dump(filename)
on the returned object. In either case, open chrome://tracing in a Chrome browser window (only Chrome works) and load the dumped file.
ray.shutdown()¶
Shutdown this instance of Ray or disconnect from the cluster.
ray.shutdown() # "Undo ray.init()".
The next lesson, Running Ray Clusters takes a brief look at the Ray CLI commands for running Ray clusters.