execution
Package
ray.rllib.execution.concurrency_ops.Concurrently(ops, *, mode='round_robin', output_indexes=None, round_robin_weights=None)
Operator that runs the given parent iterators concurrently.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
One of 'round_robin', 'async'. In 'round_robin' mode, we alternate between pulling items from each parent iterator in order deterministically. In 'async' mode, we pull from each parent iterator as fast as they are produced. This is non-deterministic. |
'round_robin' |
output_indexes |
list |
If specified, only output results from the
given ops. For example, if |
None |
round_robin_weights |
list |
List of weights to use for round robin
mode. For example, |
None |
Examples:
>>> sim_op = ParallelRollouts(...).for_each(...)
>>> replay_op = LocalReplay(...).for_each(...)
>>> combined_op = Concurrently([sim_op, replay_op], mode="async")
Source code in ray/rllib/execution/concurrency_ops.py
def Concurrently(ops: List[LocalIterator],
*,
mode: str = "round_robin",
output_indexes: Optional[List[int]] = None,
round_robin_weights: Optional[List[int]] = None
) -> LocalIterator[SampleBatchType]:
"""Operator that runs the given parent iterators concurrently.
Args:
mode (str): One of 'round_robin', 'async'. In 'round_robin' mode,
we alternate between pulling items from each parent iterator in
order deterministically. In 'async' mode, we pull from each parent
iterator as fast as they are produced. This is non-deterministic.
output_indexes (list): If specified, only output results from the
given ops. For example, if ``output_indexes=[0]``, only results
from the first op in ops will be returned.
round_robin_weights (list): List of weights to use for round robin
mode. For example, ``[2, 1]`` will cause the iterator to pull twice
as many items from the first iterator as the second. ``[2, 1, *]``
will cause as many items to be pulled as possible from the third
iterator without blocking. This is only allowed in round robin
mode.
Examples:
>>> sim_op = ParallelRollouts(...).for_each(...)
>>> replay_op = LocalReplay(...).for_each(...)
>>> combined_op = Concurrently([sim_op, replay_op], mode="async")
"""
if len(ops) < 2:
raise ValueError("Should specify at least 2 ops.")
if mode == "round_robin":
deterministic = True
elif mode == "async":
deterministic = False
if round_robin_weights:
raise ValueError(
"round_robin_weights cannot be specified in async mode")
else:
raise ValueError("Unknown mode {}".format(mode))
if round_robin_weights and all(r == "*" for r in round_robin_weights):
raise ValueError("Cannot specify all round robin weights = *")
if output_indexes:
for i in output_indexes:
assert i in range(len(ops)), ("Index out of range", i)
def tag(op, i):
return op.for_each(lambda x: (i, x))
ops = [tag(op, i) for i, op in enumerate(ops)]
output = ops[0].union(
*ops[1:],
deterministic=deterministic,
round_robin_weights=round_robin_weights)
if output_indexes:
output = (output.filter(lambda tup: tup[0] in output_indexes).for_each(
lambda tup: tup[1]))
return output
ray.rllib.execution.concurrency_ops.Enqueue
Enqueue data items into a queue.Queue instance.
Returns the input item as output.
The enqueue is non-blocking, so Enqueue operations can executed with Dequeue via the Concurrently() operator.
Examples:
>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)
ray.rllib.execution.concurrency_ops.Dequeue(input_queue, check=<function <lambda> at 0x11d98b710>)
Dequeue data items from a queue.Queue instance.
The dequeue is non-blocking, so Dequeue operations can execute with Enqueue via the Concurrently() operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_queue |
Queue |
queue to pull items from. |
required |
check |
fn |
liveness check. When this function returns false, Dequeue() will raise an error to halt execution. |
<function <lambda> at 0x11d98b710> |
Examples:
>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)
Source code in ray/rllib/execution/concurrency_ops.py
def Dequeue(input_queue: queue.Queue,
check=lambda: True) -> LocalIterator[SampleBatchType]:
"""Dequeue data items from a queue.Queue instance.
The dequeue is non-blocking, so Dequeue operations can execute with
Enqueue via the Concurrently() operator.
Args:
input_queue (Queue): queue to pull items from.
check (fn): liveness check. When this function returns false,
Dequeue() will raise an error to halt execution.
Examples:
>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)
"""
if not isinstance(input_queue, queue.Queue):
raise ValueError("Expected queue.Queue, got {}".format(
type(input_queue)))
def base_iterator(timeout=None):
while check():
try:
item = input_queue.get(timeout=0.001)
yield item
except queue.Empty:
yield _NextValueNotReady()
raise RuntimeError("Dequeue `check()` returned False! "
"Exiting with Exception from Dequeue iterator.")
return LocalIterator(base_iterator, SharedMetrics())
ray.rllib.execution.learner_thread.LearnerThread (Thread)
Background thread that updates the local model from sample trajectories.
The learner thread communicates with the main thread through Queues. This is needed since Ray operations can only be run on the main thread. In addition, moving heavyweight gradient ops session runs off the main thread improves overall throughput.
__init__(self, local_worker, minibatch_buffer_size, num_sgd_iter, learner_queue_size, learner_queue_timeout)
special
Initialize the learner thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_worker |
RolloutWorker |
process local rollout worker holding policies this thread will call learn_on_batch() on |
required |
minibatch_buffer_size |
int |
max number of train batches to store in the minibatching buffer |
required |
num_sgd_iter |
int |
number of passes to learn on per train batch |
required |
learner_queue_size |
int |
max size of queue of inbound train batches to this thread |
required |
learner_queue_timeout |
int |
raise an exception if the queue has been empty for this long in seconds |
required |
Source code in ray/rllib/execution/learner_thread.py
def __init__(self, local_worker: RolloutWorker, minibatch_buffer_size: int,
num_sgd_iter: int, learner_queue_size: int,
learner_queue_timeout: int):
"""Initialize the learner thread.
Args:
local_worker (RolloutWorker): process local rollout worker holding
policies this thread will call learn_on_batch() on
minibatch_buffer_size (int): max number of train batches to store
in the minibatching buffer
num_sgd_iter (int): number of passes to learn on per train batch
learner_queue_size (int): max size of queue of inbound
train batches to this thread
learner_queue_timeout (int): raise an exception if the queue has
been empty for this long in seconds
"""
threading.Thread.__init__(self)
self.learner_queue_size = WindowStat("size", 50)
self.local_worker = local_worker
self.inqueue = queue.Queue(maxsize=learner_queue_size)
self.outqueue = queue.Queue()
self.minibatch_buffer = MinibatchBuffer(
inqueue=self.inqueue,
size=minibatch_buffer_size,
timeout=learner_queue_timeout,
num_passes=num_sgd_iter,
init_num_passes=num_sgd_iter)
self.queue_timer = TimerStat()
self.grad_timer = TimerStat()
self.load_timer = TimerStat()
self.load_wait_timer = TimerStat()
self.daemon = True
self.weights_updated = False
self.learner_info = {}
self.stopped = False
self.num_steps = 0
add_learner_metrics(self, result)
Add internal metrics to a trainer result dict.
Source code in ray/rllib/execution/learner_thread.py
def add_learner_metrics(self, result: Dict) -> Dict:
"""Add internal metrics to a trainer result dict."""
def timer_to_ms(timer):
return round(1000 * timer.mean, 3)
result["info"].update({
"learner_queue": self.learner_queue_size.stats(),
LEARNER_INFO: copy.deepcopy(self.learner_info),
"timing_breakdown": {
"learner_grad_time_ms": timer_to_ms(self.grad_timer),
"learner_load_time_ms": timer_to_ms(self.load_timer),
"learner_load_wait_time_ms": timer_to_ms(self.load_wait_timer),
"learner_dequeue_time_ms": timer_to_ms(self.queue_timer),
}
})
return result
run(self)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
ray.rllib.execution.metric_ops.StandardMetricsReporting(train_op, workers, config, selected_workers=None, by_steps_trained=False)
Operator to periodically collect and report metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
train_op |
LocalIterator |
Operator for executing training steps. We ignore the output values. |
required |
workers |
WorkerSet |
Rollout workers to collect metrics from. |
required |
config |
dict |
Trainer configuration, used to determine the frequency of stats reporting. |
required |
selected_workers |
list |
Override the list of remote workers to collect metrics from. |
None |
by_steps_trained |
bool |
If True, uses the |
False |
Returns:
Type | Description |
---|---|
LocalIterator[dict] |
A local iterator over training results. |
Examples:
>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> metrics_op = StandardMetricsReporting(train_op, workers, config)
>>> next(metrics_op)
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
Source code in ray/rllib/execution/metric_ops.py
def StandardMetricsReporting(
train_op: LocalIterator[Any],
workers: WorkerSet,
config: dict,
selected_workers: List[ActorHandle] = None,
by_steps_trained: bool = False,
) -> LocalIterator[dict]:
"""Operator to periodically collect and report metrics.
Args:
train_op (LocalIterator): Operator for executing training steps.
We ignore the output values.
workers (WorkerSet): Rollout workers to collect metrics from.
config (dict): Trainer configuration, used to determine the frequency
of stats reporting.
selected_workers (list): Override the list of remote workers
to collect metrics from.
by_steps_trained (bool): If True, uses the `STEPS_TRAINED_COUNTER`
instead of the `STEPS_SAMPLED_COUNTER` in metrics.
Returns:
LocalIterator[dict]: A local iterator over training results.
Examples:
>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> metrics_op = StandardMetricsReporting(train_op, workers, config)
>>> next(metrics_op)
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
"""
output_op = train_op \
.filter(OncePerTimestepsElapsed(config["timesteps_per_iteration"],
by_steps_trained=by_steps_trained)) \
.filter(OncePerTimeInterval(config["min_iter_time_s"])) \
.for_each(CollectMetrics(
workers,
min_history=config["metrics_smoothing_episodes"],
timeout_seconds=config["collect_metrics_timeout"],
selected_workers=selected_workers))
return output_op
ray.rllib.execution.metric_ops.CollectMetrics
Callable that collects metrics from workers.
The metrics are smoothed over a given history window.
This should be used with the .for_each() operator. For a higher level API, consider using StandardMetricsReporting instead.
Examples:
>>> output_op = train_op.for_each(CollectMetrics(workers))
>>> print(next(output_op))
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
ray.rllib.execution.metric_ops.OncePerTimeInterval
Callable that returns True once per given interval.
This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.
Examples:
>>> throttled_op = train_op.filter(OncePerTimeInterval(5))
>>> start = time.time()
>>> next(throttled_op)
>>> print(time.time() - start)
5.00001 # will be greater than 5 seconds
ray.rllib.execution.metric_ops.OncePerTimestepsElapsed
Callable that returns True once per given number of timesteps.
This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.
Examples:
>>> throttled_op = train_op.filter(OncePerTimestepsElapsed(1000))
>>> next(throttled_op)
# will only return after 1000 steps have elapsed
__init__(self, delay_steps, by_steps_trained=False)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
delay_steps |
int |
The number of steps (sampled or trained) every which this op returns True. |
required |
by_steps_trained |
bool |
If True, uses the |
False |
Source code in ray/rllib/execution/metric_ops.py
def __init__(self, delay_steps: int, by_steps_trained: bool = False):
"""
Args:
delay_steps (int): The number of steps (sampled or trained) every
which this op returns True.
by_steps_trained (bool): If True, uses the `STEPS_TRAINED_COUNTER`
instead of the `STEPS_SAMPLED_COUNTER` in metrics.
"""
self.delay_steps = delay_steps
self.by_steps_trained = by_steps_trained
self.last_called = 0
ray.rllib.execution.multi_gpu_learner_thread.MultiGPULearnerThread (LearnerThread)
Learner that can use multiple GPUs and parallel loading.
This class is used for async sampling algorithms.
Example workflow: 2 GPUs and 3 multi-GPU tower stacks. -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.
Workers collect data from env and push it into inqueue: Workers -> (data) -> self.inqueue
We also have two queues, indicating, which stacks are loaded and which are not. - idle_tower_stacks = [0, 1, 2] <- all 3 stacks are free at first. - ready_tower_stacks = [] <- None of the 3 stacks is loaded with data.
ready_tower_stacks
is managed by ready_tower_stacks_buffer
for
possible minibatch-SGD iterations per loaded batch (this avoids a reload
from CPU to GPU for each SGD iter).
n _MultiGPULoaderThreads: self.inqueue -get()-> policy.load_batch_into_buffer() -> ready_stacks = [0 ...]
This thread: self.ready_tower_stacks_buffer -get()-> policy.learn_on_loaded_batch() -> if SGD-iters done, put stack index back in idle_tower_stacks queue.
__init__(self, local_worker, num_gpus=1, lr=None, train_batch_size=500, num_multi_gpu_tower_stacks=1, num_sgd_iter=1, learner_queue_size=16, learner_queue_timeout=300, num_data_load_threads=16, _fake_gpus=False, minibatch_buffer_size=None)
special
Initializes a MultiGPULearnerThread instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_worker |
RolloutWorker |
Local RolloutWorker holding
policies this thread will call |
required |
num_gpus |
int |
Number of GPUs to use for data-parallel SGD. |
1 |
train_batch_size |
int |
Size of batches (minibatches if
|
500 |
num_multi_gpu_tower_stacks |
int |
Number of buffers to parallelly
load data into on one device. Each buffer is of size of
|
1 |
num_sgd_iter |
int |
Number of passes to learn on per train batch
(minibatch if |
1 |
learner_queue_size |
int |
Max size of queue of inbound train batches to this thread. |
16 |
num_data_load_threads |
int |
Number of threads to use to load data into GPU memory in parallel. |
16 |
Source code in ray/rllib/execution/multi_gpu_learner_thread.py
def __init__(
self,
local_worker: RolloutWorker,
num_gpus: int = 1,
lr=None, # deprecated.
train_batch_size: int = 500,
num_multi_gpu_tower_stacks: int = 1,
num_sgd_iter: int = 1,
learner_queue_size: int = 16,
learner_queue_timeout: int = 300,
num_data_load_threads: int = 16,
_fake_gpus: bool = False,
# Deprecated arg, use
minibatch_buffer_size=None,
):
"""Initializes a MultiGPULearnerThread instance.
Args:
local_worker (RolloutWorker): Local RolloutWorker holding
policies this thread will call `load_batch_into_buffer` and
`learn_on_loaded_batch` on.
num_gpus (int): Number of GPUs to use for data-parallel SGD.
train_batch_size (int): Size of batches (minibatches if
`num_sgd_iter` > 1) to learn on.
num_multi_gpu_tower_stacks (int): Number of buffers to parallelly
load data into on one device. Each buffer is of size of
`train_batch_size` and hence increases GPU memory usage
accordingly.
num_sgd_iter (int): Number of passes to learn on per train batch
(minibatch if `num_sgd_iter` > 1).
learner_queue_size (int): Max size of queue of inbound
train batches to this thread.
num_data_load_threads (int): Number of threads to use to load
data into GPU memory in parallel.
"""
# Deprecated: No need to specify as we don't need the actual
# minibatch-buffer anyways.
if minibatch_buffer_size:
deprecation_warning(
old="MultiGPULearnerThread.minibatch_buffer_size",
error=False,
)
super().__init__(
local_worker=local_worker,
minibatch_buffer_size=0,
num_sgd_iter=num_sgd_iter,
learner_queue_size=learner_queue_size,
learner_queue_timeout=learner_queue_timeout,
)
# Delete reference to parent's minibatch_buffer, which is not needed.
# Instead, in multi-GPU mode, we pull tower stack indices from the
# `self.ready_tower_stacks_buffer` buffer, whose size is exactly
# `num_multi_gpu_tower_stacks`.
self.minibatch_buffer = None
self.train_batch_size = train_batch_size
self.policy_map = self.local_worker.policy_map
self.devices = next(iter(self.policy_map.values())).devices
logger.info("MultiGPULearnerThread devices {}".format(self.devices))
assert self.train_batch_size % len(self.devices) == 0
assert self.train_batch_size >= len(self.devices),\
"batch too small"
self.tower_stack_indices = list(range(num_multi_gpu_tower_stacks))
# Two queues for tower stacks:
# a) Those that are loaded with data ("ready")
# b) Those that are ready to be loaded with new data ("idle").
self.idle_tower_stacks = queue.Queue()
self.ready_tower_stacks = queue.Queue()
# In the beginning, all stacks are idle (no loading has taken place
# yet).
for idx in self.tower_stack_indices:
self.idle_tower_stacks.put(idx)
# Start n threads that are responsible for loading data into the
# different (idle) stacks.
for i in range(num_data_load_threads):
self.loader_thread = _MultiGPULoaderThread(
self, share_stats=(i == 0))
self.loader_thread.start()
# Create a buffer that holds stack indices that are "ready"
# (loaded with data). Those are stacks that we can call
# "learn_on_loaded_batch" on.
self.ready_tower_stacks_buffer = MinibatchBuffer(
self.ready_tower_stacks, num_multi_gpu_tower_stacks,
learner_queue_timeout, num_sgd_iter)
ray.rllib.execution.replay_ops.StoreToReplayBuffer
Callable that stores data into replay buffer actors.
If constructed with a local replay actor, data will be stored into that buffer. If constructed with a list of replay actor handles, data will be stored randomly among those actors.
This should be used with the .for_each() operator on a rollouts iterator. The batch that was stored is returned.
Examples:
>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> rollouts = ParallelRollouts(...)
>>> store_op = rollouts.for_each(StoreToReplayActors(actors=actors))
>>> next(store_op)
SampleBatch(...)
__init__(self, *, local_buffer=None, actors=None)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_buffer |
LocalReplayBuffer |
The local replay buffer to store the data into. |
None |
actors |
Optional[List[ActorHandle]] |
An optional list of replay
actors to use instead of |
None |
Source code in ray/rllib/execution/replay_ops.py
def __init__(
self,
*,
local_buffer: Optional[LocalReplayBuffer] = None,
actors: Optional[List[ActorHandle]] = None,
):
"""
Args:
local_buffer (LocalReplayBuffer): The local replay buffer to store
the data into.
actors (Optional[List[ActorHandle]]): An optional list of replay
actors to use instead of `local_buffer`.
"""
if bool(local_buffer) == bool(actors):
raise ValueError(
"Either `local_buffer` or `replay_actors` must be given, "
"not both!")
if local_buffer:
self.local_actor = local_buffer
self.replay_actors = None
else:
self.local_actor = None
self.replay_actors = actors
ray.rllib.execution.replay_ops.Replay(*, local_buffer=None, actors=None, num_async=4)
Replay experiences from the given buffer or actors.
This should be combined with the StoreToReplayActors operation using the Concurrently() operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_buffer |
LocalReplayBuffer |
Local buffer to use. Only one of this and replay_actors can be specified. |
None |
actors |
list |
List of replay actors. Only one of this and local_buffer can be specified. |
None |
num_async |
int |
In async mode, the max number of async requests in flight per actor. |
4 |
Examples:
>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> replay_op = Replay(actors=actors)
>>> next(replay_op)
SampleBatch(...)
Source code in ray/rllib/execution/replay_ops.py
def Replay(*,
local_buffer: LocalReplayBuffer = None,
actors: List[ActorHandle] = None,
num_async: int = 4) -> LocalIterator[SampleBatchType]:
"""Replay experiences from the given buffer or actors.
This should be combined with the StoreToReplayActors operation using the
Concurrently() operator.
Args:
local_buffer (LocalReplayBuffer): Local buffer to use. Only one of this
and replay_actors can be specified.
actors (list): List of replay actors. Only one of this and
local_buffer can be specified.
num_async (int): In async mode, the max number of async
requests in flight per actor.
Examples:
>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> replay_op = Replay(actors=actors)
>>> next(replay_op)
SampleBatch(...)
"""
if bool(local_buffer) == bool(actors):
raise ValueError(
"Exactly one of local_buffer and replay_actors must be given.")
if actors:
replay = from_actors(actors)
return replay.gather_async(
num_async=num_async).filter(lambda x: x is not None)
def gen_replay(_):
while True:
item = local_buffer.replay()
if item is None:
yield _NextValueNotReady()
else:
yield item
return LocalIterator(gen_replay, SharedMetrics())
ray.rllib.execution.replay_ops.SimpleReplayBuffer
Simple replay buffer that operates over batches.
__init__(self, num_slots, replay_proportion=None)
special
Initialize SimpleReplayBuffer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_slots |
int |
Number of batches to store in total. |
required |
ray.rllib.execution.replay_ops.MixInReplay
This operator adds replay to a stream of experiences.
It takes input batches, and returns a list of batches that include replayed data as well. The number of replayed batches is determined by the configured replay proportion. The max age of a batch is determined by the number of replay slots.
__init__(self, num_slots, replay_proportion)
special
Initialize MixInReplay.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_slots |
int |
Number of batches to store in total. |
required |
replay_proportion |
float |
The input batch will be returned and an additional number of batches proportional to this value will be added as well. |
required |
Examples:
replay proportion 2:1
>>> replay_op = MixInReplay(rollouts, 100, replay_proportion=2)
>>> print(next(replay_op))
[SampleBatch(<input>), SampleBatch(<replay>), SampleBatch(<rep.>)]
replay proportion 0:1, replay disabled
>>> replay_op = MixInReplay(rollouts, 100, replay_proportion=0)
>>> print(next(replay_op))
[SampleBatch(<input>)]
Source code in ray/rllib/execution/replay_ops.py
def __init__(self, num_slots: int, replay_proportion: float):
"""Initialize MixInReplay.
Args:
num_slots (int): Number of batches to store in total.
replay_proportion (float): The input batch will be returned
and an additional number of batches proportional to this value
will be added as well.
Examples:
# replay proportion 2:1
>>> replay_op = MixInReplay(rollouts, 100, replay_proportion=2)
>>> print(next(replay_op))
[SampleBatch(<input>), SampleBatch(<replay>), SampleBatch(<rep.>)]
# replay proportion 0:1, replay disabled
>>> replay_op = MixInReplay(rollouts, 100, replay_proportion=0)
>>> print(next(replay_op))
[SampleBatch(<input>)]
"""
if replay_proportion > 0 and num_slots == 0:
raise ValueError(
"You must set num_slots > 0 if replay_proportion > 0.")
self.replay_buffer = SimpleReplayBuffer(num_slots)
self.replay_proportion = replay_proportion
ray.rllib.execution.rollout_ops.ParallelRollouts(workers, *, mode='bulk_sync', num_async=1)
Operator to collect experiences in parallel from rollout workers.
If there are no remote workers, experiences will be collected serially from the local worker instance instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workers |
WorkerSet |
set of rollout workers to use. |
required |
mode |
str |
One of 'async', 'bulk_sync', 'raw'. In 'async' mode, batches are returned as soon as they are computed by rollout workers with no order guarantees. In 'bulk_sync' mode, we collect one batch from each worker and concatenate them together into a large batch to return. In 'raw' mode, the ParallelIterator object is returned directly and the caller is responsible for implementing gather and updating the timesteps counter. |
'bulk_sync' |
num_async |
int |
In async mode, the max number of async requests in flight per actor. |
1 |
Returns:
Type | Description |
---|---|
ray.util.iter.LocalIterator[ray.rllib.policy.sample_batch.SampleBatch] |
A local iterator over experiences collected in parallel. |
Examples:
>>> rollouts = ParallelRollouts(workers, mode="async")
>>> batch = next(rollouts)
>>> print(batch.count)
50 # config.rollout_fragment_length
>>> rollouts = ParallelRollouts(workers, mode="bulk_sync")
>>> batch = next(rollouts)
>>> print(batch.count)
200 # config.rollout_fragment_length * config.num_workers
Updates the STEPS_SAMPLED_COUNTER counter in the local iterator context.
Source code in ray/rllib/execution/rollout_ops.py
def ParallelRollouts(workers: WorkerSet, *, mode="bulk_sync",
num_async=1) -> LocalIterator[SampleBatch]:
"""Operator to collect experiences in parallel from rollout workers.
If there are no remote workers, experiences will be collected serially from
the local worker instance instead.
Args:
workers (WorkerSet): set of rollout workers to use.
mode (str): One of 'async', 'bulk_sync', 'raw'. In 'async' mode,
batches are returned as soon as they are computed by rollout
workers with no order guarantees. In 'bulk_sync' mode, we collect
one batch from each worker and concatenate them together into a
large batch to return. In 'raw' mode, the ParallelIterator object
is returned directly and the caller is responsible for implementing
gather and updating the timesteps counter.
num_async (int): In async mode, the max number of async
requests in flight per actor.
Returns:
A local iterator over experiences collected in parallel.
Examples:
>>> rollouts = ParallelRollouts(workers, mode="async")
>>> batch = next(rollouts)
>>> print(batch.count)
50 # config.rollout_fragment_length
>>> rollouts = ParallelRollouts(workers, mode="bulk_sync")
>>> batch = next(rollouts)
>>> print(batch.count)
200 # config.rollout_fragment_length * config.num_workers
Updates the STEPS_SAMPLED_COUNTER counter in the local iterator context.
"""
# Ensure workers are initially in sync.
workers.sync_weights()
def report_timesteps(batch):
metrics = _get_shared_metrics()
metrics.counters[STEPS_SAMPLED_COUNTER] += batch.count
if isinstance(batch, MultiAgentBatch):
metrics.counters[AGENT_STEPS_SAMPLED_COUNTER] += \
batch.agent_steps()
else:
metrics.counters[AGENT_STEPS_SAMPLED_COUNTER] += batch.count
return batch
if not workers.remote_workers():
# Handle the `num_workers=0` case, in which the local worker
# has to do sampling as well.
def sampler(_):
while True:
yield workers.local_worker().sample()
return (LocalIterator(sampler,
SharedMetrics()).for_each(report_timesteps))
# Create a parallel iterator over generated experiences.
rollouts = from_actors(workers.remote_workers())
if mode == "bulk_sync":
return rollouts \
.batch_across_shards() \
.for_each(lambda batches: SampleBatch.concat_samples(batches)) \
.for_each(report_timesteps)
elif mode == "async":
return rollouts.gather_async(
num_async=num_async).for_each(report_timesteps)
elif mode == "raw":
return rollouts
else:
raise ValueError("mode must be one of 'bulk_sync', 'async', 'raw', "
"got '{}'".format(mode))
ray.rllib.execution.rollout_ops.AsyncGradients(workers)
Operator to compute gradients in parallel from rollout workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workers |
WorkerSet |
set of rollout workers to use. |
required |
Returns:
Type | Description |
---|---|
ray.util.iter.LocalIterator[Tuple[Union[List[Tuple[Any, Any]], List[Any]], int]] |
A local iterator over policy gradients computed on rollout workers. |
Examples:
>>> grads_op = AsyncGradients(workers)
>>> print(next(grads_op))
{"var_0": ..., ...}, 50 # grads, batch count
Updates the STEPS_SAMPLED_COUNTER counter and LEARNER_INFO field in the local iterator context.
Source code in ray/rllib/execution/rollout_ops.py
def AsyncGradients(
workers: WorkerSet) -> LocalIterator[Tuple[ModelGradients, int]]:
"""Operator to compute gradients in parallel from rollout workers.
Args:
workers (WorkerSet): set of rollout workers to use.
Returns:
A local iterator over policy gradients computed on rollout workers.
Examples:
>>> grads_op = AsyncGradients(workers)
>>> print(next(grads_op))
{"var_0": ..., ...}, 50 # grads, batch count
Updates the STEPS_SAMPLED_COUNTER counter and LEARNER_INFO field in the
local iterator context.
"""
# Ensure workers are initially in sync.
workers.sync_weights()
# This function will be applied remotely on the workers.
def samples_to_grads(samples):
return get_global_worker().compute_gradients(samples), samples.count
# Record learner metrics and pass through (grads, count).
class record_metrics:
def _on_fetch_start(self):
self.fetch_start_time = time.perf_counter()
def __call__(self, item):
(grads, info), count = item
metrics = _get_shared_metrics()
metrics.counters[STEPS_SAMPLED_COUNTER] += count
metrics.info[LEARNER_INFO] = {
DEFAULT_POLICY_ID: info
} if LEARNER_STATS_KEY in info else info
metrics.timers[GRAD_WAIT_TIMER].push(time.perf_counter() -
self.fetch_start_time)
return grads, count
rollouts = from_actors(workers.remote_workers())
grads = rollouts.for_each(samples_to_grads)
return grads.gather_async().for_each(record_metrics())
ray.rllib.execution.rollout_ops.ConcatBatches
Callable used to merge batches into larger batches for training.
This should be used with the .combine() operator.
Examples:
>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.combine(ConcatBatches(
... min_batch_size=10000, count_steps_by="env_steps"))
>>> print(next(rollouts).count)
10000
ray.rllib.execution.rollout_ops.SelectExperiences
ray.rllib.execution.rollout_ops.StandardizeFields
Callable used to standardize fields of batches.
This should be used with the .for_each() operator. Note that the input may be mutated by this operator for efficiency.
Examples:
>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.for_each(StandardizeFields(["advantages"]))
>>> print(np.std(next(rollouts)["advantages"]))
1.0
ray.rllib.execution.train_ops.TrainOneStep
Callable that improves the policy and updates workers.
This should be used with the .for_each() operator. A tuple of the input and learner stats will be returned.
Examples:
>>> rollouts = ParallelRollouts(...)
>>> train_op = rollouts.for_each(TrainOneStep(workers))
>>> print(next(train_op)) # This trains the policy on one batch.
SampleBatch(...), {"learner_stats": ...}
Updates the STEPS_TRAINED_COUNTER counter and LEARNER_INFO field in the local iterator context.
ray.rllib.execution.train_ops.MultiGPUTrainOneStep
Multi-GPU version of TrainOneStep.
This should be used with the .for_each() operator. A tuple of the input and learner stats will be returned.
Examples:
>>> rollouts = ParallelRollouts(...)
>>> train_op = rollouts.for_each(MultiGPUTrainOneStep(workers, ...))
>>> print(next(train_op)) # This trains the policy on one batch.
SampleBatch(...), {"learner_stats": ...}
Updates the STEPS_TRAINED_COUNTER counter and LEARNER_INFO field in the local iterator context.
ray.rllib.execution.train_ops.ComputeGradients
Callable that computes gradients with respect to the policy loss.
This should be used with the .for_each() operator.
Examples:
>>> grads_op = rollouts.for_each(ComputeGradients(workers))
>>> print(next(grads_op))
{"var_0": ..., ...}, 50 # grads, batch count
Updates the LEARNER_INFO info field in the local iterator context.
ray.rllib.execution.train_ops.ApplyGradients
Callable that applies gradients and updates workers.
This should be used with the .for_each() operator.
Examples:
Updates the STEPS_TRAINED_COUNTER counter in the local iterator context.
__init__(self, workers, policies=frozenset(), update_all=True)
special
Creates an ApplyGradients instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workers |
WorkerSet |
workers to apply gradients to. |
required |
update_all |
bool |
If true, updates all workers. Otherwise, only update the worker that produced the sample batch we are currently processing (i.e., A3C style). |
True |
Source code in ray/rllib/execution/train_ops.py
def __init__(self,
workers,
policies: List[PolicyID] = frozenset([]),
update_all=True):
"""Creates an ApplyGradients instance.
Args:
workers (WorkerSet): workers to apply gradients to.
update_all (bool): If true, updates all workers. Otherwise, only
update the worker that produced the sample batch we are
currently processing (i.e., A3C style).
"""
self.workers = workers
self.local_worker = workers.local_worker()
self.policies = policies
self.update_all = update_all
ray.rllib.execution.train_ops.AverageGradients
Callable that averages the gradients in a batch.
This should be used with the .for_each() operator after a set of gradients have been batched with .batch().
Examples:
ray.rllib.execution.train_ops.UpdateTargetNetwork
Periodically call policy.update_target() on all trainable policies.
This should be used with the .for_each() operator after training step has been taken.
Examples:
>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> update_op = train_op.for_each(
... UpdateTargetIfNeeded(workers, target_update_freq=500))
>>> print(next(update_op))
None
Updates the LAST_TARGET_UPDATE_TS and NUM_TARGET_UPDATES counters in the local iterator context. The value of the last update counter is used to track when we should update the target next.