ray_perf.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. """This is the script for `ray microbenchmark`."""
  2. import asyncio
  3. import logging
  4. import multiprocessing
  5. import numpy as np
  6. import ray
  7. from ray._private.ray_client_microbenchmark import main as client_microbenchmark_main
  8. from ray._private.ray_microbenchmark_helpers import timeit
  9. logger = logging.getLogger(__name__)
  10. @ray.remote(num_cpus=0)
  11. class Actor:
  12. def small_value(self):
  13. return b"ok"
  14. def small_value_arg(self, x):
  15. return b"ok"
  16. def small_value_batch(self, n):
  17. ray.get([small_value.remote() for _ in range(n)])
  18. @ray.remote
  19. class AsyncActor:
  20. async def small_value(self):
  21. return b"ok"
  22. async def small_value_with_arg(self, x):
  23. return b"ok"
  24. async def small_value_batch(self, n):
  25. await asyncio.wait([small_value.remote() for _ in range(n)])
  26. @ray.remote(num_cpus=0)
  27. class Client:
  28. def __init__(self, servers):
  29. if not isinstance(servers, list):
  30. servers = [servers]
  31. self.servers = servers
  32. def small_value_batch(self, n):
  33. results = []
  34. for s in self.servers:
  35. results.extend([s.small_value.remote() for _ in range(n)])
  36. ray.get(results)
  37. def small_value_batch_arg(self, n):
  38. x = ray.put(0)
  39. results = []
  40. for s in self.servers:
  41. results.extend([s.small_value_arg.remote(x) for _ in range(n)])
  42. ray.get(results)
  43. @ray.remote
  44. def small_value():
  45. return b"ok"
  46. @ray.remote
  47. def small_value_batch(n):
  48. submitted = [small_value.remote() for _ in range(n)]
  49. ray.get(submitted)
  50. return 0
  51. @ray.remote
  52. def create_object_containing_ref():
  53. obj_refs = []
  54. for _ in range(10000):
  55. obj_refs.append(ray.put(1))
  56. return obj_refs
  57. def check_optimized_build():
  58. if not ray._raylet.OPTIMIZED:
  59. msg = (
  60. "WARNING: Unoptimized build! "
  61. "To benchmark an optimized build, try:\n"
  62. "\tbazel run -c opt //:gen_ray_pkg\n"
  63. "You can also make this permanent by adding\n"
  64. "\tbuild --compilation_mode=opt\n"
  65. "to your user-wide ~/.bazelrc file. "
  66. "(Do not add this to the project-level .bazelrc file.)"
  67. )
  68. logger.warning(msg)
  69. def main(results=None):
  70. results = results or []
  71. check_optimized_build()
  72. print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks")
  73. ray.init()
  74. value = ray.put(0)
  75. def get_small():
  76. ray.get(value)
  77. def put_small():
  78. ray.put(0)
  79. @ray.remote
  80. def do_put_small():
  81. for _ in range(100):
  82. ray.put(0)
  83. def put_multi_small():
  84. ray.get([do_put_small.remote() for _ in range(10)])
  85. arr = np.zeros(100 * 1024 * 1024, dtype=np.int64)
  86. results += timeit("single client get calls (Plasma Store)", get_small)
  87. results += timeit("single client put calls (Plasma Store)", put_small)
  88. results += timeit("multi client put calls (Plasma Store)", put_multi_small, 1000)
  89. def put_large():
  90. ray.put(arr)
  91. results += timeit("single client put gigabytes", put_large, 8 * 0.1)
  92. def small_value_batch():
  93. submitted = [small_value.remote() for _ in range(1000)]
  94. ray.get(submitted)
  95. return 0
  96. results += timeit("single client tasks and get batch", small_value_batch)
  97. @ray.remote
  98. def do_put():
  99. for _ in range(10):
  100. ray.put(np.zeros(10 * 1024 * 1024, dtype=np.int64))
  101. def put_multi():
  102. ray.get([do_put.remote() for _ in range(10)])
  103. results += timeit("multi client put gigabytes", put_multi, 10 * 8 * 0.1)
  104. obj_containing_ref = create_object_containing_ref.remote()
  105. def get_containing_object_ref():
  106. ray.get(obj_containing_ref)
  107. results += timeit(
  108. "single client get object containing 10k refs", get_containing_object_ref
  109. )
  110. def wait_multiple_refs():
  111. num_objs = 1000
  112. not_ready = [small_value.remote() for _ in range(num_objs)]
  113. # We only need to trigger the fetch_local once for each object,
  114. # raylet will persist these fetch requests even after ray.wait returns.
  115. # See https://github.com/ray-project/ray/issues/30375.
  116. fetch_local = True
  117. for _ in range(num_objs):
  118. _ready, not_ready = ray.wait(not_ready, fetch_local=fetch_local)
  119. if fetch_local:
  120. fetch_local = False
  121. results += timeit("single client wait 1k refs", wait_multiple_refs)
  122. def small_task():
  123. ray.get(small_value.remote())
  124. results += timeit("single client tasks sync", small_task)
  125. def small_task_async():
  126. ray.get([small_value.remote() for _ in range(1000)])
  127. results += timeit("single client tasks async", small_task_async, 1000)
  128. n = 10000
  129. m = 4
  130. actors = [Actor.remote() for _ in range(m)]
  131. def multi_task():
  132. submitted = [a.small_value_batch.remote(n) for a in actors]
  133. ray.get(submitted)
  134. results += timeit("multi client tasks async", multi_task, n * m)
  135. a = Actor.remote()
  136. def actor_sync():
  137. ray.get(a.small_value.remote())
  138. results += timeit("1:1 actor calls sync", actor_sync)
  139. a = Actor.remote()
  140. def actor_async():
  141. ray.get([a.small_value.remote() for _ in range(1000)])
  142. results += timeit("1:1 actor calls async", actor_async, 1000)
  143. a = Actor.options(max_concurrency=16).remote()
  144. def actor_concurrent():
  145. ray.get([a.small_value.remote() for _ in range(1000)])
  146. results += timeit("1:1 actor calls concurrent", actor_concurrent, 1000)
  147. n = 5000
  148. n_cpu = multiprocessing.cpu_count() // 2
  149. actors = [Actor._remote() for _ in range(n_cpu)]
  150. client = Client.remote(actors)
  151. def actor_async_direct():
  152. ray.get(client.small_value_batch.remote(n))
  153. results += timeit("1:n actor calls async", actor_async_direct, n * len(actors))
  154. n_cpu = multiprocessing.cpu_count() // 2
  155. a = [Actor.remote() for _ in range(n_cpu)]
  156. @ray.remote
  157. def work(actors):
  158. ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)])
  159. def actor_multi2():
  160. ray.get([work.remote(a) for _ in range(m)])
  161. results += timeit("n:n actor calls async", actor_multi2, m * n)
  162. n = 1000
  163. actors = [Actor._remote() for _ in range(n_cpu)]
  164. clients = [Client.remote(a) for a in actors]
  165. def actor_multi2_direct_arg():
  166. ray.get([c.small_value_batch_arg.remote(n) for c in clients])
  167. results += timeit(
  168. "n:n actor calls with arg async", actor_multi2_direct_arg, n * len(clients)
  169. )
  170. a = AsyncActor.remote()
  171. def actor_sync():
  172. ray.get(a.small_value.remote())
  173. results += timeit("1:1 async-actor calls sync", actor_sync)
  174. a = AsyncActor.remote()
  175. def async_actor():
  176. ray.get([a.small_value.remote() for _ in range(1000)])
  177. results += timeit("1:1 async-actor calls async", async_actor, 1000)
  178. a = AsyncActor.remote()
  179. def async_actor():
  180. ray.get([a.small_value_with_arg.remote(i) for i in range(1000)])
  181. results += timeit("1:1 async-actor calls with args async", async_actor, 1000)
  182. n = 5000
  183. n_cpu = multiprocessing.cpu_count() // 2
  184. actors = [AsyncActor.remote() for _ in range(n_cpu)]
  185. client = Client.remote(actors)
  186. def async_actor_async():
  187. ray.get(client.small_value_batch.remote(n))
  188. results += timeit("1:n async-actor calls async", async_actor_async, n * len(actors))
  189. n = 5000
  190. m = 4
  191. n_cpu = multiprocessing.cpu_count() // 2
  192. a = [AsyncActor.remote() for _ in range(n_cpu)]
  193. @ray.remote
  194. def async_actor_work(actors):
  195. ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)])
  196. def async_actor_multi():
  197. ray.get([async_actor_work.remote(a) for _ in range(m)])
  198. results += timeit("n:n async-actor calls async", async_actor_multi, m * n)
  199. ray.shutdown()
  200. ############################
  201. # End of channel perf tests.
  202. ############################
  203. NUM_PGS = 100
  204. NUM_BUNDLES = 1
  205. ray.init(resources={"custom": 100})
  206. def placement_group_create_removal(num_pgs):
  207. pgs = [
  208. ray.util.placement_group(
  209. bundles=[{"custom": 0.001} for _ in range(NUM_BUNDLES)]
  210. )
  211. for _ in range(num_pgs)
  212. ]
  213. [pg.wait(timeout_seconds=30) for pg in pgs]
  214. # Include placement group removal here to clean up.
  215. # If we don't clean up placement groups, the whole performance
  216. # gets slower as it runs more.
  217. # Since timeit function runs multiple times without
  218. # the cleaning logic, we should have this method here.
  219. for pg in pgs:
  220. ray.util.remove_placement_group(pg)
  221. results += timeit(
  222. "placement group create/removal",
  223. lambda: placement_group_create_removal(NUM_PGS),
  224. NUM_PGS,
  225. )
  226. ray.shutdown()
  227. client_microbenchmark_main(results)
  228. return results
  229. if __name__ == "__main__":
  230. main()