Skip to content

execution Package

ray.rllib.execution.concurrency_ops.Concurrently(ops, *, mode='round_robin', output_indexes=None, round_robin_weights=None)

Operator that runs the given parent iterators concurrently.

Parameters:

Name Type Description Default
mode str

One of 'round_robin', 'async'. In 'round_robin' mode, we alternate between pulling items from each parent iterator in order deterministically. In 'async' mode, we pull from each parent iterator as fast as they are produced. This is non-deterministic.

'round_robin'
output_indexes list

If specified, only output results from the given ops. For example, if output_indexes=[0], only results from the first op in ops will be returned.

None
round_robin_weights list

List of weights to use for round robin mode. For example, [2, 1] will cause the iterator to pull twice as many items from the first iterator as the second. [2, 1, *] will cause as many items to be pulled as possible from the third iterator without blocking. This is only allowed in round robin mode.

None

Examples:

>>> sim_op = ParallelRollouts(...).for_each(...)
>>> replay_op = LocalReplay(...).for_each(...)
>>> combined_op = Concurrently([sim_op, replay_op], mode="async")
Source code in ray/rllib/execution/concurrency_ops.py
def Concurrently(ops: List[LocalIterator],
                 *,
                 mode: str = "round_robin",
                 output_indexes: Optional[List[int]] = None,
                 round_robin_weights: Optional[List[int]] = None
                 ) -> LocalIterator[SampleBatchType]:
    """Operator that runs the given parent iterators concurrently.

    Args:
        mode (str): One of 'round_robin', 'async'. In 'round_robin' mode,
            we alternate between pulling items from each parent iterator in
            order deterministically. In 'async' mode, we pull from each parent
            iterator as fast as they are produced. This is non-deterministic.
        output_indexes (list): If specified, only output results from the
            given ops. For example, if ``output_indexes=[0]``, only results
            from the first op in ops will be returned.
        round_robin_weights (list): List of weights to use for round robin
            mode. For example, ``[2, 1]`` will cause the iterator to pull twice
            as many items from the first iterator as the second. ``[2, 1, *]``
            will cause as many items to be pulled as possible from the third
            iterator without blocking. This is only allowed in round robin
            mode.

    Examples:
        >>> sim_op = ParallelRollouts(...).for_each(...)
        >>> replay_op = LocalReplay(...).for_each(...)
        >>> combined_op = Concurrently([sim_op, replay_op], mode="async")
    """

    if len(ops) < 2:
        raise ValueError("Should specify at least 2 ops.")
    if mode == "round_robin":
        deterministic = True
    elif mode == "async":
        deterministic = False
        if round_robin_weights:
            raise ValueError(
                "round_robin_weights cannot be specified in async mode")
    else:
        raise ValueError("Unknown mode {}".format(mode))
    if round_robin_weights and all(r == "*" for r in round_robin_weights):
        raise ValueError("Cannot specify all round robin weights = *")

    if output_indexes:
        for i in output_indexes:
            assert i in range(len(ops)), ("Index out of range", i)

        def tag(op, i):
            return op.for_each(lambda x: (i, x))

        ops = [tag(op, i) for i, op in enumerate(ops)]

    output = ops[0].union(
        *ops[1:],
        deterministic=deterministic,
        round_robin_weights=round_robin_weights)

    if output_indexes:
        output = (output.filter(lambda tup: tup[0] in output_indexes).for_each(
            lambda tup: tup[1]))

    return output

ray.rllib.execution.concurrency_ops.Enqueue

Enqueue data items into a queue.Queue instance.

Returns the input item as output.

The enqueue is non-blocking, so Enqueue operations can executed with Dequeue via the Concurrently() operator.

Examples:

>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)

ray.rllib.execution.concurrency_ops.Dequeue(input_queue, check=<function <lambda> at 0x11d98b710>)

Dequeue data items from a queue.Queue instance.

The dequeue is non-blocking, so Dequeue operations can execute with Enqueue via the Concurrently() operator.

Parameters:

Name Type Description Default
input_queue Queue

queue to pull items from.

required
check fn

liveness check. When this function returns false, Dequeue() will raise an error to halt execution.

<function <lambda> at 0x11d98b710>

Examples:

