ray_client_microbenchmark.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import inspect
  2. import logging
  3. import sys
  4. import numpy as np
  5. from ray._private.ray_microbenchmark_helpers import timeit
  6. from ray.util.client.ray_client_helpers import ray_start_client_server
  7. def benchmark_get_calls(ray, results):
  8. value = ray.put(0)
  9. def get_small():
  10. ray.get(value)
  11. results += timeit("client: get calls", get_small)
  12. def benchmark_tasks_and_get_batch(ray, results):
  13. @ray.remote
  14. def small_value():
  15. return b"ok"
  16. def small_value_batch():
  17. submitted = [small_value.remote() for _ in range(1000)]
  18. ray.get(submitted)
  19. return 0
  20. results += timeit("client: tasks and get batch", small_value_batch)
  21. def benchmark_put_calls(ray, results):
  22. def put_small():
  23. ray.put(0)
  24. results += timeit("client: put calls", put_small)
  25. def benchmark_remote_put_calls(ray, results):
  26. @ray.remote
  27. def do_put_small():
  28. for _ in range(100):
  29. ray.put(0)
  30. def put_multi_small():
  31. ray.get([do_put_small.remote() for _ in range(10)])
  32. results += timeit("client: tasks and put batch", put_multi_small, 1000)
  33. def benchmark_put_large(ray, results):
  34. arr = np.zeros(100 * 1024 * 1024, dtype=np.int64)
  35. def put_large():
  36. ray.put(arr)
  37. results += timeit("client: put gigabytes", put_large, 8 * 0.1)
  38. def benchmark_simple_actor(ray, results):
  39. @ray.remote(num_cpus=0)
  40. class Actor:
  41. def small_value(self):
  42. return b"ok"
  43. def small_value_arg(self, x):
  44. return b"ok"
  45. def small_value_batch(self, n):
  46. ray.get([self.small_value.remote() for _ in range(n)])
  47. a = Actor.remote()
  48. def actor_sync():
  49. ray.get(a.small_value.remote())
  50. results += timeit("client: 1:1 actor calls sync", actor_sync)
  51. def actor_async():
  52. ray.get([a.small_value.remote() for _ in range(1000)])
  53. results += timeit("client: 1:1 actor calls async", actor_async, 1000)
  54. a = Actor.options(max_concurrency=16).remote()
  55. def actor_concurrent():
  56. ray.get([a.small_value.remote() for _ in range(1000)])
  57. results += timeit("client: 1:1 actor calls concurrent", actor_concurrent, 1000)
  58. def main(results=None):
  59. results = results or []
  60. ray_config = {"logging_level": logging.WARNING}
  61. def ray_connect_handler(job_config=None, **ray_init_kwargs):
  62. from ray._private.client_mode_hook import disable_client_hook
  63. with disable_client_hook():
  64. import ray as real_ray
  65. if not real_ray.is_initialized():
  66. real_ray.init(**ray_config)
  67. for name, obj in inspect.getmembers(sys.modules[__name__]):
  68. if not name.startswith("benchmark_"):
  69. continue
  70. with ray_start_client_server(ray_connect_handler=ray_connect_handler) as ray:
  71. obj(ray, results)
  72. return results
  73. if __name__ == "__main__":
  74. main()