Skip to content

env package

ray.rllib.env.base_env.BaseEnv

The lowest-level env interface used by RLlib for sampling.

BaseEnv models multiple agents executing asynchronously in multiple environments. A call to poll() returns observations from ready agents keyed by their environment and agent ids, and actions for those agents can be sent back via send_actions().

All other env types can be adapted to BaseEnv. RLlib handles these conversions internally in RolloutWorker, for example:

gym.Env => rllib.VectorEnv => rllib.BaseEnv rllib.MultiAgentEnv (is-a gym.Env) => rllib.VectorEnv => rllib.BaseEnv rllib.ExternalEnv => rllib.BaseEnv

Attributes:

Name Type Description
action_space gym.Space

Action space. This must be defined for single-agent envs. Multi-agent envs can set this to None.

observation_space gym.Space

Observation space. This must be defined for single-agent envs. Multi-agent envs can set this to None.

Examples:

>>> env = MyBaseEnv()
>>> obs, rewards, dones, infos, off_policy_actions = env.poll()
>>> print(obs)
{
    "env_0": {
        "car_0": [2.4, 1.6],
        "car_1": [3.4, -3.2],
    },
    "env_1": {
        "car_0": [8.0, 4.1],
    },
    "env_2": {
        "car_0": [2.3, 3.3],
        "car_1": [1.4, -0.2],
        "car_3": [1.2, 0.1],
    },
}
>>> env.send_actions(
    actions={
        "env_0": {
            "car_0": 0,
            "car_1": 1,
        }, ...
    })
>>> obs, rewards, dones, infos, off_policy_actions = env.poll()
>>> print(obs)
{
    "env_0": {
        "car_0": [4.1, 1.7],
        "car_1": [3.2, -4.2],
    }, ...
}
>>> print(dones)
{
    "env_0": {
        "__all__": False,
        "car_0": False,
        "car_1": True,
    }, ...
}

get_sub_environments(self)

Return a reference to the underlying sub environments, if any.

Returns:

Type Description
List[Any]

List of the underlying sub environments or [].

Source code in ray/rllib/env/base_env.py
@PublicAPI
def get_sub_environments(self) -> List[EnvType]:
    """Return a reference to the underlying sub environments, if any.

    Returns:
        List of the underlying sub environments or [].
    """
    return []

poll(self)

Returns observations from ready agents.

All returns are two-level dicts mapping from env_id to dicts of agent_ids to values. The number of agents and envs can vary over time.

Returns:

Type Description
Tuple[Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]]]

Tuple consisting of 1) New observations for each ready agent. 2) Reward values for each ready agent. If the episode is just started, the value will be None. 3) Done values for each ready agent. The special key "all" is used to indicate env termination. 4) Info values for each ready agent. 5) Agents may take off-policy actions. When that happens, there will be an entry in this dict that contains the taken action. There is no need to send_actions() for agents that have already chosen off-policy actions.

Source code in ray/rllib/env/base_env.py
@PublicAPI
def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,
                        MultiEnvDict, MultiEnvDict]:
    """Returns observations from ready agents.

    All returns are two-level dicts mapping from env_id to dicts of
    agent_ids to values. The number of agents and envs can vary over time.

    Returns:
        Tuple consisting of
        1) New observations for each ready agent.
        2) Reward values for each ready agent. If the episode is
        just started, the value will be None.
        3) Done values for each ready agent. The special key "__all__"
        is used to indicate env termination.
        4) Info values for each ready agent.
        5) Agents may take off-policy actions. When that
        happens, there will be an entry in this dict that contains the
        taken action. There is no need to send_actions() for agents that
        have already chosen off-policy actions.
    """
    raise NotImplementedError

send_actions(self, action_dict)

Called to send actions back to running agents in this env.

Actions should be sent for each ready agent that returned observations in the previous poll() call.

Parameters:

Name Type Description Default
action_dict Dict[Union[int, str], Dict[Any, Any]]

Actions values keyed by env_id and agent_id.

required
Source code in ray/rllib/env/base_env.py
@PublicAPI
def send_actions(self, action_dict: MultiEnvDict) -> None:
    """Called to send actions back to running agents in this env.

    Actions should be sent for each ready agent that returned observations
    in the previous poll() call.

    Args:
        action_dict: Actions values keyed by env_id and agent_id.
    """
    raise NotImplementedError

stop(self)

Releases all resources used.

Source code in ray/rllib/env/base_env.py
@PublicAPI
def stop(self) -> None:
    """Releases all resources used."""

    # Try calling `close` on all sub-environments.
    for env in self.get_sub_environments():
        if hasattr(env, "close"):
            env.close()

to_base_env(env, make_env=None, num_envs=1, remote_envs=False, remote_env_batch_wait_ms=0, policy_config=None) staticmethod

Converts an RLlib-supported env into a BaseEnv object.

Supported types for the given env arg are gym.Env, BaseEnv, VectorEnv, MultiAgentEnv, or ExternalEnv.

The resulting BaseEnv is always vectorized (contains n sub-environments) for batched forward passes, where n may also be 1. BaseEnv also supports async execution via the poll and send_actions methods and thus supports external simulators.

TODO: Support gym3 environments, which are already vectorized.

Parameters:

Name Type Description Default
env Any

An already existing environment of any supported env type to convert/wrap into a BaseEnv. Supported types are gym.Env, BaseEnv, VectorEnv, MultiAgentEnv, ExternalEnv, or ExternalMultiAgentEnv.

required
make_env Callable[[int], Any]

A callable taking an int as input (which indicates the number of individual sub-environments within the final vectorized BaseEnv) and returning one individual sub-environment.

None
num_envs int

The number of sub-environments to create in the resulting (vectorized) BaseEnv. The already existing env will be one of the num_envs.

1
remote_envs bool

Whether each sub-env should be a @ray.remote actor. You can set this behavior in your config via the remote_worker_envs=True option.

False
remote_env_batch_wait_ms int

The wait time (in ms) to poll remote sub-environments for, if applicable. Only used if remote_envs is True.

0
policy_config Optional[dict]

Optional policy config dict.

None

Returns:

Type Description
BaseEnv

The resulting BaseEnv object.