>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)
Source code in ray/rllib/execution/concurrency_ops.py
def Dequeue(input_queue: queue.Queue,
            check=lambda: True) -> LocalIterator[SampleBatchType]:
    """Dequeue data items from a queue.Queue instance.

    The dequeue is non-blocking, so Dequeue operations can execute with
    Enqueue via the Concurrently() operator.

    Args:
        input_queue (Queue): queue to pull items from.
        check (fn): liveness check. When this function returns false,
            Dequeue() will raise an error to halt execution.

    Examples:
        >>> queue = queue.Queue(100)
        >>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
        >>> read_op = Dequeue(queue)
        >>> combined_op = Concurrently([write_op, read_op], mode="async")
        >>> next(combined_op)
        SampleBatch(...)
    """
    if not isinstance(input_queue, queue.Queue):
        raise ValueError("Expected queue.Queue, got {}".format(
            type(input_queue)))

    def base_iterator(timeout=None):
        while check():
            try:
                item = input_queue.get(timeout=0.001)
                yield item
            except queue.Empty:
                yield _NextValueNotReady()
        raise RuntimeError("Dequeue `check()` returned False! "
                           "Exiting with Exception from Dequeue iterator.")

    return LocalIterator(base_iterator, SharedMetrics())

ray.rllib.execution.learner_thread.LearnerThread (Thread)

Background thread that updates the local model from sample trajectories.

The learner thread communicates with the main thread through Queues. This is needed since Ray operations can only be run on the main thread. In addition, moving heavyweight gradient ops session runs off the main thread improves overall throughput.

__init__(self, local_worker, minibatch_buffer_size, num_sgd_iter, learner_queue_size, learner_queue_timeout) special

Initialize the learner thread.

Parameters:

Name Type Description Default
local_worker RolloutWorker

process local rollout worker holding policies this thread will call learn_on_batch() on

required
minibatch_buffer_size int

max number of train batches to store in the minibatching buffer

required
num_sgd_iter int

number of passes to learn on per train batch

required
learner_queue_size int

max size of queue of inbound train batches to this thread

required
learner_queue_timeout int

raise an exception if the queue has been empty for this long in seconds

required
Source code in ray/rllib/execution/learner_thread.py
def __init__(self, local_worker: RolloutWorker, minibatch_buffer_size: int,
             num_sgd_iter: int, learner_queue_size: int,
             learner_queue_timeout: int):
    """Initialize the learner thread.

    Args:
        local_worker (RolloutWorker): process local rollout worker holding
            policies this thread will call learn_on_batch() on
        minibatch_buffer_size (int): max number of train batches to store
            in the minibatching buffer
        num_sgd_iter (int): number of passes to learn on per train batch
        learner_queue_size (int): max size of queue of inbound
            train batches to this thread
        learner_queue_timeout (int): raise an exception if the queue has
            been empty for this long in seconds
    """
    threading.Thread.__init__(self)
    self.learner_queue_size = WindowStat("size", 50)
    self.local_worker = local_worker
    self.inqueue = queue.Queue(maxsize=learner_queue_size)
    self.outqueue = queue.Queue()
    self.minibatch_buffer = MinibatchBuffer(
        inqueue=self.inqueue,
        size=minibatch_buffer_size,
        timeout=learner_queue_timeout,
        num_passes=num_sgd_iter,
        init_num_passes=num_sgd_iter)
    self.queue_timer = TimerStat()
    self.grad_timer = TimerStat()
    self.load_timer = TimerStat()
    self.load_wait_timer = TimerStat()
    self.daemon = True
    self.weights_updated = False
    self.learner_info = {}
    self.stopped = False
    self.num_steps = 0

add_learner_metrics(self, result)

Add internal metrics to a trainer result dict.

Source code in ray/rllib/execution/learner_thread.py
def add_learner_metrics(self, result: Dict) -> Dict:
    """Add internal metrics to a trainer result dict."""

    def timer_to_ms(timer):
        return round(1000 * timer.mean, 3)

    result["info"].update({
        "learner_queue": self.learner_queue_size.stats(),
        LEARNER_INFO: copy.deepcopy(self.learner_info),
        "timing_breakdown": {
            "learner_grad_time_ms": timer_to_ms(self.grad_timer),
            "learner_load_time_ms": timer_to_ms(self.load_timer),
            "learner_load_wait_time_ms": timer_to_ms(self.load_wait_timer),
            "learner_dequeue_time_ms": timer_to_ms(self.queue_timer),
        }
    })
    return result

run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Source code in ray/rllib/execution/learner_thread.py
def run(self) -> None:
    # Switch on eager mode if configured.
    if self.local_worker.policy_config.get("framework") in ["tf2", "tfe"]:
        tf1.enable_eager_execution()
    while not self.stopped:
        self.step()

ray.rllib.execution.metric_ops.StandardMetricsReporting(train_op, workers, config, selected_workers=None, by_steps_trained=False)

