internal_api.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import warnings
  2. from typing import List, Tuple
  3. import ray
  4. import ray._private.profiling as profiling
  5. import ray._private.services as services
  6. import ray._private.worker
  7. from ray._common.network_utils import build_address
  8. from ray._private.state import GlobalState
  9. from ray._raylet import GcsClientOptions
  10. from ray.core.generated import common_pb2
  11. __all__ = ["free", "global_gc"]
  12. MAX_MESSAGE_LENGTH = ray._config.max_grpc_message_size()
  13. def global_gc():
  14. """Trigger gc.collect() on all workers in the cluster."""
  15. worker = ray._private.worker.global_worker
  16. worker.core_worker.global_gc()
  17. def get_state_from_address(address=None):
  18. address = services.canonicalize_bootstrap_address_or_die(address)
  19. state = GlobalState()
  20. options = GcsClientOptions.create(
  21. address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
  22. )
  23. state._initialize_global_state(options)
  24. return state
  25. def memory_summary(
  26. address=None,
  27. group_by="NODE_ADDRESS",
  28. sort_by="OBJECT_SIZE",
  29. units="B",
  30. line_wrap=True,
  31. stats_only=False,
  32. num_entries=None,
  33. ):
  34. from ray.dashboard.memory_utils import memory_summary
  35. state = get_state_from_address(address)
  36. reply = get_memory_info_reply(state)
  37. if stats_only:
  38. return store_stats_summary(reply)
  39. return memory_summary(
  40. state, group_by, sort_by, line_wrap, units, num_entries
  41. ) + store_stats_summary(reply)
  42. def get_memory_info_reply(state, node_manager_address=None, node_manager_port=None):
  43. """Returns global memory info."""
  44. from ray._private.grpc_utils import init_grpc_channel
  45. from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
  46. # We can ask any Raylet for the global memory info, that Raylet internally
  47. # asks all nodes in the cluster for memory stats.
  48. if node_manager_address is None or node_manager_port is None:
  49. # We should ask for a raylet that is alive.
  50. raylet = None
  51. for node in state.node_table():
  52. if node["Alive"]:
  53. raylet = node
  54. break
  55. assert raylet is not None, "Every raylet is dead"
  56. raylet_address = build_address(
  57. raylet["NodeManagerAddress"], raylet["NodeManagerPort"]
  58. )
  59. else:
  60. raylet_address = build_address(node_manager_address, node_manager_port)
  61. channel = init_grpc_channel(
  62. raylet_address,
  63. options=[
  64. ("grpc.max_send_message_length", MAX_MESSAGE_LENGTH),
  65. ("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
  66. ],
  67. )
  68. stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
  69. reply = stub.FormatGlobalMemoryInfo(
  70. node_manager_pb2.FormatGlobalMemoryInfoRequest(include_memory_info=False),
  71. timeout=60.0,
  72. )
  73. return reply
  74. def node_stats(
  75. node_manager_address=None, node_manager_port=None, include_memory_info=True
  76. ):
  77. """Returns NodeStats object describing memory usage in the cluster."""
  78. from ray._private.grpc_utils import init_grpc_channel
  79. from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
  80. # We can ask any Raylet for the global memory info.
  81. assert node_manager_address is not None and node_manager_port is not None
  82. raylet_address = build_address(node_manager_address, node_manager_port)
  83. channel = init_grpc_channel(
  84. raylet_address,
  85. options=[
  86. ("grpc.max_send_message_length", MAX_MESSAGE_LENGTH),
  87. ("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
  88. ],
  89. )
  90. stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
  91. node_stats = stub.GetNodeStats(
  92. node_manager_pb2.GetNodeStatsRequest(include_memory_info=include_memory_info),
  93. timeout=30.0,
  94. )
  95. return node_stats
  96. def store_stats_summary(reply):
  97. """Returns formatted string describing object store stats in all nodes."""
  98. store_summary = "--- Aggregate object store stats across all nodes ---\n"
  99. # TODO(ekl) it would be nice if we could provide a full memory usage
  100. # breakdown by type (e.g., pinned by worker, primary, etc.)
  101. store_summary += (
  102. "Plasma memory usage {} MiB, {} objects, {}% full, {}% "
  103. "needed\n".format(
  104. int(reply.store_stats.object_store_bytes_used / (1024 * 1024)),
  105. reply.store_stats.num_local_objects,
  106. round(
  107. 100
  108. * reply.store_stats.object_store_bytes_used
  109. / reply.store_stats.object_store_bytes_avail,
  110. 2,
  111. ),
  112. round(
  113. 100
  114. * reply.store_stats.object_store_bytes_primary_copy
  115. / reply.store_stats.object_store_bytes_avail,
  116. 2,
  117. ),
  118. )
  119. )
  120. if reply.store_stats.object_store_bytes_fallback > 0:
  121. store_summary += "Plasma filesystem mmap usage: {} MiB\n".format(
  122. int(reply.store_stats.object_store_bytes_fallback / (1024 * 1024))
  123. )
  124. if reply.store_stats.spill_time_total_s > 0:
  125. store_summary += (
  126. "Spilled {} MiB, {} objects, avg write throughput {} MiB/s\n".format(
  127. int(reply.store_stats.spilled_bytes_total / (1024 * 1024)),
  128. reply.store_stats.spilled_objects_total,
  129. int(
  130. reply.store_stats.spilled_bytes_total
  131. / (1024 * 1024)
  132. / reply.store_stats.spill_time_total_s
  133. ),
  134. )
  135. )
  136. if reply.store_stats.restore_time_total_s > 0:
  137. store_summary += (
  138. "Restored {} MiB, {} objects, avg read throughput {} MiB/s\n".format(
  139. int(reply.store_stats.restored_bytes_total / (1024 * 1024)),
  140. reply.store_stats.restored_objects_total,
  141. int(
  142. reply.store_stats.restored_bytes_total
  143. / (1024 * 1024)
  144. / reply.store_stats.restore_time_total_s
  145. ),
  146. )
  147. )
  148. if reply.store_stats.object_pulls_queued:
  149. store_summary += "Object fetches queued, waiting for available memory."
  150. return store_summary
  151. def free(object_refs: list, local_only: bool = False):
  152. """
  153. DeprecationWarning: `free` is a deprecated API and will be
  154. removed in a future version of Ray. If you have a use case
  155. for this API, please open an issue on GitHub.
  156. Free a list of IDs from the in-process and plasma object stores.
  157. This function is a low-level API which should be used in restricted
  158. scenarios.
  159. If local_only is false, the request will be send to all object stores.
  160. This method will not return any value to indicate whether the deletion is
  161. successful or not. This function is an instruction to the object store. If
  162. some of the objects are in use, the object stores will delete them later
  163. when the ref count is down to 0.
  164. Examples:
  165. .. testcode::
  166. import ray
  167. @ray.remote
  168. def f():
  169. return 0
  170. obj_ref = f.remote()
  171. ray.get(obj_ref) # wait for object to be created first
  172. free([obj_ref]) # unpin & delete object globally
  173. Args:
  174. object_refs (List[ObjectRef]): List of object refs to delete.
  175. local_only: Whether only deleting the list of objects in local
  176. object store or all object stores.
  177. """
  178. warnings.warn(
  179. "`free` is a deprecated API and will be removed in a future version of Ray. "
  180. "If you have a use case for this API, please open an issue on GitHub.",
  181. DeprecationWarning,
  182. )
  183. worker = ray._private.worker.global_worker
  184. if isinstance(object_refs, ray.ObjectRef):
  185. object_refs = [object_refs]
  186. if not isinstance(object_refs, list):
  187. raise TypeError(
  188. "free() expects a list of ObjectRef, got {}".format(type(object_refs))
  189. )
  190. # Make sure that the values are object refs.
  191. for object_ref in object_refs:
  192. if not isinstance(object_ref, ray.ObjectRef):
  193. raise TypeError(
  194. "Attempting to call `free` on the value {}, "
  195. "which is not an ray.ObjectRef.".format(object_ref)
  196. )
  197. worker.check_connected()
  198. with profiling.profile("ray.free"):
  199. if len(object_refs) == 0:
  200. return
  201. worker.core_worker.free_objects(object_refs, local_only)
  202. def get_local_ongoing_lineage_reconstruction_tasks() -> List[
  203. Tuple[common_pb2.LineageReconstructionTask, int]
  204. ]:
  205. """Return the locally submitted ongoing retry tasks
  206. triggered by lineage reconstruction.
  207. NOTE: for the lineage reconstruction task status,
  208. this method only returns the status known to the submitter
  209. (i.e. it returns SUBMITTED_TO_WORKER instead of RUNNING).
  210. The return type is a list of pairs where pair.first is the
  211. lineage reconstruction task info and pair.second is the number
  212. of ongoing lineage reconstruction tasks of this type.
  213. """
  214. worker = ray._private.worker.global_worker
  215. worker.check_connected()
  216. return worker.core_worker.get_local_ongoing_lineage_reconstruction_tasks()