Source code in ray/rllib/env/base_env.py
@staticmethod
def to_base_env(
        env: EnvType,
        make_env: Callable[[int], EnvType] = None,
        num_envs: int = 1,
        remote_envs: bool = False,
        remote_env_batch_wait_ms: int = 0,
        policy_config: Optional[PartialTrainerConfigDict] = None,
) -> "BaseEnv":
    """Converts an RLlib-supported env into a BaseEnv object.

    Supported types for the given `env` arg are gym.Env, BaseEnv,
    VectorEnv, MultiAgentEnv, or ExternalEnv.

    The resulting BaseEnv is always vectorized (contains n
    sub-environments) for batched forward passes, where n may also be 1.
    BaseEnv also supports async execution via the `poll` and `send_actions`
    methods and thus supports external simulators.

    TODO: Support gym3 environments, which are already vectorized.

    Args:
        env: An already existing environment of any supported env type
            to convert/wrap into a BaseEnv. Supported types are gym.Env,
            BaseEnv, VectorEnv, MultiAgentEnv, ExternalEnv, or
            ExternalMultiAgentEnv.
        make_env: A callable taking an int as input (which indicates the
            number of individual sub-environments within the final
            vectorized BaseEnv) and returning one individual
            sub-environment.
        num_envs: The number of sub-environments to create in the
            resulting (vectorized) BaseEnv. The already existing `env`
            will be one of the `num_envs`.
        remote_envs: Whether each sub-env should be a @ray.remote actor.
            You can set this behavior in your config via the
            `remote_worker_envs=True` option.
        remote_env_batch_wait_ms: The wait time (in ms) to poll remote
            sub-environments for, if applicable. Only used if
            `remote_envs` is True.
        policy_config: Optional policy config dict.

    Returns:
        The resulting BaseEnv object.
    """

    from ray.rllib.env.remote_vector_env import RemoteVectorEnv
    if remote_envs and num_envs == 1:
        raise ValueError(
            "Remote envs only make sense to use if num_envs > 1 "
            "(i.e. vectorization is enabled).")

    # Given `env` is already a BaseEnv -> Return as is.
    if isinstance(env, BaseEnv):
        return env

    # `env` is not a BaseEnv yet -> Need to convert/vectorize.

    # MultiAgentEnv (which is a gym.Env).
    if isinstance(env, MultiAgentEnv):
        # Sub-environments are ray.remote actors:
        if remote_envs:
            env = RemoteVectorEnv(
                make_env,
                num_envs,
                multiagent=True,
                remote_env_batch_wait_ms=remote_env_batch_wait_ms)
        # Sub-environments are not ray.remote actors.
        else:
            env = _MultiAgentEnvToBaseEnv(
                make_env=make_env, existing_envs=[env], num_envs=num_envs)
    # ExternalEnv.
    elif isinstance(env, ExternalEnv):
        if num_envs != 1:
            raise ValueError(
                "External(MultiAgent)Env does not currently support "
                "num_envs > 1. One way of solving this would be to "
                "treat your Env as a MultiAgentEnv hosting only one "
                "type of agent but with several copies.")
        env = _ExternalEnvToBaseEnv(env)
    # VectorEnv.
    # Note that all BaseEnvs are also vectorized, but the user may want to
    # define custom vectorization logic and thus implement a custom
    # VectorEnv class.
    elif isinstance(env, VectorEnv):
        env = _VectorEnvToBaseEnv(env)
    # Anything else: This usually implies that env is a gym.Env object.
    else:
        # Sub-environments are ray.remote actors:
        if remote_envs:
            # Determine, whether the already existing sub-env (could
            # be a ray.actor) is multi-agent or not.
            multiagent = ray.get(env._is_multi_agent.remote()) if \
                hasattr(env, "_is_multi_agent") else False
            env = RemoteVectorEnv(
                make_env,
                num_envs,
                multiagent=multiagent,
                remote_env_batch_wait_ms=remote_env_batch_wait_ms,
                existing_envs=[env],
            )
        # Sub-environments are not ray.remote actors.
        else:
            env = VectorEnv.vectorize_gym_envs(
                make_env=make_env,
                existing_envs=[env],
                num_envs=num_envs,
                action_space=env.action_space,
                observation_space=env.observation_space,
            )
            env = _VectorEnvToBaseEnv(env)

    # Make sure conversion went well.
    assert isinstance(env, BaseEnv), env

    return env

try_render(self, env_id=None)

Tries to render the environment.

Parameters:

Name Type Description Default
env_id Optional[int]

The sub-environment's ID if applicable. If None, renders the entire Env (i.e. all sub-environments).

None
Source code in ray/rllib/env/base_env.py
@PublicAPI
def try_render(self, env_id: Optional[EnvID] = None) -> None:
    """Tries to render the environment.

    Args:
        env_id (Optional[int]): The sub-environment's ID if applicable.
            If None, renders the entire Env (i.e. all sub-environments).
    """

    # By default, do nothing.
    pass

try_reset(self, env_id=None)

Attempt to reset the sub-env with the given id or all sub-envs.

If the environment does not support synchronous reset, None can be returned here.

Parameters:

Name Type Description Default
env_id Union[int, str]

The sub-environment's ID if applicable. If None, reset the entire Env (i.e. all sub-environments).

None

Returns:

Type Description
Optional[Dict[Any, Any]]

The reset (multi-agent) observation dict. None if reset is not supported.

Source code in ray/rllib/env/base_env.py
@PublicAPI
def try_reset(self,
              env_id: Optional[EnvID] = None) -> Optional[MultiAgentDict]:
    """Attempt to reset the sub-env with the given id or all sub-envs.

    If the environment does not support synchronous reset, None can be
    returned here.

    Args:
        env_id: The sub-environment's ID if applicable. If None, reset
            the entire Env (i.e. all sub-environments).

    Returns:
        The reset (multi-agent) observation dict. None if reset is not
        supported.
    """
    return None

ray.rllib.env.env_context.EnvContext (dict)

Wraps env configurations to include extra rllib metadata.

These attributes can be used to parameterize environments per process. For example, one might use worker_index to control which data file an environment reads in on initialization.

RLlib auto-sets these attributes when constructing registered envs.

__init__(self, env_config, worker_index, vector_index=0, remote=False, num_workers=None) special

Initializes an EnvContext instance.

Parameters:

Name Type Description Default
env_config dict

The env's configuration defined under the "env_config" key in the Trainer's config.

required
worker_index int

When there are multiple workers created, this uniquely identifies the worker the env is created in. 0 for local worker, >0 for remote workers.

required
num_workers Optional[int]

The total number of (remote) workers in the set. 0 if only a local worker exists.

None
vector_index int

When there are multiple envs per worker, this uniquely identifies the env index within the worker. Starts from 0.

0
remote bool

Whether individual sub-environments (in a vectorized env) should be @ray.remote actors or not.

False
Source code in ray/rllib/env/env_context.py
def __init__(self,
             env_config: EnvConfigDict,
             worker_index: int,
             vector_index: int = 0,
             remote: bool = False,
             num_workers: Optional[int] = None):
    """Initializes an EnvContext instance.

    Args:
        env_config: The env's configuration defined under the
            "env_config" key in the Trainer's config.
        worker_index: When there are multiple workers created, this
            uniquely identifies the worker the env is created in.
            0 for local worker, >0 for remote workers.
        num_workers: The total number of (remote) workers in the set.
            0 if only a local worker exists.
        vector_index: When there are multiple envs per worker, this
            uniquely identifies the env index within the worker.
            Starts from 0.
        remote: Whether individual sub-environments (in a vectorized
            env) should be @ray.remote actors or not.
    """
    # Store the env_config in the (super) dict.
    dict.__init__(self, env_config)

    # Set some metadata attributes.
    self.worker_index = worker_index
    self.vector_index = vector_index
    self.remote = remote
    self.num_workers = num_workers

