ray_experimental_perf.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. """This is the script for `ray microbenchmark`."""
  2. import asyncio
  3. import logging
  4. import multiprocessing
  5. import ray
  6. import ray.experimental.channel as ray_channel
  7. from ray._common.utils import (
  8. get_or_create_event_loop,
  9. )
  10. from ray._private.ray_microbenchmark_helpers import asyncio_timeit, timeit
  11. from ray._private.test_utils import get_actor_node_id
  12. from ray.dag import InputNode, MultiOutputNode
  13. from ray.dag.compiled_dag_node import CompiledDAG
  14. from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
  15. logger = logging.getLogger(__name__)
  16. @ray.remote
  17. class DAGActor:
  18. def echo(self, x):
  19. return x
  20. def echo_multiple(self, *x):
  21. return x
  22. def check_optimized_build():
  23. if not ray._raylet.OPTIMIZED:
  24. msg = (
  25. "WARNING: Unoptimized build! "
  26. "To benchmark an optimized build, try:\n"
  27. "\tbazel run -c opt //:gen_ray_pkg\n"
  28. "You can also make this permanent by adding\n"
  29. "\tbuild --compilation_mode=opt\n"
  30. "to your user-wide ~/.bazelrc file. "
  31. "(Do not add this to the project-level .bazelrc file.)"
  32. )
  33. logger.warning(msg)
  34. def create_driver_actor():
  35. return CompiledDAG.DAGDriverProxyActor.options(
  36. scheduling_strategy=NodeAffinitySchedulingStrategy(
  37. ray.get_runtime_context().get_node_id(), soft=False
  38. )
  39. ).remote()
  40. def main(results=None):
  41. results = results or []
  42. loop = get_or_create_event_loop()
  43. check_optimized_build()
  44. print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks")
  45. #################################################
  46. # Perf tests for channels, used in compiled DAGs.
  47. #################################################
  48. ray.init()
  49. def put_channel_small(chans, do_get=False):
  50. for chan in chans:
  51. chan.write(b"0")
  52. if do_get:
  53. chan.read()
  54. @ray.remote
  55. class ChannelReader:
  56. def ready(self):
  57. return
  58. def read(self, chans):
  59. while True:
  60. for chan in chans:
  61. chan.read()
  62. driver_actor = create_driver_actor()
  63. driver_node = get_actor_node_id(driver_actor)
  64. chans = [ray_channel.Channel(None, [(driver_actor, driver_node)], 1000)]
  65. results += timeit(
  66. "[unstable] local put:local get, single channel calls",
  67. lambda: put_channel_small(chans, do_get=True),
  68. )
  69. reader = ChannelReader.remote()
  70. reader_node = get_actor_node_id(reader)
  71. chans = [ray_channel.Channel(None, [(reader, reader_node)], 1000)]
  72. ray.get(reader.ready.remote())
  73. reader.read.remote(chans)
  74. results += timeit(
  75. "[unstable] local put:1 remote get, single channel calls",
  76. lambda: put_channel_small(chans),
  77. )
  78. ray.kill(reader)
  79. n_cpu = multiprocessing.cpu_count() // 2
  80. print(f"Testing multiple readers/channels, n={n_cpu}")
  81. reader_and_node_list = []
  82. for _ in range(n_cpu):
  83. reader = ChannelReader.remote()
  84. reader_node = get_actor_node_id(reader)
  85. reader_and_node_list.append((reader, reader_node))
  86. chans = [ray_channel.Channel(None, reader_and_node_list, 1000)]
  87. ray.get([reader.ready.remote() for reader, _ in reader_and_node_list])
  88. for reader, _ in reader_and_node_list:
  89. reader.read.remote(chans)
  90. results += timeit(
  91. "[unstable] local put:n remote get, single channel calls",
  92. lambda: put_channel_small(chans),
  93. )
  94. for reader, _ in reader_and_node_list:
  95. ray.kill(reader)
  96. reader = ChannelReader.remote()
  97. reader_node = get_actor_node_id(reader)
  98. chans = [
  99. ray_channel.Channel(None, [(reader, reader_node)], 1000) for _ in range(n_cpu)
  100. ]
  101. ray.get(reader.ready.remote())
  102. reader.read.remote(chans)
  103. results += timeit(
  104. "[unstable] local put:1 remote get, n channels calls",
  105. lambda: put_channel_small(chans),
  106. )
  107. ray.kill(reader)
  108. reader_and_node_list = []
  109. for _ in range(n_cpu):
  110. reader = ChannelReader.remote()
  111. reader_node = get_actor_node_id(reader)
  112. reader_and_node_list.append((reader, reader_node))
  113. chans = [
  114. ray_channel.Channel(None, [reader_and_node_list[i]], 1000) for i in range(n_cpu)
  115. ]
  116. ray.get([reader.ready.remote() for reader, _ in reader_and_node_list])
  117. for chan, reader_node_tuple in zip(chans, reader_and_node_list):
  118. reader = reader_node_tuple[0]
  119. reader.read.remote([chan])
  120. results += timeit(
  121. "[unstable] local put:n remote get, n channels calls",
  122. lambda: put_channel_small(chans),
  123. )
  124. for reader, _ in reader_and_node_list:
  125. ray.kill(reader)
  126. # Tests for compiled DAGs.
  127. def _exec(dag, num_args=1, payload_size=1):
  128. output_ref = dag.execute(*[b"x" * payload_size for _ in range(num_args)])
  129. ray.get(output_ref)
  130. async def exec_async(tag):
  131. async def _exec_async():
  132. fut = await compiled_dag.execute_async(b"x")
  133. if not isinstance(fut, list):
  134. await fut
  135. else:
  136. await asyncio.gather(*fut)
  137. return await asyncio_timeit(
  138. tag,
  139. _exec_async,
  140. )
  141. # Single-actor DAG calls
  142. a = DAGActor.remote()
  143. with InputNode() as inp:
  144. dag = a.echo.bind(inp)
  145. results += timeit(
  146. "[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))
  147. )
  148. compiled_dag = dag.experimental_compile()
  149. results += timeit(
  150. "[unstable] compiled single-actor DAG calls", lambda: _exec(compiled_dag)
  151. )
  152. del a
  153. # Single-actor asyncio DAG calls
  154. a = DAGActor.remote()
  155. with InputNode() as inp:
  156. dag = a.echo.bind(inp)
  157. compiled_dag = dag.experimental_compile(enable_asyncio=True)
  158. results += loop.run_until_complete(
  159. exec_async(
  160. "[unstable] compiled single-actor asyncio DAG calls",
  161. )
  162. )
  163. del a
  164. # Scatter-gather DAG calls
  165. n_cpu = multiprocessing.cpu_count() // 2
  166. actors = [DAGActor.remote() for _ in range(n_cpu)]
  167. with InputNode() as inp:
  168. dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
  169. results += timeit(
  170. f"[unstable] scatter-gather DAG calls, n={n_cpu} actors",
  171. lambda: ray.get(dag.execute(b"x")),
  172. )
  173. compiled_dag = dag.experimental_compile()
  174. results += timeit(
  175. f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors",
  176. lambda: _exec(compiled_dag),
  177. )
  178. # Scatter-gather asyncio DAG calls
  179. actors = [DAGActor.remote() for _ in range(n_cpu)]
  180. with InputNode() as inp:
  181. dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
  182. compiled_dag = dag.experimental_compile(enable_asyncio=True)
  183. results += loop.run_until_complete(
  184. exec_async(
  185. f"[unstable] compiled scatter-gather asyncio DAG calls, n={n_cpu} actors",
  186. )
  187. )
  188. # Chain DAG calls
  189. actors = [DAGActor.remote() for _ in range(n_cpu)]
  190. with InputNode() as inp:
  191. dag = inp
  192. for a in actors:
  193. dag = a.echo.bind(dag)
  194. results += timeit(
  195. f"[unstable] chain DAG calls, n={n_cpu} actors",
  196. lambda: ray.get(dag.execute(b"x")),
  197. )
  198. compiled_dag = dag.experimental_compile()
  199. results += timeit(
  200. f"[unstable] compiled chain DAG calls, n={n_cpu} actors",
  201. lambda: _exec(compiled_dag),
  202. )
  203. # Chain asyncio DAG calls
  204. actors = [DAGActor.remote() for _ in range(n_cpu)]
  205. with InputNode() as inp:
  206. dag = inp
  207. for a in actors:
  208. dag = a.echo.bind(dag)
  209. compiled_dag = dag.experimental_compile(enable_asyncio=True)
  210. results += loop.run_until_complete(
  211. exec_async(f"[unstable] compiled chain asyncio DAG calls, n={n_cpu} actors")
  212. )
  213. # Multiple args with small payloads
  214. n_actors = 8
  215. assert (
  216. n_cpu > n_actors
  217. ), f"n_cpu ({n_cpu}) must be greater than n_actors ({n_actors})"
  218. actors = [DAGActor.remote() for _ in range(n_actors)]
  219. with InputNode() as inp:
  220. dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
  221. payload_size = 1
  222. results += timeit(
  223. f"[unstable] multiple args with small payloads DAG calls, n={n_actors} actors",
  224. lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
  225. )
  226. compiled_dag = dag.experimental_compile()
  227. results += timeit(
  228. f"[unstable] compiled multiple args with small payloads DAG calls, "
  229. f"n={n_actors} actors",
  230. lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
  231. )
  232. # Multiple args with medium payloads
  233. actors = [DAGActor.remote() for _ in range(n_actors)]
  234. with InputNode() as inp:
  235. dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
  236. payload_size = 1024 * 1024
  237. results += timeit(
  238. f"[unstable] multiple args with medium payloads DAG calls, n={n_actors} actors",
  239. lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
  240. )
  241. compiled_dag = dag.experimental_compile()
  242. results += timeit(
  243. "[unstable] compiled multiple args with medium payloads DAG calls, "
  244. f"n={n_actors} actors",
  245. lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
  246. )
  247. # Multiple args with large payloads
  248. actors = [DAGActor.remote() for _ in range(n_actors)]
  249. with InputNode() as inp:
  250. dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
  251. payload_size = 10 * 1024 * 1024
  252. results += timeit(
  253. f"[unstable] multiple args with large payloads DAG calls, n={n_actors} actors",
  254. lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
  255. )
  256. compiled_dag = dag.experimental_compile()
  257. results += timeit(
  258. "[unstable] compiled multiple args with large payloads DAG calls, "
  259. f"n={n_actors} actors",
  260. lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
  261. )
  262. # Worst case for multiple arguments: a single actor takes all the arguments
  263. # with small payloads.
  264. actor = DAGActor.remote()
  265. n_args = 8
  266. with InputNode() as inp:
  267. dag = actor.echo_multiple.bind(*[inp[i] for i in range(n_args)])
  268. payload_size = 1
  269. results += timeit(
  270. "[unstable] single-actor with all args with small payloads DAG calls, "
  271. "n=1 actors",
  272. lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_args)])),
  273. )
  274. compiled_dag = dag.experimental_compile()
  275. results += timeit(
  276. "[unstable] single-actor with all args with small payloads DAG calls, "
  277. "n=1 actors",
  278. lambda: _exec(compiled_dag, num_args=n_args, payload_size=payload_size),
  279. )
  280. ray.shutdown()
  281. return results
  282. if __name__ == "__main__":
  283. main()