Operator to periodically collect and report metrics.

Parameters:

Name Type Description Default
train_op LocalIterator

Operator for executing training steps. We ignore the output values.

required
workers WorkerSet

Rollout workers to collect metrics from.

required
config dict

Trainer configuration, used to determine the frequency of stats reporting.

required
selected_workers list

Override the list of remote workers to collect metrics from.

None
by_steps_trained bool

If True, uses the STEPS_TRAINED_COUNTER instead of the STEPS_SAMPLED_COUNTER in metrics.

False

Returns:

Type Description
LocalIterator[dict]

A local iterator over training results.

Examples:

>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> metrics_op = StandardMetricsReporting(train_op, workers, config)
>>> next(metrics_op)
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
Source code in ray/rllib/execution/metric_ops.py
def StandardMetricsReporting(
        train_op: LocalIterator[Any],
        workers: WorkerSet,
        config: dict,
        selected_workers: List[ActorHandle] = None,
        by_steps_trained: bool = False,
) -> LocalIterator[dict]:
    """Operator to periodically collect and report metrics.

    Args:
        train_op (LocalIterator): Operator for executing training steps.
            We ignore the output values.
        workers (WorkerSet): Rollout workers to collect metrics from.
        config (dict): Trainer configuration, used to determine the frequency
            of stats reporting.
        selected_workers (list): Override the list of remote workers
            to collect metrics from.
        by_steps_trained (bool): If True, uses the `STEPS_TRAINED_COUNTER`
            instead of the `STEPS_SAMPLED_COUNTER` in metrics.

    Returns:
        LocalIterator[dict]: A local iterator over training results.

    Examples:
        >>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
        >>> metrics_op = StandardMetricsReporting(train_op, workers, config)
        >>> next(metrics_op)
        {"episode_reward_max": ..., "episode_reward_mean": ..., ...}
    """

    output_op = train_op \
        .filter(OncePerTimestepsElapsed(config["timesteps_per_iteration"],
                                        by_steps_trained=by_steps_trained)) \
        .filter(OncePerTimeInterval(config["min_iter_time_s"])) \
        .for_each(CollectMetrics(
            workers,
            min_history=config["metrics_smoothing_episodes"],
            timeout_seconds=config["collect_metrics_timeout"],
            selected_workers=selected_workers))
    return output_op

ray.rllib.execution.metric_ops.CollectMetrics

Callable that collects metrics from workers.

The metrics are smoothed over a given history window.

This should be used with the .for_each() operator. For a higher level API, consider using StandardMetricsReporting instead.

Examples:

>>> output_op = train_op.for_each(CollectMetrics(workers))
>>> print(next(output_op))
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}

ray.rllib.execution.metric_ops.OncePerTimeInterval

Callable that returns True once per given interval.

This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.

Examples:

>>> throttled_op = train_op.filter(OncePerTimeInterval(5))
>>> start = time.time()
>>> next(throttled_op)
>>> print(time.time() - start)
5.00001  # will be greater than 5 seconds

ray.rllib.execution.metric_ops.OncePerTimestepsElapsed

Callable that returns True once per given number of timesteps.

This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.

Examples:

>>> throttled_op = train_op.filter(OncePerTimestepsElapsed(1000))
>>> next(throttled_op)
# will only return after 1000 steps have elapsed

__init__(self, delay_steps, by_steps_trained=False) special

Parameters:

Name Type Description Default
delay_steps int

The number of steps (sampled or trained) every which this op returns True.

required
by_steps_trained bool

If True, uses the STEPS_TRAINED_COUNTER instead of the STEPS_SAMPLED_COUNTER in metrics.

False
Source code in ray/rllib/execution/metric_ops.py
def __init__(self, delay_steps: int, by_steps_trained: bool = False):
    """
    Args:
        delay_steps (int): The number of steps (sampled or trained) every
            which this op returns True.
        by_steps_trained (bool): If True, uses the `STEPS_TRAINED_COUNTER`
            instead of the `STEPS_SAMPLED_COUNTER` in metrics.
    """
    self.delay_steps = delay_steps
    self.by_steps_trained = by_steps_trained
    self.last_called = 0

ray.rllib.execution.multi_gpu_learner_thread.MultiGPULearnerThread (LearnerThread)

Learner that can use multiple GPUs and parallel loading.

This class is used for async sampling algorithms.

Example workflow: 2 GPUs and 3 multi-GPU tower stacks. -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.

Workers collect data from env and push it into inqueue: Workers -> (data) -> self.inqueue