copy_with_overrides(self, env_config=None, worker_index=None, vector_index=None, remote=None, num_workers=None)

Returns a copy of this EnvContext with some attributes overridden.

Parameters:

Name Type Description Default
env_config Optional[dict]

Optional env config to use. None for not overriding the one from the source (self).

None
worker_index Optional[int]

Optional worker index to use. None for not overriding the one from the source (self).

None
vector_index Optional[int]

Optional vector index to use. None for not overriding the one from the source (self).

None
remote Optional[bool]

Optional remote setting to use. None for not overriding the one from the source (self).

None
num_workers Optional[int]

Optional num_workers to use. None for not overriding the one from the source (self).

None

Returns:

Type Description
EnvContext

A new EnvContext object as a copy of self plus the provided overrides.

Source code in ray/rllib/env/env_context.py
def copy_with_overrides(self,
                        env_config: Optional[EnvConfigDict] = None,
                        worker_index: Optional[int] = None,
                        vector_index: Optional[int] = None,
                        remote: Optional[bool] = None,
                        num_workers: Optional[int] = None) -> "EnvContext":
    """Returns a copy of this EnvContext with some attributes overridden.

    Args:
        env_config: Optional env config to use. None for not overriding
            the one from the source (self).
        worker_index: Optional worker index to use. None for not
            overriding the one from the source (self).
        vector_index: Optional vector index to use. None for not
            overriding the one from the source (self).
        remote: Optional remote setting to use. None for not overriding
            the one from the source (self).
        num_workers: Optional num_workers to use. None for not overriding
            the one from the source (self).

    Returns:
        A new EnvContext object as a copy of self plus the provided
        overrides.
    """
    return EnvContext(
        copy.deepcopy(env_config) if env_config is not None else self,
        worker_index if worker_index is not None else self.worker_index,
        vector_index if vector_index is not None else self.vector_index,
        remote if remote is not None else self.remote,
        num_workers if num_workers is not None else self.num_workers,
    )

ray.rllib.env.external_env.ExternalEnv (Thread)

An environment that interfaces with external agents.

Unlike simulator envs, control is inverted: The environment queries the policy to obtain actions and in return logs observations and rewards for training. This is in contrast to gym.Env, where the algorithm drives the simulation through env.step() calls.

You can use ExternalEnv as the backend for policy serving (by serving HTTP requests in the run loop), for ingesting offline logs data (by reading offline transitions in the run loop), or other custom use cases not easily expressed through gym.Env.

ExternalEnv supports both on-policy actions (through self.get_action()), and off-policy actions (through self.log_action()).

This env is thread-safe, but individual episodes must be executed serially.

Examples:

>>> register_env("my_env", lambda config: YourExternalEnv(config))
>>> trainer = DQNTrainer(env="my_env")
>>> while True:
>>>     print(trainer.train())

__init__(self, action_space, observation_space, max_concurrent=100) special

Initializes an ExternalEnv instance.

Parameters:

Name Type Description Default
action_space Space

Action space of the env.

required
observation_space Space

Observation space of the env.

required
max_concurrent int

Max number of active episodes to allow at once. Exceeding this limit raises an error.

100
Source code in ray/rllib/env/external_env.py
@PublicAPI
def __init__(self,
             action_space: gym.Space,
             observation_space: gym.Space,
             max_concurrent: int = 100):
    """Initializes an ExternalEnv instance.

    Args:
        action_space: Action space of the env.
        observation_space: Observation space of the env.
        max_concurrent: Max number of active episodes to allow at
            once. Exceeding this limit raises an error.
    """

    threading.Thread.__init__(self)

    self.daemon = True
    self.action_space = action_space
    self.observation_space = observation_space
    self._episodes = {}
    self._finished = set()
    self._results_avail_condition = threading.Condition()
    self._max_concurrent_episodes = max_concurrent

end_episode(self, episode_id, observation)

Records the end of an episode.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation Any

Current environment observation.

required
Source code in ray/rllib/env/external_env.py
@PublicAPI
def end_episode(self, episode_id: str, observation: EnvObsType) -> None:
    """Records the end of an episode.

    Args:
        episode_id: Episode id returned from start_episode().
        observation: Current environment observation.
    """

    episode = self._get(episode_id)
    self._finished.add(episode.episode_id)
    episode.done(observation)

get_action(self, episode_id, observation)

Record an observation and get the on-policy action.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation Any

Current environment observation.

required

Returns:

Type Description
Any

Action from the env action space.

Source code in ray/rllib/env/external_env.py
@PublicAPI
def get_action(self, episode_id: str,
               observation: EnvObsType) -> EnvActionType:
    """Record an observation and get the on-policy action.

    Args:
        episode_id: Episode id returned from start_episode().
        observation: Current environment observation.

    Returns:
        Action from the env action space.
    """

    episode = self._get(episode_id)
    return episode.wait_for_action(observation)

log_action(self, episode_id, observation, action)

Record an observation and (off-policy) action taken.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation Any

Current environment observation.

required
action Any

Action for the observation.

required
Source code in ray/rllib/env/external_env.py
@PublicAPI
def log_action(self, episode_id: str, observation: EnvObsType,
               action: EnvActionType) -> None:
    """Record an observation and (off-policy) action taken.

    Args:
        episode_id: Episode id returned from start_episode().
        observation: Current environment observation.
        action: Action for the observation.
    """

    episode = self._get(episode_id)
    episode.log_action(observation, action)

log_returns(self, episode_id, reward, info=None)

Records returns (rewards and infos) from the environment.

The reward will be attributed to the previous action taken by the episode. Rewards accumulate until the next action. If no reward is logged before the next action, a reward of 0.0 is assumed.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
reward float

Reward from the environment.

required
info Optional[dict]

Optional info dict.

None
Source code in ray/rllib/env/external_env.py
@PublicAPI
def log_returns(self,
                episode_id: str,
                reward: float,
                info: Optional[EnvInfoDict] = None) -> None:
    """Records returns (rewards and infos) from the environment.

    The reward will be attributed to the previous action taken by the
    episode. Rewards accumulate until the next action. If no reward is
    logged before the next action, a reward of 0.0 is assumed.

    Args:
        episode_id: Episode id returned from start_episode().
        reward: Reward from the environment.
        info: Optional info dict.
    """

    episode = self._get(episode_id)
    episode.cur_reward += reward

    if info:
        episode.cur_info = info or {}

run(self)

