| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- """This is the script for `ray microbenchmark`."""
- import asyncio
- import logging
- import multiprocessing
- import ray
- import ray.experimental.channel as ray_channel
- from ray._common.utils import (
- get_or_create_event_loop,
- )
- from ray._private.ray_microbenchmark_helpers import asyncio_timeit, timeit
- from ray._private.test_utils import get_actor_node_id
- from ray.dag import InputNode, MultiOutputNode
- from ray.dag.compiled_dag_node import CompiledDAG
- from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
- logger = logging.getLogger(__name__)
- @ray.remote
- class DAGActor:
- def echo(self, x):
- return x
- def echo_multiple(self, *x):
- return x
- def check_optimized_build():
- if not ray._raylet.OPTIMIZED:
- msg = (
- "WARNING: Unoptimized build! "
- "To benchmark an optimized build, try:\n"
- "\tbazel run -c opt //:gen_ray_pkg\n"
- "You can also make this permanent by adding\n"
- "\tbuild --compilation_mode=opt\n"
- "to your user-wide ~/.bazelrc file. "
- "(Do not add this to the project-level .bazelrc file.)"
- )
- logger.warning(msg)
- def create_driver_actor():
- return CompiledDAG.DAGDriverProxyActor.options(
- scheduling_strategy=NodeAffinitySchedulingStrategy(
- ray.get_runtime_context().get_node_id(), soft=False
- )
- ).remote()
- def main(results=None):
- results = results or []
- loop = get_or_create_event_loop()
- check_optimized_build()
- print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks")
- #################################################
- # Perf tests for channels, used in compiled DAGs.
- #################################################
- ray.init()
- def put_channel_small(chans, do_get=False):
- for chan in chans:
- chan.write(b"0")
- if do_get:
- chan.read()
- @ray.remote
- class ChannelReader:
- def ready(self):
- return
- def read(self, chans):
- while True:
- for chan in chans:
- chan.read()
- driver_actor = create_driver_actor()
- driver_node = get_actor_node_id(driver_actor)
- chans = [ray_channel.Channel(None, [(driver_actor, driver_node)], 1000)]
- results += timeit(
- "[unstable] local put:local get, single channel calls",
- lambda: put_channel_small(chans, do_get=True),
- )
- reader = ChannelReader.remote()
- reader_node = get_actor_node_id(reader)
- chans = [ray_channel.Channel(None, [(reader, reader_node)], 1000)]
- ray.get(reader.ready.remote())
- reader.read.remote(chans)
- results += timeit(
- "[unstable] local put:1 remote get, single channel calls",
- lambda: put_channel_small(chans),
- )
- ray.kill(reader)
- n_cpu = multiprocessing.cpu_count() // 2
- print(f"Testing multiple readers/channels, n={n_cpu}")
- reader_and_node_list = []
- for _ in range(n_cpu):
- reader = ChannelReader.remote()
- reader_node = get_actor_node_id(reader)
- reader_and_node_list.append((reader, reader_node))
- chans = [ray_channel.Channel(None, reader_and_node_list, 1000)]
- ray.get([reader.ready.remote() for reader, _ in reader_and_node_list])
- for reader, _ in reader_and_node_list:
- reader.read.remote(chans)
- results += timeit(
- "[unstable] local put:n remote get, single channel calls",
- lambda: put_channel_small(chans),
- )
- for reader, _ in reader_and_node_list:
- ray.kill(reader)
- reader = ChannelReader.remote()
- reader_node = get_actor_node_id(reader)
- chans = [
- ray_channel.Channel(None, [(reader, reader_node)], 1000) for _ in range(n_cpu)
- ]
- ray.get(reader.ready.remote())
- reader.read.remote(chans)
- results += timeit(
- "[unstable] local put:1 remote get, n channels calls",
- lambda: put_channel_small(chans),
- )
- ray.kill(reader)
- reader_and_node_list = []
- for _ in range(n_cpu):
- reader = ChannelReader.remote()
- reader_node = get_actor_node_id(reader)
- reader_and_node_list.append((reader, reader_node))
- chans = [
- ray_channel.Channel(None, [reader_and_node_list[i]], 1000) for i in range(n_cpu)
- ]
- ray.get([reader.ready.remote() for reader, _ in reader_and_node_list])
- for chan, reader_node_tuple in zip(chans, reader_and_node_list):
- reader = reader_node_tuple[0]
- reader.read.remote([chan])
- results += timeit(
- "[unstable] local put:n remote get, n channels calls",
- lambda: put_channel_small(chans),
- )
- for reader, _ in reader_and_node_list:
- ray.kill(reader)
- # Tests for compiled DAGs.
- def _exec(dag, num_args=1, payload_size=1):
- output_ref = dag.execute(*[b"x" * payload_size for _ in range(num_args)])
- ray.get(output_ref)
- async def exec_async(tag):
- async def _exec_async():
- fut = await compiled_dag.execute_async(b"x")
- if not isinstance(fut, list):
- await fut
- else:
- await asyncio.gather(*fut)
- return await asyncio_timeit(
- tag,
- _exec_async,
- )
- # Single-actor DAG calls
- a = DAGActor.remote()
- with InputNode() as inp:
- dag = a.echo.bind(inp)
- results += timeit(
- "[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- "[unstable] compiled single-actor DAG calls", lambda: _exec(compiled_dag)
- )
- del a
- # Single-actor asyncio DAG calls
- a = DAGActor.remote()
- with InputNode() as inp:
- dag = a.echo.bind(inp)
- compiled_dag = dag.experimental_compile(enable_asyncio=True)
- results += loop.run_until_complete(
- exec_async(
- "[unstable] compiled single-actor asyncio DAG calls",
- )
- )
- del a
- # Scatter-gather DAG calls
- n_cpu = multiprocessing.cpu_count() // 2
- actors = [DAGActor.remote() for _ in range(n_cpu)]
- with InputNode() as inp:
- dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
- results += timeit(
- f"[unstable] scatter-gather DAG calls, n={n_cpu} actors",
- lambda: ray.get(dag.execute(b"x")),
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors",
- lambda: _exec(compiled_dag),
- )
- # Scatter-gather asyncio DAG calls
- actors = [DAGActor.remote() for _ in range(n_cpu)]
- with InputNode() as inp:
- dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
- compiled_dag = dag.experimental_compile(enable_asyncio=True)
- results += loop.run_until_complete(
- exec_async(
- f"[unstable] compiled scatter-gather asyncio DAG calls, n={n_cpu} actors",
- )
- )
- # Chain DAG calls
- actors = [DAGActor.remote() for _ in range(n_cpu)]
- with InputNode() as inp:
- dag = inp
- for a in actors:
- dag = a.echo.bind(dag)
- results += timeit(
- f"[unstable] chain DAG calls, n={n_cpu} actors",
- lambda: ray.get(dag.execute(b"x")),
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- f"[unstable] compiled chain DAG calls, n={n_cpu} actors",
- lambda: _exec(compiled_dag),
- )
- # Chain asyncio DAG calls
- actors = [DAGActor.remote() for _ in range(n_cpu)]
- with InputNode() as inp:
- dag = inp
- for a in actors:
- dag = a.echo.bind(dag)
- compiled_dag = dag.experimental_compile(enable_asyncio=True)
- results += loop.run_until_complete(
- exec_async(f"[unstable] compiled chain asyncio DAG calls, n={n_cpu} actors")
- )
- # Multiple args with small payloads
- n_actors = 8
- assert (
- n_cpu > n_actors
- ), f"n_cpu ({n_cpu}) must be greater than n_actors ({n_actors})"
- actors = [DAGActor.remote() for _ in range(n_actors)]
- with InputNode() as inp:
- dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
- payload_size = 1
- results += timeit(
- f"[unstable] multiple args with small payloads DAG calls, n={n_actors} actors",
- lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- f"[unstable] compiled multiple args with small payloads DAG calls, "
- f"n={n_actors} actors",
- lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
- )
- # Multiple args with medium payloads
- actors = [DAGActor.remote() for _ in range(n_actors)]
- with InputNode() as inp:
- dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
- payload_size = 1024 * 1024
- results += timeit(
- f"[unstable] multiple args with medium payloads DAG calls, n={n_actors} actors",
- lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- "[unstable] compiled multiple args with medium payloads DAG calls, "
- f"n={n_actors} actors",
- lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
- )
- # Multiple args with large payloads
- actors = [DAGActor.remote() for _ in range(n_actors)]
- with InputNode() as inp:
- dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
- payload_size = 10 * 1024 * 1024
- results += timeit(
- f"[unstable] multiple args with large payloads DAG calls, n={n_actors} actors",
- lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- "[unstable] compiled multiple args with large payloads DAG calls, "
- f"n={n_actors} actors",
- lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
- )
- # Worst case for multiple arguments: a single actor takes all the arguments
- # with small payloads.
- actor = DAGActor.remote()
- n_args = 8
- with InputNode() as inp:
- dag = actor.echo_multiple.bind(*[inp[i] for i in range(n_args)])
- payload_size = 1
- results += timeit(
- "[unstable] single-actor with all args with small payloads DAG calls, "
- "n=1 actors",
- lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_args)])),
- )
- compiled_dag = dag.experimental_compile()
- results += timeit(
- "[unstable] single-actor with all args with small payloads DAG calls, "
- "n=1 actors",
- lambda: _exec(compiled_dag, num_args=n_args, payload_size=payload_size),
- )
- ray.shutdown()
- return results
- if __name__ == "__main__":
- main()
|