We also have two queues, indicating, which stacks are loaded and which are not. - idle_tower_stacks = [0, 1, 2] <- all 3 stacks are free at first. - ready_tower_stacks = [] <- None of the 3 stacks is loaded with data.

ready_tower_stacks is managed by ready_tower_stacks_buffer for possible minibatch-SGD iterations per loaded batch (this avoids a reload from CPU to GPU for each SGD iter).

n _MultiGPULoaderThreads: self.inqueue -get()-> policy.load_batch_into_buffer() -> ready_stacks = [0 ...]

This thread: self.ready_tower_stacks_buffer -get()-> policy.learn_on_loaded_batch() -> if SGD-iters done, put stack index back in idle_tower_stacks queue.

__init__(self, local_worker, num_gpus=1, lr=None, train_batch_size=500, num_multi_gpu_tower_stacks=1, num_sgd_iter=1, learner_queue_size=16, learner_queue_timeout=300, num_data_load_threads=16, _fake_gpus=False, minibatch_buffer_size=None) special

Initializes a MultiGPULearnerThread instance.

Parameters:

Name Type Description Default
local_worker RolloutWorker

Local RolloutWorker holding policies this thread will call load_batch_into_buffer and learn_on_loaded_batch on.

required
num_gpus int

Number of GPUs to use for data-parallel SGD.

1
train_batch_size int

Size of batches (minibatches if num_sgd_iter > 1) to learn on.

500
num_multi_gpu_tower_stacks int

Number of buffers to parallelly load data into on one device. Each buffer is of size of train_batch_size and hence increases GPU memory usage accordingly.

1
num_sgd_iter int

Number of passes to learn on per train batch (minibatch if num_sgd_iter > 1).

1
learner_queue_size int

Max size of queue of inbound train batches to this thread.

16
num_data_load_threads int

Number of threads to use to load data into GPU memory in parallel.

16
Source code in ray/rllib/execution/multi_gpu_learner_thread.py
def __init__(
        self,
        local_worker: RolloutWorker,
        num_gpus: int = 1,
        lr=None,  # deprecated.
        train_batch_size: int = 500,
        num_multi_gpu_tower_stacks: int = 1,
        num_sgd_iter: int = 1,
        learner_queue_size: int = 16,
        learner_queue_timeout: int = 300,
        num_data_load_threads: int = 16,
        _fake_gpus: bool = False,
        # Deprecated arg, use
        minibatch_buffer_size=None,
):
    """Initializes a MultiGPULearnerThread instance.

    Args:
        local_worker (RolloutWorker): Local RolloutWorker holding
            policies this thread will call `load_batch_into_buffer` and
            `learn_on_loaded_batch` on.
        num_gpus (int): Number of GPUs to use for data-parallel SGD.
        train_batch_size (int): Size of batches (minibatches if
            `num_sgd_iter` > 1) to learn on.
        num_multi_gpu_tower_stacks (int): Number of buffers to parallelly
            load data into on one device. Each buffer is of size of
            `train_batch_size` and hence increases GPU memory usage
            accordingly.
        num_sgd_iter (int): Number of passes to learn on per train batch
            (minibatch if `num_sgd_iter` > 1).
        learner_queue_size (int): Max size of queue of inbound
            train batches to this thread.
        num_data_load_threads (int): Number of threads to use to load
            data into GPU memory in parallel.
    """
    # Deprecated: No need to specify as we don't need the actual
    # minibatch-buffer anyways.
    if minibatch_buffer_size:
        deprecation_warning(
            old="MultiGPULearnerThread.minibatch_buffer_size",
            error=False,
        )
    super().__init__(
        local_worker=local_worker,
        minibatch_buffer_size=0,
        num_sgd_iter=num_sgd_iter,
        learner_queue_size=learner_queue_size,
        learner_queue_timeout=learner_queue_timeout,
    )
    # Delete reference to parent's minibatch_buffer, which is not needed.
    # Instead, in multi-GPU mode, we pull tower stack indices from the
    # `self.ready_tower_stacks_buffer` buffer, whose size is exactly
    # `num_multi_gpu_tower_stacks`.
    self.minibatch_buffer = None

    self.train_batch_size = train_batch_size

    self.policy_map = self.local_worker.policy_map
    self.devices = next(iter(self.policy_map.values())).devices

    logger.info("MultiGPULearnerThread devices {}".format(self.devices))
    assert self.train_batch_size % len(self.devices) == 0
    assert self.train_batch_size >= len(self.devices),\
        "batch too small"

    self.tower_stack_indices = list(range(num_multi_gpu_tower_stacks))

    # Two queues for tower stacks:
    # a) Those that are loaded with data ("ready")
    # b) Those that are ready to be loaded with new data ("idle").
    self.idle_tower_stacks = queue.Queue()
    self.ready_tower_stacks = queue.Queue()
    # In the beginning, all stacks are idle (no loading has taken place
    # yet).
    for idx in self.tower_stack_indices:
        self.idle_tower_stacks.put(idx)
    # Start n threads that are responsible for loading data into the
    # different (idle) stacks.
    for i in range(num_data_load_threads):
        self.loader_thread = _MultiGPULoaderThread(
            self, share_stats=(i == 0))
        self.loader_thread.start()

    # Create a buffer that holds stack indices that are "ready"
    # (loaded with data). Those are stacks that we can call
    # "learn_on_loaded_batch" on.
    self.ready_tower_stacks_buffer = MinibatchBuffer(
        self.ready_tower_stacks, num_multi_gpu_tower_stacks,
        learner_queue_timeout, num_sgd_iter)