Override this to implement the run loop.

Your loop should continuously: 1. Call self.start_episode(episode_id) 2. Call self.get_action(episode_id, obs) -or- self.log_action(episode_id, obs, action) 3. Call self.log_returns(episode_id, reward) 4. Call self.end_episode(episode_id, obs) 5. Wait if nothing to do.

Multiple episodes may be started at the same time.

Source code in ray/rllib/env/external_env.py
@PublicAPI
def run(self):
    """Override this to implement the run loop.

    Your loop should continuously:
        1. Call self.start_episode(episode_id)
        2. Call self.get_action(episode_id, obs)
                -or-
                self.log_action(episode_id, obs, action)
        3. Call self.log_returns(episode_id, reward)
        4. Call self.end_episode(episode_id, obs)
        5. Wait if nothing to do.

    Multiple episodes may be started at the same time.
    """
    raise NotImplementedError

start_episode(self, episode_id=None, training_enabled=True)

Record the start of an episode.

Parameters:

Name Type Description Default
episode_id Optional[str]

Unique string id for the episode or None for it to be auto-assigned and returned.

None
training_enabled bool

Whether to use experiences for this episode to improve the policy.

True

Returns:

Type Description
str

Unique string id for the episode.

Source code in ray/rllib/env/external_env.py
@PublicAPI
def start_episode(self,
                  episode_id: Optional[str] = None,
                  training_enabled: bool = True) -> str:
    """Record the start of an episode.

    Args:
        episode_id: Unique string id for the episode or
            None for it to be auto-assigned and returned.
        training_enabled: Whether to use experiences for this
            episode to improve the policy.

    Returns:
        Unique string id for the episode.
    """

    if episode_id is None:
        episode_id = uuid.uuid4().hex

    if episode_id in self._finished:
        raise ValueError(
            "Episode {} has already completed.".format(episode_id))

    if episode_id in self._episodes:
        raise ValueError(
            "Episode {} is already started".format(episode_id))

    self._episodes[episode_id] = _ExternalEnvEpisode(
        episode_id, self._results_avail_condition, training_enabled)

    return episode_id

ray.rllib.env.policy_client.PolicyClient

REST client to interact with a RLlib policy server.

__init__(self, address, inference_mode='local', update_interval=10.0) special

Create a PolicyClient instance.

Parameters:

Name Type Description Default
address str

Server to connect to (e.g., "localhost:9090").

required
inference_mode str

Whether to use 'local' or 'remote' policy inference for computing actions.

'local'
update_interval float or None

If using 'local' inference mode, the policy is refreshed after this many seconds have passed, or None for manual control via client.

10.0
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def __init__(self,
             address: str,
             inference_mode: str = "local",
             update_interval: float = 10.0):
    """Create a PolicyClient instance.

    Args:
        address (str): Server to connect to (e.g., "localhost:9090").
        inference_mode (str): Whether to use 'local' or 'remote' policy
            inference for computing actions.
        update_interval (float or None): If using 'local' inference mode,
            the policy is refreshed after this many seconds have passed,
            or None for manual control via client.
    """
    self.address = address
    self.env: ExternalEnv = None
    if inference_mode == "local":
        self.local = True
        self._setup_local_rollout_worker(update_interval)
    elif inference_mode == "remote":
        self.local = False
    else:
        raise ValueError(
            "inference_mode must be either 'local' or 'remote'")

end_episode(self, episode_id, observation)

Record the end of an episode.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation obj

Current environment observation.

required
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def end_episode(self, episode_id: str,
                observation: Union[EnvObsType, MultiAgentDict]) -> None:
    """Record the end of an episode.

    Args:
        episode_id (str): Episode id returned from start_episode().
        observation (obj): Current environment observation.
    """

    if self.local:
        self._update_local_policy()
        return self.env.end_episode(episode_id, observation)

    self._send({
        "command": PolicyClient.END_EPISODE,
        "observation": observation,
        "episode_id": episode_id,
    })

get_action(self, episode_id, observation)

Record an observation and get the on-policy action.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation obj

Current environment observation.

required

Returns:

Type Description
action (obj)

Action from the env action space.

Source code in ray/rllib/env/policy_client.py
@PublicAPI
def get_action(self, episode_id: str,
               observation: Union[EnvObsType, MultiAgentDict]
               ) -> Union[EnvActionType, MultiAgentDict]:
    """Record an observation and get the on-policy action.

    Args:
        episode_id (str): Episode id returned from start_episode().
        observation (obj): Current environment observation.

    Returns:
        action (obj): Action from the env action space.
    """

    if self.local:
        self._update_local_policy()
        if isinstance(episode_id, (list, tuple)):
            actions = {
                eid: self.env.get_action(eid, observation[eid])
                for eid in episode_id
            }
            return actions
        else:
            return self.env.get_action(episode_id, observation)
    else:
        return self._send({
            "command": PolicyClient.GET_ACTION,
            "observation": observation,
            "episode_id": episode_id,
        })["action"]

log_action(self, episode_id, observation, action)

Record an observation and (off-policy) action taken.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation obj

Current environment observation.

required
action obj

Action for the observation.

required
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_action(self, episode_id: str,
               observation: Union[EnvObsType, MultiAgentDict],
               action: Union[EnvActionType, MultiAgentDict]) -> None:
    """Record an observation and (off-policy) action taken.

    Args:
        episode_id (str): Episode id returned from start_episode().
        observation (obj): Current environment observation.
        action (obj): Action for the observation.
    """

    if self.local:
        self._update_local_policy()
        return self.env.log_action(episode_id, observation, action)

    self._send({
        "command": PolicyClient.LOG_ACTION,
        "observation": observation,
        "action": action,
        "episode_id": episode_id,
    })

log_returns(self, episode_id, reward, info=None, multiagent_done_dict=None)

Record returns from the environment.

The reward will be attributed to the previous action taken by the episode. Rewards accumulate until the next action. If no reward is logged before the next action, a reward of 0.0 is assumed.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
reward float

Reward from the environment.

required
info dict

Extra info dict.

None
multiagent_done_dict dict

Multi-agent done information.

None
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_returns(
        self,
        episode_id: str,
        reward: int,
        info: Union[EnvInfoDict, MultiAgentDict] = None,
        multiagent_done_dict: Optional[MultiAgentDict] = None) -> None:
    """Record returns from the environment.

    The reward will be attributed to the previous action taken by the
    episode. Rewards accumulate until the next action. If no reward is
    logged before the next action, a reward of 0.0 is assumed.

    Args:
        episode_id (str): Episode id returned from start_episode().
        reward (float): Reward from the environment.
        info (dict): Extra info dict.
        multiagent_done_dict (dict): Multi-agent done information.
    """

    if self.local:
        self._update_local_policy()
        if multiagent_done_dict is not None:
            assert isinstance(reward, dict)
            return self.env.log_returns(episode_id, reward, info,
                                        multiagent_done_dict)
        return self.env.log_returns(episode_id, reward, info)

    self._send({
        "command": PolicyClient.LOG_RETURNS,
        "reward": reward,
        "info": info,
        "episode_id": episode_id,
        "done": multiagent_done_dict,
    })

