util.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from concurrent.futures import ThreadPoolExecutor
  2. from typing import TYPE_CHECKING, Any, List
  3. import ray
  4. from ray.data.block import BlockAccessor, CallableClass
  5. if TYPE_CHECKING:
  6. from ray.data._internal.execution.interfaces import RefBundle
  7. def make_ref_bundles(simple_data: List[List[Any]]) -> List["RefBundle"]:
  8. """Create ref bundles from a list of block data.
  9. One bundle is created for each input block.
  10. """
  11. import pandas as pd
  12. import pyarrow as pa
  13. from ray.data._internal.execution.interfaces import RefBundle
  14. output = []
  15. for block in simple_data:
  16. block = pd.DataFrame({"id": block})
  17. output.append(
  18. RefBundle(
  19. [
  20. (
  21. ray.put(block),
  22. BlockAccessor.for_block(block).get_metadata(),
  23. )
  24. ],
  25. owns_blocks=True,
  26. schema=pa.lib.Schema.from_pandas(block, preserve_index=False),
  27. )
  28. )
  29. return output
  30. memory_units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
  31. def memory_string(num_bytes: float) -> str:
  32. """Return a human-readable memory string for the given amount of bytes."""
  33. k = 0
  34. while num_bytes >= 1024 and k < len(memory_units) - 1:
  35. num_bytes /= 1024
  36. k += 1
  37. return f"{num_bytes:.1f}{memory_units[k]}"
  38. def locality_string(locality_hits: int, locality_misses) -> str:
  39. """Return a human-readable string for object locality stats."""
  40. if not locality_misses:
  41. return "[all objects local]"
  42. return f"[{locality_hits}/{locality_hits + locality_misses} objects local]"
  43. def make_callable_class_single_threaded(callable_cls: CallableClass) -> CallableClass:
  44. """Returns a thread-safe CallableClass with the same logic as the provided
  45. `callable_cls`.
  46. This function allows the usage of concurrent actors by safeguarding user logic
  47. behind a separate thread.
  48. This allows batch slicing and formatting to occur concurrently, to overlap with the
  49. user provided UDF.
  50. """
  51. class _SingleThreadedWrapper(callable_cls):
  52. def __init__(self, *args, **kwargs):
  53. self.thread_pool_executor = ThreadPoolExecutor(max_workers=1)
  54. super().__init__(*args, **kwargs)
  55. def __repr__(self):
  56. return super().__repr__()
  57. def __call__(self, *args, **kwargs):
  58. # ThreadPoolExecutor will reuse the same thread for every submit call.
  59. future = self.thread_pool_executor.submit(super().__call__, *args, **kwargs)
  60. return future.result()
  61. return _SingleThreadedWrapper