ray.rllib.execution.replay_ops.StoreToReplayBuffer

Callable that stores data into replay buffer actors.

If constructed with a local replay actor, data will be stored into that buffer. If constructed with a list of replay actor handles, data will be stored randomly among those actors.

This should be used with the .for_each() operator on a rollouts iterator. The batch that was stored is returned.

Examples:

>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> rollouts = ParallelRollouts(...)
>>> store_op = rollouts.for_each(StoreToReplayActors(actors=actors))
>>> next(store_op)
SampleBatch(...)

__init__(self, *, local_buffer=None, actors=None) special

Parameters:

Name Type Description Default
local_buffer LocalReplayBuffer

The local replay buffer to store the data into.

None
actors Optional[List[ActorHandle]]

An optional list of replay actors to use instead of local_buffer.

None
Source code in ray/rllib/execution/replay_ops.py
def __init__(
        self,
        *,
        local_buffer: Optional[LocalReplayBuffer] = None,
        actors: Optional[List[ActorHandle]] = None,
):
    """
    Args:
        local_buffer (LocalReplayBuffer): The local replay buffer to store
            the data into.
        actors (Optional[List[ActorHandle]]): An optional list of replay
            actors to use instead of `local_buffer`.
    """
    if bool(local_buffer) == bool(actors):
        raise ValueError(
            "Either `local_buffer` or `replay_actors` must be given, "
            "not both!")

    if local_buffer:
        self.local_actor = local_buffer
        self.replay_actors = None
    else:
        self.local_actor = None
        self.replay_actors = actors

ray.rllib.execution.replay_ops.Replay(*, local_buffer=None, actors=None, num_async=4)

Replay experiences from the given buffer or actors.

This should be combined with the StoreToReplayActors operation using the Concurrently() operator.

Parameters:

Name Type Description Default
local_buffer LocalReplayBuffer

Local buffer to use. Only one of this and replay_actors can be specified.

None
actors list

List of replay actors. Only one of this and local_buffer can be specified.

None
num_async int

In async mode, the max number of async requests in flight per actor.

4

Examples:

>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> replay_op = Replay(actors=actors)
>>> next(replay_op)
SampleBatch(...)
Source code in ray/rllib/execution/replay_ops.py
def Replay(*,
           local_buffer: LocalReplayBuffer = None,
           actors: List[ActorHandle] = None,
           num_async: int = 4) -> LocalIterator[SampleBatchType]:
    """Replay experiences from the given buffer or actors.

    This should be combined with the StoreToReplayActors operation using the
    Concurrently() operator.

    Args:
        local_buffer (LocalReplayBuffer): Local buffer to use. Only one of this
            and replay_actors can be specified.
        actors (list): List of replay actors. Only one of this and
            local_buffer can be specified.
        num_async (int): In async mode, the max number of async
            requests in flight per actor.

    Examples:
        >>> actors = [ReplayActor.remote() for _ in range(4)]
        >>> replay_op = Replay(actors=actors)
        >>> next(replay_op)
        SampleBatch(...)
    """

    if bool(local_buffer) == bool(actors):
        raise ValueError(
            "Exactly one of local_buffer and replay_actors must be given.")

    if actors:
        replay = from_actors(actors)
        return replay.gather_async(
            num_async=num_async).filter(lambda x: x is not None)

    def gen_replay(_):
        while True:
            item = local_buffer.replay()
            if item is None:
                yield _NextValueNotReady()
            else:
                yield item

    return LocalIterator(gen_replay, SharedMetrics())

ray.rllib.execution.replay_ops.SimpleReplayBuffer

Simple replay buffer that operates over batches.