start_episode(self, episode_id=None, training_enabled=True)

Record the start of one or more episode(s).

Parameters:

Name Type Description Default
episode_id Optional[str]

Unique string id for the episode or None for it to be auto-assigned.

None
training_enabled bool

Whether to use experiences for this episode to improve the policy.

True

Returns:

Type Description
episode_id (str)

Unique string id for the episode.

Source code in ray/rllib/env/policy_client.py
@PublicAPI
def start_episode(self,
                  episode_id: Optional[str] = None,
                  training_enabled: bool = True) -> str:
    """Record the start of one or more episode(s).

    Args:
        episode_id (Optional[str]): Unique string id for the episode or
            None for it to be auto-assigned.
        training_enabled (bool): Whether to use experiences for this
            episode to improve the policy.

    Returns:
        episode_id (str): Unique string id for the episode.
    """

    if self.local:
        self._update_local_policy()
        return self.env.start_episode(episode_id, training_enabled)

    return self._send({
        "episode_id": episode_id,
        "command": PolicyClient.START_EPISODE,
        "training_enabled": training_enabled,
    })["episode_id"]

update_policy_weights(self)

Query the server for new policy weights, if local inference is enabled.

Source code in ray/rllib/env/policy_client.py
@PublicAPI
def update_policy_weights(self) -> None:
    """Query the server for new policy weights, if local inference is enabled.
    """
    self._update_local_policy(force=True)

ray.rllib.env.policy_server_input.PolicyServerInput (ThreadingMixIn, HTTPServer, InputReader)

REST policy server that acts as an offline data source.

This launches a multi-threaded server that listens on the specified host and port to serve policy requests and forward experiences to RLlib. For high performance experience collection, it implements InputReader.

For an example, run examples/serving/cartpole_server.py along with examples/serving/cartpole_client.py --inference-mode=local|remote.

Examples:

>>> pg = PGTrainer(
...     env="CartPole-v0", config={
...         "input": lambda ioctx:
...             PolicyServerInput(ioctx, addr, port),
...         "num_workers": 0,  # Run just 1 server, in the trainer.
...     }
>>> while True:
>>>     pg.train()
>>> client = PolicyClient("localhost:9900", inference_mode="local")
>>> eps_id = client.start_episode()
>>> action = client.get_action(eps_id, obs)
>>> ...
>>> client.log_returns(eps_id, reward)
>>> ...
>>> client.log_returns(eps_id, reward)

__init__(self, ioctx, address, port, idle_timeout=3.0) special

Create a PolicyServerInput.

This class implements rllib.offline.InputReader, and can be used with any Trainer by configuring

{"num_workers": 0,
 "input": lambda ioctx: PolicyServerInput(ioctx, addr, port)}

Note that by setting num_workers: 0, the trainer will only create one rollout worker / PolicyServerInput. Clients can connect to the launched server using rllib.env.PolicyClient.

Parameters:

Name Type Description Default
ioctx IOContext

IOContext provided by RLlib.

required
address str

Server addr (e.g., "localhost").

required
port int

Server port (e.g., 9900).

required
Source code in ray/rllib/env/policy_server_input.py
@PublicAPI
def __init__(self, ioctx, address, port, idle_timeout=3.0):
    """Create a PolicyServerInput.

    This class implements rllib.offline.InputReader, and can be used with
    any Trainer by configuring

        {"num_workers": 0,
         "input": lambda ioctx: PolicyServerInput(ioctx, addr, port)}

    Note that by setting num_workers: 0, the trainer will only create one
    rollout worker / PolicyServerInput. Clients can connect to the launched
    server using rllib.env.PolicyClient.

    Args:
        ioctx (IOContext): IOContext provided by RLlib.
        address (str): Server addr (e.g., "localhost").
        port (int): Server port (e.g., 9900).
    """

    self.rollout_worker = ioctx.worker
    self.samples_queue = queue.Queue()
    self.metrics_queue = queue.Queue()
    self.idle_timeout = idle_timeout

    def get_metrics():
        completed = []
        while True:
            try:
                completed.append(self.metrics_queue.get_nowait())
            except queue.Empty:
                break
        return completed

    # Forwards client-reported rewards directly into the local rollout
    # worker. This is a bit of a hack since it is patching the get_metrics
    # function of the sampler.
    if self.rollout_worker.sampler is not None:
        self.rollout_worker.sampler.get_metrics = get_metrics

    # Create a request handler that receives commands from the clients
    # and sends data and metrics into the queues.
    handler = _make_handler(self.rollout_worker, self.samples_queue,
                            self.metrics_queue)
    try:
        import time
        time.sleep(1)
        HTTPServer.__init__(self, (address, port), handler)
    except OSError:
        print(f"Creating a PolicyServer on {address}:{port} failed!")
        import time
        time.sleep(1)
        raise

    logger.info("Starting connector server at "
                f"{self.server_name}:{self.server_port}")

    # Start the serving thread, listening on socket and handling commands.
    serving_thread = threading.Thread(
        name="server", target=self.serve_forever)
    serving_thread.daemon = True
    serving_thread.start()

    # Start a dummy thread that puts empty SampleBatches on the queue, just
    # in case we don't receive anything from clients (or there aren't
    # any). The latter would block sample collection entirely otherwise,
    # even if other workers' PolicyServerInput receive incoming data from
    # actual clients.
    heart_beat_thread = threading.Thread(
        name="heart-beat", target=self._put_empty_sample_batch_every_n_sec)
    heart_beat_thread.daemon = True
    heart_beat_thread.start()

next(self)

Returns the next batch of read experiences.

Returns:

Type Description

The experience read (SampleBatch or MultiAgentBatch).

Source code in ray/rllib/env/policy_server_input.py
@override(InputReader)
def next(self):
    return self.samples_queue.get()

ray.rllib.env.policy_client.PolicyClient

REST client to interact with a RLlib policy server.

__init__(self, address, inference_mode='local', update_interval=10.0) special

Create a PolicyClient instance.

Parameters:

Name Type Description Default
address str

Server to connect to (e.g., "localhost:9090").

required
inference_mode str

Whether to use 'local' or 'remote' policy inference for computing actions.

'local'
update_interval float or None

If using 'local' inference mode, the policy is refreshed after this many seconds have passed, or None for manual control via client.

10.0
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def __init__(self,
             address: str,
             inference_mode: str = "local",
             update_interval: float = 10.0):
    """Create a PolicyClient instance.

    Args:
        address (str): Server to connect to (e.g., "localhost:9090").
        inference_mode (str): Whether to use 'local' or 'remote' policy
            inference for computing actions.
        update_interval (float or None): If using 'local' inference mode,
            the policy is refreshed after this many seconds have passed,
            or None for manual control via client.
    """
    self.address = address
    self.env: ExternalEnv = None
    if inference_mode == "local":
        self.local = True
        self._setup_local_rollout_worker(update_interval)
    elif inference_mode == "remote":
        self.local = False
    else:
        raise ValueError(
            "inference_mode must be either 'local' or 'remote'")

end_episode(self, episode_id, observation)

Record the end of an episode.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation obj

Current environment observation.

required
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def end_episode(self, episode_id: str,
                observation: Union[EnvObsType, MultiAgentDict]) -> None:
    """Record the end of an episode.

    Args:
        episode_id (str): Episode id returned from start_episode().
        observation (obj): Current environment observation.
    """

    if self.local:
        self._update_local_policy()
        return self.env.end_episode(episode_id, observation)

    self._send({
        "command": PolicyClient.END_EPISODE,
        "observation": observation,
        "episode_id": episode_id,
    })

get_action(self, episode_id, observation)

Record an observation and get the on-policy action.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation obj

Current environment observation.

required

Returns:

Type Description
action (obj)

Action from the env action space.

Source code in ray/rllib/env/policy_client.py
@PublicAPI
def get_action(self, episode_id: str,
               observation: Union[EnvObsType, MultiAgentDict]
               ) -> Union[EnvActionType, MultiAgentDict]:
    """Record an observation and get the on-policy action.

    Args:
        episode_id (str): Episode id returned from start_episode().
        observation (obj): Current environment observation.

    Returns:
        action (obj): Action from the env action space.
    """

    if self.local:
        self._update_local_policy()
        if isinstance(episode_id, (list, tuple)):
            actions = {
                eid: self.env.get_action(eid, observation[eid])
                for eid in episode_id
            }
            return actions
        else:
            return self.env.get_action(episode_id, observation)
    else:
        return self._send({
            "command": PolicyClient.GET_ACTION,
            "observation": observation,
            "episode_id": episode_id,
        })["action"]

log_action(self, episode_id, observation, action)

Record an observation and (off-policy) action taken.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
observation obj

Current environment observation.

required
action obj

Action for the observation.

required
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_action(self, episode_id: str,
               observation: Union[EnvObsType, MultiAgentDict],
               action: Union[EnvActionType, MultiAgentDict]) -> None:
    """Record an observation and (off-policy) action taken.

    Args:
        episode_id (str): Episode id returned from start_episode().
        observation (obj): Current environment observation.
        action (obj): Action for the observation.
    """

    if self.local:
        self._update_local_policy()
        return self.env.log_action(episode_id, observation, action)

    self._send({
        "command": PolicyClient.LOG_ACTION,
        "observation": observation,
        "action": action,
        "episode_id": episode_id,
    })

log_returns(self, episode_id, reward, info=None, multiagent_done_dict=None)

Record returns from the environment.

The reward will be attributed to the previous action taken by the episode. Rewards accumulate until the next action. If no reward is logged before the next action, a reward of 0.0 is assumed.

Parameters:

Name Type Description Default
episode_id str

Episode id returned from start_episode().

required
reward float

Reward from the environment.

required
info dict

Extra info dict.

None
multiagent_done_dict dict

Multi-agent done information.

None
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_returns(
        self,
        episode_id: str,
        reward: int,
        info: Union[EnvInfoDict, MultiAgentDict] = None,
        multiagent_done_dict: Optional[MultiAgentDict] = None) -> None:
    """Record returns from the environment.

    The reward will be attributed to the previous action taken by the
    episode. Rewards accumulate until the next action. If no reward is
    logged before the next action, a reward of 0.0 is assumed.

    Args:
        episode_id (str): Episode id returned from start_episode().
        reward (float): Reward from the environment.
        info (dict): Extra info dict.
        multiagent_done_dict (dict): Multi-agent done information.
    """

    if self.local:
        self._update_local_policy()
        if multiagent_done_dict is not None:
            assert isinstance(reward, dict)
            return self.env.log_returns(episode_id, reward, info,
                                        multiagent_done_dict)
        return self.env.log_returns(episode_id, reward, info)

    self._send({
        "command": PolicyClient.LOG_RETURNS,
        "reward": reward,
        "info": info,
        "episode_id": episode_id,
        "done": multiagent_done_dict,
    })

start_episode(self, episode_id=None, training_enabled=True)

Record the start of one or more episode(s).

Parameters:

Name Type Description Default
episode_id Optional[str]

Unique string id for the episode or None for it to be auto-assigned.

None
training_enabled bool

Whether to use experiences for this episode to improve the policy.

True

Returns:

Type Description
episode_id (str)

Unique string id for the episode.

Source code in ray/rllib/env/policy_client.py
@PublicAPI
def start_episode(self,
                  episode_id: Optional[str] = None,
                  training_enabled: bool = True) -> str:
    """Record the start of one or more episode(s).

    Args:
        episode_id (Optional[str]): Unique string id for the episode or
            None for it to be auto-assigned.
        training_enabled (bool): Whether to use experiences for this
            episode to improve the policy.

    Returns:
        episode_id (str): Unique string id for the episode.
    """

    if self.local:
        self._update_local_policy()
        return self.env.start_episode(episode_id, training_enabled)

    return self._send({
        "episode_id": episode_id,
        "command": PolicyClient.START_EPISODE,
        "training_enabled": training_enabled,
    })["episode_id"]

update_policy_weights(self)

Query the server for new policy weights, if local inference is enabled.

Source code in ray/rllib/env/policy_client.py
@PublicAPI
def update_policy_weights(self) -> None:
    """Query the server for new policy weights, if local inference is enabled.
    """
    self._update_local_policy(force=True)

ray.rllib.env.remote_vector_env.RemoteVectorEnv (BaseEnv)

Vector env that executes envs in remote workers.

This provides dynamic batching of inference as observations are returned from the remote simulator actors. Both single and multi-agent child envs are supported, and envs can be stepped synchronously or async.

You shouldn't need to instantiate this class directly. It's automatically inserted when you use the remote_worker_envs=True option in your Trainer's config.

__init__(self, make_env, num_envs, multiagent, remote_env_batch_wait_ms, existing_envs=None) special

Initializes a RemoteVectorEnv instance.

Parameters:

Name Type Description Default
make_env Callable[[int], Any]

Callable that produces a single (non-vectorized) env, given the vector env index as only arg.

required
num_envs int

The number of sub-envs to create for the vectorization.

required
multiagent bool

Whether this is a multiagent env or not.

required
remote_env_batch_wait_ms int

Time to wait for (ray.remote) sub-environments to have new observations available when polled. When none of the sub-envs is ready, simply repeat the ray.wait call until at least one sub-env is ready.

required
existing_envs Optional[List[ray.actor.ActorHandle]]

Optional list of already created sub-envs. These will be used as-is and only as many new sub-envs as necessary (num_envs - len(existing_envs)) will be created.

None
Source code in ray/rllib/env/remote_vector_env.py
def __init__(self,
             make_env: Callable[[int], EnvType],
             num_envs: int,
             multiagent: bool,
             remote_env_batch_wait_ms: int,
             existing_envs: Optional[List[ray.actor.ActorHandle]] = None):
    """Initializes a RemoteVectorEnv instance.

    Args:
        make_env: Callable that produces a single (non-vectorized) env,
            given the vector env index as only arg.
        num_envs: The number of sub-envs to create for the vectorization.
        multiagent: Whether this is a multiagent env or not.
        remote_env_batch_wait_ms: Time to wait for (ray.remote)
            sub-environments to have new observations available when
            polled. When none of the sub-envs is ready, simply repeat the
            ray.wait call until at least one sub-env is ready.
        existing_envs: Optional list of already created sub-envs.
            These will be used as-is and only as many new sub-envs as
            necessary (`num_envs - len(existing_envs)`) will be created.
    """
    # Could be creating local or remote envs.
    self.make_env = make_env
    # Whether the given `make_env` callable already returns ray.remote
    # objects or not.
    self.make_env_creates_actors = False
    # Already existing env objects (generated by the RolloutWorker).
    self.existing_envs = existing_envs or []
    self.num_envs = num_envs
    self.multiagent = multiagent
    self.poll_timeout = remote_env_batch_wait_ms / 1000

    self.actors = None  # lazy init
    self.pending = None  # lazy init

get_sub_environments(self)

Return a reference to the underlying sub environments, if any.

Returns:

Type Description

List of the underlying sub environments or [].

Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
@PublicAPI
def get_sub_environments(self):
    return self.actors

poll(self)

Returns observations from ready agents.

All returns are two-level dicts mapping from env_id to dicts of agent_ids to values. The number of agents and envs can vary over time.

Returns:

Type Description
Tuple[Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]]]

