| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
- import numpy as np
- import ray
- from ray.data.block import Block, BlockAccessor, BlockMetadata
- from ray.exceptions import RayError
- from ray.types import ObjectRef
- if TYPE_CHECKING:
- from ray.data.dataset import Dataset, Schema
- _DATASET_REPR_ELLIPSIS = "…" # Ellipsis marker for truncated cells/rows.
- _DATASET_REPR_MAX_ROWS = 10 # Total preview row budget when materialized.
- _DATASET_REPR_HEAD_ROWS = 5 # Number of head rows to show before the gap.
- _DATASET_REPR_MAX_COLUMN_WIDTH = 40 # Max width per column cell in the table.
- _DATASET_REPR_GET_TIMEOUT_S = 30.0 # Timeout for fetching preview blocks.
- __all__ = [
- "_build_dataset_ascii_repr",
- ]
- def _build_dataset_ascii_repr(
- dataset: "Dataset",
- schema: "Schema",
- is_materialized: bool,
- ) -> str:
- """Render the dataset as a multi-line tabular string."""
- columns = list(schema.names)
- if not columns:
- return dataset._plan.get_plan_as_string(dataset.__class__)
- num_rows = dataset._meta_count()
- head_rows: List[List[str]] = []
- tail_rows: List[List[str]] = []
- if is_materialized:
- try:
- head_data, tail_data, _ = _collect_materialized_rows_for_repr(
- dataset, num_rows
- )
- head_rows = _format_rows_for_repr(head_data, columns)
- tail_rows = _format_rows_for_repr(tail_data, columns)
- except RayError:
- head_rows = []
- tail_rows = []
- return _build_dataset_ascii_repr_from_rows(
- schema=schema,
- num_rows=num_rows,
- dataset_name=dataset.name,
- is_materialized=is_materialized,
- head_rows=head_rows,
- tail_rows=tail_rows,
- )
- def _build_dataset_ascii_repr_from_rows(
- *,
- schema: "Schema",
- num_rows: Optional[int],
- dataset_name: Optional[str],
- is_materialized: bool,
- head_rows: List[List[str]],
- tail_rows: List[List[str]],
- ) -> str:
- """Render the dataset repr given schema metadata and preview rows."""
- columns = list(schema.names)
- num_cols = len(columns)
- shape_line = f"shape: ({num_rows if num_rows is not None else '?'}, {num_cols})"
- # Build header rows from schema.
- dtype_strings = [_repr_format_dtype(t) for t in schema.types]
- column_headers = [
- _truncate_to_cell_width(str(col), _DATASET_REPR_MAX_COLUMN_WIDTH)
- for col in columns
- ]
- dtype_headers = [
- _truncate_to_cell_width(dtype, _DATASET_REPR_MAX_COLUMN_WIDTH)
- for dtype in dtype_strings
- ]
- separator_row = ["---"] * len(columns)
- # Assemble rows, including an ellipsis gap if needed.
- show_gap = bool(head_rows) and bool(tail_rows)
- display_rows: List[List[str]] = []
- display_rows.extend(head_rows)
- if show_gap:
- display_rows.append([_DATASET_REPR_ELLIPSIS] * len(columns))
- display_rows.extend(tail_rows)
- # Render the table with computed column widths.
- column_widths = _compute_column_widths(
- column_headers, dtype_headers, separator_row, display_rows
- )
- table_lines = _render_table_lines(
- column_headers,
- dtype_headers,
- separator_row,
- display_rows,
- column_widths,
- )
- # Append a summary line describing row coverage.
- num_rows_shown = len(head_rows) + len(tail_rows)
- summary_line = (
- f"(Showing {num_rows_shown} of {num_rows} rows)"
- if is_materialized
- else "(Dataset isn't materialized)"
- )
- if is_materialized and num_rows is None:
- summary_line = f"(Showing {num_rows_shown} of ? rows)"
- components = []
- if dataset_name is not None:
- components.append(f"name: {dataset_name}")
- components.extend([shape_line, "\n".join(table_lines), summary_line])
- return "\n".join(components)
- def _repr_format_dtype(dtype: object) -> str:
- """Format a dtype into a compact string for the schema row.
- Dtypes may come from PyArrow, pandas/NumPy, or be plain Python types.
- """
- if isinstance(dtype, type):
- return dtype.__name__
- name = getattr(dtype, "name", None)
- if isinstance(name, str):
- return name
- return str(dtype)
- def _collect_materialized_rows_for_repr(
- dataset: "Dataset",
- num_rows: Optional[int],
- ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], bool]:
- """Collect head/tail rows for preview and whether to show a gap row."""
- block_entries: List[Tuple[ObjectRef, BlockMetadata]] = []
- for ref_bundle in dataset.iter_internal_ref_bundles():
- block_entries.extend(zip(ref_bundle.block_refs, ref_bundle.metadata))
- if not block_entries:
- return [], [], False
- # Compute how many head/tail rows to show within the preview budget.
- head_row_limit, tail_row_limit = _determine_preview_row_targets(num_rows)
- block_cache: Dict[ObjectRef, Block] = {}
- def _resolve_block(block_ref: ObjectRef) -> Block:
- if block_ref not in block_cache:
- block_cache[block_ref] = ray.get(
- block_ref, timeout=_DATASET_REPR_GET_TIMEOUT_S
- )
- return block_cache[block_ref]
- head_rows: List[Dict[str, Any]] = []
- head_remaining = head_row_limit
- for block_ref, _ in block_entries:
- if head_remaining <= 0:
- break
- block = _resolve_block(block_ref)
- accessor = BlockAccessor.for_block(block)
- for row in accessor.iter_rows(public_row_format=True):
- head_rows.append(row)
- head_remaining -= 1
- if head_remaining <= 0:
- break
- tail_rows: List[Dict[str, Any]] = []
- tail_remaining = tail_row_limit
- tail_parts: List[List[Dict[str, Any]]] = []
- if tail_remaining > 0:
- for block_ref, metadata in reversed(block_entries):
- if tail_remaining <= 0:
- break
- block = _resolve_block(block_ref)
- accessor = BlockAccessor.for_block(block)
- total_rows = metadata.num_rows
- if total_rows is None:
- total_rows = accessor.num_rows()
- if total_rows == 0:
- continue
- start = max(0, total_rows - tail_remaining)
- sliced_block = accessor.slice(start, total_rows, copy=False)
- slice_accessor = BlockAccessor.for_block(sliced_block)
- block_rows = list(slice_accessor.iter_rows(public_row_format=True))
- tail_parts.append(block_rows)
- tail_remaining -= len(block_rows)
- if tail_remaining <= 0:
- break
- for part in reversed(tail_parts):
- tail_rows.extend(part)
- show_gap = bool(head_rows) and bool(tail_rows)
- return head_rows, tail_rows, show_gap
- def _determine_preview_row_targets(num_rows: Optional[int]) -> Tuple[int, int]:
- """Compute how many head and tail rows to preview."""
- max_rows = _DATASET_REPR_MAX_ROWS
- if num_rows is None or num_rows <= max_rows:
- head = num_rows if num_rows is not None else max_rows
- return head, 0
- head = min(_DATASET_REPR_HEAD_ROWS, max_rows)
- tail = max_rows - head
- return head, tail
- def _format_rows_for_repr(
- rows: List[Dict[str, Any]],
- column_names: List[str],
- ) -> List[List[str]]:
- """Format row dicts into string cell rows for table rendering."""
- formatted_rows: List[List[str]] = []
- for row in rows:
- formatted_row = []
- for column in column_names:
- value = row.get(column)
- formatted_value = _format_value(value)
- formatted_row.append(
- _truncate_to_cell_width(formatted_value, _DATASET_REPR_MAX_COLUMN_WIDTH)
- )
- formatted_rows.append(formatted_row)
- return formatted_rows
- def _format_value(value: Any) -> str:
- if isinstance(value, np.generic):
- value = value.item()
- return str(value).replace("\n", " ").replace("\r", " ")
- def _truncate_to_cell_width(value: str, max_width: int) -> str:
- """Truncate a single cell to the configured max width."""
- if max_width is None:
- return value
- if max_width <= 0:
- return _DATASET_REPR_ELLIPSIS if value else ""
- if len(value) <= max_width:
- return value
- if max_width == 1:
- return _DATASET_REPR_ELLIPSIS
- return value[: max_width - 1] + _DATASET_REPR_ELLIPSIS
- def _compute_column_widths(
- headers: List[str],
- dtype_headers: List[str],
- separator_row: List[str],
- data_rows: List[List[str]],
- ) -> List[int]:
- """Compute per-column widths for table rendering."""
- column_widths: List[int] = []
- for idx in range(len(headers)):
- widths = [
- len(headers[idx]),
- len(dtype_headers[idx]),
- len(separator_row[idx]),
- ]
- for row in data_rows:
- widths.append(len(row[idx]))
- column_widths.append(max(widths))
- return column_widths
- def _render_table_lines(
- headers: List[str],
- dtype_headers: List[str],
- separator_row: List[str],
- data_rows: List[List[str]],
- column_widths: List[int],
- ) -> List[str]:
- """Render the full table (borders, headers, data) as lines."""
- lines: List[str] = []
- top = _render_border("╭", "┬", "╮", "─", column_widths)
- header_row = _render_row(headers, column_widths)
- separator_line = _render_row(separator_row, column_widths)
- dtype_row = _render_row(dtype_headers, column_widths)
- lines.extend([top, header_row, separator_line, dtype_row])
- if data_rows:
- middle = _render_border("╞", "╪", "╡", "═", column_widths)
- lines.append(middle)
- for row in data_rows:
- lines.append(_render_row(row, column_widths))
- bottom = _render_border("╰", "┴", "╯", "─", column_widths)
- lines.append(bottom)
- return lines
- def _render_border(
- left: str, middle: str, right: str, fill: str, column_widths: List[int]
- ) -> str:
- """Render a table border line given column widths."""
- segments = [fill * (width + 2) for width in column_widths]
- return f"{left}{middle.join(segments)}{right}"
- def _render_row(values: List[str], column_widths: List[int]) -> str:
- """Render a single table row with padding."""
- cells = []
- for idx, value in enumerate(values):
- padded = value.ljust(column_widths[idx])
- cells.append(f" {padded} ")
- return f"│{'┆'.join(cells)}│"
|