__init__(self, num_slots, replay_proportion=None) special

Initialize SimpleReplayBuffer.

Parameters:

Name Type Description Default
num_slots int

Number of batches to store in total.

required
Source code in ray/rllib/execution/replay_ops.py
def __init__(self,
             num_slots: int,
             replay_proportion: Optional[float] = None):
    """Initialize SimpleReplayBuffer.

    Args:
        num_slots (int): Number of batches to store in total.
    """
    self.num_slots = num_slots
    self.replay_batches = []
    self.replay_index = 0

ray.rllib.execution.replay_ops.MixInReplay

This operator adds replay to a stream of experiences.

It takes input batches, and returns a list of batches that include replayed data as well. The number of replayed batches is determined by the configured replay proportion. The max age of a batch is determined by the number of replay slots.

__init__(self, num_slots, replay_proportion) special

Initialize MixInReplay.

Parameters:

Name Type Description Default
num_slots int

Number of batches to store in total.

required
replay_proportion float

The input batch will be returned and an additional number of batches proportional to this value will be added as well.

required

Examples:

replay proportion 2:1

>>> replay_op = MixInReplay(rollouts, 100, replay_proportion=2)
>>> print(next(replay_op))
[SampleBatch(<input>), SampleBatch(<replay>), SampleBatch(<rep.>)]

replay proportion 0:1, replay disabled

>>> replay_op = MixInReplay(rollouts, 100, replay_proportion=0)
>>> print(next(replay_op))
[SampleBatch(<input>)]
Source code in ray/rllib/execution/replay_ops.py
def __init__(self, num_slots: int, replay_proportion: float):
    """Initialize MixInReplay.

    Args:
        num_slots (int): Number of batches to store in total.
        replay_proportion (float): The input batch will be returned
            and an additional number of batches proportional to this value
            will be added as well.

    Examples:
        # replay proportion 2:1
        >>> replay_op = MixInReplay(rollouts, 100, replay_proportion=2)
        >>> print(next(replay_op))
        [SampleBatch(<input>), SampleBatch(<replay>), SampleBatch(<rep.>)]

        # replay proportion 0:1, replay disabled
        >>> replay_op = MixInReplay(rollouts, 100, replay_proportion=0)
        >>> print(next(replay_op))
        [SampleBatch(<input>)]
    """
    if replay_proportion > 0 and num_slots == 0:
        raise ValueError(
            "You must set num_slots > 0 if replay_proportion > 0.")
    self.replay_buffer = SimpleReplayBuffer(num_slots)
    self.replay_proportion = replay_proportion

ray.rllib.execution.rollout_ops.ParallelRollouts(workers, *, mode='bulk_sync', num_async=1)

Operator to collect experiences in parallel from rollout workers.

If there are no remote workers, experiences will be collected serially from the local worker instance instead.

Parameters:

Name Type Description Default
workers WorkerSet

set of rollout workers to use.

required
mode str

One of 'async', 'bulk_sync', 'raw'. In 'async' mode, batches are returned as soon as they are computed by rollout workers with no order guarantees. In 'bulk_sync' mode, we collect one batch from each worker and concatenate them together into a large batch to return. In 'raw' mode, the ParallelIterator object is returned directly and the caller is responsible for implementing gather and updating the timesteps counter.

'bulk_sync'
num_async int

In async mode, the max number of async requests in flight per actor.

1

Returns:

Type Description
ray.util.iter.LocalIterator[ray.rllib.policy.sample_batch.SampleBatch]

A local iterator over experiences collected in parallel.

Examples:

>>> rollouts = ParallelRollouts(workers, mode="async")
>>> batch = next(rollouts)
>>> print(batch.count)
50  # config.rollout_fragment_length
>>> rollouts = ParallelRollouts(workers, mode="bulk_sync")
>>> batch = next(rollouts)
>>> print(batch.count)
200  # config.rollout_fragment_length * config.num_workers

Updates the STEPS_SAMPLED_COUNTER counter in the local iterator context.

