"""This is the script for `ray microbenchmark`.""" import asyncio import logging import multiprocessing import numpy as np import ray from ray._private.ray_client_microbenchmark import main as client_microbenchmark_main from ray._private.ray_microbenchmark_helpers import timeit logger = logging.getLogger(__name__) @ray.remote(num_cpus=0) class Actor: def small_value(self): return b"ok" def small_value_arg(self, x): return b"ok" def small_value_batch(self, n): ray.get([small_value.remote() for _ in range(n)]) @ray.remote class AsyncActor: async def small_value(self): return b"ok" async def small_value_with_arg(self, x): return b"ok" async def small_value_batch(self, n): await asyncio.wait([small_value.remote() for _ in range(n)]) @ray.remote(num_cpus=0) class Client: def __init__(self, servers): if not isinstance(servers, list): servers = [servers] self.servers = servers def small_value_batch(self, n): results = [] for s in self.servers: results.extend([s.small_value.remote() for _ in range(n)]) ray.get(results) def small_value_batch_arg(self, n): x = ray.put(0) results = [] for s in self.servers: results.extend([s.small_value_arg.remote(x) for _ in range(n)]) ray.get(results) @ray.remote def small_value(): return b"ok" @ray.remote def small_value_batch(n): submitted = [small_value.remote() for _ in range(n)] ray.get(submitted) return 0 @ray.remote def create_object_containing_ref(): obj_refs = [] for _ in range(10000): obj_refs.append(ray.put(1)) return obj_refs def check_optimized_build(): if not ray._raylet.OPTIMIZED: msg = ( "WARNING: Unoptimized build! " "To benchmark an optimized build, try:\n" "\tbazel run -c opt //:gen_ray_pkg\n" "You can also make this permanent by adding\n" "\tbuild --compilation_mode=opt\n" "to your user-wide ~/.bazelrc file. " "(Do not add this to the project-level .bazelrc file.)" ) logger.warning(msg) def main(results=None): results = results or [] check_optimized_build() print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks") ray.init() value = ray.put(0) def get_small(): ray.get(value) def put_small(): ray.put(0) @ray.remote def do_put_small(): for _ in range(100): ray.put(0) def put_multi_small(): ray.get([do_put_small.remote() for _ in range(10)]) arr = np.zeros(100 * 1024 * 1024, dtype=np.int64) results += timeit("single client get calls (Plasma Store)", get_small) results += timeit("single client put calls (Plasma Store)", put_small) results += timeit("multi client put calls (Plasma Store)", put_multi_small, 1000) def put_large(): ray.put(arr) results += timeit("single client put gigabytes", put_large, 8 * 0.1) def small_value_batch(): submitted = [small_value.remote() for _ in range(1000)] ray.get(submitted) return 0 results += timeit("single client tasks and get batch", small_value_batch) @ray.remote def do_put(): for _ in range(10): ray.put(np.zeros(10 * 1024 * 1024, dtype=np.int64)) def put_multi(): ray.get([do_put.remote() for _ in range(10)]) results += timeit("multi client put gigabytes", put_multi, 10 * 8 * 0.1) obj_containing_ref = create_object_containing_ref.remote() def get_containing_object_ref(): ray.get(obj_containing_ref) results += timeit( "single client get object containing 10k refs", get_containing_object_ref ) def wait_multiple_refs(): num_objs = 1000 not_ready = [small_value.remote() for _ in range(num_objs)] # We only need to trigger the fetch_local once for each object, # raylet will persist these fetch requests even after ray.wait returns. # See https://github.com/ray-project/ray/issues/30375. fetch_local = True for _ in range(num_objs): _ready, not_ready = ray.wait(not_ready, fetch_local=fetch_local) if fetch_local: fetch_local = False results += timeit("single client wait 1k refs", wait_multiple_refs) def small_task(): ray.get(small_value.remote()) results += timeit("single client tasks sync", small_task) def small_task_async(): ray.get([small_value.remote() for _ in range(1000)]) results += timeit("single client tasks async", small_task_async, 1000) n = 10000 m = 4 actors = [Actor.remote() for _ in range(m)] def multi_task(): submitted = [a.small_value_batch.remote(n) for a in actors] ray.get(submitted) results += timeit("multi client tasks async", multi_task, n * m) a = Actor.remote() def actor_sync(): ray.get(a.small_value.remote()) results += timeit("1:1 actor calls sync", actor_sync) a = Actor.remote() def actor_async(): ray.get([a.small_value.remote() for _ in range(1000)]) results += timeit("1:1 actor calls async", actor_async, 1000) a = Actor.options(max_concurrency=16).remote() def actor_concurrent(): ray.get([a.small_value.remote() for _ in range(1000)]) results += timeit("1:1 actor calls concurrent", actor_concurrent, 1000) n = 5000 n_cpu = multiprocessing.cpu_count() // 2 actors = [Actor._remote() for _ in range(n_cpu)] client = Client.remote(actors) def actor_async_direct(): ray.get(client.small_value_batch.remote(n)) results += timeit("1:n actor calls async", actor_async_direct, n * len(actors)) n_cpu = multiprocessing.cpu_count() // 2 a = [Actor.remote() for _ in range(n_cpu)] @ray.remote def work(actors): ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)]) def actor_multi2(): ray.get([work.remote(a) for _ in range(m)]) results += timeit("n:n actor calls async", actor_multi2, m * n) n = 1000 actors = [Actor._remote() for _ in range(n_cpu)] clients = [Client.remote(a) for a in actors] def actor_multi2_direct_arg(): ray.get([c.small_value_batch_arg.remote(n) for c in clients]) results += timeit( "n:n actor calls with arg async", actor_multi2_direct_arg, n * len(clients) ) a = AsyncActor.remote() def actor_sync(): ray.get(a.small_value.remote()) results += timeit("1:1 async-actor calls sync", actor_sync) a = AsyncActor.remote() def async_actor(): ray.get([a.small_value.remote() for _ in range(1000)]) results += timeit("1:1 async-actor calls async", async_actor, 1000) a = AsyncActor.remote() def async_actor(): ray.get([a.small_value_with_arg.remote(i) for i in range(1000)]) results += timeit("1:1 async-actor calls with args async", async_actor, 1000) n = 5000 n_cpu = multiprocessing.cpu_count() // 2 actors = [AsyncActor.remote() for _ in range(n_cpu)] client = Client.remote(actors) def async_actor_async(): ray.get(client.small_value_batch.remote(n)) results += timeit("1:n async-actor calls async", async_actor_async, n * len(actors)) n = 5000 m = 4 n_cpu = multiprocessing.cpu_count() // 2 a = [AsyncActor.remote() for _ in range(n_cpu)] @ray.remote def async_actor_work(actors): ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)]) def async_actor_multi(): ray.get([async_actor_work.remote(a) for _ in range(m)]) results += timeit("n:n async-actor calls async", async_actor_multi, m * n) ray.shutdown() ############################ # End of channel perf tests. ############################ NUM_PGS = 100 NUM_BUNDLES = 1 ray.init(resources={"custom": 100}) def placement_group_create_removal(num_pgs): pgs = [ ray.util.placement_group( bundles=[{"custom": 0.001} for _ in range(NUM_BUNDLES)] ) for _ in range(num_pgs) ] [pg.wait(timeout_seconds=30) for pg in pgs] # Include placement group removal here to clean up. # If we don't clean up placement groups, the whole performance # gets slower as it runs more. # Since timeit function runs multiple times without # the cleaning logic, we should have this method here. for pg in pgs: ray.util.remove_placement_group(pg) results += timeit( "placement group create/removal", lambda: placement_group_create_removal(NUM_PGS), NUM_PGS, ) ray.shutdown() client_microbenchmark_main(results) return results if __name__ == "__main__": main()