stats.py 74 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871
  1. import collections
  2. import copy
  3. import logging
  4. import time
  5. from collections import defaultdict
  6. from contextlib import contextmanager
  7. from dataclasses import dataclass, fields
  8. from typing import (
  9. Any,
  10. Dict,
  11. List,
  12. Mapping,
  13. Optional,
  14. Set,
  15. Tuple,
  16. Union,
  17. )
  18. from uuid import uuid4
  19. import numpy as np
  20. import ray
  21. from ray.actor import ActorHandle
  22. from ray.data._internal.block_list import BlockList
  23. from ray.data._internal.execution.dataset_state import DatasetState
  24. from ray.data._internal.execution.interfaces.common import RuntimeMetricsHistogram
  25. from ray.data._internal.execution.interfaces.op_runtime_metrics import (
  26. NODE_UNKNOWN,
  27. MetricsGroup,
  28. MetricsType,
  29. NodeMetrics,
  30. OpRuntimeMetrics,
  31. )
  32. from ray.data._internal.metadata_exporter import (
  33. DataContextMetadata,
  34. DatasetMetadata,
  35. Topology,
  36. get_dataset_metadata_exporter,
  37. )
  38. from ray.data._internal.util import capfirst
  39. from ray.data.block import BlockStats
  40. from ray.data.context import DataContext
  41. from ray.util.annotations import DeveloperAPI
  42. from ray.util.metrics import Counter, Gauge, Histogram, Metric
  43. from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
  44. logger = logging.getLogger(__name__)
  45. STATS_ACTOR_NAME = "datasets_stats_actor"
  46. STATS_ACTOR_NAMESPACE = "_dataset_stats_actor"
  47. UNKNOWN = "unknown"
  48. StatsDict = Dict[str, List[BlockStats]]
  49. def fmt(seconds: float) -> str:
  50. if seconds > 1:
  51. return str(round(seconds, 2)) + "s"
  52. elif seconds > 0.001:
  53. return str(round(seconds * 1000, 2)) + "ms"
  54. else:
  55. return str(round(seconds * 1000 * 1000, 2)) + "us"
  56. def leveled_indent(lvl: int = 0, spaces_per_indent: int = 3) -> str:
  57. """Returns a string of spaces which contains `level` indents,
  58. each indent containing `spaces_per_indent` spaces. For example:
  59. >>> leveled_indent(2, 3)
  60. ' '
  61. """
  62. return (" " * spaces_per_indent) * lvl
  63. class Timer:
  64. """Helper class for tracking accumulated time (in seconds)."""
  65. def __init__(self):
  66. self._total: float = 0
  67. self._min: float = float("inf")
  68. self._max: float = 0
  69. self._total_count: float = 0
  70. @contextmanager
  71. def timer(self) -> None:
  72. time_start = time.perf_counter()
  73. try:
  74. yield
  75. finally:
  76. self.add(time.perf_counter() - time_start)
  77. def add(self, value: float) -> None:
  78. self._total += value
  79. if value < self._min:
  80. self._min = value
  81. if value > self._max:
  82. self._max = value
  83. self._total_count += 1
  84. def get(self) -> float:
  85. return self._total
  86. def min(self) -> float:
  87. return self._min
  88. def max(self) -> float:
  89. return self._max
  90. def avg(self) -> float:
  91. return self._total / self._total_count if self._total_count else float("inf")
  92. class _DatasetStatsBuilder:
  93. """Helper class for building dataset stats.
  94. When this class is created, we record the start time. When build() is
  95. called with the final blocks of the new dataset, the time delta is
  96. saved as part of the stats."""
  97. def __init__(
  98. self,
  99. operator_name: str,
  100. parent: "DatasetStats",
  101. override_start_time: Optional[float],
  102. ):
  103. self.operator_name = operator_name
  104. self.parent = parent
  105. self.start_time = override_start_time or time.perf_counter()
  106. def build_multioperator(self, metadata: StatsDict) -> "DatasetStats":
  107. op_metadata = {}
  108. for i, (k, v) in enumerate(metadata.items()):
  109. capped_k = capfirst(k)
  110. if len(metadata) > 1:
  111. if i == 0:
  112. op_metadata[self.operator_name + capped_k] = v
  113. else:
  114. op_metadata[self.operator_name.split("->")[-1] + capped_k] = v
  115. else:
  116. op_metadata[self.operator_name] = v
  117. stats = DatasetStats(
  118. metadata=op_metadata,
  119. parent=self.parent,
  120. base_name=self.operator_name,
  121. )
  122. stats.time_total_s = time.perf_counter() - self.start_time
  123. return stats
  124. def build(self, final_blocks: BlockList) -> "DatasetStats":
  125. stats = DatasetStats(
  126. metadata={self.operator_name: final_blocks.get_metadata()},
  127. parent=self.parent,
  128. )
  129. stats.time_total_s = time.perf_counter() - self.start_time
  130. return stats
  131. @ray.remote(num_cpus=0)
  132. class _StatsActor:
  133. """Actor holding stats for blocks created by LazyBlockList.
  134. This actor is shared across all datasets created in the same cluster.
  135. In order to cap memory usage, we set a max number of stats to keep
  136. in the actor. When this limit is exceeded, the stats will be garbage
  137. collected in FIFO order.
  138. TODO(ekl) we should consider refactoring LazyBlockList so stats can be
  139. extracted without using an out-of-band actor."""
  140. def __init__(self, max_stats=1000):
  141. # Mapping from uuid -> (task_id -> list of blocks statistics).
  142. self.metadata = collections.defaultdict(dict)
  143. self.last_time = {}
  144. self.start_time = {}
  145. self.max_stats = max_stats
  146. # Assign dataset uuids with a global counter.
  147. self.next_dataset_id = 0
  148. # Dataset metadata to be queried directly by DashboardHead api.
  149. self.datasets: Dict[str, Any] = {}
  150. # Cache of calls to ray.nodes() to prevent unnecessary network calls
  151. self._ray_nodes_cache: Dict[str, str] = {}
  152. # Initialize the metadata exporter
  153. self._metadata_exporter = get_dataset_metadata_exporter()
  154. self.dataset_metadatas: Dict[str, DatasetMetadata] = {}
  155. # A FIFO queue of dataset_tags for finished datasets. This is used to
  156. # efficiently evict the oldest finished datasets when max_stats is reached.
  157. self.finished_datasets_queue = collections.deque()
  158. # Ray Data dashboard metrics
  159. # Everything is a gauge because we need to reset all of
  160. # a dataset's metrics to 0 after each finishes execution.
  161. op_tags_keys = ("dataset", "operator")
  162. # TODO(scottjlee): move these overvie metrics as fields in a
  163. # separate dataclass, similar to OpRuntimeMetrics.
  164. self.spilled_bytes = Gauge(
  165. "data_spilled_bytes",
  166. description="""Bytes spilled by dataset operators.
  167. DataContext.enable_get_object_locations_for_metrics
  168. must be set to True to report this metric""",
  169. tag_keys=op_tags_keys,
  170. )
  171. self.freed_bytes = Gauge(
  172. "data_freed_bytes",
  173. description="Bytes freed by dataset operators",
  174. tag_keys=op_tags_keys,
  175. )
  176. self.current_bytes = Gauge(
  177. "data_current_bytes",
  178. description="Bytes currently in memory store used by dataset operators",
  179. tag_keys=op_tags_keys,
  180. )
  181. self.cpu_usage_cores = Gauge(
  182. "data_cpu_usage_cores",
  183. description="CPUs allocated to dataset operators",
  184. tag_keys=op_tags_keys,
  185. )
  186. self.gpu_usage_cores = Gauge(
  187. "data_gpu_usage_cores",
  188. description="GPUs allocated to dataset operators",
  189. tag_keys=op_tags_keys,
  190. )
  191. self.output_bytes = Gauge(
  192. "data_output_bytes",
  193. description="Bytes outputted by dataset operators",
  194. tag_keys=op_tags_keys,
  195. )
  196. self.output_rows = Gauge(
  197. "data_output_rows",
  198. description="Rows outputted by dataset operators",
  199. tag_keys=op_tags_keys,
  200. )
  201. # === Metrics from OpRuntimeMetrics ===
  202. # Inputs-related metrics
  203. self.execution_metrics_inputs = (
  204. self._create_prometheus_metrics_for_execution_metrics(
  205. metrics_group=MetricsGroup.INPUTS,
  206. tag_keys=op_tags_keys,
  207. )
  208. )
  209. # Outputs-related metrics
  210. self.execution_metrics_outputs = (
  211. self._create_prometheus_metrics_for_execution_metrics(
  212. metrics_group=MetricsGroup.OUTPUTS,
  213. tag_keys=op_tags_keys,
  214. )
  215. )
  216. # Task-related metrics
  217. self.execution_metrics_tasks = (
  218. self._create_prometheus_metrics_for_execution_metrics(
  219. metrics_group=MetricsGroup.TASKS,
  220. tag_keys=op_tags_keys,
  221. )
  222. )
  223. # Object store memory-related metrics
  224. self.execution_metrics_obj_store_memory = (
  225. self._create_prometheus_metrics_for_execution_metrics(
  226. metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
  227. tag_keys=op_tags_keys,
  228. )
  229. )
  230. # Actor related metrics
  231. self.execution_metrics_actors = (
  232. self._create_prometheus_metrics_for_execution_metrics(
  233. metrics_group=MetricsGroup.ACTORS,
  234. tag_keys=op_tags_keys,
  235. )
  236. )
  237. # Miscellaneous metrics
  238. self.execution_metrics_misc = (
  239. self._create_prometheus_metrics_for_execution_metrics(
  240. metrics_group=MetricsGroup.MISC,
  241. tag_keys=op_tags_keys,
  242. )
  243. )
  244. # Per Node metrics
  245. self.per_node_metrics = self._create_prometheus_metrics_for_per_node_metrics()
  246. iter_tag_keys = ("dataset",)
  247. self.time_to_first_batch_s = Gauge(
  248. "data_iter_time_to_first_batch_seconds",
  249. description="Total time spent waiting for the first batch after starting iteration. "
  250. "This includes the dataset pipeline warmup time. This metric is accumulated across different epochs.",
  251. tag_keys=iter_tag_keys,
  252. )
  253. self.iter_block_fetching_s = Gauge(
  254. "data_iter_block_fetching_seconds",
  255. description="Seconds taken to fetch (with ray.get) blocks by iter_batches()",
  256. tag_keys=iter_tag_keys,
  257. )
  258. self.iter_batch_shaping_s = Gauge(
  259. "data_iter_batch_shaping_seconds",
  260. description="Seconds taken to shape batch from incoming blocks by iter_batches()",
  261. tag_keys=iter_tag_keys,
  262. )
  263. self.iter_batch_formatting_s = Gauge(
  264. "data_iter_batch_formatting_seconds",
  265. description="Seconds taken to format batches by iter_batches()",
  266. tag_keys=iter_tag_keys,
  267. )
  268. self.iter_batch_collating_s = Gauge(
  269. "data_iter_batch_collating_seconds",
  270. description="Seconds taken to collate batches by iter_batches()",
  271. tag_keys=iter_tag_keys,
  272. )
  273. self.iter_batch_finalizing_s = Gauge(
  274. "data_iter_batch_finalizing_seconds",
  275. description="Seconds taken to collate batches by iter_batches()",
  276. tag_keys=iter_tag_keys,
  277. )
  278. self.iter_total_blocked_s = Gauge(
  279. "data_iter_total_blocked_seconds",
  280. description="Seconds user thread is blocked by iter_batches()",
  281. tag_keys=iter_tag_keys,
  282. )
  283. self.iter_user_s = Gauge(
  284. "data_iter_user_seconds",
  285. description="Seconds spent in user code",
  286. tag_keys=iter_tag_keys,
  287. )
  288. self.iter_initialize_s = Gauge(
  289. "data_iter_initialize_seconds",
  290. description="Seconds spent in iterator initialization code",
  291. tag_keys=iter_tag_keys,
  292. )
  293. self.iter_get_ref_bundles_s = Gauge(
  294. "data_iter_get_ref_bundles_seconds",
  295. description="Seconds spent getting RefBundles from the dataset iterator",
  296. tag_keys=iter_tag_keys,
  297. )
  298. self.iter_get_s = Gauge(
  299. "data_iter_get_seconds",
  300. description="Seconds spent in ray.get() while resolving block references",
  301. tag_keys=iter_tag_keys,
  302. )
  303. self.iter_next_batch_s = Gauge(
  304. "data_iter_next_batch_seconds",
  305. description="Seconds spent getting the next batch from the block buffer",
  306. tag_keys=iter_tag_keys,
  307. )
  308. self.iter_format_batch_s = Gauge(
  309. "data_iter_format_batch_seconds",
  310. description="Seconds spent formatting the batch",
  311. tag_keys=iter_tag_keys,
  312. )
  313. self.iter_collate_batch_s = Gauge(
  314. "data_iter_collate_batch_seconds",
  315. description="Seconds spent collating the batch",
  316. tag_keys=iter_tag_keys,
  317. )
  318. self.iter_finalize_batch_s = Gauge(
  319. "data_iter_finalize_batch_seconds",
  320. description="Seconds spent finalizing the batch",
  321. tag_keys=iter_tag_keys,
  322. )
  323. self.iter_blocks_local = Gauge(
  324. "data_iter_blocks_local",
  325. description="Number of blocks already on the local node",
  326. tag_keys=iter_tag_keys,
  327. )
  328. self.iter_blocks_remote = Gauge(
  329. "data_iter_blocks_remote",
  330. description="Number of blocks that require fetching from another node",
  331. tag_keys=iter_tag_keys,
  332. )
  333. self.iter_unknown_location = Gauge(
  334. "data_iter_unknown_location",
  335. description="Number of blocks that have unknown locations",
  336. tag_keys=iter_tag_keys,
  337. )
  338. self.iter_prefetched_bytes = Gauge(
  339. "data_iter_prefetched_bytes",
  340. description="Current bytes of prefetched blocks in the iterator",
  341. tag_keys=iter_tag_keys,
  342. )
  343. # === Dataset and Operator Metadata Metrics ===
  344. dataset_tags = ("dataset", "job_id", "start_time")
  345. self.data_dataset_estimated_total_blocks = Gauge(
  346. "data_dataset_estimated_total_blocks",
  347. description="Total work units in blocks for dataset",
  348. tag_keys=dataset_tags,
  349. )
  350. self.data_dataset_estimated_total_rows = Gauge(
  351. "data_dataset_estimated_total_rows",
  352. description="Total work units in rows for dataset",
  353. tag_keys=dataset_tags,
  354. )
  355. self.data_dataset_state = Gauge(
  356. "data_dataset_state",
  357. description=f"State of dataset ({', '.join([f'{s.value}={s.name}' for s in DatasetState])})",
  358. tag_keys=dataset_tags,
  359. )
  360. operator_tags = ("dataset", "operator")
  361. self.data_operator_estimated_total_blocks = Gauge(
  362. "data_operator_estimated_total_blocks",
  363. description="Total work units in blocks for operator",
  364. tag_keys=operator_tags,
  365. )
  366. self.data_operator_estimated_total_rows = Gauge(
  367. "data_operator_estimated_total_rows",
  368. description="Total work units in rows for operator",
  369. tag_keys=operator_tags,
  370. )
  371. self.data_operator_queued_blocks = Gauge(
  372. "data_operator_queued_blocks",
  373. description="Number of queued blocks for operator",
  374. tag_keys=operator_tags,
  375. )
  376. self.data_operator_state = Gauge(
  377. "data_operator_state",
  378. description=f"State of operator ({', '.join([f'{s.value}={s.name}' for s in DatasetState])})",
  379. tag_keys=operator_tags,
  380. )
  381. def _create_prometheus_metrics_for_execution_metrics(
  382. self, metrics_group: MetricsGroup, tag_keys: Tuple[str, ...]
  383. ) -> Dict[str, Metric]:
  384. metrics = {}
  385. for metric in OpRuntimeMetrics.get_metrics():
  386. if not metric.metrics_group == metrics_group:
  387. continue
  388. metric_name = f"data_{metric.name}"
  389. metric_description = metric.description
  390. if metric.metrics_type == MetricsType.Gauge:
  391. metrics[metric.name] = Gauge(
  392. metric_name,
  393. description=metric_description,
  394. tag_keys=tag_keys,
  395. )
  396. elif metric.metrics_type == MetricsType.Histogram:
  397. metrics[metric.name] = Histogram(
  398. metric_name,
  399. description=metric_description,
  400. tag_keys=tag_keys,
  401. **metric.metrics_args,
  402. )
  403. elif metric.metrics_type == MetricsType.Counter:
  404. metrics[metric.name] = Counter(
  405. metric_name,
  406. description=metric_description,
  407. tag_keys=tag_keys,
  408. )
  409. return metrics
  410. def _create_prometheus_metrics_for_per_node_metrics(self) -> Dict[str, Gauge]:
  411. metrics = {}
  412. for field in fields(NodeMetrics):
  413. metric_name = f"data_{field.name}_per_node"
  414. metrics[field.name] = Gauge(
  415. metric_name,
  416. description="",
  417. tag_keys=("dataset", "node_ip"),
  418. )
  419. return metrics
  420. def gen_dataset_id(self) -> str:
  421. """Generate a unique dataset_id for tracking datasets."""
  422. dataset_id = str(self.next_dataset_id)
  423. self.next_dataset_id += 1
  424. return dataset_id
  425. def update_execution_metrics(
  426. self,
  427. dataset_tag: str,
  428. op_metrics: List[Dict[str, Union[int, float]]],
  429. operator_tags: List[str],
  430. state: Dict[str, Any],
  431. per_node_metrics: Optional[Dict[str, Dict[str, Union[int, float]]]] = None,
  432. ):
  433. def _record(
  434. prom_metric: Metric,
  435. value: Union[int, float, List[int]],
  436. tags: Dict[str, str] = None,
  437. ):
  438. if isinstance(prom_metric, Gauge):
  439. prom_metric.set(value, tags)
  440. elif isinstance(prom_metric, Counter):
  441. prom_metric.inc(value, tags)
  442. elif isinstance(prom_metric, Histogram):
  443. if isinstance(value, RuntimeMetricsHistogram):
  444. value.export_to(prom_metric, tags)
  445. for stats, operator_tag in zip(op_metrics, operator_tags):
  446. tags = self._create_tags(dataset_tag, operator_tag)
  447. self.spilled_bytes.set(stats.get("obj_store_mem_spilled", 0), tags)
  448. self.freed_bytes.set(stats.get("obj_store_mem_freed", 0), tags)
  449. self.current_bytes.set(stats.get("obj_store_mem_used", 0), tags)
  450. self.output_bytes.set(stats.get("bytes_task_outputs_generated", 0), tags)
  451. self.output_rows.set(stats.get("row_outputs_taken", 0), tags)
  452. self.cpu_usage_cores.set(stats.get("cpu_usage", 0), tags)
  453. self.gpu_usage_cores.set(stats.get("gpu_usage", 0), tags)
  454. for field_name, prom_metric in self.execution_metrics_inputs.items():
  455. _record(prom_metric, stats.get(field_name, 0), tags)
  456. for field_name, prom_metric in self.execution_metrics_outputs.items():
  457. _record(prom_metric, stats.get(field_name, 0), tags)
  458. for field_name, prom_metric in self.execution_metrics_tasks.items():
  459. _record(prom_metric, stats.get(field_name, 0), tags)
  460. for (
  461. field_name,
  462. prom_metric,
  463. ) in self.execution_metrics_obj_store_memory.items():
  464. _record(prom_metric, stats.get(field_name, 0), tags)
  465. for field_name, prom_metric in self.execution_metrics_actors.items():
  466. _record(prom_metric, stats.get(field_name, 0), tags)
  467. for field_name, prom_metric in self.execution_metrics_misc.items():
  468. _record(prom_metric, stats.get(field_name, 0), tags)
  469. # Update per node metrics if they exist, the creation of these metrics is controlled
  470. # by the _data_context.enable_per_node_metrics flag in the streaming executor but
  471. # that is not exposed in the _StatsActor so here we simply check if the metrics exist
  472. # and if so, update them
  473. if per_node_metrics is not None:
  474. for node_id, node_metrics in per_node_metrics.items():
  475. # Translate node_id into node_name (the node ip), cache node info
  476. if node_id not in self._ray_nodes_cache:
  477. # Rebuilding this cache will fetch all nodes, this
  478. # only needs to be done up to once per loop
  479. self._rebuild_ray_nodes_cache()
  480. node_ip = self._ray_nodes_cache.get(node_id, NODE_UNKNOWN)
  481. tags = self._create_tags(dataset_tag=dataset_tag, node_ip_tag=node_ip)
  482. for metric_name, metric_value in node_metrics.items():
  483. prom_metric = self.per_node_metrics[metric_name]
  484. _record(prom_metric, metric_value, tags)
  485. # This update is called from a dataset's executor,
  486. # so all tags should contain the same dataset
  487. self.update_dataset(dataset_tag, state)
  488. def _rebuild_ray_nodes_cache(self) -> None:
  489. current_nodes = ray.nodes()
  490. for node in current_nodes:
  491. node_id = node.get("NodeID", None)
  492. node_name = node.get("NodeName", None)
  493. if node_id is not None and node_name is not None:
  494. self._ray_nodes_cache[node_id] = node_name
  495. def update_iteration_metrics(
  496. self,
  497. stats: "DatasetStats",
  498. dataset_tag,
  499. ):
  500. tags = self._create_tags(dataset_tag)
  501. self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags)
  502. self.iter_get_ref_bundles_s.set(stats.iter_get_ref_bundles_s.get(), tags)
  503. self.iter_get_s.set(stats.iter_get_s.get(), tags)
  504. self.iter_next_batch_s.set(stats.iter_next_batch_s.get(), tags)
  505. self.iter_format_batch_s.set(stats.iter_format_batch_s.get(), tags)
  506. self.iter_collate_batch_s.set(stats.iter_collate_batch_s.get(), tags)
  507. self.iter_finalize_batch_s.set(stats.iter_finalize_batch_s.get(), tags)
  508. self.iter_blocks_local.set(stats.iter_blocks_local, tags)
  509. self.iter_blocks_remote.set(stats.iter_blocks_remote, tags)
  510. self.iter_unknown_location.set(stats.iter_unknown_location, tags)
  511. self.iter_prefetched_bytes.set(stats.iter_prefetched_bytes, tags)
  512. self.iter_block_fetching_s.set(stats.iter_get_s.get(), tags)
  513. self.iter_batch_shaping_s.set(stats.iter_next_batch_s.get(), tags)
  514. self.iter_batch_formatting_s.set(stats.iter_format_batch_s.get(), tags)
  515. self.iter_batch_collating_s.set(stats.iter_collate_batch_s.get(), tags)
  516. self.iter_batch_finalizing_s.set(stats.iter_finalize_batch_s.get(), tags)
  517. self.time_to_first_batch_s.set(stats.iter_time_to_first_batch_s.get(), tags)
  518. self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags)
  519. self.iter_user_s.set(stats.iter_user_s.get(), tags)
  520. def register_dataset(
  521. self,
  522. job_id: str,
  523. dataset_tag: str,
  524. operator_tags: List[str],
  525. topology: Topology,
  526. data_context: DataContextMetadata,
  527. ):
  528. start_time = time.time()
  529. self.datasets[dataset_tag] = {
  530. "job_id": job_id,
  531. "state": DatasetState.PENDING.name,
  532. "progress": 0,
  533. "total": 0,
  534. "total_rows": 0,
  535. "start_time": start_time,
  536. "end_time": None,
  537. "operators": {
  538. operator: {
  539. "state": DatasetState.PENDING.name,
  540. "progress": 0,
  541. "total": 0,
  542. "queued_blocks": 0,
  543. }
  544. for operator in operator_tags
  545. },
  546. }
  547. if self._metadata_exporter is not None:
  548. self.dataset_metadatas[dataset_tag] = DatasetMetadata(
  549. job_id=job_id,
  550. topology=topology,
  551. dataset_id=dataset_tag,
  552. start_time=start_time,
  553. data_context=data_context,
  554. execution_start_time=None,
  555. execution_end_time=None,
  556. state=DatasetState.PENDING.name,
  557. )
  558. self._metadata_exporter.export_dataset_metadata(
  559. self.dataset_metadatas[dataset_tag]
  560. )
  561. def update_dataset(self, dataset_tag: str, state: Dict[str, Any]):
  562. self.datasets[dataset_tag].update(state)
  563. state = self.datasets[dataset_tag]
  564. job_id = self.datasets[dataset_tag].get("job_id", "None")
  565. start_time = str(int(self.datasets[dataset_tag].get("start_time", 0)))
  566. # Update dataset-level metrics
  567. dataset_tags = {
  568. "dataset": dataset_tag,
  569. "job_id": job_id,
  570. "start_time": start_time,
  571. }
  572. self.data_dataset_estimated_total_blocks.set(
  573. state.get("total", 0), dataset_tags
  574. )
  575. self.data_dataset_estimated_total_rows.set(
  576. state.get("total_rows", 0), dataset_tags
  577. )
  578. state_string = state.get("state", DatasetState.UNKNOWN.name)
  579. state_enum = DatasetState.from_string(state_string)
  580. self.data_dataset_state.set(state_enum.value, dataset_tags)
  581. self.update_dataset_metadata_state(dataset_tag, state_string)
  582. # Update operator-level metrics
  583. operator_states: Dict[str, str] = {}
  584. for operator, op_state in state.get("operators", {}).items():
  585. operator_tags = {
  586. "dataset": dataset_tag,
  587. "operator": operator,
  588. }
  589. self.data_operator_estimated_total_blocks.set(
  590. op_state.get("total", 0), operator_tags
  591. )
  592. self.data_operator_estimated_total_rows.set(
  593. op_state.get("total_rows", 0), operator_tags
  594. )
  595. self.data_operator_queued_blocks.set(
  596. op_state.get("queued_blocks", 0), operator_tags
  597. )
  598. # Get state code directly from enum
  599. state_string = op_state.get("state", DatasetState.UNKNOWN.name)
  600. state_enum = DatasetState.from_string(state_string)
  601. self.data_operator_state.set(state_enum.value, operator_tags)
  602. operator_states[operator] = state_string
  603. self.update_dataset_metadata_operator_states(dataset_tag, operator_states)
  604. # Evict the oldest finished datasets to ensure the `max_stats` limit is enforced.
  605. if state["state"] in {DatasetState.FINISHED.name, DatasetState.FAILED.name}:
  606. self.finished_datasets_queue.append(dataset_tag)
  607. while len(self.datasets) > self.max_stats and self.finished_datasets_queue:
  608. tag_to_evict = self.finished_datasets_queue.popleft()
  609. self.datasets.pop(tag_to_evict, None)
  610. self.dataset_metadatas.pop(tag_to_evict, None)
  611. def get_datasets(self, job_id: Optional[str] = None):
  612. if not job_id:
  613. return self.datasets
  614. return {k: v for k, v in self.datasets.items() if v["job_id"] == job_id}
  615. def update_dataset_metadata_state(self, dataset_id: str, new_state: str):
  616. if dataset_id not in self.dataset_metadatas:
  617. return
  618. update_time = time.time()
  619. dataset_metadata = self.dataset_metadatas[dataset_id]
  620. if dataset_metadata.state == new_state:
  621. return
  622. updated_dataset_metadata = copy.deepcopy(dataset_metadata)
  623. updated_dataset_metadata.state = new_state
  624. if new_state == DatasetState.RUNNING.name:
  625. updated_dataset_metadata.execution_start_time = update_time
  626. elif new_state in (DatasetState.FINISHED.name, DatasetState.FAILED.name):
  627. updated_dataset_metadata.execution_end_time = update_time
  628. # Update metadata of running operators
  629. for operator in updated_dataset_metadata.topology.operators:
  630. if operator.state == DatasetState.RUNNING.name:
  631. operator.state = new_state
  632. operator.execution_end_time = update_time
  633. self.dataset_metadatas[dataset_id] = updated_dataset_metadata
  634. if self._metadata_exporter is not None:
  635. self._metadata_exporter.export_dataset_metadata(
  636. updated_dataset_metadata,
  637. include_data_context=False,
  638. include_op_args=False,
  639. )
  640. def update_dataset_metadata_operator_states(
  641. self, dataset_id: str, operator_states: Dict[str, str]
  642. ):
  643. if dataset_id not in self.dataset_metadatas:
  644. return
  645. dataset_metadata = self.dataset_metadatas[dataset_id]
  646. update_needed = False
  647. for operator in dataset_metadata.topology.operators:
  648. if (
  649. operator.id in operator_states
  650. and operator.state != operator_states[operator.id]
  651. ):
  652. update_needed = True
  653. break
  654. if not update_needed:
  655. return
  656. updated_dataset_metadata = copy.deepcopy(dataset_metadata)
  657. update_time = time.time()
  658. for operator in updated_dataset_metadata.topology.operators:
  659. if operator.id in operator_states:
  660. new_state = operator_states[operator.id]
  661. if operator.state == new_state:
  662. continue
  663. operator.state = new_state
  664. if new_state == DatasetState.RUNNING.name:
  665. operator.execution_start_time = update_time
  666. elif new_state in (
  667. DatasetState.FINISHED.name,
  668. DatasetState.FAILED.name,
  669. ):
  670. operator.execution_end_time = update_time
  671. # Handle outlier case for InputDataBuffer, which is marked as finished immediately and does not have a RUNNING state.
  672. # Set the execution time the same as its end time
  673. if not operator.execution_start_time:
  674. operator.execution_start_time = update_time
  675. self.dataset_metadatas[dataset_id] = updated_dataset_metadata
  676. if self._metadata_exporter is not None:
  677. self._metadata_exporter.export_dataset_metadata(
  678. updated_dataset_metadata,
  679. include_data_context=False,
  680. include_op_args=False,
  681. )
  682. def _create_tags(
  683. self,
  684. dataset_tag: str,
  685. operator_tag: Optional[str] = None,
  686. node_ip_tag: Optional[str] = None,
  687. ):
  688. tags = {"dataset": dataset_tag}
  689. if operator_tag is not None:
  690. tags["operator"] = operator_tag
  691. if node_ip_tag is not None:
  692. tags["node_ip"] = node_ip_tag
  693. return tags
  694. def get_or_create_stats_actor() -> ActorHandle[_StatsActor]:
  695. """Each cluster will contain exactly 1 _StatsActor. This function
  696. returns the current _StatsActor handle, or create a new one if one
  697. does not exist in the connected cluster. The _StatsActor is pinned on
  698. on driver process' node.
  699. """
  700. if ray._private.worker._global_node is None:
  701. raise RuntimeError(
  702. "Global node is not initialized. Driver might be not connected to Ray."
  703. )
  704. current_cluster_id = ray._private.worker._global_node.cluster_id
  705. logger.debug(f"Stats Actor located on cluster_id={current_cluster_id}")
  706. # so it fate-shares with the driver.
  707. scheduling_strategy = NodeAffinitySchedulingStrategy(
  708. ray.get_runtime_context().get_node_id(),
  709. soft=False,
  710. )
  711. return _StatsActor.options(
  712. name=STATS_ACTOR_NAME,
  713. namespace=STATS_ACTOR_NAMESPACE,
  714. get_if_exists=True,
  715. lifetime="detached",
  716. scheduling_strategy=scheduling_strategy,
  717. ).remote()
  718. class _StatsManager:
  719. """A Class containing util functions that manage remote calls to _StatsActor.
  720. Ray Data updates metrics through the _StatsManager, and direct remote calls
  721. to the _StatsActor is discouraged. Some functionalities provided by
  722. _StatsManager:
  723. - Format and update iteration metrics
  724. - Format and update execution metrics
  725. - Aggregate per node metrics
  726. - Dataset registration
  727. """
  728. @staticmethod
  729. def _aggregate_per_node_metrics(
  730. op_metrics: List[OpRuntimeMetrics],
  731. ) -> Optional[Mapping[str, Mapping[str, Union[int, float]]]]:
  732. """
  733. Aggregate per-node metrics from a list of OpRuntimeMetrics objects.
  734. If per-node metrics are disabled in the current DataContext, returns None.
  735. Otherwise, it sums up all NodeMetrics fields across the provided metrics and
  736. returns a nested dictionary mapping each node ID to a dict of field values.
  737. """
  738. if not DataContext.get_current().enable_per_node_metrics:
  739. return None
  740. aggregated_by_node = defaultdict(lambda: defaultdict(int))
  741. for metrics in op_metrics:
  742. for node_id, node_metrics in metrics._per_node_metrics.items():
  743. agg_node_metrics = aggregated_by_node[node_id]
  744. for f in fields(NodeMetrics):
  745. agg_node_metrics[f.name] += getattr(node_metrics, f.name)
  746. return aggregated_by_node
  747. @staticmethod
  748. def update_execution_metrics(
  749. dataset_tag: str,
  750. op_metrics: List[OpRuntimeMetrics],
  751. operator_tags: List[str],
  752. state: Dict[str, Any],
  753. ):
  754. per_node_metrics = _StatsManager._aggregate_per_node_metrics(op_metrics)
  755. op_metrics_dicts = [metric.as_dict() for metric in op_metrics]
  756. args = (
  757. dataset_tag,
  758. op_metrics_dicts,
  759. operator_tags,
  760. state,
  761. per_node_metrics,
  762. )
  763. try:
  764. get_or_create_stats_actor().update_execution_metrics.remote(*args)
  765. except Exception as e:
  766. logger.warning(
  767. f"Error occurred during update_execution_metrics.remote call to _StatsActor: {e}",
  768. exc_info=True,
  769. )
  770. return
  771. @staticmethod
  772. def update_iteration_metrics(stats: "DatasetStats", dataset_tag: str):
  773. args = (stats, dataset_tag)
  774. try:
  775. get_or_create_stats_actor().update_iteration_metrics.remote(*args)
  776. except Exception as e:
  777. logger.warning(
  778. f"Error occurred during update_iteration_metrics.remote call to _StatsActor: {e}",
  779. exc_info=True,
  780. )
  781. @staticmethod
  782. def register_dataset_to_stats_actor(
  783. dataset_tag: str,
  784. operator_tags: List[str],
  785. topology: Topology,
  786. data_context: DataContext,
  787. ):
  788. """Register a dataset with the stats actor.
  789. Args:
  790. dataset_tag: Tag for the dataset
  791. operator_tags: List of operator tags
  792. topology: Optional Topology representing the DAG structure to export
  793. data_context: The DataContext attached to the dataset
  794. """
  795. # Convert DataContext to DataContextMetadata before serialization to avoid
  796. # module dependency issues during Ray's cloudpickle serialization.
  797. data_context = DataContextMetadata.from_data_context(data_context)
  798. get_or_create_stats_actor().register_dataset.remote(
  799. ray.get_runtime_context().get_job_id(),
  800. dataset_tag,
  801. operator_tags,
  802. topology,
  803. data_context,
  804. )
  805. @staticmethod
  806. def gen_dataset_id_from_stats_actor() -> str:
  807. try:
  808. stats_actor = get_or_create_stats_actor()
  809. return ray.get(stats_actor.gen_dataset_id.remote())
  810. except Exception as e:
  811. logger.warning(
  812. f"Failed to generate dataset_id, falling back to random uuid_v4: {e}"
  813. )
  814. # Getting dataset id from _StatsActor may fail, in this case
  815. # fall back to uuid4
  816. return uuid4().hex
  817. class DatasetStats:
  818. """Holds the execution times for a given Dataset.
  819. This object contains a reference to the parent Dataset's stats as well,
  820. but not the Dataset object itself, to allow its blocks to be dropped from
  821. memory."""
  822. def __init__(
  823. self,
  824. *,
  825. metadata: StatsDict,
  826. parent: Union[Optional["DatasetStats"], List["DatasetStats"]],
  827. base_name: str = None,
  828. ):
  829. """Create dataset stats.
  830. Args:
  831. metadata: Dict of operators used to create this Dataset from the
  832. previous one. Typically one entry, e.g., {"map": [...]}.
  833. parent: Reference to parent Dataset's stats, or a list of parents
  834. if there are multiple.
  835. base_name: The name of the base operation for a multi-operator operation.
  836. """
  837. self.metadata: StatsDict = metadata
  838. if parent is not None and not isinstance(parent, list):
  839. parent = [parent]
  840. self.parents: List["DatasetStats"] = parent or []
  841. self.number: int = (
  842. 0 if not self.parents else max(p.number for p in self.parents) + 1
  843. )
  844. self.base_name = base_name
  845. # TODO(ekl) deprecate and remove the notion of dataset UUID once we move
  846. # fully to streaming execution.
  847. self.dataset_uuid: str = "unknown_uuid"
  848. self.time_total_s: float = 0
  849. # Streaming executor stats
  850. self.streaming_exec_schedule_s: Timer = Timer()
  851. # Iteration stats, filled out if the user iterates over the dataset.
  852. self.iter_wait_s: Timer = Timer()
  853. self.iter_get_ref_bundles_s: Timer = Timer()
  854. self.iter_get_s: Timer = Timer()
  855. self.iter_next_batch_s: Timer = Timer()
  856. self.iter_format_batch_s: Timer = Timer()
  857. self.iter_collate_batch_s: Timer = Timer()
  858. self.iter_finalize_batch_s: Timer = Timer()
  859. self.iter_time_to_first_batch_s: Timer = Timer()
  860. self.iter_total_blocked_s: Timer = Timer()
  861. self.iter_user_s: Timer = Timer()
  862. self.iter_initialize_s: Timer = Timer()
  863. self.iter_total_s: Timer = Timer()
  864. self.extra_metrics = {}
  865. # Block fetch stats during iteration.
  866. # These are stats about locations of blocks when the iterator is trying to
  867. # consume them. The iteration performance will be affected depending on
  868. # whether the block is in the local object store of the node where the
  869. # iterator is running.
  870. # This serves as an indicator of block prefetching effectiveness.
  871. self.iter_blocks_local: int = 0
  872. self.iter_blocks_remote: int = 0
  873. self.iter_unknown_location: int = 0
  874. self.iter_prefetched_bytes: int = 0
  875. # Memory usage stats
  876. self.global_bytes_spilled: int = 0
  877. self.global_bytes_restored: int = 0
  878. self.dataset_bytes_spilled: int = 0
  879. # Streaming split coordinator stats (dataset level)
  880. self.streaming_split_coordinator_s: Timer = Timer()
  881. @property
  882. def stats_actor(self):
  883. return get_or_create_stats_actor()
  884. def child_builder(
  885. self, name: str, override_start_time: Optional[float] = None
  886. ) -> _DatasetStatsBuilder:
  887. """Start recording stats for an op of the given name (e.g., map)."""
  888. return _DatasetStatsBuilder(name, self, override_start_time)
  889. def to_summary(self) -> "DatasetStatsSummary":
  890. """Generate a `DatasetStatsSummary` object from the given `DatasetStats`
  891. object, which can be used to generate a summary string."""
  892. operators_stats = []
  893. is_sub_operator = len(self.metadata) > 1
  894. iter_stats = IterStatsSummary(
  895. self.iter_wait_s,
  896. self.iter_get_ref_bundles_s,
  897. self.iter_get_s,
  898. self.iter_next_batch_s,
  899. self.iter_format_batch_s,
  900. self.iter_collate_batch_s,
  901. self.iter_finalize_batch_s,
  902. self.iter_time_to_first_batch_s,
  903. self.iter_total_blocked_s,
  904. self.iter_user_s,
  905. self.iter_initialize_s,
  906. self.iter_total_s,
  907. self.streaming_split_coordinator_s,
  908. self.iter_blocks_local,
  909. self.iter_blocks_remote,
  910. self.iter_unknown_location,
  911. self.iter_prefetched_bytes,
  912. )
  913. stats_summary_parents = []
  914. if self.parents is not None:
  915. stats_summary_parents = [p.to_summary() for p in self.parents]
  916. # Collect the sum of the final output row counts from all parent nodes
  917. parent_total_output = 0
  918. for i, parent_summary in enumerate(stats_summary_parents):
  919. if parent_summary.operators_stats:
  920. # Get the last operator stats from the current parent summary
  921. last_parent_op = parent_summary.operators_stats[-1]
  922. # Extract output row count (handle dict type with "sum" key)
  923. op_output = (
  924. last_parent_op.output_num_rows.get("sum", 0)
  925. if isinstance(last_parent_op.output_num_rows, dict)
  926. else 0
  927. )
  928. logger.debug(
  929. f"Parent {i + 1} (operator: {last_parent_op.operator_name}) contributes {op_output} rows to input"
  930. )
  931. parent_total_output += op_output
  932. # Create temporary operator stats objects from block metadata
  933. op_stats = [
  934. OperatorStatsSummary.from_block_metadata(
  935. name, stats, is_sub_operator=is_sub_operator
  936. )
  937. for name, stats in self.metadata.items()
  938. ]
  939. for i, op_stat in enumerate(op_stats):
  940. # For sub-operators: inherit input based on the order in the current list
  941. if is_sub_operator:
  942. if i == 0:
  943. # Input of the first sub-operator is the total output from parent nodes
  944. op_stat.total_input_num_rows = parent_total_output
  945. else:
  946. # Input of subsequent sub-operators is the output of the previous sub-operator
  947. prev_op = op_stats[i - 1]
  948. op_stat.total_input_num_rows = (
  949. prev_op.output_num_rows["sum"]
  950. if (
  951. prev_op.output_num_rows and "sum" in prev_op.output_num_rows
  952. )
  953. else 0
  954. )
  955. else:
  956. # Single operator scenario: input rows = total output from all parent nodes
  957. op_stat.total_input_num_rows = parent_total_output
  958. operators_stats.append(op_stat)
  959. streaming_exec_schedule_s = (
  960. self.streaming_exec_schedule_s.get()
  961. if self.streaming_exec_schedule_s
  962. else 0
  963. )
  964. return DatasetStatsSummary(
  965. operators_stats,
  966. iter_stats,
  967. stats_summary_parents,
  968. self.number,
  969. self.dataset_uuid,
  970. self.time_total_s,
  971. self.base_name,
  972. self.extra_metrics,
  973. self.global_bytes_spilled,
  974. self.global_bytes_restored,
  975. self.dataset_bytes_spilled,
  976. streaming_exec_schedule_s,
  977. )
  978. def runtime_metrics(self) -> str:
  979. """Generate a string representing the runtime metrics of a Dataset. This is
  980. a high level summary of the time spent in Ray Data code broken down by operator.
  981. It also includes the time spent in the scheduler. Times are shown as the total
  982. time for each operator and percentages of time are shown as a fraction of the
  983. total time for the whole dataset."""
  984. return self.to_summary().runtime_metrics()
  985. @DeveloperAPI
  986. @dataclass
  987. class DatasetStatsSummary:
  988. operators_stats: List["OperatorStatsSummary"]
  989. iter_stats: "IterStatsSummary"
  990. parents: List["DatasetStatsSummary"]
  991. number: int
  992. dataset_uuid: str
  993. time_total_s: float
  994. base_name: str
  995. extra_metrics: Dict[str, Any]
  996. global_bytes_spilled: int
  997. global_bytes_restored: int
  998. dataset_bytes_spilled: int
  999. streaming_exec_schedule_s: float
  1000. def to_string(
  1001. self,
  1002. already_printed: Optional[Set[str]] = None,
  1003. include_parent: bool = True,
  1004. add_global_stats=True,
  1005. ) -> str:
  1006. """Return a human-readable summary of this Dataset's stats.
  1007. Args:
  1008. already_printed: Set of operator IDs that have already had its stats printed
  1009. out.
  1010. include_parent: If true, also include parent stats summary; otherwise, only
  1011. log stats of the latest operator.
  1012. add_global_stats: If true, includes global stats to this summary.
  1013. Returns:
  1014. String with summary statistics for executing the Dataset.
  1015. """
  1016. if already_printed is None:
  1017. already_printed = set()
  1018. out = ""
  1019. if self.parents and include_parent:
  1020. for p in self.parents:
  1021. parent_sum = p.to_string(already_printed, add_global_stats=False)
  1022. if parent_sum:
  1023. out += parent_sum
  1024. out += "\n"
  1025. operators_stats_summary = None
  1026. if len(self.operators_stats) == 1:
  1027. operators_stats_summary = self.operators_stats[0]
  1028. operator_name = operators_stats_summary.operator_name
  1029. operator_uuid = self.dataset_uuid + operator_name
  1030. out += "Operator {} {}: ".format(self.number, operator_name)
  1031. if operator_uuid in already_printed:
  1032. out += "[execution cached]\n"
  1033. else:
  1034. already_printed.add(operator_uuid)
  1035. out += str(operators_stats_summary)
  1036. elif len(self.operators_stats) > 1:
  1037. rounded_total = round(self.time_total_s, 2)
  1038. if rounded_total <= 0:
  1039. # Handle -0.0 case.
  1040. rounded_total = 0
  1041. out += "Operator {} {}: executed in {}s\n".format(
  1042. self.number, self.base_name, rounded_total
  1043. )
  1044. for n, operators_stats_summary in enumerate(self.operators_stats):
  1045. operator_name = operators_stats_summary.operator_name
  1046. operator_uuid = self.dataset_uuid + operator_name
  1047. out += "\n"
  1048. out += "\tSuboperator {} {}: ".format(n, operator_name)
  1049. if operator_uuid in already_printed:
  1050. out += "\t[execution cached]\n"
  1051. else:
  1052. already_printed.add(operator_uuid)
  1053. out += str(operators_stats_summary)
  1054. verbose_stats_logs = DataContext.get_current().verbose_stats_logs
  1055. if verbose_stats_logs and self.extra_metrics:
  1056. indent = (
  1057. "\t"
  1058. if operators_stats_summary and operators_stats_summary.is_sub_operator
  1059. else ""
  1060. )
  1061. out += indent
  1062. out += "* Extra metrics: " + str(self.extra_metrics) + "\n"
  1063. out += str(self.iter_stats)
  1064. if len(self.operators_stats) > 0 and add_global_stats:
  1065. mb_spilled = round(self.global_bytes_spilled / 1e6)
  1066. mb_restored = round(self.global_bytes_restored / 1e6)
  1067. if mb_spilled or mb_restored:
  1068. out += "\nCluster memory:\n"
  1069. out += "* Spilled to disk: {}MB\n".format(mb_spilled)
  1070. out += "* Restored from disk: {}MB\n".format(mb_restored)
  1071. dataset_mb_spilled = round(self.dataset_bytes_spilled / 1e6)
  1072. if dataset_mb_spilled:
  1073. out += "\nDataset memory:\n"
  1074. out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)
  1075. if self.num_rows_per_s:
  1076. out += "\n"
  1077. out += "Dataset throughput:\n"
  1078. out += f"\t* Ray Data throughput: {self.num_rows_per_s} rows/s\n"
  1079. if verbose_stats_logs and add_global_stats:
  1080. out += "\n" + self.runtime_metrics()
  1081. return out
  1082. @property
  1083. def num_rows_per_s(self) -> float:
  1084. """Calculates the throughput in rows per second for the entire dataset."""
  1085. # The observed dataset throughput is computed by dividing the total number
  1086. # of rows produced by the total wall time of the dataset (i.e. from start to
  1087. # finish how long did the dataset take to be processed). With the recursive
  1088. # nature of the DatasetStatsSummary, we use get_total_wall_time to determine
  1089. # the total wall time (this finds the difference between the earliest start
  1090. # and latest end for any block in any operator).
  1091. output_num_rows = (
  1092. self.operators_stats[-1].output_num_rows if self.operators_stats else 0
  1093. )
  1094. total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
  1095. wall_time = self.get_total_wall_time()
  1096. if not total_num_out_rows or not wall_time:
  1097. return 0.0
  1098. return total_num_out_rows / wall_time
  1099. @staticmethod
  1100. def _collect_dataset_stats_summaries(
  1101. curr: "DatasetStatsSummary",
  1102. ) -> List["DatasetStatsSummary"]:
  1103. summs = []
  1104. # TODO: Do operators ever have multiple parents? Do we need to deduplicate?
  1105. for p in curr.parents:
  1106. if p and p.parents:
  1107. summs.extend(DatasetStatsSummary._collect_dataset_stats_summaries(p))
  1108. return summs + [curr]
  1109. @staticmethod
  1110. def _find_start_and_end(summ: "DatasetStatsSummary") -> Tuple[float, float]:
  1111. earliest_start = min(ops.earliest_start_time for ops in summ.operators_stats)
  1112. latest_end = max(ops.latest_end_time for ops in summ.operators_stats)
  1113. return earliest_start, latest_end
  1114. def runtime_metrics(self) -> str:
  1115. total_wall_time = self.get_total_wall_time()
  1116. def fmt_line(name: str, time: float) -> str:
  1117. fraction = time / total_wall_time if total_wall_time > 0 else 0
  1118. return f"* {name}: {fmt(time)} ({fraction * 100:.3f}%)\n"
  1119. summaries = DatasetStatsSummary._collect_dataset_stats_summaries(self)
  1120. out = "Runtime Metrics:\n"
  1121. for summ in summaries:
  1122. if len(summ.operators_stats) > 0:
  1123. earliest_start, latest_end = DatasetStatsSummary._find_start_and_end(
  1124. summ
  1125. )
  1126. op_total_time = latest_end - earliest_start
  1127. out += fmt_line(summ.base_name, op_total_time)
  1128. out += fmt_line("Scheduling", self.streaming_exec_schedule_s)
  1129. out += fmt_line("Total", total_wall_time)
  1130. return out
  1131. def __repr__(self, level=0) -> str:
  1132. indent = leveled_indent(level)
  1133. operators_stats = "\n".join(
  1134. [ss.__repr__(level + 2) for ss in self.operators_stats]
  1135. )
  1136. parent_stats = "\n".join([ps.__repr__(level + 2) for ps in self.parents])
  1137. extra_metrics = "\n".join(
  1138. f"{leveled_indent(level + 2)}{k}: {v},"
  1139. for k, v in self.extra_metrics.items()
  1140. )
  1141. # Handle formatting case for empty outputs.
  1142. operators_stats = (
  1143. f"\n{operators_stats},\n{indent} " if operators_stats else ""
  1144. )
  1145. parent_stats = f"\n{parent_stats},\n{indent} " if parent_stats else ""
  1146. extra_metrics = f"\n{extra_metrics}\n{indent} " if extra_metrics else ""
  1147. return (
  1148. f"{indent}DatasetStatsSummary(\n"
  1149. f"{indent} dataset_uuid={self.dataset_uuid},\n"
  1150. f"{indent} base_name={self.base_name},\n"
  1151. f"{indent} number={self.number},\n"
  1152. f"{indent} extra_metrics={{{extra_metrics}}},\n"
  1153. f"{indent} operators_stats=[{operators_stats}],\n"
  1154. f"{indent} iter_stats={self.iter_stats.__repr__(level+1)},\n"
  1155. f"{indent} global_bytes_spilled={self.global_bytes_spilled / 1e6}MB,\n"
  1156. f"{indent} global_bytes_restored={self.global_bytes_restored / 1e6}MB,\n"
  1157. f"{indent} dataset_bytes_spilled={self.dataset_bytes_spilled / 1e6}MB,\n"
  1158. f"{indent} parents=[{parent_stats}],\n"
  1159. f"{indent})"
  1160. )
  1161. def get_total_wall_time(self) -> float:
  1162. """Calculate the total wall time for the dataset, this is done by finding
  1163. the earliest start time and latest end time for any block in any operator.
  1164. The wall time is the difference of these two times.
  1165. """
  1166. start_ends = [
  1167. DatasetStatsSummary._find_start_and_end(summ)
  1168. for summ in DatasetStatsSummary._collect_dataset_stats_summaries(self)
  1169. if len(summ.operators_stats) > 0
  1170. ]
  1171. if len(start_ends) == 0:
  1172. return 0
  1173. else:
  1174. earliest_start = min(start_end[0] for start_end in start_ends)
  1175. latest_end = max(start_end[1] for start_end in start_ends)
  1176. return latest_end - earliest_start
  1177. def get_total_time_all_blocks(self) -> float:
  1178. """Calculate the sum of the wall times across all blocks of all operators."""
  1179. summaries = DatasetStatsSummary._collect_dataset_stats_summaries(self)
  1180. return sum(
  1181. (
  1182. sum(
  1183. ops.wall_time.get("sum", 0) if ops.wall_time else 0
  1184. for ops in summ.operators_stats
  1185. )
  1186. )
  1187. for summ in summaries
  1188. )
  1189. def get_total_cpu_time(self) -> float:
  1190. parent_sum = sum(p.get_total_cpu_time() for p in self.parents)
  1191. return parent_sum + sum(
  1192. ss.cpu_time.get("sum", 0) for ss in self.operators_stats
  1193. )
  1194. def get_max_heap_memory(self) -> float:
  1195. parent_memory = [p.get_max_heap_memory() for p in self.parents]
  1196. parent_max = max(parent_memory) if parent_memory else 0
  1197. if not self.operators_stats:
  1198. return parent_max
  1199. return max(
  1200. parent_max,
  1201. *[ss.memory.get("max", 0) for ss in self.operators_stats],
  1202. )
  1203. @dataclass
  1204. class OperatorStatsSummary:
  1205. operator_name: str
  1206. # Whether the operator associated with this OperatorStatsSummary object
  1207. # is a suboperator
  1208. is_sub_operator: bool
  1209. # This is the total walltime of the entire operator, typically obtained from
  1210. # `DatasetStats.time_total_s`. An important distinction is that this is the
  1211. # overall runtime of the operator, pulled from the stats actor, whereas the
  1212. # computed walltimes in `self.wall_time` are calculated on a operator level.
  1213. time_total_s: float
  1214. earliest_start_time: float
  1215. latest_end_time: float
  1216. # String summarizing high-level statistics from executing the operator
  1217. block_execution_summary_str: str
  1218. # The fields below are dicts with stats aggregated across blocks
  1219. # processed in this operator. For example:
  1220. # {"min": ..., "max": ..., "mean": ..., "sum": ...}
  1221. wall_time: Optional[Dict[str, float]] = None
  1222. cpu_time: Optional[Dict[str, float]] = None
  1223. udf_time: Optional[Dict[str, float]] = None
  1224. # memory: no "sum" stat
  1225. memory: Optional[Dict[str, float]] = None
  1226. # Use the output_num_rows of the parent Operator as output_num_rows
  1227. total_input_num_rows: Optional[int] = None
  1228. output_num_rows: Optional[Dict[str, float]] = None
  1229. output_size_bytes: Optional[Dict[str, float]] = None
  1230. # node_count: "count" stat instead of "sum"
  1231. node_count: Optional[Dict[str, float]] = None
  1232. task_rows: Optional[Dict[str, float]] = None
  1233. @property
  1234. def num_rows_per_s(self) -> float:
  1235. # The observed Ray Data operator throughput is computed by dividing the
  1236. # total number of rows produced by the wall time of the operator,
  1237. # time_total_s.
  1238. if not self.output_num_rows or not self.time_total_s:
  1239. return 0.0
  1240. return self.output_num_rows["sum"] / self.time_total_s
  1241. @property
  1242. def num_rows_per_task_s(self) -> float:
  1243. """Calculates the estimated single-task throughput in rows per second."""
  1244. # The estimated single task operator throughput is computed by dividing the
  1245. # total number of rows produced by the sum of the wall times across all
  1246. # blocks of the operator. This assumes that on a single task the work done
  1247. # would be equivalent, with no concurrency.
  1248. if not self.output_num_rows or not self.wall_time or not self.wall_time["sum"]:
  1249. return 0.0
  1250. return self.output_num_rows["sum"] / self.wall_time["sum"]
  1251. @classmethod
  1252. def from_block_metadata(
  1253. cls,
  1254. operator_name: str,
  1255. block_stats: List[BlockStats],
  1256. is_sub_operator: bool,
  1257. ) -> "OperatorStatsSummary":
  1258. """Calculate the stats for a operator from a given list of blocks,
  1259. and generates a `OperatorStatsSummary` object with the results.
  1260. Args:
  1261. block_stats: List of `BlockStats` to calculate stats of
  1262. operator_name: Name of operator associated with `blocks`
  1263. is_sub_operator: Whether this set of blocks belongs to a sub operator.
  1264. Returns:
  1265. A `OperatorStatsSummary` object initialized with the calculated statistics
  1266. """
  1267. exec_stats = [m.exec_stats for m in block_stats if m.exec_stats is not None]
  1268. rounded_total = 0
  1269. time_total_s = 0
  1270. earliest_start_time, latest_end_time = 0, 0
  1271. if exec_stats:
  1272. # Calculate the total execution time of operator as
  1273. # the difference between the latest end time and
  1274. # the earliest start time of all blocks in the operator.
  1275. earliest_start_time = min(s.start_time_s for s in exec_stats)
  1276. latest_end_time = max(s.end_time_s for s in exec_stats)
  1277. time_total_s = latest_end_time - earliest_start_time
  1278. if is_sub_operator:
  1279. exec_summary_str = "{} blocks produced\n".format(len(exec_stats))
  1280. else:
  1281. if exec_stats:
  1282. rounded_total = round(time_total_s, 2)
  1283. if rounded_total <= 0:
  1284. # Handle -0.0 case.
  1285. rounded_total = 0
  1286. exec_summary_str = "{} blocks produced in {}s".format(
  1287. len(exec_stats), rounded_total
  1288. )
  1289. else:
  1290. exec_summary_str = ""
  1291. exec_summary_str += "\n"
  1292. task_rows = collections.defaultdict(int)
  1293. for meta in block_stats:
  1294. if meta.num_rows is not None and meta.exec_stats is not None:
  1295. task_rows[meta.exec_stats.task_idx] += meta.num_rows
  1296. task_rows_stats = None
  1297. if len(task_rows) > 0:
  1298. task_rows_stats = {
  1299. "min": min(task_rows.values()),
  1300. "max": max(task_rows.values()),
  1301. "mean": int(np.mean(list(task_rows.values()))),
  1302. "count": len(task_rows),
  1303. }
  1304. exec_summary_str = "{} tasks executed, {}".format(
  1305. len(task_rows), exec_summary_str
  1306. )
  1307. wall_time_stats, cpu_stats, memory_stats, udf_stats = None, None, None, None
  1308. if exec_stats:
  1309. wall_time_stats = {
  1310. "min": min([e.wall_time_s for e in exec_stats]),
  1311. "max": max([e.wall_time_s for e in exec_stats]),
  1312. "mean": np.mean([e.wall_time_s for e in exec_stats]),
  1313. "sum": sum([e.wall_time_s for e in exec_stats]),
  1314. }
  1315. cpu_stats = {
  1316. "min": min([e.cpu_time_s for e in exec_stats]),
  1317. "max": max([e.cpu_time_s for e in exec_stats]),
  1318. "mean": np.mean([e.cpu_time_s for e in exec_stats]),
  1319. "sum": sum([e.cpu_time_s for e in exec_stats]),
  1320. }
  1321. memory_stats_mb = [
  1322. round((e.max_uss_bytes or 0) / (1024 * 1024), 2) for e in exec_stats
  1323. ]
  1324. memory_stats = {
  1325. "min": min(memory_stats_mb),
  1326. "max": max(memory_stats_mb),
  1327. "mean": int(np.mean(memory_stats_mb)),
  1328. }
  1329. udf_stats = {
  1330. "min": min([e.udf_time_s for e in exec_stats]),
  1331. "max": max([e.udf_time_s for e in exec_stats]),
  1332. "mean": np.mean([e.udf_time_s for e in exec_stats]),
  1333. "sum": sum([e.udf_time_s for e in exec_stats]),
  1334. }
  1335. output_num_rows_stats = None
  1336. output_num_rows = [m.num_rows for m in block_stats if m.num_rows is not None]
  1337. if output_num_rows:
  1338. output_num_rows_stats = {
  1339. "min": min(output_num_rows),
  1340. "max": max(output_num_rows),
  1341. "mean": int(np.mean(output_num_rows)),
  1342. "sum": sum(output_num_rows),
  1343. }
  1344. output_size_bytes_stats = None
  1345. output_size_bytes = [
  1346. m.size_bytes for m in block_stats if m.size_bytes is not None
  1347. ]
  1348. if output_size_bytes:
  1349. output_size_bytes_stats = {
  1350. "min": min(output_size_bytes),
  1351. "max": max(output_size_bytes),
  1352. "mean": int(np.mean(output_size_bytes)),
  1353. "sum": sum(output_size_bytes),
  1354. }
  1355. node_counts_stats = None
  1356. if exec_stats:
  1357. node_tasks = collections.defaultdict(set)
  1358. for s in exec_stats:
  1359. node_tasks[s.node_id].add(s.task_idx)
  1360. node_counts = {node: len(tasks) for node, tasks in node_tasks.items()}
  1361. node_counts_stats = {
  1362. "min": min(node_counts.values()),
  1363. "max": max(node_counts.values()),
  1364. "mean": int(np.mean(list(node_counts.values()))),
  1365. "count": len(node_counts),
  1366. }
  1367. # Assign a value in to_summary and initialize it as None.
  1368. total_input_num_rows = None
  1369. return OperatorStatsSummary(
  1370. operator_name=operator_name,
  1371. is_sub_operator=is_sub_operator,
  1372. time_total_s=time_total_s,
  1373. earliest_start_time=earliest_start_time,
  1374. latest_end_time=latest_end_time,
  1375. block_execution_summary_str=exec_summary_str,
  1376. wall_time=wall_time_stats,
  1377. cpu_time=cpu_stats,
  1378. udf_time=udf_stats,
  1379. memory=memory_stats,
  1380. total_input_num_rows=total_input_num_rows,
  1381. output_num_rows=output_num_rows_stats,
  1382. output_size_bytes=output_size_bytes_stats,
  1383. node_count=node_counts_stats,
  1384. task_rows=task_rows_stats,
  1385. )
  1386. def __str__(self) -> str:
  1387. """For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
  1388. `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
  1389. that summarizes operator execution statistics.
  1390. Returns:
  1391. String with summary statistics for executing the given operator.
  1392. """
  1393. indent = "\t" if self.is_sub_operator else ""
  1394. out = self.block_execution_summary_str
  1395. wall_time_stats = self.wall_time
  1396. if wall_time_stats:
  1397. out += indent
  1398. out += "* Remote wall time: {} min, {} max, {} mean, {} total\n".format(
  1399. fmt(wall_time_stats["min"]),
  1400. fmt(wall_time_stats["max"]),
  1401. fmt(wall_time_stats["mean"]),
  1402. fmt(wall_time_stats["sum"]),
  1403. )
  1404. cpu_stats = self.cpu_time
  1405. if cpu_stats:
  1406. out += indent
  1407. out += "* Remote cpu time: {} min, {} max, {} mean, {} total\n".format(
  1408. fmt(cpu_stats["min"]),
  1409. fmt(cpu_stats["max"]),
  1410. fmt(cpu_stats["mean"]),
  1411. fmt(cpu_stats["sum"]),
  1412. )
  1413. udf_stats = self.udf_time
  1414. if udf_stats:
  1415. out += indent
  1416. out += "* UDF time: {} min, {} max, {} mean, {} total\n".format(
  1417. fmt(udf_stats["min"]),
  1418. fmt(udf_stats["max"]),
  1419. fmt(udf_stats["mean"]),
  1420. fmt(udf_stats["sum"]),
  1421. )
  1422. memory_stats = self.memory
  1423. if memory_stats:
  1424. out += indent
  1425. out += "* Peak heap memory usage (MiB): {} min, {} max, {} mean\n".format(
  1426. memory_stats["min"],
  1427. memory_stats["max"],
  1428. memory_stats["mean"],
  1429. )
  1430. output_num_rows_stats = self.output_num_rows
  1431. if output_num_rows_stats:
  1432. out += indent
  1433. out += (
  1434. "* Output num rows per block: {} min, {} max, {} mean, {} total\n"
  1435. ).format(
  1436. output_num_rows_stats["min"],
  1437. output_num_rows_stats["max"],
  1438. output_num_rows_stats["mean"],
  1439. output_num_rows_stats["sum"],
  1440. )
  1441. output_size_bytes_stats = self.output_size_bytes
  1442. if output_size_bytes_stats:
  1443. out += indent
  1444. out += (
  1445. "* Output size bytes per block: {} min, {} max, {} mean, {} total\n"
  1446. ).format(
  1447. output_size_bytes_stats["min"],
  1448. output_size_bytes_stats["max"],
  1449. output_size_bytes_stats["mean"],
  1450. output_size_bytes_stats["sum"],
  1451. )
  1452. task_rows = self.task_rows
  1453. if task_rows:
  1454. out += indent
  1455. out += (
  1456. "* Output rows per task: {} min, {} max, {} mean, {} tasks used\n"
  1457. ).format(
  1458. task_rows["min"],
  1459. task_rows["max"],
  1460. task_rows["mean"],
  1461. task_rows["count"],
  1462. )
  1463. node_count_stats = self.node_count
  1464. if node_count_stats:
  1465. out += indent
  1466. out += "* Tasks per node: {} min, {} max, {} mean; {} nodes used\n".format(
  1467. node_count_stats["min"],
  1468. node_count_stats["max"],
  1469. node_count_stats["mean"],
  1470. node_count_stats["count"],
  1471. )
  1472. if self.num_rows_per_s and self.num_rows_per_task_s:
  1473. total_num_in_rows = (
  1474. self.total_input_num_rows if self.total_input_num_rows else 0
  1475. )
  1476. total_num_out_rows = output_num_rows_stats["sum"]
  1477. out += indent
  1478. out += "* Operator throughput:\n"
  1479. out += (
  1480. indent + "\t* Total input num rows:" f" {total_num_in_rows} " "rows\n"
  1481. )
  1482. out += (
  1483. indent + "\t* Total output num rows:" f" {total_num_out_rows} " "rows\n"
  1484. )
  1485. out += (
  1486. indent + "\t* Ray Data throughput:"
  1487. f" {self.num_rows_per_s} "
  1488. "rows/s\n"
  1489. )
  1490. out += (
  1491. indent + "\t* Estimated single task throughput:"
  1492. f" {self.num_rows_per_task_s} "
  1493. "rows/s\n"
  1494. )
  1495. return out
  1496. def __repr__(self, level=0) -> str:
  1497. """For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
  1498. `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
  1499. that summarizes operator execution statistics.
  1500. Returns:
  1501. String with summary statistics for executing the given operator.
  1502. """
  1503. indent = leveled_indent(level)
  1504. indent += leveled_indent(1) if self.is_sub_operator else ""
  1505. wall_time_stats = {k: fmt(v) for k, v in (self.wall_time or {}).items()}
  1506. cpu_stats = {k: fmt(v) for k, v in (self.cpu_time or {}).items()}
  1507. memory_stats = {k: fmt(v) for k, v in (self.memory or {}).items()}
  1508. output_num_rows_stats = {
  1509. k: fmt(v) for k, v in (self.output_num_rows or {}).items()
  1510. }
  1511. output_size_bytes_stats = {
  1512. k: fmt(v) for k, v in (self.output_size_bytes or {}).items()
  1513. }
  1514. node_conut_stats = {k: fmt(v) for k, v in (self.node_count or {}).items()}
  1515. out = (
  1516. f"{indent}OperatorStatsSummary(\n"
  1517. f"{indent} operator_name='{self.operator_name}',\n"
  1518. f"{indent} is_suboperator={self.is_sub_operator},\n"
  1519. f"{indent} time_total_s={fmt(self.time_total_s)},\n"
  1520. # block_execution_summary_str already ends with \n
  1521. f"{indent} block_execution_summary_str={self.block_execution_summary_str}"
  1522. f"{indent} wall_time={wall_time_stats or None},\n"
  1523. f"{indent} cpu_time={cpu_stats or None},\n"
  1524. f"{indent} memory={memory_stats or None},\n"
  1525. f"{indent} output_num_rows={output_num_rows_stats or None},\n"
  1526. f"{indent} output_size_bytes={output_size_bytes_stats or None},\n"
  1527. f"{indent} node_count={node_conut_stats or None},\n"
  1528. f"{indent})"
  1529. )
  1530. return out
  1531. @dataclass
  1532. class IterStatsSummary:
  1533. # Time spent in actor based prefetching, in seconds.
  1534. wait_time: Timer
  1535. # Time spent getting RefBundles from the dataset iterator, in seconds
  1536. get_ref_bundles_time: Timer
  1537. # Time spent in `ray.get()`, in seconds
  1538. get_time: Timer
  1539. # Time spent in batch building, in seconds
  1540. next_time: Timer
  1541. # Time spent in `_format_batch_()`, in seconds
  1542. format_time: Timer
  1543. # Time spent in collate fn, in seconds
  1544. collate_time: Timer
  1545. # Time spent in finalize_fn, in seconds
  1546. finalize_batch_time: Timer
  1547. # Time user thread is blocked waiting for first batch
  1548. time_to_first_batch: Timer
  1549. # Total time user thread is blocked by iter_batches
  1550. block_time: Timer
  1551. # Time spent in user code, in seconds
  1552. user_time: Timer
  1553. initialize_time: Timer
  1554. # Total time taken by Dataset iterator, in seconds
  1555. total_time: Timer
  1556. # Time spent in streaming split coordinator
  1557. streaming_split_coord_time: Timer
  1558. # Num of blocks that are in local object store
  1559. iter_blocks_local: int
  1560. # Num of blocks that are in remote node and have to fetch locally
  1561. iter_blocks_remote: int
  1562. # Num of blocks with unknown locations
  1563. iter_unknown_location: int
  1564. # Current bytes of prefetched blocks in the iterator
  1565. iter_prefetched_bytes: int
  1566. def __str__(self) -> str:
  1567. return self.to_string()
  1568. def to_string(self) -> str:
  1569. out = ""
  1570. if (
  1571. self.block_time.get()
  1572. or self.time_to_first_batch.get()
  1573. or self.total_time.get()
  1574. or self.get_ref_bundles_time.get()
  1575. or self.get_time.get()
  1576. or self.next_time.get()
  1577. or self.format_time.get()
  1578. or self.collate_time.get()
  1579. or self.finalize_batch_time.get()
  1580. ):
  1581. out += "\nDataset iterator time breakdown:\n"
  1582. if self.total_time.get():
  1583. out += "* Total time overall: {}\n".format(fmt(self.total_time.get()))
  1584. if self.initialize_time.get():
  1585. out += (
  1586. " * Total time in Ray Data iterator initialization code: "
  1587. "{}\n".format(fmt(self.initialize_time.get()))
  1588. )
  1589. if self.block_time.get():
  1590. out += (
  1591. " * Total time user thread is blocked by Ray Data iter_batches: "
  1592. "{}\n".format(fmt(self.block_time.get()))
  1593. )
  1594. if self.time_to_first_batch.get():
  1595. out += (
  1596. " * Total time spent waiting for the first batch after starting iteration: "
  1597. "{}\n".format(fmt(self.time_to_first_batch.get()))
  1598. )
  1599. if self.user_time.get():
  1600. out += " * Total execution time for user thread: {}\n".format(
  1601. fmt(self.user_time.get())
  1602. )
  1603. out += (
  1604. "* Batch iteration time breakdown (summed across prefetch threads):\n"
  1605. )
  1606. if self.get_ref_bundles_time.get():
  1607. out += " * In get RefBundles: {} min, {} max, {} avg, {} total\n".format(
  1608. fmt(self.get_ref_bundles_time.min()),
  1609. fmt(self.get_ref_bundles_time.max()),
  1610. fmt(self.get_ref_bundles_time.avg()),
  1611. fmt(self.get_ref_bundles_time.get()),
  1612. )
  1613. if self.get_time.get():
  1614. out += " * In ray.get(): {} min, {} max, {} avg, {} total\n".format(
  1615. fmt(self.get_time.min()),
  1616. fmt(self.get_time.max()),
  1617. fmt(self.get_time.avg()),
  1618. fmt(self.get_time.get()),
  1619. )
  1620. if self.next_time.get():
  1621. batch_creation_str = (
  1622. " * In batch creation: {} min, {} max, {} avg, {} total\n"
  1623. )
  1624. out += batch_creation_str.format(
  1625. fmt(self.next_time.min()),
  1626. fmt(self.next_time.max()),
  1627. fmt(self.next_time.avg()),
  1628. fmt(self.next_time.get()),
  1629. )
  1630. if self.format_time.get():
  1631. format_str = (
  1632. " * In batch formatting: {} min, {} max, {} avg, {} total\n"
  1633. )
  1634. out += format_str.format(
  1635. fmt(self.format_time.min()),
  1636. fmt(self.format_time.max()),
  1637. fmt(self.format_time.avg()),
  1638. fmt(self.format_time.get()),
  1639. )
  1640. if self.collate_time.get():
  1641. out += " * In collate_fn: {} min, {} max, {} avg, {} total\n".format(
  1642. fmt(self.collate_time.min()),
  1643. fmt(self.collate_time.max()),
  1644. fmt(self.collate_time.avg()),
  1645. fmt(self.collate_time.get()),
  1646. )
  1647. if self.finalize_batch_time.get():
  1648. format_str = (
  1649. " * In host->device transfer: {} min, {} max, {} avg, {} total\n"
  1650. )
  1651. out += format_str.format(
  1652. fmt(self.finalize_batch_time.min()),
  1653. fmt(self.finalize_batch_time.max()),
  1654. fmt(self.finalize_batch_time.avg()),
  1655. fmt(self.finalize_batch_time.get()),
  1656. )
  1657. if DataContext.get_current().enable_get_object_locations_for_metrics:
  1658. out += "Block locations:\n"
  1659. out += " * Num blocks local: {}\n".format(self.iter_blocks_local)
  1660. out += " * Num blocks remote: {}\n".format(self.iter_blocks_remote)
  1661. out += " * Num blocks unknown location: {}\n".format(
  1662. self.iter_unknown_location
  1663. )
  1664. if self.iter_prefetched_bytes:
  1665. out += " * Prefetched bytes: {}\n".format(self.iter_prefetched_bytes)
  1666. if self.streaming_split_coord_time.get() != 0:
  1667. out += "Streaming split coordinator overhead time: "
  1668. out += f"{fmt(self.streaming_split_coord_time.get())}\n"
  1669. return out
  1670. def __repr__(self, level=0) -> str:
  1671. indent = leveled_indent(level)
  1672. return (
  1673. f"IterStatsSummary(\n"
  1674. f"{indent} wait_time={fmt(self.wait_time.get()) or None},\n"
  1675. f"{indent} get_ref_bundles_time={fmt(self.get_ref_bundles_time.get()) or None},\n"
  1676. f"{indent} get_time={fmt(self.get_time.get()) or None},\n"
  1677. f"{indent} iter_blocks_local={self.iter_blocks_local or None},\n"
  1678. f"{indent} iter_blocks_remote={self.iter_blocks_remote or None},\n"
  1679. f"{indent} iter_unknown_location={self.iter_unknown_location or None},\n"
  1680. f"{indent} iter_prefetched_bytes={self.iter_prefetched_bytes or None},\n"
  1681. f"{indent} next_time={fmt(self.next_time.get()) or None},\n"
  1682. f"{indent} format_time={fmt(self.format_time.get()) or None},\n"
  1683. f"{indent} user_time={fmt(self.user_time.get()) or None},\n"
  1684. f"{indent} total_time={fmt(self.total_time.get()) or None},\n"
  1685. f"{indent})"
  1686. )