Source code in ray/rllib/execution/rollout_ops.py
def ParallelRollouts(workers: WorkerSet, *, mode="bulk_sync",
                     num_async=1) -> LocalIterator[SampleBatch]:
    """Operator to collect experiences in parallel from rollout workers.

    If there are no remote workers, experiences will be collected serially from
    the local worker instance instead.

    Args:
        workers (WorkerSet): set of rollout workers to use.
        mode (str): One of 'async', 'bulk_sync', 'raw'. In 'async' mode,
            batches are returned as soon as they are computed by rollout
            workers with no order guarantees. In 'bulk_sync' mode, we collect
            one batch from each worker and concatenate them together into a
            large batch to return. In 'raw' mode, the ParallelIterator object
            is returned directly and the caller is responsible for implementing
            gather and updating the timesteps counter.
        num_async (int): In async mode, the max number of async
            requests in flight per actor.

    Returns:
        A local iterator over experiences collected in parallel.

    Examples:
        >>> rollouts = ParallelRollouts(workers, mode="async")
        >>> batch = next(rollouts)
        >>> print(batch.count)
        50  # config.rollout_fragment_length

        >>> rollouts = ParallelRollouts(workers, mode="bulk_sync")
        >>> batch = next(rollouts)
        >>> print(batch.count)
        200  # config.rollout_fragment_length * config.num_workers

    Updates the STEPS_SAMPLED_COUNTER counter in the local iterator context.
    """

    # Ensure workers are initially in sync.
    workers.sync_weights()

    def report_timesteps(batch):
        metrics = _get_shared_metrics()
        metrics.counters[STEPS_SAMPLED_COUNTER] += batch.count
        if isinstance(batch, MultiAgentBatch):
            metrics.counters[AGENT_STEPS_SAMPLED_COUNTER] += \
                batch.agent_steps()
        else:
            metrics.counters[AGENT_STEPS_SAMPLED_COUNTER] += batch.count
        return batch

    if not workers.remote_workers():
        # Handle the `num_workers=0` case, in which the local worker
        # has to do sampling as well.
        def sampler(_):
            while True:
                yield workers.local_worker().sample()

        return (LocalIterator(sampler,
                              SharedMetrics()).for_each(report_timesteps))

    # Create a parallel iterator over generated experiences.
    rollouts = from_actors(workers.remote_workers())

    if mode == "bulk_sync":
        return rollouts \
            .batch_across_shards() \
            .for_each(lambda batches: SampleBatch.concat_samples(batches)) \
            .for_each(report_timesteps)
    elif mode == "async":
        return rollouts.gather_async(
            num_async=num_async).for_each(report_timesteps)
    elif mode == "raw":
        return rollouts
    else:
        raise ValueError("mode must be one of 'bulk_sync', 'async', 'raw', "
                         "got '{}'".format(mode))

ray.rllib.execution.rollout_ops.AsyncGradients(workers)

Operator to compute gradients in parallel from rollout workers.

Parameters:

Name Type Description Default
workers WorkerSet

set of rollout workers to use.

required

Returns:

Type Description
ray.util.iter.LocalIterator[Tuple[Union[List[Tuple[Any, Any]], List[Any]], int]]

A local iterator over policy gradients computed on rollout workers.

Examples:

>>> grads_op = AsyncGradients(workers)
>>> print(next(grads_op))
{"var_0": ..., ...}, 50  # grads, batch count

Updates the STEPS_SAMPLED_COUNTER counter and LEARNER_INFO field in the local iterator context.

Source code in ray/rllib/execution/rollout_ops.py
def AsyncGradients(
        workers: WorkerSet) -> LocalIterator[Tuple[ModelGradients, int]]:
    """Operator to compute gradients in parallel from rollout workers.

    Args:
        workers (WorkerSet): set of rollout workers to use.

    Returns:
        A local iterator over policy gradients computed on rollout workers.

    Examples:
        >>> grads_op = AsyncGradients(workers)
        >>> print(next(grads_op))
        {"var_0": ..., ...}, 50  # grads, batch count

    Updates the STEPS_SAMPLED_COUNTER counter and LEARNER_INFO field in the
    local iterator context.
    """

    # Ensure workers are initially in sync.
    workers.sync_weights()

    # This function will be applied remotely on the workers.
    def samples_to_grads(samples):
        return get_global_worker().compute_gradients(samples), samples.count

    # Record learner metrics and pass through (grads, count).
    class record_metrics:
        def _on_fetch_start(self):
            self.fetch_start_time = time.perf_counter()

        def __call__(self, item):
            (grads, info), count = item
            metrics = _get_shared_metrics()
            metrics.counters[STEPS_SAMPLED_COUNTER] += count
            metrics.info[LEARNER_INFO] = {
                DEFAULT_POLICY_ID: info
            } if LEARNER_STATS_KEY in info else info
            metrics.timers[GRAD_WAIT_TIMER].push(time.perf_counter() -
                                                 self.fetch_start_time)
            return grads, count

    rollouts = from_actors(workers.remote_workers())
    grads = rollouts.for_each(samples_to_grads)
    return grads.gather_async().for_each(record_metrics())

