| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import os
- import time
- from contextlib import contextmanager
- from typing import List, Optional, Tuple
- import numpy as np
- import ray
- # Only run tests matching this filter pattern.
- filter_pattern = os.environ.get("TESTS_TO_RUN", "")
- skip_pattern = os.environ.get("TESTS_TO_SKIP", "")
- def timeit(
- name, fn, multiplier=1, warmup_time_sec=10
- ) -> List[Optional[Tuple[str, float, float]]]:
- if filter_pattern and filter_pattern not in name:
- return [None]
- if skip_pattern and skip_pattern in name:
- return [None]
- # sleep for a while to avoid noisy neigbhors.
- # related issue: https://github.com/ray-project/ray/issues/22045
- time.sleep(warmup_time_sec)
- # warmup
- start = time.perf_counter()
- count = 0
- while time.perf_counter() - start < 1:
- fn()
- count += 1
- # real run
- step = count // 10 + 1
- stats = []
- for _ in range(4):
- start = time.perf_counter()
- count = 0
- while time.perf_counter() - start < 2:
- for _ in range(step):
- fn()
- count += step
- end = time.perf_counter()
- stats.append(multiplier * count / (end - start))
- mean = np.mean(stats)
- sd = np.std(stats)
- print(name, "per second", round(mean, 2), "+-", round(sd, 2))
- return [(name, mean, sd)]
- async def asyncio_timeit(
- name, async_fn, multiplier=1, warmup_time_sec=10
- ) -> List[Optional[Tuple[str, float, float]]]:
- if filter_pattern and filter_pattern not in name:
- return [None]
- if skip_pattern and skip_pattern in name:
- return [None]
- # sleep for a while to avoid noisy neigbhors.
- # related issue: https://github.com/ray-project/ray/issues/22045
- time.sleep(warmup_time_sec)
- # warmup
- start = time.perf_counter()
- count = 0
- while time.perf_counter() - start < 1:
- await async_fn()
- count += 1
- # real run
- step = count // 10 + 1
- stats = []
- for _ in range(4):
- start = time.perf_counter()
- count = 0
- while time.perf_counter() - start < 2:
- for _ in range(step):
- await async_fn()
- count += step
- end = time.perf_counter()
- stats.append(multiplier * count / (end - start))
- mean = np.mean(stats)
- sd = np.std(stats)
- print(name, "per second", round(mean, 2), "+-", round(sd, 2))
- return [(name, mean, sd)]
- @contextmanager
- def ray_setup_and_teardown(**init_args):
- ray.init(**init_args)
- try:
- yield None
- finally:
- ray.shutdown()
|