Tuple consisting of 1) New observations for each ready agent. 2) Reward values for each ready agent. If the episode is just started, the value will be None. 3) Done values for each ready agent. The special key "all" is used to indicate env termination. 4) Info values for each ready agent. 5) Agents may take off-policy actions. When that happens, there will be an entry in this dict that contains the taken action. There is no need to send_actions() for agents that have already chosen off-policy actions.

Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,
                        MultiEnvDict, MultiEnvDict]:
    if self.actors is None:
        # `self.make_env` already produces Actors: Use it directly.
        if len(self.existing_envs) > 0 and isinstance(
                self.existing_envs[0], ray.actor.ActorHandle):
            self.make_env_creates_actors = True
            self.actors = []
            while len(self.actors) < self.num_envs:
                self.actors.append(self.make_env(len(self.actors)))
        # `self.make_env` produces gym.Envs (or children thereof, such
        # as MultiAgentEnv): Need to auto-wrap it here. The problem with
        # this is that custom methods wil get lost. If you would like to
        # keep your custom methods in your envs, you should provide the
        # env class directly in your config (w/o tune.register_env()),
        # such that your class will directly be made a @ray.remote
        # (w/o the wrapping via `_Remote[Multi|Single]AgentEnv`).
        else:

            def make_remote_env(i):
                logger.info("Launching env {} in remote actor".format(i))
                if self.multiagent:
                    return _RemoteMultiAgentEnv.remote(self.make_env, i)
                else:
                    return _RemoteSingleAgentEnv.remote(self.make_env, i)

            self.actors = [
                make_remote_env(i) for i in range(self.num_envs)
            ]

    if self.pending is None:
        self.pending = {a.reset.remote(): a for a in self.actors}

    # each keyed by env_id in [0, num_remote_envs)
    obs, rewards, dones, infos = {}, {}, {}, {}
    ready = []

    # Wait for at least 1 env to be ready here
    while not ready:
        ready, _ = ray.wait(
            list(self.pending),
            num_returns=len(self.pending),
            timeout=self.poll_timeout)

    # Get and return observations for each of the ready envs
    env_ids = set()
    for obj_ref in ready:
        actor = self.pending.pop(obj_ref)
        env_id = self.actors.index(actor)
        env_ids.add(env_id)
        ret = ray.get(obj_ref)
        # Our sub-envs are simple Actor-turned gym.Envs or MultiAgentEnvs.
        if self.make_env_creates_actors:
            rew, done, info = None, None, None
            if self.multiagent:
                if isinstance(ret, tuple) and len(ret) == 4:
                    ob, rew, done, info = ret
                else:
                    ob = ret
            else:
                if isinstance(ret, tuple) and len(ret) == 4:
                    ob = {_DUMMY_AGENT_ID: ret[0]}
                    rew = {_DUMMY_AGENT_ID: ret[1]}
                    done = {_DUMMY_AGENT_ID: ret[2], "__all__": ret[2]}
                    info = {_DUMMY_AGENT_ID: ret[3]}
                else:
                    ob = {_DUMMY_AGENT_ID: ret}

            if rew is None:
                rew = {agent_id: 0 for agent_id in ob.keys()}
                done = {"__all__": False}
                info = {agent_id: {} for agent_id in ob.keys()}
        # Our sub-envs are auto-wrapped and already behave like multi-agent
        # envs.
        else:
            ob, rew, done, info = ret
        obs[env_id] = ob
        rewards[env_id] = rew
        dones[env_id] = done
        infos[env_id] = info

    logger.debug("Got obs batch for actors {}".format(env_ids))
    return obs, rewards, dones, infos, {}

send_actions(self, action_dict)

Called to send actions back to running agents in this env.

Actions should be sent for each ready agent that returned observations in the previous poll() call.

Parameters:

Name Type Description Default
action_dict Dict[Union[int, str], Dict[Any, Any]]

Actions values keyed by env_id and agent_id.

required
Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
@PublicAPI
def send_actions(self, action_dict: MultiEnvDict) -> None:
    for env_id, actions in action_dict.items():
        actor = self.actors[env_id]
        # `actor` is a simple single-agent (remote) env, e.g. a gym.Env
        # that was made a @ray.remote.
        if not self.multiagent and self.make_env_creates_actors:
            obj_ref = actor.step.remote(actions[_DUMMY_AGENT_ID])
        # `actor` is already a _RemoteSingleAgentEnv or
        # _RemoteMultiAgentEnv wrapper
        # (handles the multi-agent action_dict automatically).
        else:
            obj_ref = actor.step.remote(actions)
        self.pending[obj_ref] = actor

stop(self)

Releases all resources used.

Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
@PublicAPI
def stop(self) -> None:
    if self.actors is not None:
        for actor in self.actors:
            actor.__ray_terminate__.remote()

