dataset_repr.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
  2. import numpy as np
  3. import ray
  4. from ray.data.block import Block, BlockAccessor, BlockMetadata
  5. from ray.exceptions import RayError
  6. from ray.types import ObjectRef
  7. if TYPE_CHECKING:
  8. from ray.data.dataset import Dataset, Schema
  9. _DATASET_REPR_ELLIPSIS = "…" # Ellipsis marker for truncated cells/rows.
  10. _DATASET_REPR_MAX_ROWS = 10 # Total preview row budget when materialized.
  11. _DATASET_REPR_HEAD_ROWS = 5 # Number of head rows to show before the gap.
  12. _DATASET_REPR_MAX_COLUMN_WIDTH = 40 # Max width per column cell in the table.
  13. _DATASET_REPR_GET_TIMEOUT_S = 30.0 # Timeout for fetching preview blocks.
  14. __all__ = [
  15. "_build_dataset_ascii_repr",
  16. ]
  17. def _build_dataset_ascii_repr(
  18. dataset: "Dataset",
  19. schema: "Schema",
  20. is_materialized: bool,
  21. ) -> str:
  22. """Render the dataset as a multi-line tabular string."""
  23. columns = list(schema.names)
  24. if not columns:
  25. return dataset._plan.get_plan_as_string(dataset.__class__)
  26. num_rows = dataset._meta_count()
  27. head_rows: List[List[str]] = []
  28. tail_rows: List[List[str]] = []
  29. if is_materialized:
  30. try:
  31. head_data, tail_data, _ = _collect_materialized_rows_for_repr(
  32. dataset, num_rows
  33. )
  34. head_rows = _format_rows_for_repr(head_data, columns)
  35. tail_rows = _format_rows_for_repr(tail_data, columns)
  36. except RayError:
  37. head_rows = []
  38. tail_rows = []
  39. return _build_dataset_ascii_repr_from_rows(
  40. schema=schema,
  41. num_rows=num_rows,
  42. dataset_name=dataset.name,
  43. is_materialized=is_materialized,
  44. head_rows=head_rows,
  45. tail_rows=tail_rows,
  46. )
  47. def _build_dataset_ascii_repr_from_rows(
  48. *,
  49. schema: "Schema",
  50. num_rows: Optional[int],
  51. dataset_name: Optional[str],
  52. is_materialized: bool,
  53. head_rows: List[List[str]],
  54. tail_rows: List[List[str]],
  55. ) -> str:
  56. """Render the dataset repr given schema metadata and preview rows."""
  57. columns = list(schema.names)
  58. num_cols = len(columns)
  59. shape_line = f"shape: ({num_rows if num_rows is not None else '?'}, {num_cols})"
  60. # Build header rows from schema.
  61. dtype_strings = [_repr_format_dtype(t) for t in schema.types]
  62. column_headers = [
  63. _truncate_to_cell_width(str(col), _DATASET_REPR_MAX_COLUMN_WIDTH)
  64. for col in columns
  65. ]
  66. dtype_headers = [
  67. _truncate_to_cell_width(dtype, _DATASET_REPR_MAX_COLUMN_WIDTH)
  68. for dtype in dtype_strings
  69. ]
  70. separator_row = ["---"] * len(columns)
  71. # Assemble rows, including an ellipsis gap if needed.
  72. show_gap = bool(head_rows) and bool(tail_rows)
  73. display_rows: List[List[str]] = []
  74. display_rows.extend(head_rows)
  75. if show_gap:
  76. display_rows.append([_DATASET_REPR_ELLIPSIS] * len(columns))
  77. display_rows.extend(tail_rows)
  78. # Render the table with computed column widths.
  79. column_widths = _compute_column_widths(
  80. column_headers, dtype_headers, separator_row, display_rows
  81. )
  82. table_lines = _render_table_lines(
  83. column_headers,
  84. dtype_headers,
  85. separator_row,
  86. display_rows,
  87. column_widths,
  88. )
  89. # Append a summary line describing row coverage.
  90. num_rows_shown = len(head_rows) + len(tail_rows)
  91. summary_line = (
  92. f"(Showing {num_rows_shown} of {num_rows} rows)"
  93. if is_materialized
  94. else "(Dataset isn't materialized)"
  95. )
  96. if is_materialized and num_rows is None:
  97. summary_line = f"(Showing {num_rows_shown} of ? rows)"
  98. components = []
  99. if dataset_name is not None:
  100. components.append(f"name: {dataset_name}")
  101. components.extend([shape_line, "\n".join(table_lines), summary_line])
  102. return "\n".join(components)
  103. def _repr_format_dtype(dtype: object) -> str:
  104. """Format a dtype into a compact string for the schema row.
  105. Dtypes may come from PyArrow, pandas/NumPy, or be plain Python types.
  106. """
  107. if isinstance(dtype, type):
  108. return dtype.__name__
  109. name = getattr(dtype, "name", None)
  110. if isinstance(name, str):
  111. return name
  112. return str(dtype)
  113. def _collect_materialized_rows_for_repr(
  114. dataset: "Dataset",
  115. num_rows: Optional[int],
  116. ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], bool]:
  117. """Collect head/tail rows for preview and whether to show a gap row."""
  118. block_entries: List[Tuple[ObjectRef, BlockMetadata]] = []
  119. for ref_bundle in dataset.iter_internal_ref_bundles():
  120. block_entries.extend(zip(ref_bundle.block_refs, ref_bundle.metadata))
  121. if not block_entries:
  122. return [], [], False
  123. # Compute how many head/tail rows to show within the preview budget.
  124. head_row_limit, tail_row_limit = _determine_preview_row_targets(num_rows)
  125. block_cache: Dict[ObjectRef, Block] = {}
  126. def _resolve_block(block_ref: ObjectRef) -> Block:
  127. if block_ref not in block_cache:
  128. block_cache[block_ref] = ray.get(
  129. block_ref, timeout=_DATASET_REPR_GET_TIMEOUT_S
  130. )
  131. return block_cache[block_ref]
  132. head_rows: List[Dict[str, Any]] = []
  133. head_remaining = head_row_limit
  134. for block_ref, _ in block_entries:
  135. if head_remaining <= 0:
  136. break
  137. block = _resolve_block(block_ref)
  138. accessor = BlockAccessor.for_block(block)
  139. for row in accessor.iter_rows(public_row_format=True):
  140. head_rows.append(row)
  141. head_remaining -= 1
  142. if head_remaining <= 0:
  143. break
  144. tail_rows: List[Dict[str, Any]] = []
  145. tail_remaining = tail_row_limit
  146. tail_parts: List[List[Dict[str, Any]]] = []
  147. if tail_remaining > 0:
  148. for block_ref, metadata in reversed(block_entries):
  149. if tail_remaining <= 0:
  150. break
  151. block = _resolve_block(block_ref)
  152. accessor = BlockAccessor.for_block(block)
  153. total_rows = metadata.num_rows
  154. if total_rows is None:
  155. total_rows = accessor.num_rows()
  156. if total_rows == 0:
  157. continue
  158. start = max(0, total_rows - tail_remaining)
  159. sliced_block = accessor.slice(start, total_rows, copy=False)
  160. slice_accessor = BlockAccessor.for_block(sliced_block)
  161. block_rows = list(slice_accessor.iter_rows(public_row_format=True))
  162. tail_parts.append(block_rows)
  163. tail_remaining -= len(block_rows)
  164. if tail_remaining <= 0:
  165. break
  166. for part in reversed(tail_parts):
  167. tail_rows.extend(part)
  168. show_gap = bool(head_rows) and bool(tail_rows)
  169. return head_rows, tail_rows, show_gap
  170. def _determine_preview_row_targets(num_rows: Optional[int]) -> Tuple[int, int]:
  171. """Compute how many head and tail rows to preview."""
  172. max_rows = _DATASET_REPR_MAX_ROWS
  173. if num_rows is None or num_rows <= max_rows:
  174. head = num_rows if num_rows is not None else max_rows
  175. return head, 0
  176. head = min(_DATASET_REPR_HEAD_ROWS, max_rows)
  177. tail = max_rows - head
  178. return head, tail
  179. def _format_rows_for_repr(
  180. rows: List[Dict[str, Any]],
  181. column_names: List[str],
  182. ) -> List[List[str]]:
  183. """Format row dicts into string cell rows for table rendering."""
  184. formatted_rows: List[List[str]] = []
  185. for row in rows:
  186. formatted_row = []
  187. for column in column_names:
  188. value = row.get(column)
  189. formatted_value = _format_value(value)
  190. formatted_row.append(
  191. _truncate_to_cell_width(formatted_value, _DATASET_REPR_MAX_COLUMN_WIDTH)
  192. )
  193. formatted_rows.append(formatted_row)
  194. return formatted_rows
  195. def _format_value(value: Any) -> str:
  196. if isinstance(value, np.generic):
  197. value = value.item()
  198. return str(value).replace("\n", " ").replace("\r", " ")
  199. def _truncate_to_cell_width(value: str, max_width: int) -> str:
  200. """Truncate a single cell to the configured max width."""
  201. if max_width is None:
  202. return value
  203. if max_width <= 0:
  204. return _DATASET_REPR_ELLIPSIS if value else ""
  205. if len(value) <= max_width:
  206. return value
  207. if max_width == 1:
  208. return _DATASET_REPR_ELLIPSIS
  209. return value[: max_width - 1] + _DATASET_REPR_ELLIPSIS
  210. def _compute_column_widths(
  211. headers: List[str],
  212. dtype_headers: List[str],
  213. separator_row: List[str],
  214. data_rows: List[List[str]],
  215. ) -> List[int]:
  216. """Compute per-column widths for table rendering."""
  217. column_widths: List[int] = []
  218. for idx in range(len(headers)):
  219. widths = [
  220. len(headers[idx]),
  221. len(dtype_headers[idx]),
  222. len(separator_row[idx]),
  223. ]
  224. for row in data_rows:
  225. widths.append(len(row[idx]))
  226. column_widths.append(max(widths))
  227. return column_widths
  228. def _render_table_lines(
  229. headers: List[str],
  230. dtype_headers: List[str],
  231. separator_row: List[str],
  232. data_rows: List[List[str]],
  233. column_widths: List[int],
  234. ) -> List[str]:
  235. """Render the full table (borders, headers, data) as lines."""
  236. lines: List[str] = []
  237. top = _render_border("╭", "┬", "╮", "─", column_widths)
  238. header_row = _render_row(headers, column_widths)
  239. separator_line = _render_row(separator_row, column_widths)
  240. dtype_row = _render_row(dtype_headers, column_widths)
  241. lines.extend([top, header_row, separator_line, dtype_row])
  242. if data_rows:
  243. middle = _render_border("╞", "╪", "╡", "═", column_widths)
  244. lines.append(middle)
  245. for row in data_rows:
  246. lines.append(_render_row(row, column_widths))
  247. bottom = _render_border("╰", "┴", "╯", "─", column_widths)
  248. lines.append(bottom)
  249. return lines
  250. def _render_border(
  251. left: str, middle: str, right: str, fill: str, column_widths: List[int]
  252. ) -> str:
  253. """Render a table border line given column widths."""
  254. segments = [fill * (width + 2) for width in column_widths]
  255. return f"{left}{middle.join(segments)}{right}"
  256. def _render_row(values: List[str], column_widths: List[int]) -> str:
  257. """Render a single table row with padding."""
  258. cells = []
  259. for idx, value in enumerate(values):
  260. padded = value.ljust(column_widths[idx])
  261. cells.append(f" {padded} ")
  262. return f"│{'┆'.join(cells)}│"