models.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. """
  2. Generic model definitions for common utilities.
  3. These models represent generic concepts that can be used by both
  4. serve and batch components.
  5. """
  6. import asyncio
  7. import threading
  8. from functools import partial
  9. from typing import Awaitable, Callable, TypeVar
  10. T = TypeVar("T")
  11. # DiskMultiplexConfig removed - it's serve-specific and belongs in serve/configs/server_models.py
  12. class GlobalIdManager:
  13. """Thread-safe global ID manager for assigning unique IDs."""
  14. def __init__(self):
  15. self._counter = 0
  16. self._lock = threading.Lock()
  17. def next(self) -> int:
  18. """Get the next unique ID."""
  19. with self._lock:
  20. self._counter += 1
  21. return self._counter
  22. # Global instance
  23. global_id_manager = GlobalIdManager()
  24. def make_async(_func: Callable[..., T]) -> Callable[..., Awaitable[T]]:
  25. """Take a blocking function, and run it on in an executor thread.
  26. This function prevents the blocking function from blocking the asyncio event loop.
  27. The code in this function needs to be thread safe.
  28. """
  29. def _async_wrapper(*args, **kwargs) -> asyncio.Future:
  30. loop = asyncio.get_event_loop()
  31. func = partial(_func, *args, **kwargs)
  32. return loop.run_in_executor(executor=None, func=func)
  33. return _async_wrapper