try_reset(self, env_id=None)

Attempt to reset the sub-env with the given id or all sub-envs.

If the environment does not support synchronous reset, None can be returned here.

Parameters:

Name Type Description Default
env_id Union[int, str]

The sub-environment's ID if applicable. If None, reset the entire Env (i.e. all sub-environments).

None

Returns:

Type Description
Optional[Dict[Any, Any]]

The reset (multi-agent) observation dict. None if reset is not supported.

Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
@PublicAPI
def try_reset(self,
              env_id: Optional[EnvID] = None) -> Optional[MultiAgentDict]:
    actor = self.actors[env_id]
    obj_ref = actor.reset.remote()
    self.pending[obj_ref] = actor
    return ASYNC_RESET_RETURN

ray.rllib.env.vector_env.VectorEnv

An environment that supports batch evaluation using clones of sub-envs.

__init__(self, observation_space, action_space, num_envs) special

Initializes a VectorEnv instance.

Parameters:

Name Type Description Default
observation_space Space

The observation Space of a single sub-env.

required
action_space Space

The action Space of a single sub-env.

required
num_envs int

The number of clones to make of the given sub-env.

required
Source code in ray/rllib/env/vector_env.py
def __init__(self, observation_space: gym.Space, action_space: gym.Space,
             num_envs: int):
    """Initializes a VectorEnv instance.

    Args:
        observation_space: The observation Space of a single
            sub-env.
        action_space: The action Space of a single sub-env.
        num_envs: The number of clones to make of the given sub-env.
    """
    self.observation_space = observation_space
    self.action_space = action_space
    self.num_envs = num_envs

get_sub_environments(self)

Returns the underlying sub environments.

Returns:

Type Description
List[Any]

List of all underlying sub environments.

Source code in ray/rllib/env/vector_env.py
@PublicAPI
def get_sub_environments(self) -> List[EnvType]:
    """Returns the underlying sub environments.

    Returns:
        List of all underlying sub environments.
    """
    return []

reset_at(self, index=None)

Resets a single environment.

Parameters:

Name Type Description Default
index Optional[int]

An optional sub-env index to reset.

None

Returns:

Type Description
Any

Observations from the reset sub environment.

Source code in ray/rllib/env/vector_env.py
@PublicAPI
def reset_at(self, index: Optional[int] = None) -> EnvObsType:
    """Resets a single environment.

    Args:
        index: An optional sub-env index to reset.

    Returns:
        Observations from the reset sub environment.
    """
    raise NotImplementedError

try_render_at(self, index=None)

Renders a single environment.

Parameters:

Name Type Description Default
index Optional[int]

An optional sub-env index to render.

None

Returns:

Type Description
Optional[numpy.ndarray]

Either a numpy RGB image (shape=(w x h x 3) dtype=uint8) or None in case rendering is handled directly by this method.

Source code in ray/rllib/env/vector_env.py
def try_render_at(self, index: Optional[int] = None) -> \
        Optional[np.ndarray]:
    """Renders a single environment.

    Args:
        index: An optional sub-env index to render.

    Returns:
        Either a numpy RGB image (shape=(w x h x 3) dtype=uint8) or
        None in case rendering is handled directly by this method.
    """
    pass

vector_reset(self)

Resets all sub-environments.

Returns:

Type Description
List[Any]

List of observations from each environment.

Source code in ray/rllib/env/vector_env.py
@PublicAPI
def vector_reset(self) -> List[EnvObsType]:
    """Resets all sub-environments.

    Returns:
        List of observations from each environment.
    """
    raise NotImplementedError

vector_step(self, actions)

Performs a vectorized step on all sub environments using actions.

Parameters:

Name Type Description Default
actions List[Any]

List of actions (one for each sub-env).

required

Returns:

Type Description
Tuple[List[Any], List[float], List[bool], List[dict]]

A tuple consisting of 1) New observations for each sub-env. 2) Reward values for each sub-env. 3) Done values for each sub-env. 4) Info values for each sub-env.

Source code in ray/rllib/env/vector_env.py
@PublicAPI
def vector_step(
        self, actions: List[EnvActionType]
) -> Tuple[List[EnvObsType], List[float], List[bool], List[EnvInfoDict]]:
    """Performs a vectorized step on all sub environments using `actions`.

    Args:
        actions: List of actions (one for each sub-env).

    Returns:
        A tuple consisting of
        1) New observations for each sub-env.
        2) Reward values for each sub-env.
        3) Done values for each sub-env.
        4) Info values for each sub-env.
    """
    raise NotImplementedError

vectorize_gym_envs(make_env=None, existing_envs=None, num_envs=1, action_space=None, observation_space=None, env_config=None, policy_config=None) staticmethod

Translates any given gym.Env(s) into a VectorizedEnv object.

Parameters:

Name Type Description Default
make_env Optional[Callable[[int], Any]]

Factory that produces a new gym.Env taking the sub-env's vector index as only arg. Must be defined if the number of existing_envs is less than num_envs.

None
existing_envs Optional[List[gym.core.Env]]

Optional list of already instantiated sub environments.

None
num_envs int

Total number of sub environments in this VectorEnv.

1
action_space Optional[gym.spaces.space.Space]

The action space. If None, use existing_envs[0]'s action space.

None
observation_space Optional[gym.spaces.space.Space]

The observation space. If None, use existing_envs[0]'s action space.

None

Returns:

Type Description
_VectorizedGymEnv

The resulting _VectorizedGymEnv object (subclass of VectorEnv).

Source code in ray/rllib/env/vector_env.py
@staticmethod
def vectorize_gym_envs(
        make_env: Optional[Callable[[int], EnvType]] = None,
        existing_envs: Optional[List[gym.Env]] = None,
        num_envs: int = 1,
        action_space: Optional[gym.Space] = None,
        observation_space: Optional[gym.Space] = None,
        # Deprecated. These seem to have never been used.
        env_config=None,
        policy_config=None) -> "_VectorizedGymEnv":
    """Translates any given gym.Env(s) into a VectorizedEnv object.

    Args:
        make_env: Factory that produces a new gym.Env taking the sub-env's
            vector index as only arg. Must be defined if the
            number of `existing_envs` is less than `num_envs`.
        existing_envs: Optional list of already instantiated sub
            environments.
        num_envs: Total number of sub environments in this VectorEnv.
        action_space: The action space. If None, use existing_envs[0]'s
            action space.
        observation_space: The observation space. If None, use
            existing_envs[0]'s action space.

    Returns:
        The resulting _VectorizedGymEnv object (subclass of VectorEnv).
    """
    return _VectorizedGymEnv(
        make_env=make_env,
        existing_envs=existing_envs or [],
        num_envs=num_envs,
        observation_space=observation_space,
        action_space=action_space,
    )
Back to top