handle_options.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from abc import ABC, abstractmethod
  2. from dataclasses import dataclass, fields
  3. import ray
  4. from ray.serve._private.common import DeploymentHandleSource
  5. from ray.serve._private.constants import (
  6. RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP,
  7. RAY_SERVE_USE_GRPC_BY_DEFAULT,
  8. )
  9. from ray.serve._private.utils import DEFAULT
  10. @dataclass(frozen=True)
  11. class InitHandleOptionsBase(ABC):
  12. """Init options for each ServeHandle instance.
  13. These fields can be set by calling `.init()` on a handle before
  14. sending the first request.
  15. """
  16. _prefer_local_routing: bool = False
  17. _source: DeploymentHandleSource = DeploymentHandleSource.UNKNOWN
  18. _run_router_in_separate_loop: bool = RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP
  19. @classmethod
  20. @abstractmethod
  21. def create(cls, **kwargs) -> "InitHandleOptionsBase":
  22. raise NotImplementedError
  23. @dataclass(frozen=True)
  24. class InitHandleOptions(InitHandleOptionsBase):
  25. @classmethod
  26. def create(cls, **kwargs) -> "InitHandleOptions":
  27. for k in list(kwargs.keys()):
  28. if kwargs[k] == DEFAULT.VALUE:
  29. # Use default value
  30. del kwargs[k]
  31. # Detect replica source for handles
  32. if (
  33. "_source" not in kwargs
  34. and ray.serve.context._get_internal_replica_context() is not None
  35. ):
  36. kwargs["_source"] = DeploymentHandleSource.REPLICA
  37. return cls(**kwargs)
  38. @dataclass(frozen=True)
  39. class DynamicHandleOptionsBase(ABC):
  40. """Dynamic options for each ServeHandle instance.
  41. These fields can be changed by calling `.options()` on a handle.
  42. """
  43. method_name: str = "__call__"
  44. multiplexed_model_id: str = ""
  45. stream: bool = False
  46. @abstractmethod
  47. def copy_and_update(self, **kwargs) -> "DynamicHandleOptionsBase":
  48. pass
  49. @dataclass(frozen=True)
  50. class DynamicHandleOptions(DynamicHandleOptionsBase):
  51. _by_reference: bool = not RAY_SERVE_USE_GRPC_BY_DEFAULT
  52. request_serialization: str = "cloudpickle"
  53. response_serialization: str = "cloudpickle"
  54. def copy_and_update(self, **kwargs) -> "DynamicHandleOptions":
  55. new_kwargs = {}
  56. for f in fields(self):
  57. if f.name not in kwargs or kwargs[f.name] == DEFAULT.VALUE:
  58. new_kwargs[f.name] = getattr(self, f.name)
  59. else:
  60. new_kwargs[f.name] = kwargs[f.name]
  61. return DynamicHandleOptions(**new_kwargs)