| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871 |
- import collections
- import copy
- import logging
- import time
- from collections import defaultdict
- from contextlib import contextmanager
- from dataclasses import dataclass, fields
- from typing import (
- Any,
- Dict,
- List,
- Mapping,
- Optional,
- Set,
- Tuple,
- Union,
- )
- from uuid import uuid4
- import numpy as np
- import ray
- from ray.actor import ActorHandle
- from ray.data._internal.block_list import BlockList
- from ray.data._internal.execution.dataset_state import DatasetState
- from ray.data._internal.execution.interfaces.common import RuntimeMetricsHistogram
- from ray.data._internal.execution.interfaces.op_runtime_metrics import (
- NODE_UNKNOWN,
- MetricsGroup,
- MetricsType,
- NodeMetrics,
- OpRuntimeMetrics,
- )
- from ray.data._internal.metadata_exporter import (
- DataContextMetadata,
- DatasetMetadata,
- Topology,
- get_dataset_metadata_exporter,
- )
- from ray.data._internal.util import capfirst
- from ray.data.block import BlockStats
- from ray.data.context import DataContext
- from ray.util.annotations import DeveloperAPI
- from ray.util.metrics import Counter, Gauge, Histogram, Metric
- from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
- logger = logging.getLogger(__name__)
- STATS_ACTOR_NAME = "datasets_stats_actor"
- STATS_ACTOR_NAMESPACE = "_dataset_stats_actor"
- UNKNOWN = "unknown"
- StatsDict = Dict[str, List[BlockStats]]
- def fmt(seconds: float) -> str:
- if seconds > 1:
- return str(round(seconds, 2)) + "s"
- elif seconds > 0.001:
- return str(round(seconds * 1000, 2)) + "ms"
- else:
- return str(round(seconds * 1000 * 1000, 2)) + "us"
- def leveled_indent(lvl: int = 0, spaces_per_indent: int = 3) -> str:
- """Returns a string of spaces which contains `level` indents,
- each indent containing `spaces_per_indent` spaces. For example:
- >>> leveled_indent(2, 3)
- ' '
- """
- return (" " * spaces_per_indent) * lvl
- class Timer:
- """Helper class for tracking accumulated time (in seconds)."""
- def __init__(self):
- self._total: float = 0
- self._min: float = float("inf")
- self._max: float = 0
- self._total_count: float = 0
- @contextmanager
- def timer(self) -> None:
- time_start = time.perf_counter()
- try:
- yield
- finally:
- self.add(time.perf_counter() - time_start)
- def add(self, value: float) -> None:
- self._total += value
- if value < self._min:
- self._min = value
- if value > self._max:
- self._max = value
- self._total_count += 1
- def get(self) -> float:
- return self._total
- def min(self) -> float:
- return self._min
- def max(self) -> float:
- return self._max
- def avg(self) -> float:
- return self._total / self._total_count if self._total_count else float("inf")
- class _DatasetStatsBuilder:
- """Helper class for building dataset stats.
- When this class is created, we record the start time. When build() is
- called with the final blocks of the new dataset, the time delta is
- saved as part of the stats."""
- def __init__(
- self,
- operator_name: str,
- parent: "DatasetStats",
- override_start_time: Optional[float],
- ):
- self.operator_name = operator_name
- self.parent = parent
- self.start_time = override_start_time or time.perf_counter()
- def build_multioperator(self, metadata: StatsDict) -> "DatasetStats":
- op_metadata = {}
- for i, (k, v) in enumerate(metadata.items()):
- capped_k = capfirst(k)
- if len(metadata) > 1:
- if i == 0:
- op_metadata[self.operator_name + capped_k] = v
- else:
- op_metadata[self.operator_name.split("->")[-1] + capped_k] = v
- else:
- op_metadata[self.operator_name] = v
- stats = DatasetStats(
- metadata=op_metadata,
- parent=self.parent,
- base_name=self.operator_name,
- )
- stats.time_total_s = time.perf_counter() - self.start_time
- return stats
- def build(self, final_blocks: BlockList) -> "DatasetStats":
- stats = DatasetStats(
- metadata={self.operator_name: final_blocks.get_metadata()},
- parent=self.parent,
- )
- stats.time_total_s = time.perf_counter() - self.start_time
- return stats
- @ray.remote(num_cpus=0)
- class _StatsActor:
- """Actor holding stats for blocks created by LazyBlockList.
- This actor is shared across all datasets created in the same cluster.
- In order to cap memory usage, we set a max number of stats to keep
- in the actor. When this limit is exceeded, the stats will be garbage
- collected in FIFO order.
- TODO(ekl) we should consider refactoring LazyBlockList so stats can be
- extracted without using an out-of-band actor."""
- def __init__(self, max_stats=1000):
- # Mapping from uuid -> (task_id -> list of blocks statistics).
- self.metadata = collections.defaultdict(dict)
- self.last_time = {}
- self.start_time = {}
- self.max_stats = max_stats
- # Assign dataset uuids with a global counter.
- self.next_dataset_id = 0
- # Dataset metadata to be queried directly by DashboardHead api.
- self.datasets: Dict[str, Any] = {}
- # Cache of calls to ray.nodes() to prevent unnecessary network calls
- self._ray_nodes_cache: Dict[str, str] = {}
- # Initialize the metadata exporter
- self._metadata_exporter = get_dataset_metadata_exporter()
- self.dataset_metadatas: Dict[str, DatasetMetadata] = {}
- # A FIFO queue of dataset_tags for finished datasets. This is used to
- # efficiently evict the oldest finished datasets when max_stats is reached.
- self.finished_datasets_queue = collections.deque()
- # Ray Data dashboard metrics
- # Everything is a gauge because we need to reset all of
- # a dataset's metrics to 0 after each finishes execution.
- op_tags_keys = ("dataset", "operator")
- # TODO(scottjlee): move these overvie metrics as fields in a
- # separate dataclass, similar to OpRuntimeMetrics.
- self.spilled_bytes = Gauge(
- "data_spilled_bytes",
- description="""Bytes spilled by dataset operators.
- DataContext.enable_get_object_locations_for_metrics
- must be set to True to report this metric""",
- tag_keys=op_tags_keys,
- )
- self.freed_bytes = Gauge(
- "data_freed_bytes",
- description="Bytes freed by dataset operators",
- tag_keys=op_tags_keys,
- )
- self.current_bytes = Gauge(
- "data_current_bytes",
- description="Bytes currently in memory store used by dataset operators",
- tag_keys=op_tags_keys,
- )
- self.cpu_usage_cores = Gauge(
- "data_cpu_usage_cores",
- description="CPUs allocated to dataset operators",
- tag_keys=op_tags_keys,
- )
- self.gpu_usage_cores = Gauge(
- "data_gpu_usage_cores",
- description="GPUs allocated to dataset operators",
- tag_keys=op_tags_keys,
- )
- self.output_bytes = Gauge(
- "data_output_bytes",
- description="Bytes outputted by dataset operators",
- tag_keys=op_tags_keys,
- )
- self.output_rows = Gauge(
- "data_output_rows",
- description="Rows outputted by dataset operators",
- tag_keys=op_tags_keys,
- )
- # === Metrics from OpRuntimeMetrics ===
- # Inputs-related metrics
- self.execution_metrics_inputs = (
- self._create_prometheus_metrics_for_execution_metrics(
- metrics_group=MetricsGroup.INPUTS,
- tag_keys=op_tags_keys,
- )
- )
- # Outputs-related metrics
- self.execution_metrics_outputs = (
- self._create_prometheus_metrics_for_execution_metrics(
- metrics_group=MetricsGroup.OUTPUTS,
- tag_keys=op_tags_keys,
- )
- )
- # Task-related metrics
- self.execution_metrics_tasks = (
- self._create_prometheus_metrics_for_execution_metrics(
- metrics_group=MetricsGroup.TASKS,
- tag_keys=op_tags_keys,
- )
- )
- # Object store memory-related metrics
- self.execution_metrics_obj_store_memory = (
- self._create_prometheus_metrics_for_execution_metrics(
- metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
- tag_keys=op_tags_keys,
- )
- )
- # Actor related metrics
- self.execution_metrics_actors = (
- self._create_prometheus_metrics_for_execution_metrics(
- metrics_group=MetricsGroup.ACTORS,
- tag_keys=op_tags_keys,
- )
- )
- # Miscellaneous metrics
- self.execution_metrics_misc = (
- self._create_prometheus_metrics_for_execution_metrics(
- metrics_group=MetricsGroup.MISC,
- tag_keys=op_tags_keys,
- )
- )
- # Per Node metrics
- self.per_node_metrics = self._create_prometheus_metrics_for_per_node_metrics()
- iter_tag_keys = ("dataset",)
- self.time_to_first_batch_s = Gauge(
- "data_iter_time_to_first_batch_seconds",
- description="Total time spent waiting for the first batch after starting iteration. "
- "This includes the dataset pipeline warmup time. This metric is accumulated across different epochs.",
- tag_keys=iter_tag_keys,
- )
- self.iter_block_fetching_s = Gauge(
- "data_iter_block_fetching_seconds",
- description="Seconds taken to fetch (with ray.get) blocks by iter_batches()",
- tag_keys=iter_tag_keys,
- )
- self.iter_batch_shaping_s = Gauge(
- "data_iter_batch_shaping_seconds",
- description="Seconds taken to shape batch from incoming blocks by iter_batches()",
- tag_keys=iter_tag_keys,
- )
- self.iter_batch_formatting_s = Gauge(
- "data_iter_batch_formatting_seconds",
- description="Seconds taken to format batches by iter_batches()",
- tag_keys=iter_tag_keys,
- )
- self.iter_batch_collating_s = Gauge(
- "data_iter_batch_collating_seconds",
- description="Seconds taken to collate batches by iter_batches()",
- tag_keys=iter_tag_keys,
- )
- self.iter_batch_finalizing_s = Gauge(
- "data_iter_batch_finalizing_seconds",
- description="Seconds taken to collate batches by iter_batches()",
- tag_keys=iter_tag_keys,
- )
- self.iter_total_blocked_s = Gauge(
- "data_iter_total_blocked_seconds",
- description="Seconds user thread is blocked by iter_batches()",
- tag_keys=iter_tag_keys,
- )
- self.iter_user_s = Gauge(
- "data_iter_user_seconds",
- description="Seconds spent in user code",
- tag_keys=iter_tag_keys,
- )
- self.iter_initialize_s = Gauge(
- "data_iter_initialize_seconds",
- description="Seconds spent in iterator initialization code",
- tag_keys=iter_tag_keys,
- )
- self.iter_get_ref_bundles_s = Gauge(
- "data_iter_get_ref_bundles_seconds",
- description="Seconds spent getting RefBundles from the dataset iterator",
- tag_keys=iter_tag_keys,
- )
- self.iter_get_s = Gauge(
- "data_iter_get_seconds",
- description="Seconds spent in ray.get() while resolving block references",
- tag_keys=iter_tag_keys,
- )
- self.iter_next_batch_s = Gauge(
- "data_iter_next_batch_seconds",
- description="Seconds spent getting the next batch from the block buffer",
- tag_keys=iter_tag_keys,
- )
- self.iter_format_batch_s = Gauge(
- "data_iter_format_batch_seconds",
- description="Seconds spent formatting the batch",
- tag_keys=iter_tag_keys,
- )
- self.iter_collate_batch_s = Gauge(
- "data_iter_collate_batch_seconds",
- description="Seconds spent collating the batch",
- tag_keys=iter_tag_keys,
- )
- self.iter_finalize_batch_s = Gauge(
- "data_iter_finalize_batch_seconds",
- description="Seconds spent finalizing the batch",
- tag_keys=iter_tag_keys,
- )
- self.iter_blocks_local = Gauge(
- "data_iter_blocks_local",
- description="Number of blocks already on the local node",
- tag_keys=iter_tag_keys,
- )
- self.iter_blocks_remote = Gauge(
- "data_iter_blocks_remote",
- description="Number of blocks that require fetching from another node",
- tag_keys=iter_tag_keys,
- )
- self.iter_unknown_location = Gauge(
- "data_iter_unknown_location",
- description="Number of blocks that have unknown locations",
- tag_keys=iter_tag_keys,
- )
- self.iter_prefetched_bytes = Gauge(
- "data_iter_prefetched_bytes",
- description="Current bytes of prefetched blocks in the iterator",
- tag_keys=iter_tag_keys,
- )
- # === Dataset and Operator Metadata Metrics ===
- dataset_tags = ("dataset", "job_id", "start_time")
- self.data_dataset_estimated_total_blocks = Gauge(
- "data_dataset_estimated_total_blocks",
- description="Total work units in blocks for dataset",
- tag_keys=dataset_tags,
- )
- self.data_dataset_estimated_total_rows = Gauge(
- "data_dataset_estimated_total_rows",
- description="Total work units in rows for dataset",
- tag_keys=dataset_tags,
- )
- self.data_dataset_state = Gauge(
- "data_dataset_state",
- description=f"State of dataset ({', '.join([f'{s.value}={s.name}' for s in DatasetState])})",
- tag_keys=dataset_tags,
- )
- operator_tags = ("dataset", "operator")
- self.data_operator_estimated_total_blocks = Gauge(
- "data_operator_estimated_total_blocks",
- description="Total work units in blocks for operator",
- tag_keys=operator_tags,
- )
- self.data_operator_estimated_total_rows = Gauge(
- "data_operator_estimated_total_rows",
- description="Total work units in rows for operator",
- tag_keys=operator_tags,
- )
- self.data_operator_queued_blocks = Gauge(
- "data_operator_queued_blocks",
- description="Number of queued blocks for operator",
- tag_keys=operator_tags,
- )
- self.data_operator_state = Gauge(
- "data_operator_state",
- description=f"State of operator ({', '.join([f'{s.value}={s.name}' for s in DatasetState])})",
- tag_keys=operator_tags,
- )
- def _create_prometheus_metrics_for_execution_metrics(
- self, metrics_group: MetricsGroup, tag_keys: Tuple[str, ...]
- ) -> Dict[str, Metric]:
- metrics = {}
- for metric in OpRuntimeMetrics.get_metrics():
- if not metric.metrics_group == metrics_group:
- continue
- metric_name = f"data_{metric.name}"
- metric_description = metric.description
- if metric.metrics_type == MetricsType.Gauge:
- metrics[metric.name] = Gauge(
- metric_name,
- description=metric_description,
- tag_keys=tag_keys,
- )
- elif metric.metrics_type == MetricsType.Histogram:
- metrics[metric.name] = Histogram(
- metric_name,
- description=metric_description,
- tag_keys=tag_keys,
- **metric.metrics_args,
- )
- elif metric.metrics_type == MetricsType.Counter:
- metrics[metric.name] = Counter(
- metric_name,
- description=metric_description,
- tag_keys=tag_keys,
- )
- return metrics
- def _create_prometheus_metrics_for_per_node_metrics(self) -> Dict[str, Gauge]:
- metrics = {}
- for field in fields(NodeMetrics):
- metric_name = f"data_{field.name}_per_node"
- metrics[field.name] = Gauge(
- metric_name,
- description="",
- tag_keys=("dataset", "node_ip"),
- )
- return metrics
- def gen_dataset_id(self) -> str:
- """Generate a unique dataset_id for tracking datasets."""
- dataset_id = str(self.next_dataset_id)
- self.next_dataset_id += 1
- return dataset_id
- def update_execution_metrics(
- self,
- dataset_tag: str,
- op_metrics: List[Dict[str, Union[int, float]]],
- operator_tags: List[str],
- state: Dict[str, Any],
- per_node_metrics: Optional[Dict[str, Dict[str, Union[int, float]]]] = None,
- ):
- def _record(
- prom_metric: Metric,
- value: Union[int, float, List[int]],
- tags: Dict[str, str] = None,
- ):
- if isinstance(prom_metric, Gauge):
- prom_metric.set(value, tags)
- elif isinstance(prom_metric, Counter):
- prom_metric.inc(value, tags)
- elif isinstance(prom_metric, Histogram):
- if isinstance(value, RuntimeMetricsHistogram):
- value.export_to(prom_metric, tags)
- for stats, operator_tag in zip(op_metrics, operator_tags):
- tags = self._create_tags(dataset_tag, operator_tag)
- self.spilled_bytes.set(stats.get("obj_store_mem_spilled", 0), tags)
- self.freed_bytes.set(stats.get("obj_store_mem_freed", 0), tags)
- self.current_bytes.set(stats.get("obj_store_mem_used", 0), tags)
- self.output_bytes.set(stats.get("bytes_task_outputs_generated", 0), tags)
- self.output_rows.set(stats.get("row_outputs_taken", 0), tags)
- self.cpu_usage_cores.set(stats.get("cpu_usage", 0), tags)
- self.gpu_usage_cores.set(stats.get("gpu_usage", 0), tags)
- for field_name, prom_metric in self.execution_metrics_inputs.items():
- _record(prom_metric, stats.get(field_name, 0), tags)
- for field_name, prom_metric in self.execution_metrics_outputs.items():
- _record(prom_metric, stats.get(field_name, 0), tags)
- for field_name, prom_metric in self.execution_metrics_tasks.items():
- _record(prom_metric, stats.get(field_name, 0), tags)
- for (
- field_name,
- prom_metric,
- ) in self.execution_metrics_obj_store_memory.items():
- _record(prom_metric, stats.get(field_name, 0), tags)
- for field_name, prom_metric in self.execution_metrics_actors.items():
- _record(prom_metric, stats.get(field_name, 0), tags)
- for field_name, prom_metric in self.execution_metrics_misc.items():
- _record(prom_metric, stats.get(field_name, 0), tags)
- # Update per node metrics if they exist, the creation of these metrics is controlled
- # by the _data_context.enable_per_node_metrics flag in the streaming executor but
- # that is not exposed in the _StatsActor so here we simply check if the metrics exist
- # and if so, update them
- if per_node_metrics is not None:
- for node_id, node_metrics in per_node_metrics.items():
- # Translate node_id into node_name (the node ip), cache node info
- if node_id not in self._ray_nodes_cache:
- # Rebuilding this cache will fetch all nodes, this
- # only needs to be done up to once per loop
- self._rebuild_ray_nodes_cache()
- node_ip = self._ray_nodes_cache.get(node_id, NODE_UNKNOWN)
- tags = self._create_tags(dataset_tag=dataset_tag, node_ip_tag=node_ip)
- for metric_name, metric_value in node_metrics.items():
- prom_metric = self.per_node_metrics[metric_name]
- _record(prom_metric, metric_value, tags)
- # This update is called from a dataset's executor,
- # so all tags should contain the same dataset
- self.update_dataset(dataset_tag, state)
- def _rebuild_ray_nodes_cache(self) -> None:
- current_nodes = ray.nodes()
- for node in current_nodes:
- node_id = node.get("NodeID", None)
- node_name = node.get("NodeName", None)
- if node_id is not None and node_name is not None:
- self._ray_nodes_cache[node_id] = node_name
- def update_iteration_metrics(
- self,
- stats: "DatasetStats",
- dataset_tag,
- ):
- tags = self._create_tags(dataset_tag)
- self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags)
- self.iter_get_ref_bundles_s.set(stats.iter_get_ref_bundles_s.get(), tags)
- self.iter_get_s.set(stats.iter_get_s.get(), tags)
- self.iter_next_batch_s.set(stats.iter_next_batch_s.get(), tags)
- self.iter_format_batch_s.set(stats.iter_format_batch_s.get(), tags)
- self.iter_collate_batch_s.set(stats.iter_collate_batch_s.get(), tags)
- self.iter_finalize_batch_s.set(stats.iter_finalize_batch_s.get(), tags)
- self.iter_blocks_local.set(stats.iter_blocks_local, tags)
- self.iter_blocks_remote.set(stats.iter_blocks_remote, tags)
- self.iter_unknown_location.set(stats.iter_unknown_location, tags)
- self.iter_prefetched_bytes.set(stats.iter_prefetched_bytes, tags)
- self.iter_block_fetching_s.set(stats.iter_get_s.get(), tags)
- self.iter_batch_shaping_s.set(stats.iter_next_batch_s.get(), tags)
- self.iter_batch_formatting_s.set(stats.iter_format_batch_s.get(), tags)
- self.iter_batch_collating_s.set(stats.iter_collate_batch_s.get(), tags)
- self.iter_batch_finalizing_s.set(stats.iter_finalize_batch_s.get(), tags)
- self.time_to_first_batch_s.set(stats.iter_time_to_first_batch_s.get(), tags)
- self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags)
- self.iter_user_s.set(stats.iter_user_s.get(), tags)
- def register_dataset(
- self,
- job_id: str,
- dataset_tag: str,
- operator_tags: List[str],
- topology: Topology,
- data_context: DataContextMetadata,
- ):
- start_time = time.time()
- self.datasets[dataset_tag] = {
- "job_id": job_id,
- "state": DatasetState.PENDING.name,
- "progress": 0,
- "total": 0,
- "total_rows": 0,
- "start_time": start_time,
- "end_time": None,
- "operators": {
- operator: {
- "state": DatasetState.PENDING.name,
- "progress": 0,
- "total": 0,
- "queued_blocks": 0,
- }
- for operator in operator_tags
- },
- }
- if self._metadata_exporter is not None:
- self.dataset_metadatas[dataset_tag] = DatasetMetadata(
- job_id=job_id,
- topology=topology,
- dataset_id=dataset_tag,
- start_time=start_time,
- data_context=data_context,
- execution_start_time=None,
- execution_end_time=None,
- state=DatasetState.PENDING.name,
- )
- self._metadata_exporter.export_dataset_metadata(
- self.dataset_metadatas[dataset_tag]
- )
- def update_dataset(self, dataset_tag: str, state: Dict[str, Any]):
- self.datasets[dataset_tag].update(state)
- state = self.datasets[dataset_tag]
- job_id = self.datasets[dataset_tag].get("job_id", "None")
- start_time = str(int(self.datasets[dataset_tag].get("start_time", 0)))
- # Update dataset-level metrics
- dataset_tags = {
- "dataset": dataset_tag,
- "job_id": job_id,
- "start_time": start_time,
- }
- self.data_dataset_estimated_total_blocks.set(
- state.get("total", 0), dataset_tags
- )
- self.data_dataset_estimated_total_rows.set(
- state.get("total_rows", 0), dataset_tags
- )
- state_string = state.get("state", DatasetState.UNKNOWN.name)
- state_enum = DatasetState.from_string(state_string)
- self.data_dataset_state.set(state_enum.value, dataset_tags)
- self.update_dataset_metadata_state(dataset_tag, state_string)
- # Update operator-level metrics
- operator_states: Dict[str, str] = {}
- for operator, op_state in state.get("operators", {}).items():
- operator_tags = {
- "dataset": dataset_tag,
- "operator": operator,
- }
- self.data_operator_estimated_total_blocks.set(
- op_state.get("total", 0), operator_tags
- )
- self.data_operator_estimated_total_rows.set(
- op_state.get("total_rows", 0), operator_tags
- )
- self.data_operator_queued_blocks.set(
- op_state.get("queued_blocks", 0), operator_tags
- )
- # Get state code directly from enum
- state_string = op_state.get("state", DatasetState.UNKNOWN.name)
- state_enum = DatasetState.from_string(state_string)
- self.data_operator_state.set(state_enum.value, operator_tags)
- operator_states[operator] = state_string
- self.update_dataset_metadata_operator_states(dataset_tag, operator_states)
- # Evict the oldest finished datasets to ensure the `max_stats` limit is enforced.
- if state["state"] in {DatasetState.FINISHED.name, DatasetState.FAILED.name}:
- self.finished_datasets_queue.append(dataset_tag)
- while len(self.datasets) > self.max_stats and self.finished_datasets_queue:
- tag_to_evict = self.finished_datasets_queue.popleft()
- self.datasets.pop(tag_to_evict, None)
- self.dataset_metadatas.pop(tag_to_evict, None)
- def get_datasets(self, job_id: Optional[str] = None):
- if not job_id:
- return self.datasets
- return {k: v for k, v in self.datasets.items() if v["job_id"] == job_id}
- def update_dataset_metadata_state(self, dataset_id: str, new_state: str):
- if dataset_id not in self.dataset_metadatas:
- return
- update_time = time.time()
- dataset_metadata = self.dataset_metadatas[dataset_id]
- if dataset_metadata.state == new_state:
- return
- updated_dataset_metadata = copy.deepcopy(dataset_metadata)
- updated_dataset_metadata.state = new_state
- if new_state == DatasetState.RUNNING.name:
- updated_dataset_metadata.execution_start_time = update_time
- elif new_state in (DatasetState.FINISHED.name, DatasetState.FAILED.name):
- updated_dataset_metadata.execution_end_time = update_time
- # Update metadata of running operators
- for operator in updated_dataset_metadata.topology.operators:
- if operator.state == DatasetState.RUNNING.name:
- operator.state = new_state
- operator.execution_end_time = update_time
- self.dataset_metadatas[dataset_id] = updated_dataset_metadata
- if self._metadata_exporter is not None:
- self._metadata_exporter.export_dataset_metadata(
- updated_dataset_metadata,
- include_data_context=False,
- include_op_args=False,
- )
- def update_dataset_metadata_operator_states(
- self, dataset_id: str, operator_states: Dict[str, str]
- ):
- if dataset_id not in self.dataset_metadatas:
- return
- dataset_metadata = self.dataset_metadatas[dataset_id]
- update_needed = False
- for operator in dataset_metadata.topology.operators:
- if (
- operator.id in operator_states
- and operator.state != operator_states[operator.id]
- ):
- update_needed = True
- break
- if not update_needed:
- return
- updated_dataset_metadata = copy.deepcopy(dataset_metadata)
- update_time = time.time()
- for operator in updated_dataset_metadata.topology.operators:
- if operator.id in operator_states:
- new_state = operator_states[operator.id]
- if operator.state == new_state:
- continue
- operator.state = new_state
- if new_state == DatasetState.RUNNING.name:
- operator.execution_start_time = update_time
- elif new_state in (
- DatasetState.FINISHED.name,
- DatasetState.FAILED.name,
- ):
- operator.execution_end_time = update_time
- # Handle outlier case for InputDataBuffer, which is marked as finished immediately and does not have a RUNNING state.
- # Set the execution time the same as its end time
- if not operator.execution_start_time:
- operator.execution_start_time = update_time
- self.dataset_metadatas[dataset_id] = updated_dataset_metadata
- if self._metadata_exporter is not None:
- self._metadata_exporter.export_dataset_metadata(
- updated_dataset_metadata,
- include_data_context=False,
- include_op_args=False,
- )
- def _create_tags(
- self,
- dataset_tag: str,
- operator_tag: Optional[str] = None,
- node_ip_tag: Optional[str] = None,
- ):
- tags = {"dataset": dataset_tag}
- if operator_tag is not None:
- tags["operator"] = operator_tag
- if node_ip_tag is not None:
- tags["node_ip"] = node_ip_tag
- return tags
- def get_or_create_stats_actor() -> ActorHandle[_StatsActor]:
- """Each cluster will contain exactly 1 _StatsActor. This function
- returns the current _StatsActor handle, or create a new one if one
- does not exist in the connected cluster. The _StatsActor is pinned on
- on driver process' node.
- """
- if ray._private.worker._global_node is None:
- raise RuntimeError(
- "Global node is not initialized. Driver might be not connected to Ray."
- )
- current_cluster_id = ray._private.worker._global_node.cluster_id
- logger.debug(f"Stats Actor located on cluster_id={current_cluster_id}")
- # so it fate-shares with the driver.
- scheduling_strategy = NodeAffinitySchedulingStrategy(
- ray.get_runtime_context().get_node_id(),
- soft=False,
- )
- return _StatsActor.options(
- name=STATS_ACTOR_NAME,
- namespace=STATS_ACTOR_NAMESPACE,
- get_if_exists=True,
- lifetime="detached",
- scheduling_strategy=scheduling_strategy,
- ).remote()
- class _StatsManager:
- """A Class containing util functions that manage remote calls to _StatsActor.
- Ray Data updates metrics through the _StatsManager, and direct remote calls
- to the _StatsActor is discouraged. Some functionalities provided by
- _StatsManager:
- - Format and update iteration metrics
- - Format and update execution metrics
- - Aggregate per node metrics
- - Dataset registration
- """
- @staticmethod
- def _aggregate_per_node_metrics(
- op_metrics: List[OpRuntimeMetrics],
- ) -> Optional[Mapping[str, Mapping[str, Union[int, float]]]]:
- """
- Aggregate per-node metrics from a list of OpRuntimeMetrics objects.
- If per-node metrics are disabled in the current DataContext, returns None.
- Otherwise, it sums up all NodeMetrics fields across the provided metrics and
- returns a nested dictionary mapping each node ID to a dict of field values.
- """
- if not DataContext.get_current().enable_per_node_metrics:
- return None
- aggregated_by_node = defaultdict(lambda: defaultdict(int))
- for metrics in op_metrics:
- for node_id, node_metrics in metrics._per_node_metrics.items():
- agg_node_metrics = aggregated_by_node[node_id]
- for f in fields(NodeMetrics):
- agg_node_metrics[f.name] += getattr(node_metrics, f.name)
- return aggregated_by_node
- @staticmethod
- def update_execution_metrics(
- dataset_tag: str,
- op_metrics: List[OpRuntimeMetrics],
- operator_tags: List[str],
- state: Dict[str, Any],
- ):
- per_node_metrics = _StatsManager._aggregate_per_node_metrics(op_metrics)
- op_metrics_dicts = [metric.as_dict() for metric in op_metrics]
- args = (
- dataset_tag,
- op_metrics_dicts,
- operator_tags,
- state,
- per_node_metrics,
- )
- try:
- get_or_create_stats_actor().update_execution_metrics.remote(*args)
- except Exception as e:
- logger.warning(
- f"Error occurred during update_execution_metrics.remote call to _StatsActor: {e}",
- exc_info=True,
- )
- return
- @staticmethod
- def update_iteration_metrics(stats: "DatasetStats", dataset_tag: str):
- args = (stats, dataset_tag)
- try:
- get_or_create_stats_actor().update_iteration_metrics.remote(*args)
- except Exception as e:
- logger.warning(
- f"Error occurred during update_iteration_metrics.remote call to _StatsActor: {e}",
- exc_info=True,
- )
- @staticmethod
- def register_dataset_to_stats_actor(
- dataset_tag: str,
- operator_tags: List[str],
- topology: Topology,
- data_context: DataContext,
- ):
- """Register a dataset with the stats actor.
- Args:
- dataset_tag: Tag for the dataset
- operator_tags: List of operator tags
- topology: Optional Topology representing the DAG structure to export
- data_context: The DataContext attached to the dataset
- """
- # Convert DataContext to DataContextMetadata before serialization to avoid
- # module dependency issues during Ray's cloudpickle serialization.
- data_context = DataContextMetadata.from_data_context(data_context)
- get_or_create_stats_actor().register_dataset.remote(
- ray.get_runtime_context().get_job_id(),
- dataset_tag,
- operator_tags,
- topology,
- data_context,
- )
- @staticmethod
- def gen_dataset_id_from_stats_actor() -> str:
- try:
- stats_actor = get_or_create_stats_actor()
- return ray.get(stats_actor.gen_dataset_id.remote())
- except Exception as e:
- logger.warning(
- f"Failed to generate dataset_id, falling back to random uuid_v4: {e}"
- )
- # Getting dataset id from _StatsActor may fail, in this case
- # fall back to uuid4
- return uuid4().hex
- class DatasetStats:
- """Holds the execution times for a given Dataset.
- This object contains a reference to the parent Dataset's stats as well,
- but not the Dataset object itself, to allow its blocks to be dropped from
- memory."""
- def __init__(
- self,
- *,
- metadata: StatsDict,
- parent: Union[Optional["DatasetStats"], List["DatasetStats"]],
- base_name: str = None,
- ):
- """Create dataset stats.
- Args:
- metadata: Dict of operators used to create this Dataset from the
- previous one. Typically one entry, e.g., {"map": [...]}.
- parent: Reference to parent Dataset's stats, or a list of parents
- if there are multiple.
- base_name: The name of the base operation for a multi-operator operation.
- """
- self.metadata: StatsDict = metadata
- if parent is not None and not isinstance(parent, list):
- parent = [parent]
- self.parents: List["DatasetStats"] = parent or []
- self.number: int = (
- 0 if not self.parents else max(p.number for p in self.parents) + 1
- )
- self.base_name = base_name
- # TODO(ekl) deprecate and remove the notion of dataset UUID once we move
- # fully to streaming execution.
- self.dataset_uuid: str = "unknown_uuid"
- self.time_total_s: float = 0
- # Streaming executor stats
- self.streaming_exec_schedule_s: Timer = Timer()
- # Iteration stats, filled out if the user iterates over the dataset.
- self.iter_wait_s: Timer = Timer()
- self.iter_get_ref_bundles_s: Timer = Timer()
- self.iter_get_s: Timer = Timer()
- self.iter_next_batch_s: Timer = Timer()
- self.iter_format_batch_s: Timer = Timer()
- self.iter_collate_batch_s: Timer = Timer()
- self.iter_finalize_batch_s: Timer = Timer()
- self.iter_time_to_first_batch_s: Timer = Timer()
- self.iter_total_blocked_s: Timer = Timer()
- self.iter_user_s: Timer = Timer()
- self.iter_initialize_s: Timer = Timer()
- self.iter_total_s: Timer = Timer()
- self.extra_metrics = {}
- # Block fetch stats during iteration.
- # These are stats about locations of blocks when the iterator is trying to
- # consume them. The iteration performance will be affected depending on
- # whether the block is in the local object store of the node where the
- # iterator is running.
- # This serves as an indicator of block prefetching effectiveness.
- self.iter_blocks_local: int = 0
- self.iter_blocks_remote: int = 0
- self.iter_unknown_location: int = 0
- self.iter_prefetched_bytes: int = 0
- # Memory usage stats
- self.global_bytes_spilled: int = 0
- self.global_bytes_restored: int = 0
- self.dataset_bytes_spilled: int = 0
- # Streaming split coordinator stats (dataset level)
- self.streaming_split_coordinator_s: Timer = Timer()
- @property
- def stats_actor(self):
- return get_or_create_stats_actor()
- def child_builder(
- self, name: str, override_start_time: Optional[float] = None
- ) -> _DatasetStatsBuilder:
- """Start recording stats for an op of the given name (e.g., map)."""
- return _DatasetStatsBuilder(name, self, override_start_time)
- def to_summary(self) -> "DatasetStatsSummary":
- """Generate a `DatasetStatsSummary` object from the given `DatasetStats`
- object, which can be used to generate a summary string."""
- operators_stats = []
- is_sub_operator = len(self.metadata) > 1
- iter_stats = IterStatsSummary(
- self.iter_wait_s,
- self.iter_get_ref_bundles_s,
- self.iter_get_s,
- self.iter_next_batch_s,
- self.iter_format_batch_s,
- self.iter_collate_batch_s,
- self.iter_finalize_batch_s,
- self.iter_time_to_first_batch_s,
- self.iter_total_blocked_s,
- self.iter_user_s,
- self.iter_initialize_s,
- self.iter_total_s,
- self.streaming_split_coordinator_s,
- self.iter_blocks_local,
- self.iter_blocks_remote,
- self.iter_unknown_location,
- self.iter_prefetched_bytes,
- )
- stats_summary_parents = []
- if self.parents is not None:
- stats_summary_parents = [p.to_summary() for p in self.parents]
- # Collect the sum of the final output row counts from all parent nodes
- parent_total_output = 0
- for i, parent_summary in enumerate(stats_summary_parents):
- if parent_summary.operators_stats:
- # Get the last operator stats from the current parent summary
- last_parent_op = parent_summary.operators_stats[-1]
- # Extract output row count (handle dict type with "sum" key)
- op_output = (
- last_parent_op.output_num_rows.get("sum", 0)
- if isinstance(last_parent_op.output_num_rows, dict)
- else 0
- )
- logger.debug(
- f"Parent {i + 1} (operator: {last_parent_op.operator_name}) contributes {op_output} rows to input"
- )
- parent_total_output += op_output
- # Create temporary operator stats objects from block metadata
- op_stats = [
- OperatorStatsSummary.from_block_metadata(
- name, stats, is_sub_operator=is_sub_operator
- )
- for name, stats in self.metadata.items()
- ]
- for i, op_stat in enumerate(op_stats):
- # For sub-operators: inherit input based on the order in the current list
- if is_sub_operator:
- if i == 0:
- # Input of the first sub-operator is the total output from parent nodes
- op_stat.total_input_num_rows = parent_total_output
- else:
- # Input of subsequent sub-operators is the output of the previous sub-operator
- prev_op = op_stats[i - 1]
- op_stat.total_input_num_rows = (
- prev_op.output_num_rows["sum"]
- if (
- prev_op.output_num_rows and "sum" in prev_op.output_num_rows
- )
- else 0
- )
- else:
- # Single operator scenario: input rows = total output from all parent nodes
- op_stat.total_input_num_rows = parent_total_output
- operators_stats.append(op_stat)
- streaming_exec_schedule_s = (
- self.streaming_exec_schedule_s.get()
- if self.streaming_exec_schedule_s
- else 0
- )
- return DatasetStatsSummary(
- operators_stats,
- iter_stats,
- stats_summary_parents,
- self.number,
- self.dataset_uuid,
- self.time_total_s,
- self.base_name,
- self.extra_metrics,
- self.global_bytes_spilled,
- self.global_bytes_restored,
- self.dataset_bytes_spilled,
- streaming_exec_schedule_s,
- )
- def runtime_metrics(self) -> str:
- """Generate a string representing the runtime metrics of a Dataset. This is
- a high level summary of the time spent in Ray Data code broken down by operator.
- It also includes the time spent in the scheduler. Times are shown as the total
- time for each operator and percentages of time are shown as a fraction of the
- total time for the whole dataset."""
- return self.to_summary().runtime_metrics()
- @DeveloperAPI
- @dataclass
- class DatasetStatsSummary:
- operators_stats: List["OperatorStatsSummary"]
- iter_stats: "IterStatsSummary"
- parents: List["DatasetStatsSummary"]
- number: int
- dataset_uuid: str
- time_total_s: float
- base_name: str
- extra_metrics: Dict[str, Any]
- global_bytes_spilled: int
- global_bytes_restored: int
- dataset_bytes_spilled: int
- streaming_exec_schedule_s: float
- def to_string(
- self,
- already_printed: Optional[Set[str]] = None,
- include_parent: bool = True,
- add_global_stats=True,
- ) -> str:
- """Return a human-readable summary of this Dataset's stats.
- Args:
- already_printed: Set of operator IDs that have already had its stats printed
- out.
- include_parent: If true, also include parent stats summary; otherwise, only
- log stats of the latest operator.
- add_global_stats: If true, includes global stats to this summary.
- Returns:
- String with summary statistics for executing the Dataset.
- """
- if already_printed is None:
- already_printed = set()
- out = ""
- if self.parents and include_parent:
- for p in self.parents:
- parent_sum = p.to_string(already_printed, add_global_stats=False)
- if parent_sum:
- out += parent_sum
- out += "\n"
- operators_stats_summary = None
- if len(self.operators_stats) == 1:
- operators_stats_summary = self.operators_stats[0]
- operator_name = operators_stats_summary.operator_name
- operator_uuid = self.dataset_uuid + operator_name
- out += "Operator {} {}: ".format(self.number, operator_name)
- if operator_uuid in already_printed:
- out += "[execution cached]\n"
- else:
- already_printed.add(operator_uuid)
- out += str(operators_stats_summary)
- elif len(self.operators_stats) > 1:
- rounded_total = round(self.time_total_s, 2)
- if rounded_total <= 0:
- # Handle -0.0 case.
- rounded_total = 0
- out += "Operator {} {}: executed in {}s\n".format(
- self.number, self.base_name, rounded_total
- )
- for n, operators_stats_summary in enumerate(self.operators_stats):
- operator_name = operators_stats_summary.operator_name
- operator_uuid = self.dataset_uuid + operator_name
- out += "\n"
- out += "\tSuboperator {} {}: ".format(n, operator_name)
- if operator_uuid in already_printed:
- out += "\t[execution cached]\n"
- else:
- already_printed.add(operator_uuid)
- out += str(operators_stats_summary)
- verbose_stats_logs = DataContext.get_current().verbose_stats_logs
- if verbose_stats_logs and self.extra_metrics:
- indent = (
- "\t"
- if operators_stats_summary and operators_stats_summary.is_sub_operator
- else ""
- )
- out += indent
- out += "* Extra metrics: " + str(self.extra_metrics) + "\n"
- out += str(self.iter_stats)
- if len(self.operators_stats) > 0 and add_global_stats:
- mb_spilled = round(self.global_bytes_spilled / 1e6)
- mb_restored = round(self.global_bytes_restored / 1e6)
- if mb_spilled or mb_restored:
- out += "\nCluster memory:\n"
- out += "* Spilled to disk: {}MB\n".format(mb_spilled)
- out += "* Restored from disk: {}MB\n".format(mb_restored)
- dataset_mb_spilled = round(self.dataset_bytes_spilled / 1e6)
- if dataset_mb_spilled:
- out += "\nDataset memory:\n"
- out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)
- if self.num_rows_per_s:
- out += "\n"
- out += "Dataset throughput:\n"
- out += f"\t* Ray Data throughput: {self.num_rows_per_s} rows/s\n"
- if verbose_stats_logs and add_global_stats:
- out += "\n" + self.runtime_metrics()
- return out
- @property
- def num_rows_per_s(self) -> float:
- """Calculates the throughput in rows per second for the entire dataset."""
- # The observed dataset throughput is computed by dividing the total number
- # of rows produced by the total wall time of the dataset (i.e. from start to
- # finish how long did the dataset take to be processed). With the recursive
- # nature of the DatasetStatsSummary, we use get_total_wall_time to determine
- # the total wall time (this finds the difference between the earliest start
- # and latest end for any block in any operator).
- output_num_rows = (
- self.operators_stats[-1].output_num_rows if self.operators_stats else 0
- )
- total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
- wall_time = self.get_total_wall_time()
- if not total_num_out_rows or not wall_time:
- return 0.0
- return total_num_out_rows / wall_time
- @staticmethod
- def _collect_dataset_stats_summaries(
- curr: "DatasetStatsSummary",
- ) -> List["DatasetStatsSummary"]:
- summs = []
- # TODO: Do operators ever have multiple parents? Do we need to deduplicate?
- for p in curr.parents:
- if p and p.parents:
- summs.extend(DatasetStatsSummary._collect_dataset_stats_summaries(p))
- return summs + [curr]
- @staticmethod
- def _find_start_and_end(summ: "DatasetStatsSummary") -> Tuple[float, float]:
- earliest_start = min(ops.earliest_start_time for ops in summ.operators_stats)
- latest_end = max(ops.latest_end_time for ops in summ.operators_stats)
- return earliest_start, latest_end
- def runtime_metrics(self) -> str:
- total_wall_time = self.get_total_wall_time()
- def fmt_line(name: str, time: float) -> str:
- fraction = time / total_wall_time if total_wall_time > 0 else 0
- return f"* {name}: {fmt(time)} ({fraction * 100:.3f}%)\n"
- summaries = DatasetStatsSummary._collect_dataset_stats_summaries(self)
- out = "Runtime Metrics:\n"
- for summ in summaries:
- if len(summ.operators_stats) > 0:
- earliest_start, latest_end = DatasetStatsSummary._find_start_and_end(
- summ
- )
- op_total_time = latest_end - earliest_start
- out += fmt_line(summ.base_name, op_total_time)
- out += fmt_line("Scheduling", self.streaming_exec_schedule_s)
- out += fmt_line("Total", total_wall_time)
- return out
- def __repr__(self, level=0) -> str:
- indent = leveled_indent(level)
- operators_stats = "\n".join(
- [ss.__repr__(level + 2) for ss in self.operators_stats]
- )
- parent_stats = "\n".join([ps.__repr__(level + 2) for ps in self.parents])
- extra_metrics = "\n".join(
- f"{leveled_indent(level + 2)}{k}: {v},"
- for k, v in self.extra_metrics.items()
- )
- # Handle formatting case for empty outputs.
- operators_stats = (
- f"\n{operators_stats},\n{indent} " if operators_stats else ""
- )
- parent_stats = f"\n{parent_stats},\n{indent} " if parent_stats else ""
- extra_metrics = f"\n{extra_metrics}\n{indent} " if extra_metrics else ""
- return (
- f"{indent}DatasetStatsSummary(\n"
- f"{indent} dataset_uuid={self.dataset_uuid},\n"
- f"{indent} base_name={self.base_name},\n"
- f"{indent} number={self.number},\n"
- f"{indent} extra_metrics={{{extra_metrics}}},\n"
- f"{indent} operators_stats=[{operators_stats}],\n"
- f"{indent} iter_stats={self.iter_stats.__repr__(level+1)},\n"
- f"{indent} global_bytes_spilled={self.global_bytes_spilled / 1e6}MB,\n"
- f"{indent} global_bytes_restored={self.global_bytes_restored / 1e6}MB,\n"
- f"{indent} dataset_bytes_spilled={self.dataset_bytes_spilled / 1e6}MB,\n"
- f"{indent} parents=[{parent_stats}],\n"
- f"{indent})"
- )
- def get_total_wall_time(self) -> float:
- """Calculate the total wall time for the dataset, this is done by finding
- the earliest start time and latest end time for any block in any operator.
- The wall time is the difference of these two times.
- """
- start_ends = [
- DatasetStatsSummary._find_start_and_end(summ)
- for summ in DatasetStatsSummary._collect_dataset_stats_summaries(self)
- if len(summ.operators_stats) > 0
- ]
- if len(start_ends) == 0:
- return 0
- else:
- earliest_start = min(start_end[0] for start_end in start_ends)
- latest_end = max(start_end[1] for start_end in start_ends)
- return latest_end - earliest_start
- def get_total_time_all_blocks(self) -> float:
- """Calculate the sum of the wall times across all blocks of all operators."""
- summaries = DatasetStatsSummary._collect_dataset_stats_summaries(self)
- return sum(
- (
- sum(
- ops.wall_time.get("sum", 0) if ops.wall_time else 0
- for ops in summ.operators_stats
- )
- )
- for summ in summaries
- )
- def get_total_cpu_time(self) -> float:
- parent_sum = sum(p.get_total_cpu_time() for p in self.parents)
- return parent_sum + sum(
- ss.cpu_time.get("sum", 0) for ss in self.operators_stats
- )
- def get_max_heap_memory(self) -> float:
- parent_memory = [p.get_max_heap_memory() for p in self.parents]
- parent_max = max(parent_memory) if parent_memory else 0
- if not self.operators_stats:
- return parent_max
- return max(
- parent_max,
- *[ss.memory.get("max", 0) for ss in self.operators_stats],
- )
- @dataclass
- class OperatorStatsSummary:
- operator_name: str
- # Whether the operator associated with this OperatorStatsSummary object
- # is a suboperator
- is_sub_operator: bool
- # This is the total walltime of the entire operator, typically obtained from
- # `DatasetStats.time_total_s`. An important distinction is that this is the
- # overall runtime of the operator, pulled from the stats actor, whereas the
- # computed walltimes in `self.wall_time` are calculated on a operator level.
- time_total_s: float
- earliest_start_time: float
- latest_end_time: float
- # String summarizing high-level statistics from executing the operator
- block_execution_summary_str: str
- # The fields below are dicts with stats aggregated across blocks
- # processed in this operator. For example:
- # {"min": ..., "max": ..., "mean": ..., "sum": ...}
- wall_time: Optional[Dict[str, float]] = None
- cpu_time: Optional[Dict[str, float]] = None
- udf_time: Optional[Dict[str, float]] = None
- # memory: no "sum" stat
- memory: Optional[Dict[str, float]] = None
- # Use the output_num_rows of the parent Operator as output_num_rows
- total_input_num_rows: Optional[int] = None
- output_num_rows: Optional[Dict[str, float]] = None
- output_size_bytes: Optional[Dict[str, float]] = None
- # node_count: "count" stat instead of "sum"
- node_count: Optional[Dict[str, float]] = None
- task_rows: Optional[Dict[str, float]] = None
- @property
- def num_rows_per_s(self) -> float:
- # The observed Ray Data operator throughput is computed by dividing the
- # total number of rows produced by the wall time of the operator,
- # time_total_s.
- if not self.output_num_rows or not self.time_total_s:
- return 0.0
- return self.output_num_rows["sum"] / self.time_total_s
- @property
- def num_rows_per_task_s(self) -> float:
- """Calculates the estimated single-task throughput in rows per second."""
- # The estimated single task operator throughput is computed by dividing the
- # total number of rows produced by the sum of the wall times across all
- # blocks of the operator. This assumes that on a single task the work done
- # would be equivalent, with no concurrency.
- if not self.output_num_rows or not self.wall_time or not self.wall_time["sum"]:
- return 0.0
- return self.output_num_rows["sum"] / self.wall_time["sum"]
- @classmethod
- def from_block_metadata(
- cls,
- operator_name: str,
- block_stats: List[BlockStats],
- is_sub_operator: bool,
- ) -> "OperatorStatsSummary":
- """Calculate the stats for a operator from a given list of blocks,
- and generates a `OperatorStatsSummary` object with the results.
- Args:
- block_stats: List of `BlockStats` to calculate stats of
- operator_name: Name of operator associated with `blocks`
- is_sub_operator: Whether this set of blocks belongs to a sub operator.
- Returns:
- A `OperatorStatsSummary` object initialized with the calculated statistics
- """
- exec_stats = [m.exec_stats for m in block_stats if m.exec_stats is not None]
- rounded_total = 0
- time_total_s = 0
- earliest_start_time, latest_end_time = 0, 0
- if exec_stats:
- # Calculate the total execution time of operator as
- # the difference between the latest end time and
- # the earliest start time of all blocks in the operator.
- earliest_start_time = min(s.start_time_s for s in exec_stats)
- latest_end_time = max(s.end_time_s for s in exec_stats)
- time_total_s = latest_end_time - earliest_start_time
- if is_sub_operator:
- exec_summary_str = "{} blocks produced\n".format(len(exec_stats))
- else:
- if exec_stats:
- rounded_total = round(time_total_s, 2)
- if rounded_total <= 0:
- # Handle -0.0 case.
- rounded_total = 0
- exec_summary_str = "{} blocks produced in {}s".format(
- len(exec_stats), rounded_total
- )
- else:
- exec_summary_str = ""
- exec_summary_str += "\n"
- task_rows = collections.defaultdict(int)
- for meta in block_stats:
- if meta.num_rows is not None and meta.exec_stats is not None:
- task_rows[meta.exec_stats.task_idx] += meta.num_rows
- task_rows_stats = None
- if len(task_rows) > 0:
- task_rows_stats = {
- "min": min(task_rows.values()),
- "max": max(task_rows.values()),
- "mean": int(np.mean(list(task_rows.values()))),
- "count": len(task_rows),
- }
- exec_summary_str = "{} tasks executed, {}".format(
- len(task_rows), exec_summary_str
- )
- wall_time_stats, cpu_stats, memory_stats, udf_stats = None, None, None, None
- if exec_stats:
- wall_time_stats = {
- "min": min([e.wall_time_s for e in exec_stats]),
- "max": max([e.wall_time_s for e in exec_stats]),
- "mean": np.mean([e.wall_time_s for e in exec_stats]),
- "sum": sum([e.wall_time_s for e in exec_stats]),
- }
- cpu_stats = {
- "min": min([e.cpu_time_s for e in exec_stats]),
- "max": max([e.cpu_time_s for e in exec_stats]),
- "mean": np.mean([e.cpu_time_s for e in exec_stats]),
- "sum": sum([e.cpu_time_s for e in exec_stats]),
- }
- memory_stats_mb = [
- round((e.max_uss_bytes or 0) / (1024 * 1024), 2) for e in exec_stats
- ]
- memory_stats = {
- "min": min(memory_stats_mb),
- "max": max(memory_stats_mb),
- "mean": int(np.mean(memory_stats_mb)),
- }
- udf_stats = {
- "min": min([e.udf_time_s for e in exec_stats]),
- "max": max([e.udf_time_s for e in exec_stats]),
- "mean": np.mean([e.udf_time_s for e in exec_stats]),
- "sum": sum([e.udf_time_s for e in exec_stats]),
- }
- output_num_rows_stats = None
- output_num_rows = [m.num_rows for m in block_stats if m.num_rows is not None]
- if output_num_rows:
- output_num_rows_stats = {
- "min": min(output_num_rows),
- "max": max(output_num_rows),
- "mean": int(np.mean(output_num_rows)),
- "sum": sum(output_num_rows),
- }
- output_size_bytes_stats = None
- output_size_bytes = [
- m.size_bytes for m in block_stats if m.size_bytes is not None
- ]
- if output_size_bytes:
- output_size_bytes_stats = {
- "min": min(output_size_bytes),
- "max": max(output_size_bytes),
- "mean": int(np.mean(output_size_bytes)),
- "sum": sum(output_size_bytes),
- }
- node_counts_stats = None
- if exec_stats:
- node_tasks = collections.defaultdict(set)
- for s in exec_stats:
- node_tasks[s.node_id].add(s.task_idx)
- node_counts = {node: len(tasks) for node, tasks in node_tasks.items()}
- node_counts_stats = {
- "min": min(node_counts.values()),
- "max": max(node_counts.values()),
- "mean": int(np.mean(list(node_counts.values()))),
- "count": len(node_counts),
- }
- # Assign a value in to_summary and initialize it as None.
- total_input_num_rows = None
- return OperatorStatsSummary(
- operator_name=operator_name,
- is_sub_operator=is_sub_operator,
- time_total_s=time_total_s,
- earliest_start_time=earliest_start_time,
- latest_end_time=latest_end_time,
- block_execution_summary_str=exec_summary_str,
- wall_time=wall_time_stats,
- cpu_time=cpu_stats,
- udf_time=udf_stats,
- memory=memory_stats,
- total_input_num_rows=total_input_num_rows,
- output_num_rows=output_num_rows_stats,
- output_size_bytes=output_size_bytes_stats,
- node_count=node_counts_stats,
- task_rows=task_rows_stats,
- )
- def __str__(self) -> str:
- """For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
- `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
- that summarizes operator execution statistics.
- Returns:
- String with summary statistics for executing the given operator.
- """
- indent = "\t" if self.is_sub_operator else ""
- out = self.block_execution_summary_str
- wall_time_stats = self.wall_time
- if wall_time_stats:
- out += indent
- out += "* Remote wall time: {} min, {} max, {} mean, {} total\n".format(
- fmt(wall_time_stats["min"]),
- fmt(wall_time_stats["max"]),
- fmt(wall_time_stats["mean"]),
- fmt(wall_time_stats["sum"]),
- )
- cpu_stats = self.cpu_time
- if cpu_stats:
- out += indent
- out += "* Remote cpu time: {} min, {} max, {} mean, {} total\n".format(
- fmt(cpu_stats["min"]),
- fmt(cpu_stats["max"]),
- fmt(cpu_stats["mean"]),
- fmt(cpu_stats["sum"]),
- )
- udf_stats = self.udf_time
- if udf_stats:
- out += indent
- out += "* UDF time: {} min, {} max, {} mean, {} total\n".format(
- fmt(udf_stats["min"]),
- fmt(udf_stats["max"]),
- fmt(udf_stats["mean"]),
- fmt(udf_stats["sum"]),
- )
- memory_stats = self.memory
- if memory_stats:
- out += indent
- out += "* Peak heap memory usage (MiB): {} min, {} max, {} mean\n".format(
- memory_stats["min"],
- memory_stats["max"],
- memory_stats["mean"],
- )
- output_num_rows_stats = self.output_num_rows
- if output_num_rows_stats:
- out += indent
- out += (
- "* Output num rows per block: {} min, {} max, {} mean, {} total\n"
- ).format(
- output_num_rows_stats["min"],
- output_num_rows_stats["max"],
- output_num_rows_stats["mean"],
- output_num_rows_stats["sum"],
- )
- output_size_bytes_stats = self.output_size_bytes
- if output_size_bytes_stats:
- out += indent
- out += (
- "* Output size bytes per block: {} min, {} max, {} mean, {} total\n"
- ).format(
- output_size_bytes_stats["min"],
- output_size_bytes_stats["max"],
- output_size_bytes_stats["mean"],
- output_size_bytes_stats["sum"],
- )
- task_rows = self.task_rows
- if task_rows:
- out += indent
- out += (
- "* Output rows per task: {} min, {} max, {} mean, {} tasks used\n"
- ).format(
- task_rows["min"],
- task_rows["max"],
- task_rows["mean"],
- task_rows["count"],
- )
- node_count_stats = self.node_count
- if node_count_stats:
- out += indent
- out += "* Tasks per node: {} min, {} max, {} mean; {} nodes used\n".format(
- node_count_stats["min"],
- node_count_stats["max"],
- node_count_stats["mean"],
- node_count_stats["count"],
- )
- if self.num_rows_per_s and self.num_rows_per_task_s:
- total_num_in_rows = (
- self.total_input_num_rows if self.total_input_num_rows else 0
- )
- total_num_out_rows = output_num_rows_stats["sum"]
- out += indent
- out += "* Operator throughput:\n"
- out += (
- indent + "\t* Total input num rows:" f" {total_num_in_rows} " "rows\n"
- )
- out += (
- indent + "\t* Total output num rows:" f" {total_num_out_rows} " "rows\n"
- )
- out += (
- indent + "\t* Ray Data throughput:"
- f" {self.num_rows_per_s} "
- "rows/s\n"
- )
- out += (
- indent + "\t* Estimated single task throughput:"
- f" {self.num_rows_per_task_s} "
- "rows/s\n"
- )
- return out
- def __repr__(self, level=0) -> str:
- """For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
- `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
- that summarizes operator execution statistics.
- Returns:
- String with summary statistics for executing the given operator.
- """
- indent = leveled_indent(level)
- indent += leveled_indent(1) if self.is_sub_operator else ""
- wall_time_stats = {k: fmt(v) for k, v in (self.wall_time or {}).items()}
- cpu_stats = {k: fmt(v) for k, v in (self.cpu_time or {}).items()}
- memory_stats = {k: fmt(v) for k, v in (self.memory or {}).items()}
- output_num_rows_stats = {
- k: fmt(v) for k, v in (self.output_num_rows or {}).items()
- }
- output_size_bytes_stats = {
- k: fmt(v) for k, v in (self.output_size_bytes or {}).items()
- }
- node_conut_stats = {k: fmt(v) for k, v in (self.node_count or {}).items()}
- out = (
- f"{indent}OperatorStatsSummary(\n"
- f"{indent} operator_name='{self.operator_name}',\n"
- f"{indent} is_suboperator={self.is_sub_operator},\n"
- f"{indent} time_total_s={fmt(self.time_total_s)},\n"
- # block_execution_summary_str already ends with \n
- f"{indent} block_execution_summary_str={self.block_execution_summary_str}"
- f"{indent} wall_time={wall_time_stats or None},\n"
- f"{indent} cpu_time={cpu_stats or None},\n"
- f"{indent} memory={memory_stats or None},\n"
- f"{indent} output_num_rows={output_num_rows_stats or None},\n"
- f"{indent} output_size_bytes={output_size_bytes_stats or None},\n"
- f"{indent} node_count={node_conut_stats or None},\n"
- f"{indent})"
- )
- return out
- @dataclass
- class IterStatsSummary:
- # Time spent in actor based prefetching, in seconds.
- wait_time: Timer
- # Time spent getting RefBundles from the dataset iterator, in seconds
- get_ref_bundles_time: Timer
- # Time spent in `ray.get()`, in seconds
- get_time: Timer
- # Time spent in batch building, in seconds
- next_time: Timer
- # Time spent in `_format_batch_()`, in seconds
- format_time: Timer
- # Time spent in collate fn, in seconds
- collate_time: Timer
- # Time spent in finalize_fn, in seconds
- finalize_batch_time: Timer
- # Time user thread is blocked waiting for first batch
- time_to_first_batch: Timer
- # Total time user thread is blocked by iter_batches
- block_time: Timer
- # Time spent in user code, in seconds
- user_time: Timer
- initialize_time: Timer
- # Total time taken by Dataset iterator, in seconds
- total_time: Timer
- # Time spent in streaming split coordinator
- streaming_split_coord_time: Timer
- # Num of blocks that are in local object store
- iter_blocks_local: int
- # Num of blocks that are in remote node and have to fetch locally
- iter_blocks_remote: int
- # Num of blocks with unknown locations
- iter_unknown_location: int
- # Current bytes of prefetched blocks in the iterator
- iter_prefetched_bytes: int
- def __str__(self) -> str:
- return self.to_string()
- def to_string(self) -> str:
- out = ""
- if (
- self.block_time.get()
- or self.time_to_first_batch.get()
- or self.total_time.get()
- or self.get_ref_bundles_time.get()
- or self.get_time.get()
- or self.next_time.get()
- or self.format_time.get()
- or self.collate_time.get()
- or self.finalize_batch_time.get()
- ):
- out += "\nDataset iterator time breakdown:\n"
- if self.total_time.get():
- out += "* Total time overall: {}\n".format(fmt(self.total_time.get()))
- if self.initialize_time.get():
- out += (
- " * Total time in Ray Data iterator initialization code: "
- "{}\n".format(fmt(self.initialize_time.get()))
- )
- if self.block_time.get():
- out += (
- " * Total time user thread is blocked by Ray Data iter_batches: "
- "{}\n".format(fmt(self.block_time.get()))
- )
- if self.time_to_first_batch.get():
- out += (
- " * Total time spent waiting for the first batch after starting iteration: "
- "{}\n".format(fmt(self.time_to_first_batch.get()))
- )
- if self.user_time.get():
- out += " * Total execution time for user thread: {}\n".format(
- fmt(self.user_time.get())
- )
- out += (
- "* Batch iteration time breakdown (summed across prefetch threads):\n"
- )
- if self.get_ref_bundles_time.get():
- out += " * In get RefBundles: {} min, {} max, {} avg, {} total\n".format(
- fmt(self.get_ref_bundles_time.min()),
- fmt(self.get_ref_bundles_time.max()),
- fmt(self.get_ref_bundles_time.avg()),
- fmt(self.get_ref_bundles_time.get()),
- )
- if self.get_time.get():
- out += " * In ray.get(): {} min, {} max, {} avg, {} total\n".format(
- fmt(self.get_time.min()),
- fmt(self.get_time.max()),
- fmt(self.get_time.avg()),
- fmt(self.get_time.get()),
- )
- if self.next_time.get():
- batch_creation_str = (
- " * In batch creation: {} min, {} max, {} avg, {} total\n"
- )
- out += batch_creation_str.format(
- fmt(self.next_time.min()),
- fmt(self.next_time.max()),
- fmt(self.next_time.avg()),
- fmt(self.next_time.get()),
- )
- if self.format_time.get():
- format_str = (
- " * In batch formatting: {} min, {} max, {} avg, {} total\n"
- )
- out += format_str.format(
- fmt(self.format_time.min()),
- fmt(self.format_time.max()),
- fmt(self.format_time.avg()),
- fmt(self.format_time.get()),
- )
- if self.collate_time.get():
- out += " * In collate_fn: {} min, {} max, {} avg, {} total\n".format(
- fmt(self.collate_time.min()),
- fmt(self.collate_time.max()),
- fmt(self.collate_time.avg()),
- fmt(self.collate_time.get()),
- )
- if self.finalize_batch_time.get():
- format_str = (
- " * In host->device transfer: {} min, {} max, {} avg, {} total\n"
- )
- out += format_str.format(
- fmt(self.finalize_batch_time.min()),
- fmt(self.finalize_batch_time.max()),
- fmt(self.finalize_batch_time.avg()),
- fmt(self.finalize_batch_time.get()),
- )
- if DataContext.get_current().enable_get_object_locations_for_metrics:
- out += "Block locations:\n"
- out += " * Num blocks local: {}\n".format(self.iter_blocks_local)
- out += " * Num blocks remote: {}\n".format(self.iter_blocks_remote)
- out += " * Num blocks unknown location: {}\n".format(
- self.iter_unknown_location
- )
- if self.iter_prefetched_bytes:
- out += " * Prefetched bytes: {}\n".format(self.iter_prefetched_bytes)
- if self.streaming_split_coord_time.get() != 0:
- out += "Streaming split coordinator overhead time: "
- out += f"{fmt(self.streaming_split_coord_time.get())}\n"
- return out
- def __repr__(self, level=0) -> str:
- indent = leveled_indent(level)
- return (
- f"IterStatsSummary(\n"
- f"{indent} wait_time={fmt(self.wait_time.get()) or None},\n"
- f"{indent} get_ref_bundles_time={fmt(self.get_ref_bundles_time.get()) or None},\n"
- f"{indent} get_time={fmt(self.get_time.get()) or None},\n"
- f"{indent} iter_blocks_local={self.iter_blocks_local or None},\n"
- f"{indent} iter_blocks_remote={self.iter_blocks_remote or None},\n"
- f"{indent} iter_unknown_location={self.iter_unknown_location or None},\n"
- f"{indent} iter_prefetched_bytes={self.iter_prefetched_bytes or None},\n"
- f"{indent} next_time={fmt(self.next_time.get()) or None},\n"
- f"{indent} format_time={fmt(self.format_time.get()) or None},\n"
- f"{indent} user_time={fmt(self.user_time.get()) or None},\n"
- f"{indent} total_time={fmt(self.total_time.get()) or None},\n"
- f"{indent})"
- )
|