env
package
ray.rllib.env.base_env.BaseEnv
The lowest-level env interface used by RLlib for sampling.
BaseEnv models multiple agents executing asynchronously in multiple environments. A call to poll() returns observations from ready agents keyed by their environment and agent ids, and actions for those agents can be sent back via send_actions().
All other env types can be adapted to BaseEnv. RLlib handles these conversions internally in RolloutWorker, for example:
gym.Env => rllib.VectorEnv => rllib.BaseEnv rllib.MultiAgentEnv (is-a gym.Env) => rllib.VectorEnv => rllib.BaseEnv rllib.ExternalEnv => rllib.BaseEnv
Attributes:
Name | Type | Description |
---|---|---|
action_space |
gym.Space |
Action space. This must be defined for single-agent envs. Multi-agent envs can set this to None. |
observation_space |
gym.Space |
Observation space. This must be defined for single-agent envs. Multi-agent envs can set this to None. |
Examples:
>>> env = MyBaseEnv()
>>> obs, rewards, dones, infos, off_policy_actions = env.poll()
>>> print(obs)
{
"env_0": {
"car_0": [2.4, 1.6],
"car_1": [3.4, -3.2],
},
"env_1": {
"car_0": [8.0, 4.1],
},
"env_2": {
"car_0": [2.3, 3.3],
"car_1": [1.4, -0.2],
"car_3": [1.2, 0.1],
},
}
>>> env.send_actions(
actions={
"env_0": {
"car_0": 0,
"car_1": 1,
}, ...
})
>>> obs, rewards, dones, infos, off_policy_actions = env.poll()
>>> print(obs)
{
"env_0": {
"car_0": [4.1, 1.7],
"car_1": [3.2, -4.2],
}, ...
}
>>> print(dones)
{
"env_0": {
"__all__": False,
"car_0": False,
"car_1": True,
}, ...
}
get_sub_environments(self)
Return a reference to the underlying sub environments, if any.
Returns:
Type | Description |
---|---|
List[Any] |
List of the underlying sub environments or []. |
poll(self)
Returns observations from ready agents.
All returns are two-level dicts mapping from env_id to dicts of agent_ids to values. The number of agents and envs can vary over time.
Returns:
Type | Description |
---|---|
Tuple[Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]]] |
Tuple consisting of 1) New observations for each ready agent. 2) Reward values for each ready agent. If the episode is just started, the value will be None. 3) Done values for each ready agent. The special key "all" is used to indicate env termination. 4) Info values for each ready agent. 5) Agents may take off-policy actions. When that happens, there will be an entry in this dict that contains the taken action. There is no need to send_actions() for agents that have already chosen off-policy actions. |
Source code in ray/rllib/env/base_env.py
@PublicAPI
def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,
MultiEnvDict, MultiEnvDict]:
"""Returns observations from ready agents.
All returns are two-level dicts mapping from env_id to dicts of
agent_ids to values. The number of agents and envs can vary over time.
Returns:
Tuple consisting of
1) New observations for each ready agent.
2) Reward values for each ready agent. If the episode is
just started, the value will be None.
3) Done values for each ready agent. The special key "__all__"
is used to indicate env termination.
4) Info values for each ready agent.
5) Agents may take off-policy actions. When that
happens, there will be an entry in this dict that contains the
taken action. There is no need to send_actions() for agents that
have already chosen off-policy actions.
"""
raise NotImplementedError
send_actions(self, action_dict)
Called to send actions back to running agents in this env.
Actions should be sent for each ready agent that returned observations in the previous poll() call.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_dict |
Dict[Union[int, str], Dict[Any, Any]] |
Actions values keyed by env_id and agent_id. |
required |
Source code in ray/rllib/env/base_env.py
@PublicAPI
def send_actions(self, action_dict: MultiEnvDict) -> None:
"""Called to send actions back to running agents in this env.
Actions should be sent for each ready agent that returned observations
in the previous poll() call.
Args:
action_dict: Actions values keyed by env_id and agent_id.
"""
raise NotImplementedError
stop(self)
to_base_env(env, make_env=None, num_envs=1, remote_envs=False, remote_env_batch_wait_ms=0, policy_config=None)
staticmethod
Converts an RLlib-supported env into a BaseEnv object.
Supported types for the given env
arg are gym.Env, BaseEnv,
VectorEnv, MultiAgentEnv, or ExternalEnv.
The resulting BaseEnv is always vectorized (contains n
sub-environments) for batched forward passes, where n may also be 1.
BaseEnv also supports async execution via the poll
and send_actions
methods and thus supports external simulators.
TODO: Support gym3 environments, which are already vectorized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
env |
Any |
An already existing environment of any supported env type to convert/wrap into a BaseEnv. Supported types are gym.Env, BaseEnv, VectorEnv, MultiAgentEnv, ExternalEnv, or ExternalMultiAgentEnv. |
required |
make_env |
Callable[[int], Any] |
A callable taking an int as input (which indicates the number of individual sub-environments within the final vectorized BaseEnv) and returning one individual sub-environment. |
None |
num_envs |
int |
The number of sub-environments to create in the
resulting (vectorized) BaseEnv. The already existing |
1 |
remote_envs |
bool |
Whether each sub-env should be a @ray.remote actor.
You can set this behavior in your config via the
|
False |
remote_env_batch_wait_ms |
int |
The wait time (in ms) to poll remote
sub-environments for, if applicable. Only used if
|
0 |
policy_config |
Optional[dict] |
Optional policy config dict. |
None |
Returns:
Type | Description |
---|---|
BaseEnv |
The resulting BaseEnv object. |
Source code in ray/rllib/env/base_env.py
@staticmethod
def to_base_env(
env: EnvType,
make_env: Callable[[int], EnvType] = None,
num_envs: int = 1,
remote_envs: bool = False,
remote_env_batch_wait_ms: int = 0,
policy_config: Optional[PartialTrainerConfigDict] = None,
) -> "BaseEnv":
"""Converts an RLlib-supported env into a BaseEnv object.
Supported types for the given `env` arg are gym.Env, BaseEnv,
VectorEnv, MultiAgentEnv, or ExternalEnv.
The resulting BaseEnv is always vectorized (contains n
sub-environments) for batched forward passes, where n may also be 1.
BaseEnv also supports async execution via the `poll` and `send_actions`
methods and thus supports external simulators.
TODO: Support gym3 environments, which are already vectorized.
Args:
env: An already existing environment of any supported env type
to convert/wrap into a BaseEnv. Supported types are gym.Env,
BaseEnv, VectorEnv, MultiAgentEnv, ExternalEnv, or
ExternalMultiAgentEnv.
make_env: A callable taking an int as input (which indicates the
number of individual sub-environments within the final
vectorized BaseEnv) and returning one individual
sub-environment.
num_envs: The number of sub-environments to create in the
resulting (vectorized) BaseEnv. The already existing `env`
will be one of the `num_envs`.
remote_envs: Whether each sub-env should be a @ray.remote actor.
You can set this behavior in your config via the
`remote_worker_envs=True` option.
remote_env_batch_wait_ms: The wait time (in ms) to poll remote
sub-environments for, if applicable. Only used if
`remote_envs` is True.
policy_config: Optional policy config dict.
Returns:
The resulting BaseEnv object.
"""
from ray.rllib.env.remote_vector_env import RemoteVectorEnv
if remote_envs and num_envs == 1:
raise ValueError(
"Remote envs only make sense to use if num_envs > 1 "
"(i.e. vectorization is enabled).")
# Given `env` is already a BaseEnv -> Return as is.
if isinstance(env, BaseEnv):
return env
# `env` is not a BaseEnv yet -> Need to convert/vectorize.
# MultiAgentEnv (which is a gym.Env).
if isinstance(env, MultiAgentEnv):
# Sub-environments are ray.remote actors:
if remote_envs:
env = RemoteVectorEnv(
make_env,
num_envs,
multiagent=True,
remote_env_batch_wait_ms=remote_env_batch_wait_ms)
# Sub-environments are not ray.remote actors.
else:
env = _MultiAgentEnvToBaseEnv(
make_env=make_env, existing_envs=[env], num_envs=num_envs)
# ExternalEnv.
elif isinstance(env, ExternalEnv):
if num_envs != 1:
raise ValueError(
"External(MultiAgent)Env does not currently support "
"num_envs > 1. One way of solving this would be to "
"treat your Env as a MultiAgentEnv hosting only one "
"type of agent but with several copies.")
env = _ExternalEnvToBaseEnv(env)
# VectorEnv.
# Note that all BaseEnvs are also vectorized, but the user may want to
# define custom vectorization logic and thus implement a custom
# VectorEnv class.
elif isinstance(env, VectorEnv):
env = _VectorEnvToBaseEnv(env)
# Anything else: This usually implies that env is a gym.Env object.
else:
# Sub-environments are ray.remote actors:
if remote_envs:
# Determine, whether the already existing sub-env (could
# be a ray.actor) is multi-agent or not.
multiagent = ray.get(env._is_multi_agent.remote()) if \
hasattr(env, "_is_multi_agent") else False
env = RemoteVectorEnv(
make_env,
num_envs,
multiagent=multiagent,
remote_env_batch_wait_ms=remote_env_batch_wait_ms,
existing_envs=[env],
)
# Sub-environments are not ray.remote actors.
else:
env = VectorEnv.vectorize_gym_envs(
make_env=make_env,
existing_envs=[env],
num_envs=num_envs,
action_space=env.action_space,
observation_space=env.observation_space,
)
env = _VectorEnvToBaseEnv(env)
# Make sure conversion went well.
assert isinstance(env, BaseEnv), env
return env
try_render(self, env_id=None)
Tries to render the environment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
env_id |
Optional[int] |
The sub-environment's ID if applicable. If None, renders the entire Env (i.e. all sub-environments). |
None |
Source code in ray/rllib/env/base_env.py
try_reset(self, env_id=None)
Attempt to reset the sub-env with the given id or all sub-envs.
If the environment does not support synchronous reset, None can be returned here.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
env_id |
Union[int, str] |
The sub-environment's ID if applicable. If None, reset the entire Env (i.e. all sub-environments). |
None |
Returns:
Type | Description |
---|---|
Optional[Dict[Any, Any]] |
The reset (multi-agent) observation dict. None if reset is not supported. |
Source code in ray/rllib/env/base_env.py
@PublicAPI
def try_reset(self,
env_id: Optional[EnvID] = None) -> Optional[MultiAgentDict]:
"""Attempt to reset the sub-env with the given id or all sub-envs.
If the environment does not support synchronous reset, None can be
returned here.
Args:
env_id: The sub-environment's ID if applicable. If None, reset
the entire Env (i.e. all sub-environments).
Returns:
The reset (multi-agent) observation dict. None if reset is not
supported.
"""
return None
ray.rllib.env.env_context.EnvContext (dict)
Wraps env configurations to include extra rllib metadata.
These attributes can be used to parameterize environments per process.
For example, one might use worker_index
to control which data file an
environment reads in on initialization.
RLlib auto-sets these attributes when constructing registered envs.
__init__(self, env_config, worker_index, vector_index=0, remote=False, num_workers=None)
special
Initializes an EnvContext instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
env_config |
dict |
The env's configuration defined under the "env_config" key in the Trainer's config. |
required |
worker_index |
int |
When there are multiple workers created, this uniquely identifies the worker the env is created in. 0 for local worker, >0 for remote workers. |
required |
num_workers |
Optional[int] |
The total number of (remote) workers in the set. 0 if only a local worker exists. |
None |
vector_index |
int |
When there are multiple envs per worker, this uniquely identifies the env index within the worker. Starts from 0. |
0 |
remote |
bool |
Whether individual sub-environments (in a vectorized env) should be @ray.remote actors or not. |
False |
Source code in ray/rllib/env/env_context.py
def __init__(self,
env_config: EnvConfigDict,
worker_index: int,
vector_index: int = 0,
remote: bool = False,
num_workers: Optional[int] = None):
"""Initializes an EnvContext instance.
Args:
env_config: The env's configuration defined under the
"env_config" key in the Trainer's config.
worker_index: When there are multiple workers created, this
uniquely identifies the worker the env is created in.
0 for local worker, >0 for remote workers.
num_workers: The total number of (remote) workers in the set.
0 if only a local worker exists.
vector_index: When there are multiple envs per worker, this
uniquely identifies the env index within the worker.
Starts from 0.
remote: Whether individual sub-environments (in a vectorized
env) should be @ray.remote actors or not.
"""
# Store the env_config in the (super) dict.
dict.__init__(self, env_config)
# Set some metadata attributes.
self.worker_index = worker_index
self.vector_index = vector_index
self.remote = remote
self.num_workers = num_workers
copy_with_overrides(self, env_config=None, worker_index=None, vector_index=None, remote=None, num_workers=None)
Returns a copy of this EnvContext with some attributes overridden.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
env_config |
Optional[dict] |
Optional env config to use. None for not overriding the one from the source (self). |
None |
worker_index |
Optional[int] |
Optional worker index to use. None for not overriding the one from the source (self). |
None |
vector_index |
Optional[int] |
Optional vector index to use. None for not overriding the one from the source (self). |
None |
remote |
Optional[bool] |
Optional remote setting to use. None for not overriding the one from the source (self). |
None |
num_workers |
Optional[int] |
Optional num_workers to use. None for not overriding the one from the source (self). |
None |
Returns:
Type | Description |
---|---|
EnvContext |
A new EnvContext object as a copy of self plus the provided overrides. |
Source code in ray/rllib/env/env_context.py
def copy_with_overrides(self,
env_config: Optional[EnvConfigDict] = None,
worker_index: Optional[int] = None,
vector_index: Optional[int] = None,
remote: Optional[bool] = None,
num_workers: Optional[int] = None) -> "EnvContext":
"""Returns a copy of this EnvContext with some attributes overridden.
Args:
env_config: Optional env config to use. None for not overriding
the one from the source (self).
worker_index: Optional worker index to use. None for not
overriding the one from the source (self).
vector_index: Optional vector index to use. None for not
overriding the one from the source (self).
remote: Optional remote setting to use. None for not overriding
the one from the source (self).
num_workers: Optional num_workers to use. None for not overriding
the one from the source (self).
Returns:
A new EnvContext object as a copy of self plus the provided
overrides.
"""
return EnvContext(
copy.deepcopy(env_config) if env_config is not None else self,
worker_index if worker_index is not None else self.worker_index,
vector_index if vector_index is not None else self.vector_index,
remote if remote is not None else self.remote,
num_workers if num_workers is not None else self.num_workers,
)
ray.rllib.env.external_env.ExternalEnv (Thread)
An environment that interfaces with external agents.
Unlike simulator envs, control is inverted: The environment queries the policy to obtain actions and in return logs observations and rewards for training. This is in contrast to gym.Env, where the algorithm drives the simulation through env.step() calls.
You can use ExternalEnv as the backend for policy serving (by serving HTTP requests in the run loop), for ingesting offline logs data (by reading offline transitions in the run loop), or other custom use cases not easily expressed through gym.Env.
ExternalEnv supports both on-policy actions (through self.get_action()), and off-policy actions (through self.log_action()).
This env is thread-safe, but individual episodes must be executed serially.
Examples:
>>> register_env("my_env", lambda config: YourExternalEnv(config))
>>> trainer = DQNTrainer(env="my_env")
>>> while True:
>>> print(trainer.train())
__init__(self, action_space, observation_space, max_concurrent=100)
special
Initializes an ExternalEnv instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_space |
Space |
Action space of the env. |
required |
observation_space |
Space |
Observation space of the env. |
required |
max_concurrent |
int |
Max number of active episodes to allow at once. Exceeding this limit raises an error. |
100 |
Source code in ray/rllib/env/external_env.py
@PublicAPI
def __init__(self,
action_space: gym.Space,
observation_space: gym.Space,
max_concurrent: int = 100):
"""Initializes an ExternalEnv instance.
Args:
action_space: Action space of the env.
observation_space: Observation space of the env.
max_concurrent: Max number of active episodes to allow at
once. Exceeding this limit raises an error.
"""
threading.Thread.__init__(self)
self.daemon = True
self.action_space = action_space
self.observation_space = observation_space
self._episodes = {}
self._finished = set()
self._results_avail_condition = threading.Condition()
self._max_concurrent_episodes = max_concurrent
end_episode(self, episode_id, observation)
Records the end of an episode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
Any |
Current environment observation. |
required |
Source code in ray/rllib/env/external_env.py
@PublicAPI
def end_episode(self, episode_id: str, observation: EnvObsType) -> None:
"""Records the end of an episode.
Args:
episode_id: Episode id returned from start_episode().
observation: Current environment observation.
"""
episode = self._get(episode_id)
self._finished.add(episode.episode_id)
episode.done(observation)
get_action(self, episode_id, observation)
Record an observation and get the on-policy action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
Any |
Current environment observation. |
required |
Returns:
Type | Description |
---|---|
Any |
Action from the env action space. |
Source code in ray/rllib/env/external_env.py
@PublicAPI
def get_action(self, episode_id: str,
observation: EnvObsType) -> EnvActionType:
"""Record an observation and get the on-policy action.
Args:
episode_id: Episode id returned from start_episode().
observation: Current environment observation.
Returns:
Action from the env action space.
"""
episode = self._get(episode_id)
return episode.wait_for_action(observation)
log_action(self, episode_id, observation, action)
Record an observation and (off-policy) action taken.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
Any |
Current environment observation. |
required |
action |
Any |
Action for the observation. |
required |
Source code in ray/rllib/env/external_env.py
@PublicAPI
def log_action(self, episode_id: str, observation: EnvObsType,
action: EnvActionType) -> None:
"""Record an observation and (off-policy) action taken.
Args:
episode_id: Episode id returned from start_episode().
observation: Current environment observation.
action: Action for the observation.
"""
episode = self._get(episode_id)
episode.log_action(observation, action)
log_returns(self, episode_id, reward, info=None)
Records returns (rewards and infos) from the environment.
The reward will be attributed to the previous action taken by the episode. Rewards accumulate until the next action. If no reward is logged before the next action, a reward of 0.0 is assumed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
reward |
float |
Reward from the environment. |
required |
info |
Optional[dict] |
Optional info dict. |
None |
Source code in ray/rllib/env/external_env.py
@PublicAPI
def log_returns(self,
episode_id: str,
reward: float,
info: Optional[EnvInfoDict] = None) -> None:
"""Records returns (rewards and infos) from the environment.
The reward will be attributed to the previous action taken by the
episode. Rewards accumulate until the next action. If no reward is
logged before the next action, a reward of 0.0 is assumed.
Args:
episode_id: Episode id returned from start_episode().
reward: Reward from the environment.
info: Optional info dict.
"""
episode = self._get(episode_id)
episode.cur_reward += reward
if info:
episode.cur_info = info or {}
run(self)
Override this to implement the run loop.
Your loop should continuously: 1. Call self.start_episode(episode_id) 2. Call self.get_action(episode_id, obs) -or- self.log_action(episode_id, obs, action) 3. Call self.log_returns(episode_id, reward) 4. Call self.end_episode(episode_id, obs) 5. Wait if nothing to do.
Multiple episodes may be started at the same time.
Source code in ray/rllib/env/external_env.py
@PublicAPI
def run(self):
"""Override this to implement the run loop.
Your loop should continuously:
1. Call self.start_episode(episode_id)
2. Call self.get_action(episode_id, obs)
-or-
self.log_action(episode_id, obs, action)
3. Call self.log_returns(episode_id, reward)
4. Call self.end_episode(episode_id, obs)
5. Wait if nothing to do.
Multiple episodes may be started at the same time.
"""
raise NotImplementedError
start_episode(self, episode_id=None, training_enabled=True)
Record the start of an episode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
Optional[str] |
Unique string id for the episode or None for it to be auto-assigned and returned. |
None |
training_enabled |
bool |
Whether to use experiences for this episode to improve the policy. |
True |
Returns:
Type | Description |
---|---|
str |
Unique string id for the episode. |
Source code in ray/rllib/env/external_env.py
@PublicAPI
def start_episode(self,
episode_id: Optional[str] = None,
training_enabled: bool = True) -> str:
"""Record the start of an episode.
Args:
episode_id: Unique string id for the episode or
None for it to be auto-assigned and returned.
training_enabled: Whether to use experiences for this
episode to improve the policy.
Returns:
Unique string id for the episode.
"""
if episode_id is None:
episode_id = uuid.uuid4().hex
if episode_id in self._finished:
raise ValueError(
"Episode {} has already completed.".format(episode_id))
if episode_id in self._episodes:
raise ValueError(
"Episode {} is already started".format(episode_id))
self._episodes[episode_id] = _ExternalEnvEpisode(
episode_id, self._results_avail_condition, training_enabled)
return episode_id
ray.rllib.env.policy_client.PolicyClient
REST client to interact with a RLlib policy server.
__init__(self, address, inference_mode='local', update_interval=10.0)
special
Create a PolicyClient instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
str |
Server to connect to (e.g., "localhost:9090"). |
required |
inference_mode |
str |
Whether to use 'local' or 'remote' policy inference for computing actions. |
'local' |
update_interval |
float or None |
If using 'local' inference mode, the policy is refreshed after this many seconds have passed, or None for manual control via client. |
10.0 |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def __init__(self,
address: str,
inference_mode: str = "local",
update_interval: float = 10.0):
"""Create a PolicyClient instance.
Args:
address (str): Server to connect to (e.g., "localhost:9090").
inference_mode (str): Whether to use 'local' or 'remote' policy
inference for computing actions.
update_interval (float or None): If using 'local' inference mode,
the policy is refreshed after this many seconds have passed,
or None for manual control via client.
"""
self.address = address
self.env: ExternalEnv = None
if inference_mode == "local":
self.local = True
self._setup_local_rollout_worker(update_interval)
elif inference_mode == "remote":
self.local = False
else:
raise ValueError(
"inference_mode must be either 'local' or 'remote'")
end_episode(self, episode_id, observation)
Record the end of an episode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
obj |
Current environment observation. |
required |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def end_episode(self, episode_id: str,
observation: Union[EnvObsType, MultiAgentDict]) -> None:
"""Record the end of an episode.
Args:
episode_id (str): Episode id returned from start_episode().
observation (obj): Current environment observation.
"""
if self.local:
self._update_local_policy()
return self.env.end_episode(episode_id, observation)
self._send({
"command": PolicyClient.END_EPISODE,
"observation": observation,
"episode_id": episode_id,
})
get_action(self, episode_id, observation)
Record an observation and get the on-policy action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
obj |
Current environment observation. |
required |
Returns:
Type | Description |
---|---|
action (obj) |
Action from the env action space. |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def get_action(self, episode_id: str,
observation: Union[EnvObsType, MultiAgentDict]
) -> Union[EnvActionType, MultiAgentDict]:
"""Record an observation and get the on-policy action.
Args:
episode_id (str): Episode id returned from start_episode().
observation (obj): Current environment observation.
Returns:
action (obj): Action from the env action space.
"""
if self.local:
self._update_local_policy()
if isinstance(episode_id, (list, tuple)):
actions = {
eid: self.env.get_action(eid, observation[eid])
for eid in episode_id
}
return actions
else:
return self.env.get_action(episode_id, observation)
else:
return self._send({
"command": PolicyClient.GET_ACTION,
"observation": observation,
"episode_id": episode_id,
})["action"]
log_action(self, episode_id, observation, action)
Record an observation and (off-policy) action taken.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
obj |
Current environment observation. |
required |
action |
obj |
Action for the observation. |
required |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_action(self, episode_id: str,
observation: Union[EnvObsType, MultiAgentDict],
action: Union[EnvActionType, MultiAgentDict]) -> None:
"""Record an observation and (off-policy) action taken.
Args:
episode_id (str): Episode id returned from start_episode().
observation (obj): Current environment observation.
action (obj): Action for the observation.
"""
if self.local:
self._update_local_policy()
return self.env.log_action(episode_id, observation, action)
self._send({
"command": PolicyClient.LOG_ACTION,
"observation": observation,
"action": action,
"episode_id": episode_id,
})
log_returns(self, episode_id, reward, info=None, multiagent_done_dict=None)
Record returns from the environment.
The reward will be attributed to the previous action taken by the episode. Rewards accumulate until the next action. If no reward is logged before the next action, a reward of 0.0 is assumed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
reward |
float |
Reward from the environment. |
required |
info |
dict |
Extra info dict. |
None |
multiagent_done_dict |
dict |
Multi-agent done information. |
None |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_returns(
self,
episode_id: str,
reward: int,
info: Union[EnvInfoDict, MultiAgentDict] = None,
multiagent_done_dict: Optional[MultiAgentDict] = None) -> None:
"""Record returns from the environment.
The reward will be attributed to the previous action taken by the
episode. Rewards accumulate until the next action. If no reward is
logged before the next action, a reward of 0.0 is assumed.
Args:
episode_id (str): Episode id returned from start_episode().
reward (float): Reward from the environment.
info (dict): Extra info dict.
multiagent_done_dict (dict): Multi-agent done information.
"""
if self.local:
self._update_local_policy()
if multiagent_done_dict is not None:
assert isinstance(reward, dict)
return self.env.log_returns(episode_id, reward, info,
multiagent_done_dict)
return self.env.log_returns(episode_id, reward, info)
self._send({
"command": PolicyClient.LOG_RETURNS,
"reward": reward,
"info": info,
"episode_id": episode_id,
"done": multiagent_done_dict,
})
start_episode(self, episode_id=None, training_enabled=True)
Record the start of one or more episode(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
Optional[str] |
Unique string id for the episode or None for it to be auto-assigned. |
None |
training_enabled |
bool |
Whether to use experiences for this episode to improve the policy. |
True |
Returns:
Type | Description |
---|---|
episode_id (str) |
Unique string id for the episode. |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def start_episode(self,
episode_id: Optional[str] = None,
training_enabled: bool = True) -> str:
"""Record the start of one or more episode(s).
Args:
episode_id (Optional[str]): Unique string id for the episode or
None for it to be auto-assigned.
training_enabled (bool): Whether to use experiences for this
episode to improve the policy.
Returns:
episode_id (str): Unique string id for the episode.
"""
if self.local:
self._update_local_policy()
return self.env.start_episode(episode_id, training_enabled)
return self._send({
"episode_id": episode_id,
"command": PolicyClient.START_EPISODE,
"training_enabled": training_enabled,
})["episode_id"]
update_policy_weights(self)
ray.rllib.env.policy_server_input.PolicyServerInput (ThreadingMixIn, HTTPServer, InputReader)
REST policy server that acts as an offline data source.
This launches a multi-threaded server that listens on the specified host and port to serve policy requests and forward experiences to RLlib. For high performance experience collection, it implements InputReader.
For an example, run examples/serving/cartpole_server.py
along
with examples/serving/cartpole_client.py --inference-mode=local|remote
.
Examples:
>>> pg = PGTrainer(
... env="CartPole-v0", config={
... "input": lambda ioctx:
... PolicyServerInput(ioctx, addr, port),
... "num_workers": 0, # Run just 1 server, in the trainer.
... }
>>> while True:
>>> pg.train()
>>> client = PolicyClient("localhost:9900", inference_mode="local")
>>> eps_id = client.start_episode()
>>> action = client.get_action(eps_id, obs)
>>> ...
>>> client.log_returns(eps_id, reward)
>>> ...
>>> client.log_returns(eps_id, reward)
__init__(self, ioctx, address, port, idle_timeout=3.0)
special
Create a PolicyServerInput.
This class implements rllib.offline.InputReader, and can be used with any Trainer by configuring
{"num_workers": 0,
"input": lambda ioctx: PolicyServerInput(ioctx, addr, port)}
Note that by setting num_workers: 0, the trainer will only create one rollout worker / PolicyServerInput. Clients can connect to the launched server using rllib.env.PolicyClient.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ioctx |
IOContext |
IOContext provided by RLlib. |
required |
address |
str |
Server addr (e.g., "localhost"). |
required |
port |
int |
Server port (e.g., 9900). |
required |
Source code in ray/rllib/env/policy_server_input.py
@PublicAPI
def __init__(self, ioctx, address, port, idle_timeout=3.0):
"""Create a PolicyServerInput.
This class implements rllib.offline.InputReader, and can be used with
any Trainer by configuring
{"num_workers": 0,
"input": lambda ioctx: PolicyServerInput(ioctx, addr, port)}
Note that by setting num_workers: 0, the trainer will only create one
rollout worker / PolicyServerInput. Clients can connect to the launched
server using rllib.env.PolicyClient.
Args:
ioctx (IOContext): IOContext provided by RLlib.
address (str): Server addr (e.g., "localhost").
port (int): Server port (e.g., 9900).
"""
self.rollout_worker = ioctx.worker
self.samples_queue = queue.Queue()
self.metrics_queue = queue.Queue()
self.idle_timeout = idle_timeout
def get_metrics():
completed = []
while True:
try:
completed.append(self.metrics_queue.get_nowait())
except queue.Empty:
break
return completed
# Forwards client-reported rewards directly into the local rollout
# worker. This is a bit of a hack since it is patching the get_metrics
# function of the sampler.
if self.rollout_worker.sampler is not None:
self.rollout_worker.sampler.get_metrics = get_metrics
# Create a request handler that receives commands from the clients
# and sends data and metrics into the queues.
handler = _make_handler(self.rollout_worker, self.samples_queue,
self.metrics_queue)
try:
import time
time.sleep(1)
HTTPServer.__init__(self, (address, port), handler)
except OSError:
print(f"Creating a PolicyServer on {address}:{port} failed!")
import time
time.sleep(1)
raise
logger.info("Starting connector server at "
f"{self.server_name}:{self.server_port}")
# Start the serving thread, listening on socket and handling commands.
serving_thread = threading.Thread(
name="server", target=self.serve_forever)
serving_thread.daemon = True
serving_thread.start()
# Start a dummy thread that puts empty SampleBatches on the queue, just
# in case we don't receive anything from clients (or there aren't
# any). The latter would block sample collection entirely otherwise,
# even if other workers' PolicyServerInput receive incoming data from
# actual clients.
heart_beat_thread = threading.Thread(
name="heart-beat", target=self._put_empty_sample_batch_every_n_sec)
heart_beat_thread.daemon = True
heart_beat_thread.start()
ray.rllib.env.policy_client.PolicyClient
REST client to interact with a RLlib policy server.
__init__(self, address, inference_mode='local', update_interval=10.0)
special
Create a PolicyClient instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
str |
Server to connect to (e.g., "localhost:9090"). |
required |
inference_mode |
str |
Whether to use 'local' or 'remote' policy inference for computing actions. |
'local' |
update_interval |
float or None |
If using 'local' inference mode, the policy is refreshed after this many seconds have passed, or None for manual control via client. |
10.0 |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def __init__(self,
address: str,
inference_mode: str = "local",
update_interval: float = 10.0):
"""Create a PolicyClient instance.
Args:
address (str): Server to connect to (e.g., "localhost:9090").
inference_mode (str): Whether to use 'local' or 'remote' policy
inference for computing actions.
update_interval (float or None): If using 'local' inference mode,
the policy is refreshed after this many seconds have passed,
or None for manual control via client.
"""
self.address = address
self.env: ExternalEnv = None
if inference_mode == "local":
self.local = True
self._setup_local_rollout_worker(update_interval)
elif inference_mode == "remote":
self.local = False
else:
raise ValueError(
"inference_mode must be either 'local' or 'remote'")
end_episode(self, episode_id, observation)
Record the end of an episode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
obj |
Current environment observation. |
required |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def end_episode(self, episode_id: str,
observation: Union[EnvObsType, MultiAgentDict]) -> None:
"""Record the end of an episode.
Args:
episode_id (str): Episode id returned from start_episode().
observation (obj): Current environment observation.
"""
if self.local:
self._update_local_policy()
return self.env.end_episode(episode_id, observation)
self._send({
"command": PolicyClient.END_EPISODE,
"observation": observation,
"episode_id": episode_id,
})
get_action(self, episode_id, observation)
Record an observation and get the on-policy action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
obj |
Current environment observation. |
required |
Returns:
Type | Description |
---|---|
action (obj) |
Action from the env action space. |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def get_action(self, episode_id: str,
observation: Union[EnvObsType, MultiAgentDict]
) -> Union[EnvActionType, MultiAgentDict]:
"""Record an observation and get the on-policy action.
Args:
episode_id (str): Episode id returned from start_episode().
observation (obj): Current environment observation.
Returns:
action (obj): Action from the env action space.
"""
if self.local:
self._update_local_policy()
if isinstance(episode_id, (list, tuple)):
actions = {
eid: self.env.get_action(eid, observation[eid])
for eid in episode_id
}
return actions
else:
return self.env.get_action(episode_id, observation)
else:
return self._send({
"command": PolicyClient.GET_ACTION,
"observation": observation,
"episode_id": episode_id,
})["action"]
log_action(self, episode_id, observation, action)
Record an observation and (off-policy) action taken.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
observation |
obj |
Current environment observation. |
required |
action |
obj |
Action for the observation. |
required |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_action(self, episode_id: str,
observation: Union[EnvObsType, MultiAgentDict],
action: Union[EnvActionType, MultiAgentDict]) -> None:
"""Record an observation and (off-policy) action taken.
Args:
episode_id (str): Episode id returned from start_episode().
observation (obj): Current environment observation.
action (obj): Action for the observation.
"""
if self.local:
self._update_local_policy()
return self.env.log_action(episode_id, observation, action)
self._send({
"command": PolicyClient.LOG_ACTION,
"observation": observation,
"action": action,
"episode_id": episode_id,
})
log_returns(self, episode_id, reward, info=None, multiagent_done_dict=None)
Record returns from the environment.
The reward will be attributed to the previous action taken by the episode. Rewards accumulate until the next action. If no reward is logged before the next action, a reward of 0.0 is assumed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
str |
Episode id returned from start_episode(). |
required |
reward |
float |
Reward from the environment. |
required |
info |
dict |
Extra info dict. |
None |
multiagent_done_dict |
dict |
Multi-agent done information. |
None |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def log_returns(
self,
episode_id: str,
reward: int,
info: Union[EnvInfoDict, MultiAgentDict] = None,
multiagent_done_dict: Optional[MultiAgentDict] = None) -> None:
"""Record returns from the environment.
The reward will be attributed to the previous action taken by the
episode. Rewards accumulate until the next action. If no reward is
logged before the next action, a reward of 0.0 is assumed.
Args:
episode_id (str): Episode id returned from start_episode().
reward (float): Reward from the environment.
info (dict): Extra info dict.
multiagent_done_dict (dict): Multi-agent done information.
"""
if self.local:
self._update_local_policy()
if multiagent_done_dict is not None:
assert isinstance(reward, dict)
return self.env.log_returns(episode_id, reward, info,
multiagent_done_dict)
return self.env.log_returns(episode_id, reward, info)
self._send({
"command": PolicyClient.LOG_RETURNS,
"reward": reward,
"info": info,
"episode_id": episode_id,
"done": multiagent_done_dict,
})
start_episode(self, episode_id=None, training_enabled=True)
Record the start of one or more episode(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
episode_id |
Optional[str] |
Unique string id for the episode or None for it to be auto-assigned. |
None |
training_enabled |
bool |
Whether to use experiences for this episode to improve the policy. |
True |
Returns:
Type | Description |
---|---|
episode_id (str) |
Unique string id for the episode. |
Source code in ray/rllib/env/policy_client.py
@PublicAPI
def start_episode(self,
episode_id: Optional[str] = None,
training_enabled: bool = True) -> str:
"""Record the start of one or more episode(s).
Args:
episode_id (Optional[str]): Unique string id for the episode or
None for it to be auto-assigned.
training_enabled (bool): Whether to use experiences for this
episode to improve the policy.
Returns:
episode_id (str): Unique string id for the episode.
"""
if self.local:
self._update_local_policy()
return self.env.start_episode(episode_id, training_enabled)
return self._send({
"episode_id": episode_id,
"command": PolicyClient.START_EPISODE,
"training_enabled": training_enabled,
})["episode_id"]
update_policy_weights(self)
ray.rllib.env.remote_vector_env.RemoteVectorEnv (BaseEnv)
Vector env that executes envs in remote workers.
This provides dynamic batching of inference as observations are returned from the remote simulator actors. Both single and multi-agent child envs are supported, and envs can be stepped synchronously or async.
You shouldn't need to instantiate this class directly. It's automatically
inserted when you use the remote_worker_envs=True
option in your
Trainer's config.
__init__(self, make_env, num_envs, multiagent, remote_env_batch_wait_ms, existing_envs=None)
special
Initializes a RemoteVectorEnv instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
make_env |
Callable[[int], Any] |
Callable that produces a single (non-vectorized) env, given the vector env index as only arg. |
required |
num_envs |
int |
The number of sub-envs to create for the vectorization. |
required |
multiagent |
bool |
Whether this is a multiagent env or not. |
required |
remote_env_batch_wait_ms |
int |
Time to wait for (ray.remote) sub-environments to have new observations available when polled. When none of the sub-envs is ready, simply repeat the ray.wait call until at least one sub-env is ready. |
required |
existing_envs |
Optional[List[ray.actor.ActorHandle]] |
Optional list of already created sub-envs.
These will be used as-is and only as many new sub-envs as
necessary ( |
None |
Source code in ray/rllib/env/remote_vector_env.py
def __init__(self,
make_env: Callable[[int], EnvType],
num_envs: int,
multiagent: bool,
remote_env_batch_wait_ms: int,
existing_envs: Optional[List[ray.actor.ActorHandle]] = None):
"""Initializes a RemoteVectorEnv instance.
Args:
make_env: Callable that produces a single (non-vectorized) env,
given the vector env index as only arg.
num_envs: The number of sub-envs to create for the vectorization.
multiagent: Whether this is a multiagent env or not.
remote_env_batch_wait_ms: Time to wait for (ray.remote)
sub-environments to have new observations available when
polled. When none of the sub-envs is ready, simply repeat the
ray.wait call until at least one sub-env is ready.
existing_envs: Optional list of already created sub-envs.
These will be used as-is and only as many new sub-envs as
necessary (`num_envs - len(existing_envs)`) will be created.
"""
# Could be creating local or remote envs.
self.make_env = make_env
# Whether the given `make_env` callable already returns ray.remote
# objects or not.
self.make_env_creates_actors = False
# Already existing env objects (generated by the RolloutWorker).
self.existing_envs = existing_envs or []
self.num_envs = num_envs
self.multiagent = multiagent
self.poll_timeout = remote_env_batch_wait_ms / 1000
self.actors = None # lazy init
self.pending = None # lazy init
get_sub_environments(self)
poll(self)
Returns observations from ready agents.
All returns are two-level dicts mapping from env_id to dicts of agent_ids to values. The number of agents and envs can vary over time.
Returns:
Type | Description |
---|---|
Tuple[Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]], Dict[Union[int, str], Dict[Any, Any]]] |
Tuple consisting of 1) New observations for each ready agent. 2) Reward values for each ready agent. If the episode is just started, the value will be None. 3) Done values for each ready agent. The special key "all" is used to indicate env termination. 4) Info values for each ready agent. 5) Agents may take off-policy actions. When that happens, there will be an entry in this dict that contains the taken action. There is no need to send_actions() for agents that have already chosen off-policy actions. |
Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,
MultiEnvDict, MultiEnvDict]:
if self.actors is None:
# `self.make_env` already produces Actors: Use it directly.
if len(self.existing_envs) > 0 and isinstance(
self.existing_envs[0], ray.actor.ActorHandle):
self.make_env_creates_actors = True
self.actors = []
while len(self.actors) < self.num_envs:
self.actors.append(self.make_env(len(self.actors)))
# `self.make_env` produces gym.Envs (or children thereof, such
# as MultiAgentEnv): Need to auto-wrap it here. The problem with
# this is that custom methods wil get lost. If you would like to
# keep your custom methods in your envs, you should provide the
# env class directly in your config (w/o tune.register_env()),
# such that your class will directly be made a @ray.remote
# (w/o the wrapping via `_Remote[Multi|Single]AgentEnv`).
else:
def make_remote_env(i):
logger.info("Launching env {} in remote actor".format(i))
if self.multiagent:
return _RemoteMultiAgentEnv.remote(self.make_env, i)
else:
return _RemoteSingleAgentEnv.remote(self.make_env, i)
self.actors = [
make_remote_env(i) for i in range(self.num_envs)
]
if self.pending is None:
self.pending = {a.reset.remote(): a for a in self.actors}
# each keyed by env_id in [0, num_remote_envs)
obs, rewards, dones, infos = {}, {}, {}, {}
ready = []
# Wait for at least 1 env to be ready here
while not ready:
ready, _ = ray.wait(
list(self.pending),
num_returns=len(self.pending),
timeout=self.poll_timeout)
# Get and return observations for each of the ready envs
env_ids = set()
for obj_ref in ready:
actor = self.pending.pop(obj_ref)
env_id = self.actors.index(actor)
env_ids.add(env_id)
ret = ray.get(obj_ref)
# Our sub-envs are simple Actor-turned gym.Envs or MultiAgentEnvs.
if self.make_env_creates_actors:
rew, done, info = None, None, None
if self.multiagent:
if isinstance(ret, tuple) and len(ret) == 4:
ob, rew, done, info = ret
else:
ob = ret
else:
if isinstance(ret, tuple) and len(ret) == 4:
ob = {_DUMMY_AGENT_ID: ret[0]}
rew = {_DUMMY_AGENT_ID: ret[1]}
done = {_DUMMY_AGENT_ID: ret[2], "__all__": ret[2]}
info = {_DUMMY_AGENT_ID: ret[3]}
else:
ob = {_DUMMY_AGENT_ID: ret}
if rew is None:
rew = {agent_id: 0 for agent_id in ob.keys()}
done = {"__all__": False}
info = {agent_id: {} for agent_id in ob.keys()}
# Our sub-envs are auto-wrapped and already behave like multi-agent
# envs.
else:
ob, rew, done, info = ret
obs[env_id] = ob
rewards[env_id] = rew
dones[env_id] = done
infos[env_id] = info
logger.debug("Got obs batch for actors {}".format(env_ids))
return obs, rewards, dones, infos, {}
send_actions(self, action_dict)
Called to send actions back to running agents in this env.
Actions should be sent for each ready agent that returned observations in the previous poll() call.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_dict |
Dict[Union[int, str], Dict[Any, Any]] |
Actions values keyed by env_id and agent_id. |
required |
Source code in ray/rllib/env/remote_vector_env.py
@override(BaseEnv)
@PublicAPI
def send_actions(self, action_dict: MultiEnvDict) -> None:
for env_id, actions in action_dict.items():
actor = self.actors[env_id]
# `actor` is a simple single-agent (remote) env, e.g. a gym.Env
# that was made a @ray.remote.
if not self.multiagent and self.make_env_creates_actors:
obj_ref = actor.step.remote(actions[_DUMMY_AGENT_ID])
# `actor` is already a _RemoteSingleAgentEnv or
# _RemoteMultiAgentEnv wrapper
# (handles the multi-agent action_dict automatically).
else:
obj_ref = actor.step.remote(actions)
self.pending[obj_ref] = actor
stop(self)
try_reset(self, env_id=None)
Attempt to reset the sub-env with the given id or all sub-envs.
If the environment does not support synchronous reset, None can be returned here.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
env_id |
Union[int, str] |
The sub-environment's ID if applicable. If None, reset the entire Env (i.e. all sub-environments). |
None |
Returns:
Type | Description |
---|---|
Optional[Dict[Any, Any]] |
The reset (multi-agent) observation dict. None if reset is not supported. |
ray.rllib.env.vector_env.VectorEnv
An environment that supports batch evaluation using clones of sub-envs.
__init__(self, observation_space, action_space, num_envs)
special
Initializes a VectorEnv instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
observation_space |
Space |
The observation Space of a single sub-env. |
required |
action_space |
Space |
The action Space of a single sub-env. |
required |
num_envs |
int |
The number of clones to make of the given sub-env. |
required |
Source code in ray/rllib/env/vector_env.py
def __init__(self, observation_space: gym.Space, action_space: gym.Space,
num_envs: int):
"""Initializes a VectorEnv instance.
Args:
observation_space: The observation Space of a single
sub-env.
action_space: The action Space of a single sub-env.
num_envs: The number of clones to make of the given sub-env.
"""
self.observation_space = observation_space
self.action_space = action_space
self.num_envs = num_envs
get_sub_environments(self)
Returns the underlying sub environments.
Returns:
Type | Description |
---|---|
List[Any] |
List of all underlying sub environments. |
reset_at(self, index=None)
Resets a single environment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
Optional[int] |
An optional sub-env index to reset. |
None |
Returns:
Type | Description |
---|---|
Any |
Observations from the reset sub environment. |
try_render_at(self, index=None)
Renders a single environment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
Optional[int] |
An optional sub-env index to render. |
None |
Returns:
Type | Description |
---|---|
Optional[numpy.ndarray] |
Either a numpy RGB image (shape=(w x h x 3) dtype=uint8) or None in case rendering is handled directly by this method. |
Source code in ray/rllib/env/vector_env.py
vector_reset(self)
Resets all sub-environments.
Returns:
Type | Description |
---|---|
List[Any] |
List of observations from each environment. |
vector_step(self, actions)
Performs a vectorized step on all sub environments using actions
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions |
List[Any] |
List of actions (one for each sub-env). |
required |
Returns:
Type | Description |
---|---|
Tuple[List[Any], List[float], List[bool], List[dict]] |
A tuple consisting of 1) New observations for each sub-env. 2) Reward values for each sub-env. 3) Done values for each sub-env. 4) Info values for each sub-env. |
Source code in ray/rllib/env/vector_env.py
@PublicAPI
def vector_step(
self, actions: List[EnvActionType]
) -> Tuple[List[EnvObsType], List[float], List[bool], List[EnvInfoDict]]:
"""Performs a vectorized step on all sub environments using `actions`.
Args:
actions: List of actions (one for each sub-env).
Returns:
A tuple consisting of
1) New observations for each sub-env.
2) Reward values for each sub-env.
3) Done values for each sub-env.
4) Info values for each sub-env.
"""
raise NotImplementedError
vectorize_gym_envs(make_env=None, existing_envs=None, num_envs=1, action_space=None, observation_space=None, env_config=None, policy_config=None)
staticmethod
Translates any given gym.Env(s) into a VectorizedEnv object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
make_env |
Optional[Callable[[int], Any]] |
Factory that produces a new gym.Env taking the sub-env's
vector index as only arg. Must be defined if the
number of |
None |
existing_envs |
Optional[List[gym.core.Env]] |
Optional list of already instantiated sub environments. |
None |
num_envs |
int |
Total number of sub environments in this VectorEnv. |
1 |
action_space |
Optional[gym.spaces.space.Space] |
The action space. If None, use existing_envs[0]'s action space. |
None |
observation_space |
Optional[gym.spaces.space.Space] |
The observation space. If None, use existing_envs[0]'s action space. |
None |
Returns:
Type | Description |
---|---|
_VectorizedGymEnv |
The resulting _VectorizedGymEnv object (subclass of VectorEnv). |
Source code in ray/rllib/env/vector_env.py
@staticmethod
def vectorize_gym_envs(
make_env: Optional[Callable[[int], EnvType]] = None,
existing_envs: Optional[List[gym.Env]] = None,
num_envs: int = 1,
action_space: Optional[gym.Space] = None,
observation_space: Optional[gym.Space] = None,
# Deprecated. These seem to have never been used.
env_config=None,
policy_config=None) -> "_VectorizedGymEnv":
"""Translates any given gym.Env(s) into a VectorizedEnv object.
Args:
make_env: Factory that produces a new gym.Env taking the sub-env's
vector index as only arg. Must be defined if the
number of `existing_envs` is less than `num_envs`.
existing_envs: Optional list of already instantiated sub
environments.
num_envs: Total number of sub environments in this VectorEnv.
action_space: The action space. If None, use existing_envs[0]'s
action space.
observation_space: The observation space. If None, use
existing_envs[0]'s action space.
Returns:
The resulting _VectorizedGymEnv object (subclass of VectorEnv).
"""
return _VectorizedGymEnv(
make_env=make_env,
existing_envs=existing_envs or [],
num_envs=num_envs,
observation_space=observation_space,
action_space=action_space,
)