ray.rllib.execution.rollout_ops.ConcatBatches

Callable used to merge batches into larger batches for training.

This should be used with the .combine() operator.

Examples:

>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.combine(ConcatBatches(
...    min_batch_size=10000, count_steps_by="env_steps"))
>>> print(next(rollouts).count)
10000

ray.rllib.execution.rollout_ops.SelectExperiences

Callable used to select experiences from a MultiAgentBatch.

This should be used with the .for_each() operator.

Examples:

>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.for_each(SelectExperiences(["pol1", "pol2"]))
>>> print(next(rollouts).policy_batches.keys())
{"pol1", "pol2"}

ray.rllib.execution.rollout_ops.StandardizeFields

Callable used to standardize fields of batches.

This should be used with the .for_each() operator. Note that the input may be mutated by this operator for efficiency.

Examples:

>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.for_each(StandardizeFields(["advantages"]))
>>> print(np.std(next(rollouts)["advantages"]))
1.0

ray.rllib.execution.train_ops.TrainOneStep

Callable that improves the policy and updates workers.

This should be used with the .for_each() operator. A tuple of the input and learner stats will be returned.

Examples:

>>> rollouts = ParallelRollouts(...)
>>> train_op = rollouts.for_each(TrainOneStep(workers))
>>> print(next(train_op))  # This trains the policy on one batch.
SampleBatch(...), {"learner_stats": ...}

Updates the STEPS_TRAINED_COUNTER counter and LEARNER_INFO field in the local iterator context.

ray.rllib.execution.train_ops.MultiGPUTrainOneStep

Multi-GPU version of TrainOneStep.

This should be used with the .for_each() operator. A tuple of the input and learner stats will be returned.

Examples:

>>> rollouts = ParallelRollouts(...)
>>> train_op = rollouts.for_each(MultiGPUTrainOneStep(workers, ...))
>>> print(next(train_op))  # This trains the policy on one batch.
SampleBatch(...), {"learner_stats": ...}

Updates the STEPS_TRAINED_COUNTER counter and LEARNER_INFO field in the local iterator context.

ray.rllib.execution.train_ops.ComputeGradients

Callable that computes gradients with respect to the policy loss.

This should be used with the .for_each() operator.

Examples:

>>> grads_op = rollouts.for_each(ComputeGradients(workers))
>>> print(next(grads_op))
{"var_0": ..., ...}, 50  # grads, batch count

Updates the LEARNER_INFO info field in the local iterator context.

ray.rllib.execution.train_ops.ApplyGradients

Callable that applies gradients and updates workers.

This should be used with the .for_each() operator.

Examples:

>>> apply_op = grads_op.for_each(ApplyGradients(workers))
>>> print(next(apply_op))
None

Updates the STEPS_TRAINED_COUNTER counter in the local iterator context.

__init__(self, workers, policies=frozenset(), update_all=True) special

Creates an ApplyGradients instance.

Parameters:

Name Type Description Default
workers WorkerSet

workers to apply gradients to.

required
update_all bool

If true, updates all workers. Otherwise, only update the worker that produced the sample batch we are currently processing (i.e., A3C style).

True
Source code in ray/rllib/execution/train_ops.py
def __init__(self,
             workers,
             policies: List[PolicyID] = frozenset([]),
             update_all=True):
    """Creates an ApplyGradients instance.

    Args:
        workers (WorkerSet): workers to apply gradients to.
        update_all (bool): If true, updates all workers. Otherwise, only
            update the worker that produced the sample batch we are
            currently processing (i.e., A3C style).
    """
    self.workers = workers
    self.local_worker = workers.local_worker()
    self.policies = policies
    self.update_all = update_all

ray.rllib.execution.train_ops.AverageGradients

Callable that averages the gradients in a batch.

This should be used with the .for_each() operator after a set of gradients have been batched with .batch().

Examples:

>>> batched_grads = grads_op.batch(32)
>>> avg_grads = batched_grads.for_each(AverageGradients())
>>> print(next(avg_grads))
{"var_0": ..., ...}, 1600  # averaged grads, summed batch count

ray.rllib.execution.train_ops.UpdateTargetNetwork

Periodically call policy.update_target() on all trainable policies.

This should be used with the .for_each() operator after training step has been taken.

Examples:

>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> update_op = train_op.for_each(
...     UpdateTargetIfNeeded(workers, target_update_freq=500))
>>> print(next(update_op))
None

Updates the LAST_TARGET_UPDATE_TS and NUM_TARGET_UPDATES counters in the local iterator context. The value of the last update counter is used to track when we should update the target next.

Back to top