| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- from collections import deque
- from typing import Optional, Union
- import gymnasium as gym
- import numpy as np
- from gymnasium import spaces
- from ray.rllib.utils.annotations import PublicAPI
- from ray.rllib.utils.images import resize, rgb2gray
- @PublicAPI
- def is_atari(env: Union[gym.Env, str]) -> bool:
- """Returns, whether a given env object or env descriptor (str) is an Atari env.
- Args:
- env: The gym.Env object or a string descriptor of the env (for example,
- "ale_py:ALE/Pong-v5").
- Returns:
- Whether `env` is an Atari environment.
- """
- # If a gym.Env, check proper spaces as well as occurrence of the "Atari<ALE" string
- # in the class name.
- if not isinstance(env, str):
- if (
- hasattr(env.observation_space, "shape")
- and env.observation_space.shape is not None
- and len(env.observation_space.shape) <= 2
- ):
- return False
- return "AtariEnv<ALE" in str(env)
- # If string, check for "ale_py:ALE/" prefix.
- else:
- return env.startswith("ALE/") or env.startswith("ale_py:")
- @PublicAPI
- def get_wrapper_by_cls(env, cls):
- """Returns the gym env wrapper of the given class, or None."""
- currentenv = env
- while True:
- if isinstance(currentenv, cls):
- return currentenv
- elif isinstance(currentenv, gym.Wrapper):
- currentenv = currentenv.env
- else:
- return None
- @PublicAPI
- class ClipRewardEnv(gym.RewardWrapper):
- def __init__(self, env):
- gym.RewardWrapper.__init__(self, env)
- def reward(self, reward):
- """Bin reward to {+1, 0, -1} by its sign."""
- return np.sign(reward)
- @PublicAPI
- class EpisodicLifeEnv(gym.Wrapper):
- def __init__(self, env):
- """Make end-of-life == end-of-episode, but only reset on true game over.
- Done by DeepMind for the DQN and co. since it helps value estimation.
- """
- gym.Wrapper.__init__(self, env)
- self.lives = 0
- self.was_real_terminated = True
- def step(self, action):
- obs, reward, terminated, truncated, info = self.env.step(action)
- self.was_real_terminated = terminated
- # check current lives, make loss of life terminal,
- # then update lives to handle bonus lives
- lives = self.env.unwrapped.ale.lives()
- if lives < self.lives and lives > 0:
- # for Qbert sometimes we stay in lives == 0 condtion for a few fr
- # so its important to keep lives > 0, so that we only reset once
- # the environment advertises `terminated`.
- terminated = True
- self.lives = lives
- return obs, reward, terminated, truncated, info
- def reset(self, **kwargs):
- """Reset only when lives are exhausted.
- This way all states are still reachable even though lives are episodic,
- and the learner need not know about any of this behind-the-scenes.
- """
- if self.was_real_terminated:
- obs, info = self.env.reset(**kwargs)
- else:
- # no-op step to advance from terminal/lost life state
- obs, _, _, _, info = self.env.step(0)
- self.lives = self.env.unwrapped.ale.lives()
- return obs, info
- @PublicAPI
- class FireResetEnv(gym.Wrapper):
- def __init__(self, env):
- """Take action on reset.
- For environments that are fixed until firing."""
- gym.Wrapper.__init__(self, env)
- assert env.unwrapped.get_action_meanings()[1] == "FIRE"
- assert len(env.unwrapped.get_action_meanings()) >= 3
- def reset(self, **kwargs):
- self.env.reset(**kwargs)
- obs, _, terminated, truncated, _ = self.env.step(1)
- if terminated or truncated:
- self.env.reset(**kwargs)
- obs, _, terminated, truncated, info = self.env.step(2)
- if terminated or truncated:
- self.env.reset(**kwargs)
- return obs, info
- def step(self, ac):
- return self.env.step(ac)
- @PublicAPI
- class FrameStack(gym.Wrapper):
- def __init__(self, env, k):
- """Stack k last frames."""
- gym.Wrapper.__init__(self, env)
- self.k = k
- self.frames = deque([], maxlen=k)
- shp = env.observation_space.shape
- self.observation_space = spaces.Box(
- low=np.repeat(env.observation_space.low, repeats=k, axis=-1),
- high=np.repeat(env.observation_space.high, repeats=k, axis=-1),
- shape=(shp[0], shp[1], shp[2] * k),
- dtype=env.observation_space.dtype,
- )
- def reset(self, *, seed=None, options=None):
- ob, infos = self.env.reset(seed=seed, options=options)
- for _ in range(self.k):
- self.frames.append(ob)
- return self._get_ob(), infos
- def step(self, action):
- ob, reward, terminated, truncated, info = self.env.step(action)
- self.frames.append(ob)
- return self._get_ob(), reward, terminated, truncated, info
- def _get_ob(self):
- assert len(self.frames) == self.k
- return np.concatenate(self.frames, axis=2)
- @PublicAPI
- class FrameStackTrajectoryView(gym.ObservationWrapper):
- def __init__(self, env):
- """No stacking. Trajectory View API takes care of this."""
- gym.Wrapper.__init__(self, env)
- shp = env.observation_space.shape
- assert shp[2] == 1
- self.observation_space = spaces.Box(
- low=0, high=255, shape=(shp[0], shp[1]), dtype=env.observation_space.dtype
- )
- def observation(self, observation):
- return np.squeeze(observation, axis=-1)
- @PublicAPI
- class MaxAndSkipEnv(gym.Wrapper):
- def __init__(self, env, skip=4):
- """Return only every `skip`-th frame"""
- gym.Wrapper.__init__(self, env)
- # most recent raw observations (for max pooling across time steps)
- self._obs_buffer = np.zeros(
- (2,) + env.observation_space.shape, dtype=env.observation_space.dtype
- )
- self._skip = skip
- def step(self, action):
- """Repeat action, sum reward, and max over last observations."""
- total_reward = 0.0
- terminated = truncated = info = None
- for i in range(self._skip):
- obs, reward, terminated, truncated, info = self.env.step(action)
- if i == self._skip - 2:
- self._obs_buffer[0] = obs
- if i == self._skip - 1:
- self._obs_buffer[1] = obs
- total_reward += reward
- if terminated or truncated:
- break
- # Note that the observation on the terminated|truncated=True frame
- # doesn't matter
- max_frame = self._obs_buffer.max(axis=0)
- return max_frame, total_reward, terminated, truncated, info
- def reset(self, **kwargs):
- return self.env.reset(**kwargs)
- @PublicAPI
- class MonitorEnv(gym.Wrapper):
- def __init__(self, env=None):
- """Record episodes stats prior to EpisodicLifeEnv, etc."""
- gym.Wrapper.__init__(self, env)
- self._current_reward = None
- self._num_steps = None
- self._total_steps = None
- self._episode_rewards = []
- self._episode_lengths = []
- self._num_episodes = 0
- self._num_returned = 0
- def reset(self, **kwargs):
- obs, info = self.env.reset(**kwargs)
- if self._total_steps is None:
- self._total_steps = sum(self._episode_lengths)
- if self._current_reward is not None:
- self._episode_rewards.append(self._current_reward)
- self._episode_lengths.append(self._num_steps)
- self._num_episodes += 1
- self._current_reward = 0
- self._num_steps = 0
- return obs, info
- def step(self, action):
- obs, rew, terminated, truncated, info = self.env.step(action)
- self._current_reward += rew
- self._num_steps += 1
- self._total_steps += 1
- return obs, rew, terminated, truncated, info
- def get_episode_rewards(self):
- return self._episode_rewards
- def get_episode_lengths(self):
- return self._episode_lengths
- def get_total_steps(self):
- return self._total_steps
- def next_episode_results(self):
- for i in range(self._num_returned, len(self._episode_rewards)):
- yield (self._episode_rewards[i], self._episode_lengths[i])
- self._num_returned = len(self._episode_rewards)
- @PublicAPI
- class NoopResetEnv(gym.Wrapper):
- def __init__(self, env, noop_max=30):
- """Sample initial states by taking random number of no-ops on reset.
- No-op is assumed to be action 0.
- """
- gym.Wrapper.__init__(self, env)
- self.noop_max = noop_max
- self.override_num_noops = None
- self.noop_action = 0
- assert env.unwrapped.get_action_meanings()[0] == "NOOP"
- def reset(self, **kwargs):
- """Do no-op action for a number of steps in [1, noop_max]."""
- self.env.reset(**kwargs)
- if self.override_num_noops is not None:
- noops = self.override_num_noops
- else:
- # This environment now uses the pcg64 random number generator which
- # does not have `randint` as an attribute only has `integers`.
- try:
- noops = self.unwrapped.np_random.integers(1, self.noop_max + 1)
- # Also still support older versions.
- except AttributeError:
- noops = self.unwrapped.np_random.randint(1, self.noop_max + 1)
- assert noops > 0
- obs = None
- for _ in range(noops):
- obs, _, terminated, truncated, info = self.env.step(self.noop_action)
- if terminated or truncated:
- obs, info = self.env.reset(**kwargs)
- return obs, info
- def step(self, ac):
- return self.env.step(ac)
- @PublicAPI
- class NormalizedImageEnv(gym.ObservationWrapper):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.observation_space = gym.spaces.Box(
- -1.0,
- 1.0,
- shape=self.observation_space.shape,
- dtype=np.float32,
- )
- # Divide by scale and center around 0.0, such that observations are in the range
- # of -1.0 and 1.0.
- def observation(self, observation):
- return (observation.astype(np.float32) / 128.0) - 1.0
- @PublicAPI
- class GrayScaleAndResize(gym.ObservationWrapper):
- def __init__(self, env, dim, grayscale: bool = True):
- """Warp frames to the specified size (dim x dim)."""
- gym.ObservationWrapper.__init__(self, env)
- self.width = dim
- self.height = dim
- self.grayscale = grayscale
- self.observation_space = spaces.Box(
- low=0,
- high=255,
- shape=(self.height, self.width, 1 if grayscale else 3),
- dtype=np.uint8,
- )
- def observation(self, frame):
- if self.grayscale:
- frame = rgb2gray(frame)
- frame = resize(frame, height=self.height, width=self.width)
- return frame[:, :, None]
- else:
- return resize(frame, height=self.height, width=self.width)
- WarpFrame = GrayScaleAndResize
- @PublicAPI
- def wrap_atari_for_new_api_stack(
- env: gym.Env,
- dim: int = 64,
- frameskip: int = 4,
- framestack: Optional[int] = None,
- grayscale: bool = True,
- # TODO (sven): Add option to NOT grayscale, in which case framestack must be None
- # (b/c we are using the 3 color channels already as stacking frames).
- ) -> gym.Env:
- """Wraps `env` for new-API-stack-friendly RLlib Atari experiments.
- Note that we assume reward clipping is done outside the wrapper.
- Args:
- env: The env object to wrap.
- dim: Dimension to resize observations to (dim x dim).
- frameskip: Whether to skip n frames and max over them (keep brightest pixels).
- framestack: Whether to stack the last n (grayscaled) frames. Note that this
- step happens after(!) a possible frameskip step, meaning that if
- frameskip=4 and framestack=2, we would perform the following over this
- trajectory:
- actual env timesteps: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 -> ...
- frameskip: ( max ) ( max ) ( max ) ( max )
- framestack: ( stack ) (stack )
- Returns:
- The wrapped gym.Env.
- """
- # Time limit.
- env = gym.wrappers.TimeLimit(env, max_episode_steps=108000)
- # Grayscale + resize.
- env = WarpFrame(env, dim=dim, grayscale=grayscale)
- # Normalize the image.
- env = NormalizedImageEnv(env)
- # Frameskip: Take max over these n frames.
- if frameskip > 1:
- assert env.spec is not None
- env = MaxAndSkipEnv(env, skip=frameskip)
- # Send n noop actions into env after reset to increase variance in the
- # "start states" of the trajectories. These dummy steps are NOT included in the
- # sampled data used for learning.
- env = NoopResetEnv(env, noop_max=30)
- # Each life is one episode.
- env = EpisodicLifeEnv(env)
- # Some envs only start playing after pressing fire. Unblock those.
- if "FIRE" in env.unwrapped.get_action_meanings():
- env = FireResetEnv(env)
- # Framestack.
- if framestack:
- env = FrameStack(env, k=framestack)
- return env
- @PublicAPI
- def wrap_deepmind(env, dim=84, framestack=True, noframeskip=False):
- """Configure environment for DeepMind-style Atari.
- Note that we assume reward clipping is done outside the wrapper.
- Args:
- env: The env object to wrap.
- dim: Dimension to resize observations to (dim x dim).
- framestack: Whether to framestack observations.
- """
- env = MonitorEnv(env)
- env = NoopResetEnv(env, noop_max=30)
- if env.spec is not None and noframeskip is True:
- env = MaxAndSkipEnv(env, skip=4)
- env = EpisodicLifeEnv(env)
- if "FIRE" in env.unwrapped.get_action_meanings():
- env = FireResetEnv(env)
- env = WarpFrame(env, dim)
- # env = ClipRewardEnv(env) # reward clipping is handled by policy eval
- # 4x image framestacking.
- if framestack is True:
- env = FrameStack(env, 4)
- return env
|