release_test_util.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import json
  2. import os
  3. import pickle
  4. import tempfile
  5. import time
  6. from collections import Counter
  7. import numpy as np
  8. from ray import tune
  9. from ray._private.test_utils import safe_write_to_results_json
  10. from ray.tune import Checkpoint
  11. from ray.tune.callback import Callback
  12. class ProgressCallback(Callback):
  13. def __init__(self):
  14. self.last_update = 0
  15. self.update_interval = 60
  16. def on_step_end(self, iteration, trials, **kwargs):
  17. if time.time() - self.last_update > self.update_interval:
  18. now = time.time()
  19. result = {
  20. "last_update": now,
  21. "iteration": iteration,
  22. "trial_states": dict(Counter([trial.status for trial in trials])),
  23. }
  24. safe_write_to_results_json(result, "/tmp/release_test_out.json")
  25. self.last_update = now
  26. class TestDurableTrainable(tune.Trainable):
  27. def __init__(self, *args, **kwargs):
  28. self.setup_env()
  29. super(TestDurableTrainable, self).__init__(*args, **kwargs)
  30. def setup_env(self):
  31. pass
  32. def setup(self, config):
  33. self._num_iters = int(config["num_iters"])
  34. self._sleep_time = config["sleep_time"]
  35. self._score = config["score"]
  36. self._checkpoint_iters = config["checkpoint_iters"]
  37. self._checkpoint_size_b = config["checkpoint_size_b"]
  38. self._checkpoint_num_items = self._checkpoint_size_b // 8 # np.float64
  39. self._iter = 0
  40. def step(self):
  41. if self._iter > 0:
  42. time.sleep(self._sleep_time)
  43. res = dict(score=self._iter + self._score)
  44. if self._iter >= self._num_iters:
  45. res["done"] = True
  46. self._iter += 1
  47. return res
  48. def save_checkpoint(self, tmp_checkpoint_dir):
  49. checkpoint_file = os.path.join(tmp_checkpoint_dir, "bogus.ckpt")
  50. checkpoint_data = np.random.uniform(0, 1, size=self._checkpoint_num_items)
  51. with open(checkpoint_file, "wb") as fp:
  52. pickle.dump(checkpoint_data, fp)
  53. def load_checkpoint(self, checkpoint):
  54. pass
  55. def function_trainable(config):
  56. num_iters = int(config["num_iters"])
  57. sleep_time = config["sleep_time"]
  58. score = config["score"]
  59. checkpoint_iters = config["checkpoint_iters"]
  60. checkpoint_size_b = config["checkpoint_size_b"]
  61. checkpoint_num_items = checkpoint_size_b // 8 # np.float64
  62. checkpoint_num_files = config["checkpoint_num_files"]
  63. for i in range(num_iters):
  64. metrics = {"score": i + score}
  65. if (
  66. checkpoint_iters >= 0
  67. and checkpoint_size_b > 0
  68. and i % checkpoint_iters == 0
  69. ):
  70. with tempfile.TemporaryDirectory() as tmpdir:
  71. for i in range(checkpoint_num_files):
  72. checkpoint_file = os.path.join(tmpdir, f"bogus_{i}.ckpt")
  73. checkpoint_data = np.random.uniform(0, 1, size=checkpoint_num_items)
  74. with open(checkpoint_file, "wb") as fp:
  75. pickle.dump(checkpoint_data, fp)
  76. tune.report(metrics, checkpoint=Checkpoint.from_directory(tmpdir))
  77. else:
  78. tune.report(metrics)
  79. time.sleep(sleep_time)
  80. def timed_tune_run(
  81. name: str,
  82. num_samples: int,
  83. results_per_second: int = 1,
  84. trial_length_s: int = 1,
  85. max_runtime: int = 300,
  86. checkpoint_freq_s: int = -1,
  87. checkpoint_size_b: int = 0,
  88. checkpoint_num_files: int = 1,
  89. **tune_kwargs,
  90. ) -> bool:
  91. durable = (
  92. "storage_path" in tune_kwargs
  93. and tune_kwargs["storage_path"]
  94. and (
  95. tune_kwargs["storage_path"].startswith("s3://")
  96. or tune_kwargs["storage_path"].startswith("gs://")
  97. )
  98. )
  99. sleep_time = 1.0 / results_per_second
  100. num_iters = int(trial_length_s / sleep_time)
  101. checkpoint_iters = -1
  102. if checkpoint_freq_s >= 0:
  103. checkpoint_iters = int(checkpoint_freq_s / sleep_time)
  104. config = {
  105. "score": tune.uniform(0.0, 1.0),
  106. "num_iters": num_iters,
  107. "sleep_time": sleep_time,
  108. "checkpoint_iters": checkpoint_iters,
  109. "checkpoint_size_b": checkpoint_size_b,
  110. "checkpoint_num_files": checkpoint_num_files,
  111. }
  112. print(f"Starting benchmark with config: {config}")
  113. run_kwargs = {"reuse_actors": True, "verbose": 2}
  114. run_kwargs.update(tune_kwargs)
  115. _train = function_trainable
  116. if durable:
  117. _train = TestDurableTrainable
  118. run_kwargs["checkpoint_freq"] = checkpoint_iters
  119. start_time = time.monotonic()
  120. analysis = tune.run(
  121. _train,
  122. config=config,
  123. num_samples=num_samples,
  124. raise_on_failed_trial=False,
  125. **run_kwargs,
  126. )
  127. time_taken = time.monotonic() - start_time
  128. result = {
  129. "time_taken": time_taken,
  130. "trial_states": dict(Counter([trial.status for trial in analysis.trials])),
  131. "last_update": time.time(),
  132. }
  133. test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/tune_test.json")
  134. with open(test_output_json, "wt") as f:
  135. json.dump(result, f)
  136. success = time_taken <= max_runtime
  137. if not success:
  138. print(
  139. f"The {name} test took {time_taken:.2f} seconds, but should not "
  140. f"have exceeded {max_runtime:.2f} seconds. Test failed. \n\n"
  141. f"--- FAILED: {name.upper()} ::: "
  142. f"{time_taken:.2f} > {max_runtime:.2f} ---"
  143. )
  144. else:
  145. print(
  146. f"The {name} test took {time_taken:.2f} seconds, which "
  147. f"is below the budget of {max_runtime:.2f} seconds. "
  148. f"Test successful. \n\n"
  149. f"--- PASSED: {name.upper()} ::: "
  150. f"{time_taken:.2f} <= {max_runtime:.2f} ---"
  151. )
  152. return success