episode_v2.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. import random
  2. from collections import defaultdict
  3. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
  4. import numpy as np
  5. from ray.rllib.env.base_env import _DUMMY_AGENT_ID
  6. from ray.rllib.evaluation.collectors.agent_collector import AgentCollector
  7. from ray.rllib.evaluation.collectors.simple_list_collector import (
  8. _PolicyCollector,
  9. _PolicyCollectorGroup,
  10. )
  11. from ray.rllib.policy.policy_map import PolicyMap
  12. from ray.rllib.policy.sample_batch import SampleBatch
  13. from ray.rllib.utils.annotations import OldAPIStack
  14. from ray.rllib.utils.typing import AgentID, EnvID, EnvInfoDict, PolicyID, TensorType
  15. if TYPE_CHECKING:
  16. from ray.rllib.callbacks.callbacks import RLlibCallback
  17. from ray.rllib.evaluation.rollout_worker import RolloutWorker
  18. @OldAPIStack
  19. class EpisodeV2:
  20. """Tracks the current state of a (possibly multi-agent) episode."""
  21. def __init__(
  22. self,
  23. env_id: EnvID,
  24. policies: PolicyMap,
  25. policy_mapping_fn: Callable[[AgentID, "EpisodeV2", "RolloutWorker"], PolicyID],
  26. *,
  27. worker: Optional["RolloutWorker"] = None,
  28. callbacks: Optional["RLlibCallback"] = None,
  29. ):
  30. """Initializes an Episode instance.
  31. Args:
  32. env_id: The environment's ID in which this episode runs.
  33. policies: The PolicyMap object (mapping PolicyIDs to Policy
  34. objects) to use for determining, which policy is used for
  35. which agent.
  36. policy_mapping_fn: The mapping function mapping AgentIDs to
  37. PolicyIDs.
  38. worker: The RolloutWorker instance, in which this episode runs.
  39. """
  40. # Unique id identifying this trajectory.
  41. self.episode_id: int = random.randrange(int(1e18))
  42. # ID of the environment this episode is tracking.
  43. self.env_id = env_id
  44. # Summed reward across all agents in this episode.
  45. self.total_reward: float = 0.0
  46. # Active (uncollected) # of env steps taken by this episode.
  47. # Start from -1. After add_init_obs(), we will be at 0 step.
  48. self.active_env_steps: int = -1
  49. # Total # of env steps taken by this episode.
  50. # Start from -1, After add_init_obs(), we will be at 0 step.
  51. self.total_env_steps: int = -1
  52. # Active (uncollected) agent steps.
  53. self.active_agent_steps: int = 0
  54. # Total # of steps take by all agents in this env.
  55. self.total_agent_steps: int = 0
  56. # Dict for user to add custom metrics.
  57. # TODO (sven): We should probably unify custom_metrics, user_data,
  58. # and hist_data into a single data container for user to track per-step.
  59. # metrics and states.
  60. self.custom_metrics: Dict[str, float] = {}
  61. # Temporary storage. E.g. storing data in between two custom
  62. # callbacks referring to the same episode.
  63. self.user_data: Dict[str, Any] = {}
  64. # Dict mapping str keys to List[float] for storage of
  65. # per-timestep float data throughout the episode.
  66. self.hist_data: Dict[str, List[float]] = {}
  67. self.media: Dict[str, Any] = {}
  68. self.worker = worker
  69. self.callbacks = callbacks
  70. self.policy_map: PolicyMap = policies
  71. self.policy_mapping_fn: Callable[
  72. [AgentID, "EpisodeV2", "RolloutWorker"], PolicyID
  73. ] = policy_mapping_fn
  74. # Per-agent data collectors.
  75. self._agent_to_policy: Dict[AgentID, PolicyID] = {}
  76. self._agent_collectors: Dict[AgentID, AgentCollector] = {}
  77. self._next_agent_index: int = 0
  78. self._agent_to_index: Dict[AgentID, int] = {}
  79. # Summed rewards broken down by agent.
  80. self.agent_rewards: Dict[Tuple[AgentID, PolicyID], float] = defaultdict(float)
  81. self._agent_reward_history: Dict[AgentID, List[int]] = defaultdict(list)
  82. self._has_init_obs: Dict[AgentID, bool] = {}
  83. self._last_terminateds: Dict[AgentID, bool] = {}
  84. self._last_truncateds: Dict[AgentID, bool] = {}
  85. # Keep last info dict around, in case an environment tries to signal
  86. # us something.
  87. self._last_infos: Dict[AgentID, Dict] = {}
  88. def policy_for(
  89. self, agent_id: AgentID = _DUMMY_AGENT_ID, refresh: bool = False
  90. ) -> PolicyID:
  91. """Returns and stores the policy ID for the specified agent.
  92. If the agent is new, the policy mapping fn will be called to bind the
  93. agent to a policy for the duration of the entire episode (even if the
  94. policy_mapping_fn is changed in the meantime!).
  95. Args:
  96. agent_id: The agent ID to lookup the policy ID for.
  97. Returns:
  98. The policy ID for the specified agent.
  99. """
  100. # Perform a new policy_mapping_fn lookup and bind AgentID for the
  101. # duration of this episode to the returned PolicyID.
  102. if agent_id not in self._agent_to_policy or refresh:
  103. policy_id = self._agent_to_policy[agent_id] = self.policy_mapping_fn(
  104. agent_id, # agent_id
  105. self, # episode
  106. worker=self.worker,
  107. )
  108. # Use already determined PolicyID.
  109. else:
  110. policy_id = self._agent_to_policy[agent_id]
  111. # PolicyID not found in policy map -> Error.
  112. if policy_id not in self.policy_map:
  113. raise KeyError(
  114. "policy_mapping_fn returned invalid policy id " f"'{policy_id}'!"
  115. )
  116. return policy_id
  117. def get_agents(self) -> List[AgentID]:
  118. """Returns list of agent IDs that have appeared in this episode.
  119. Returns:
  120. The list of all agent IDs that have appeared so far in this
  121. episode.
  122. """
  123. return list(self._agent_to_index.keys())
  124. def agent_index(self, agent_id: AgentID) -> int:
  125. """Get the index of an agent among its environment.
  126. A new index will be created if an agent is seen for the first time.
  127. Args:
  128. agent_id: ID of an agent.
  129. Returns:
  130. The index of this agent.
  131. """
  132. if agent_id not in self._agent_to_index:
  133. self._agent_to_index[agent_id] = self._next_agent_index
  134. self._next_agent_index += 1
  135. return self._agent_to_index[agent_id]
  136. def step(self) -> None:
  137. """Advance the episode forward by one step."""
  138. self.active_env_steps += 1
  139. self.total_env_steps += 1
  140. def add_init_obs(
  141. self,
  142. *,
  143. agent_id: AgentID,
  144. init_obs: TensorType,
  145. init_infos: Dict[str, TensorType],
  146. t: int = -1,
  147. ) -> None:
  148. """Add initial env obs at the start of a new episode
  149. Args:
  150. agent_id: Agent ID.
  151. init_obs: Initial observations.
  152. init_infos: Initial infos dicts.
  153. t: timestamp.
  154. """
  155. policy = self.policy_map[self.policy_for(agent_id)]
  156. # Add initial obs to Trajectory.
  157. assert agent_id not in self._agent_collectors
  158. self._agent_collectors[agent_id] = AgentCollector(
  159. policy.view_requirements,
  160. max_seq_len=policy.config["model"]["max_seq_len"],
  161. disable_action_flattening=policy.config.get(
  162. "_disable_action_flattening", False
  163. ),
  164. is_policy_recurrent=policy.is_recurrent(),
  165. intial_states=policy.get_initial_state(),
  166. _enable_new_api_stack=False,
  167. )
  168. self._agent_collectors[agent_id].add_init_obs(
  169. episode_id=self.episode_id,
  170. agent_index=self.agent_index(agent_id),
  171. env_id=self.env_id,
  172. init_obs=init_obs,
  173. init_infos=init_infos,
  174. t=t,
  175. )
  176. self._has_init_obs[agent_id] = True
  177. def add_action_reward_done_next_obs(
  178. self,
  179. agent_id: AgentID,
  180. values: Dict[str, TensorType],
  181. ) -> None:
  182. """Add action, reward, info, and next_obs as a new step.
  183. Args:
  184. agent_id: Agent ID.
  185. values: Dict of action, reward, info, and next_obs.
  186. """
  187. # Make sure, agent already has some (at least init) data.
  188. assert agent_id in self._agent_collectors
  189. self.active_agent_steps += 1
  190. self.total_agent_steps += 1
  191. # Include the current agent id for multi-agent algorithms.
  192. if agent_id != _DUMMY_AGENT_ID:
  193. values["agent_id"] = agent_id
  194. # Add action/reward/next-obs (and other data) to Trajectory.
  195. self._agent_collectors[agent_id].add_action_reward_next_obs(values)
  196. # Keep track of agent reward history.
  197. reward = values[SampleBatch.REWARDS]
  198. self.total_reward += reward
  199. self.agent_rewards[(agent_id, self.policy_for(agent_id))] += reward
  200. self._agent_reward_history[agent_id].append(reward)
  201. # Keep track of last terminated info for agent.
  202. if SampleBatch.TERMINATEDS in values:
  203. self._last_terminateds[agent_id] = values[SampleBatch.TERMINATEDS]
  204. # Keep track of last truncated info for agent.
  205. if SampleBatch.TRUNCATEDS in values:
  206. self._last_truncateds[agent_id] = values[SampleBatch.TRUNCATEDS]
  207. # Keep track of last info dict if available.
  208. if SampleBatch.INFOS in values:
  209. self.set_last_info(agent_id, values[SampleBatch.INFOS])
  210. def postprocess_episode(
  211. self,
  212. batch_builder: _PolicyCollectorGroup,
  213. is_done: bool = False,
  214. check_dones: bool = False,
  215. ) -> None:
  216. """Build and return currently collected training samples by policies.
  217. Clear agent collector states if this episode is done.
  218. Args:
  219. batch_builder: _PolicyCollectorGroup for saving the collected per-agent
  220. sample batches.
  221. is_done: If this episode is done (terminated or truncated).
  222. check_dones: Whether to make sure per-agent trajectories are actually done.
  223. """
  224. # TODO: (sven) Once we implement multi-agent communication channels,
  225. # we have to resolve the restriction of only sending other agent
  226. # batches from the same policy to the postprocess methods.
  227. # Build SampleBatches for the given episode.
  228. pre_batches = {}
  229. for agent_id, collector in self._agent_collectors.items():
  230. # Build only if there is data and agent is part of given episode.
  231. if collector.agent_steps == 0:
  232. continue
  233. pid = self.policy_for(agent_id)
  234. policy = self.policy_map[pid]
  235. pre_batch = collector.build_for_training(policy.view_requirements)
  236. pre_batches[agent_id] = (pid, policy, pre_batch)
  237. for agent_id, (pid, policy, pre_batch) in pre_batches.items():
  238. # Entire episode is said to be done.
  239. # Error if no DONE at end of this agent's trajectory.
  240. if is_done and check_dones and not pre_batch.is_terminated_or_truncated():
  241. raise ValueError(
  242. "Episode {} terminated for all agents, but we still "
  243. "don't have a last observation for agent {} (policy "
  244. "{}). ".format(self.episode_id, agent_id, self.policy_for(agent_id))
  245. + "Please ensure that you include the last observations "
  246. "of all live agents when setting done[__all__] to "
  247. "True."
  248. )
  249. # Skip a trajectory's postprocessing (and thus using it for training),
  250. # if its agent's info exists and contains the training_enabled=False
  251. # setting (used by our PolicyClients).
  252. if not self._last_infos.get(agent_id, {}).get("training_enabled", True):
  253. continue
  254. if (
  255. not pre_batch.is_single_trajectory()
  256. or len(np.unique(pre_batch[SampleBatch.EPS_ID])) > 1
  257. ):
  258. raise ValueError(
  259. "Batches sent to postprocessing must only contain steps "
  260. "from a single trajectory.",
  261. pre_batch,
  262. )
  263. if len(pre_batches) > 1:
  264. other_batches = pre_batches.copy()
  265. del other_batches[agent_id]
  266. else:
  267. other_batches = {}
  268. # Call the Policy's Exploration's postprocess method.
  269. post_batch = pre_batch
  270. if getattr(policy, "exploration", None) is not None:
  271. policy.exploration.postprocess_trajectory(
  272. policy, post_batch, policy.get_session()
  273. )
  274. post_batch.set_get_interceptor(None)
  275. post_batch = policy.postprocess_trajectory(post_batch, other_batches, self)
  276. from ray.rllib.evaluation.rollout_worker import get_global_worker
  277. self.callbacks.on_postprocess_trajectory(
  278. worker=get_global_worker(),
  279. episode=self,
  280. agent_id=agent_id,
  281. policy_id=pid,
  282. policies=self.policy_map,
  283. postprocessed_batch=post_batch,
  284. original_batches=pre_batches,
  285. )
  286. # Append post_batch for return.
  287. if pid not in batch_builder.policy_collectors:
  288. batch_builder.policy_collectors[pid] = _PolicyCollector(policy)
  289. batch_builder.policy_collectors[pid].add_postprocessed_batch_for_training(
  290. post_batch, policy.view_requirements
  291. )
  292. batch_builder.agent_steps += self.active_agent_steps
  293. batch_builder.env_steps += self.active_env_steps
  294. # AgentCollector cleared.
  295. self.active_agent_steps = 0
  296. self.active_env_steps = 0
  297. def has_init_obs(self, agent_id: AgentID = None) -> bool:
  298. """Returns whether this episode has initial obs for an agent.
  299. If agent_id is None, return whether we have received any initial obs,
  300. in other words, whether this episode is completely fresh.
  301. """
  302. if agent_id is not None:
  303. return agent_id in self._has_init_obs and self._has_init_obs[agent_id]
  304. else:
  305. return any(list(self._has_init_obs.values()))
  306. def is_done(self, agent_id: AgentID) -> bool:
  307. return self.is_terminated(agent_id) or self.is_truncated(agent_id)
  308. def is_terminated(self, agent_id: AgentID) -> bool:
  309. return self._last_terminateds.get(agent_id, False)
  310. def is_truncated(self, agent_id: AgentID) -> bool:
  311. return self._last_truncateds.get(agent_id, False)
  312. def set_last_info(self, agent_id: AgentID, info: Dict):
  313. self._last_infos[agent_id] = info
  314. def last_info_for(
  315. self, agent_id: AgentID = _DUMMY_AGENT_ID
  316. ) -> Optional[EnvInfoDict]:
  317. return self._last_infos.get(agent_id)
  318. @property
  319. def length(self):
  320. return self.total_env_steps