Ray: A Distributed Execution Framework for AI Applications
This post announces Ray, a framework for efficiently running Python code on clusters and large multi-core machines. The project is open source. You can check out the code and the documentation.
Many AI algorithms are computationally intensive and exhibit complex communication patterns. As a result, many researchers spend most of their time building custom systems to efficiently distribute their code across clusters of machines.
However, the resulting systems are often specific to a single algorithm or class of algorithms. We built Ray to help eliminate a bunch of the redundant engineering effort that is currently repeated over and over for each new algorithm. Our hope is that a few basic primitives can be reused to implement and to efficiently execute a broad range of algorithms and applications.
Simple Parallelization of Existing Code
Ray enables Python functions to be executed remotely with minimal modifications.
With regular Python, when you call a function, the call blocks until the function has been executed. This example would take 8 seconds to execute.
With Ray, when you call a remote function, the call immediately returns a future (we will refer to these as object IDs). A task is then created, scheduled, and executed somewhere in the cluster. This example would take 1 second to execute.
Note that the only changes are that we add the @ray.remote
decorator to the
function definition, we call the function with f.remote()
, and we call
ray.get
on the list of object IDs (remember that object IDs are futures) in
order to block until the corresponding tasks have finished executing.
Flexible Encoding of Task Dependencies
In contrast with bulk-synchronous parallel frameworks like MapReduce or Apache Spark, Ray is designed to support AI applications which require fine-grained task dependencies. In contrast with the computation of aggregate statistics of an entire dataset, a training procedure may operate on a small subset of data or on the outputs of a handful of tasks.
Dependencies can be encoded by passing object IDs (which are the outputs of tasks) into other tasks.
By passing the outputs of some calls to aggregate_data
into subsequent calls
to aggregate_data
, we encode dependencies between these tasks which can be
used by the system to make scheduling decisions and to coordinate the transfer
of objects. Note that when object IDs are passed into remote function calls, the
actual values will be unpacked before the function is executed, so when the
aggregate_data
function is executed, x
and y
will be numpy arrays.
Shared Mutable State with Actors
Ray uses actors to share mutable state between tasks. Here is an example in which multiple tasks share the state of an Atari simulator. Each task runs the simulator for several steps picking up where the previous task left off.
Each call to simulator.step.remote
generates a task that is scheduled on the
actor. These tasks mutate the state of the simulator object, and they are
executed one at a time.
Like remote functions, actor methods return object IDs (that is, futures) that
can be passed into other tasks and whose values can be retrieved with ray.get
.
Waiting for a Subset of Tasks to Finish
Sometimes when running tasks with variable durations, we don’t want to wait for all of the tasks to finish. Instead, we may wish to wait for half of the tasks to finish or perhaps to use whichever tasks have completed after one second.
In this example ready_ids
is a list of object IDs whose corresponding tasks
have finished executing, and remaining_ids
is a list of the remaining object
IDs.
This primitive makes it easy to implement other behaviors, for example we may wish to process some tasks in the order that they complete.
Note that it would be straightforward to modify the above example to adaptively launch new tasks whenever a previous one completes.
Efficient Shared Memory and Serialization with Apache Arrow
Serializing and deserializing data is often a bottleneck in distributed computing. Ray lets worker processes on the same machine access the same objects through shared memory. To facilitate this, Ray uses an in-memory object store on each machine to serve objects.
To illustrate the problem, suppose we create some neural network weights and wish to ship them from one Python process to another.
To ship the neural network weights around, we need to first serialize them into a contiguous blob of bytes. This can be done with standard serialization libraries like pickle.
The time required for deserialization is particularly important because one of the most common patterns in machine learning is to aggregate a large number of values (for example, neural net weights, rollouts, or other values) in a single process, so the deserialization step could happen hundreds of times in a row.
To minimize the time required to deserialize objects in shared memory, we use the Apache Arrow data layout. This allows us to compute offsets into the serialized blob without scanning through the entire blob. In practice, this can translate into deserialization that is several orders of magnitude faster.
The call to ray.put
serializes the weights using Arrow and copies the result
into the object store’s memory. The call to ray.get
then deserializes the
serialized object and constructs a new dictionary of numpy arrays. However, the
underlying arrays backing the numpy arrays live in shared memory and are not
copied into the Python process’s heap.
Note that if the call to ray.get
happens from a different machine, the
relevant serialized object will be copied from a machine where it lives to the
machine where it is needed.
In this example, we call ray.put
explicitly. However, normally this call would
happen under the hood when a Python object is passed into a remote function or
returned from a remote function.
Feedback is Appreciated
This project is in its early stages. If you try it out, we’d love to hear your thoughts and suggestions.