common.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. import json
  2. from dataclasses import asdict, dataclass, field
  3. from enum import Enum
  4. from typing import Any, Awaitable, Callable, Dict, List, Optional
  5. from starlette.types import Scope
  6. import ray
  7. from ray._common.pydantic_compat import BaseModel
  8. from ray.actor import ActorHandle
  9. from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE
  10. from ray.serve._private.thirdparty.get_asgi_route_name import RoutePattern
  11. from ray.serve.generated.serve_pb2 import (
  12. DeploymentStatus as DeploymentStatusProto,
  13. DeploymentStatusInfo as DeploymentStatusInfoProto,
  14. DeploymentStatusTrigger as DeploymentStatusTriggerProto,
  15. )
  16. from ray.serve.grpc_util import RayServegRPCContext
  17. from ray.util.annotations import PublicAPI
  18. REPLICA_ID_FULL_ID_STR_PREFIX = "SERVE_REPLICA::"
  19. @dataclass(frozen=True)
  20. class DeploymentID:
  21. name: str
  22. app_name: str = SERVE_DEFAULT_APP_NAME
  23. def __hash__(self):
  24. # Lazy hash caching: compute on first access, cache for subsequent calls.
  25. # The _hash attribute is excluded from pickling via __getstate__, so after
  26. # deserialization it gets recomputed with the correct per-process hash seed.
  27. try:
  28. return self._hash
  29. except AttributeError:
  30. h = hash((self.name, self.app_name))
  31. object.__setattr__(self, "_hash", h)
  32. return h
  33. def __getstate__(self):
  34. # Exclude _hash from pickling - it must be recomputed per-process
  35. return {"name": self.name, "app_name": self.app_name}
  36. def __setstate__(self, state):
  37. object.__setattr__(self, "name", state["name"])
  38. object.__setattr__(self, "app_name", state["app_name"])
  39. def to_replica_actor_class_name(self):
  40. return f"ServeReplica:{self.app_name}:{self.name}"
  41. def __str__(self):
  42. return f"Deployment(name='{self.name}', app='{self.app_name}')"
  43. def __repr__(self):
  44. return str(self)
  45. @PublicAPI(stability="alpha")
  46. @dataclass(frozen=True)
  47. class ReplicaID:
  48. """A unique identifier for a replica."""
  49. unique_id: str
  50. """A unique identifier for the replica within the deployment."""
  51. deployment_id: DeploymentID
  52. """The deployment this replica belongs to."""
  53. def __hash__(self):
  54. # Lazy hash caching: compute on first access, cache for subsequent calls.
  55. # The _hash attribute is excluded from pickling via __getstate__, so after
  56. # deserialization it gets recomputed with the correct per-process hash seed.
  57. try:
  58. return self._hash
  59. except AttributeError:
  60. h = hash((self.unique_id, self.deployment_id))
  61. object.__setattr__(self, "_hash", h)
  62. return h
  63. def __getstate__(self):
  64. # Exclude _hash from pickling - it must be recomputed per-process
  65. return {"unique_id": self.unique_id, "deployment_id": self.deployment_id}
  66. def __setstate__(self, state):
  67. object.__setattr__(self, "unique_id", state["unique_id"])
  68. object.__setattr__(self, "deployment_id", state["deployment_id"])
  69. def to_full_id_str(self) -> str:
  70. s = f"{self.deployment_id.name}#{self.unique_id}"
  71. if self.deployment_id.app_name:
  72. s = f"{self.deployment_id.app_name}#{s}"
  73. return f"{REPLICA_ID_FULL_ID_STR_PREFIX}{s}"
  74. @staticmethod
  75. def is_full_id_str(s: str) -> bool:
  76. return s.startswith(REPLICA_ID_FULL_ID_STR_PREFIX)
  77. @classmethod
  78. def from_full_id_str(cls, s: str):
  79. assert cls.is_full_id_str(s)
  80. parsed = s[len(REPLICA_ID_FULL_ID_STR_PREFIX) :].split("#")
  81. if len(parsed) == 3:
  82. app_name, deployment_name, unique_id = parsed
  83. elif len(parsed) == 2:
  84. app_name = ""
  85. deployment_name, unique_id = parsed
  86. else:
  87. raise ValueError(
  88. f"Given replica ID string {s} didn't match expected pattern, "
  89. "ensure it has either two or three fields with delimiter '#'."
  90. )
  91. return cls(
  92. unique_id,
  93. deployment_id=DeploymentID(name=deployment_name, app_name=app_name),
  94. )
  95. def __repr__(self) -> str:
  96. return str(self)
  97. def __str__(self) -> str:
  98. """Returns a human-readable string.
  99. This is used in user-facing log messages, so take care when updating it.
  100. """
  101. return (
  102. f"Replica("
  103. f"id='{self.unique_id}', "
  104. f"deployment='{self.deployment_id.name}', "
  105. f"app='{self.deployment_id.app_name}'"
  106. ")"
  107. )
  108. NodeId = str
  109. Duration = float
  110. ApplicationName = str
  111. @dataclass
  112. class EndpointInfo:
  113. """Metadata about a deployment's HTTP/gRPC endpoint.
  114. This represents the public routing interface for a deployment. It's created when
  115. a deployment is registered with a route prefix and broadcast to all proxies via
  116. the long poll mechanism (ROUTE_TABLE namespace).
  117. Flow:
  118. 1. Created in ApplicationState when deployment is applied
  119. 2. Stored in EndpointState (controller's source of truth)
  120. 3. Broadcast to all ProxyActors via long poll (ROUTE_TABLE)
  121. 4. Cached in ProxyRouter for request routing
  122. 5. Used to route incoming HTTP/gRPC requests to correct deployments
  123. 6. Used to determine route patterns for accurate metrics tagging
  124. Key Difference from DeploymentInfo:
  125. - EndpointInfo: Just HTTP/gRPC routing metadata (shared with proxies)
  126. - DeploymentInfo: Complete deployment config (replicas, resources, etc.)
  127. Attributes:
  128. route: The route prefix for this deployment (e.g., "/api").
  129. app_is_cross_language: Whether the deployment uses a different language
  130. than the proxy (e.g., Java deployment with Python proxy). This affects
  131. how the proxy serializes/deserializes requests.
  132. route_patterns: List of RoutePattern objects for ASGI route patterns.
  133. Each RoutePattern has methods (list of HTTP methods or None) and path.
  134. Examples: [RoutePattern(methods=["GET", "POST"], path="/"),
  135. RoutePattern(methods=["PUT"], path="/users/{id}"),
  136. RoutePattern(methods=None, path="/websocket")]
  137. Used by proxies to match incoming requests to specific route patterns
  138. for accurate metrics tagging. This avoids high cardinality by using
  139. parameterized patterns instead of individual request paths.
  140. Only populated for deployments with ASGI apps (FastAPI/Starlette).
  141. """
  142. route: str
  143. app_is_cross_language: bool = False
  144. route_patterns: Optional[List["RoutePattern"]] = None
  145. # Keep in sync with ServeReplicaState in dashboard/client/src/type/serve.ts
  146. class ReplicaState(str, Enum):
  147. STARTING = "STARTING"
  148. UPDATING = "UPDATING"
  149. RECOVERING = "RECOVERING"
  150. RUNNING = "RUNNING"
  151. STOPPING = "STOPPING"
  152. PENDING_MIGRATION = "PENDING_MIGRATION"
  153. class DeploymentStatus(str, Enum):
  154. UPDATING = "UPDATING"
  155. HEALTHY = "HEALTHY"
  156. UNHEALTHY = "UNHEALTHY"
  157. DEPLOY_FAILED = "DEPLOY_FAILED"
  158. UPSCALING = "UPSCALING"
  159. DOWNSCALING = "DOWNSCALING"
  160. def to_numeric(self) -> int:
  161. """Convert status to numeric value for metrics, it serves state
  162. progression order on the dashboard.
  163. 0 is reserved for UNKNOWN. Values are ordered by severity/state progression:
  164. 0=UNKNOWN, 1=DEPLOY_FAILED, 2=UNHEALTHY, 3=UPDATING,
  165. 4=UPSCALING, 5=DOWNSCALING, 6=HEALTHY
  166. """
  167. mapping = {
  168. DeploymentStatus.DEPLOY_FAILED: 1,
  169. DeploymentStatus.UNHEALTHY: 2,
  170. DeploymentStatus.UPDATING: 3,
  171. DeploymentStatus.UPSCALING: 4,
  172. DeploymentStatus.DOWNSCALING: 5,
  173. DeploymentStatus.HEALTHY: 6,
  174. }
  175. return mapping.get(self, 0)
  176. class DeploymentStatusTrigger(str, Enum):
  177. """Explains how a deployment reached its current DeploymentStatus."""
  178. UNSPECIFIED = "UNSPECIFIED"
  179. CONFIG_UPDATE_STARTED = "CONFIG_UPDATE_STARTED"
  180. CONFIG_UPDATE_COMPLETED = "CONFIG_UPDATE_COMPLETED"
  181. UPSCALE_COMPLETED = "UPSCALE_COMPLETED"
  182. DOWNSCALE_COMPLETED = "DOWNSCALE_COMPLETED"
  183. AUTOSCALING = "AUTOSCALING"
  184. REPLICA_STARTUP_FAILED = "REPLICA_STARTUP_FAILED"
  185. HEALTH_CHECK_FAILED = "HEALTH_CHECK_FAILED"
  186. INTERNAL_ERROR = "INTERNAL_ERROR"
  187. DELETING = "DELETING"
  188. # Internal Enum used to manage deployment state machine
  189. class DeploymentStatusInternalTrigger(str, Enum):
  190. HEALTHY = "HEALTHY"
  191. CONFIG_UPDATE = "CONFIG_UPDATE"
  192. AUTOSCALE_UP = "AUTOSCALE_UP"
  193. AUTOSCALE_DOWN = "AUTOSCALE_DOWN"
  194. # MANUALLY_INCREASE_NUM_REPLICAS and MANUALLY_DECREASE_NUM_REPLICAS are used
  195. # instead of CONFIG_UPDATE when the config update only scales
  196. # the number of replicas.
  197. MANUALLY_INCREASE_NUM_REPLICAS = "MANUALLY_INCREASE_NUM_REPLICAS"
  198. MANUALLY_DECREASE_NUM_REPLICAS = "MANUALLY_DECREASE_NUM_REPLICAS"
  199. REPLICA_STARTUP_FAILED = "REPLICA_STARTUP_FAILED"
  200. HEALTH_CHECK_FAILED = "HEALTH_CHECK_FAILED"
  201. INTERNAL_ERROR = "INTERNAL_ERROR"
  202. DELETE = "DELETE"
  203. # List of states in ranked order.
  204. #
  205. # Each ranked state has the format of a tuple with either 1 or 2 items.
  206. # If 1 item: contains a single DeploymentStatus, representing states with
  207. # that DeploymentStatus and any DeploymentStatusTrigger.
  208. # If 2 items: tuple contains a DeploymentStatus and a DeploymentStatusTrigger,
  209. # representing a state with that status and status trigger.
  210. DEPLOYMENT_STATUS_RANKING_ORDER = {
  211. # Status ranking order is defined in a following fashion:
  212. # 0. (Highest) State signaling a deploy failure.
  213. (DeploymentStatus.DEPLOY_FAILED,): 0,
  214. # 1. State signaling any non-deploy failures in the system.
  215. (DeploymentStatus.UNHEALTHY,): 1,
  216. # 2. States signaling the user updated the configuration.
  217. (DeploymentStatus.UPDATING,): 2,
  218. (DeploymentStatus.UPSCALING, DeploymentStatusTrigger.CONFIG_UPDATE_STARTED): 2,
  219. (
  220. DeploymentStatus.DOWNSCALING,
  221. DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  222. ): 2,
  223. # 3. Steady state or autoscaling.
  224. (DeploymentStatus.UPSCALING, DeploymentStatusTrigger.AUTOSCALING): 3,
  225. (DeploymentStatus.DOWNSCALING, DeploymentStatusTrigger.AUTOSCALING): 3,
  226. (DeploymentStatus.HEALTHY,): 3,
  227. }
  228. @dataclass(eq=True)
  229. class DeploymentStatusInfo:
  230. name: str
  231. status: DeploymentStatus
  232. status_trigger: DeploymentStatusTrigger
  233. message: str = ""
  234. @property
  235. def rank(self) -> int:
  236. """Get priority of state based on ranking_order().
  237. The ranked order indicates what the status should be of a
  238. hierarchically "higher" resource when derived from a group of
  239. `DeploymentStatusInfo` sub-resources.
  240. """
  241. if (self.status,) in DEPLOYMENT_STATUS_RANKING_ORDER:
  242. return DEPLOYMENT_STATUS_RANKING_ORDER[(self.status,)]
  243. elif (self.status, self.status_trigger) in DEPLOYMENT_STATUS_RANKING_ORDER:
  244. return DEPLOYMENT_STATUS_RANKING_ORDER[(self.status, self.status_trigger)]
  245. def debug_string(self):
  246. return json.dumps(asdict(self), indent=4)
  247. def _updated_copy(
  248. self,
  249. status: DeploymentStatus = None,
  250. status_trigger: DeploymentStatusTrigger = None,
  251. message: str = "",
  252. ):
  253. """Returns a copy of the current object with the passed in kwargs updated."""
  254. return DeploymentStatusInfo(
  255. name=self.name,
  256. status=status if status else self.status,
  257. status_trigger=status_trigger if status_trigger else self.status_trigger,
  258. message=message,
  259. )
  260. def update_message(self, message: str):
  261. return self._updated_copy(message=message)
  262. def handle_transition(
  263. self,
  264. trigger: DeploymentStatusInternalTrigger,
  265. message: str = "",
  266. ) -> "DeploymentStatusInfo":
  267. """Handles a transition from the current state to the next state.
  268. Args:
  269. trigger: An internal trigger that determines the state
  270. transition. This is the new incoming trigger causing the
  271. transition.
  272. message: The message to set in status info.
  273. Returns:
  274. New instance of DeploymentStatusInfo representing the
  275. next state to transition to.
  276. """
  277. # If there was an unexpected internal error during reconciliation, set
  278. # status to unhealthy immediately and return
  279. if trigger == DeploymentStatusInternalTrigger.INTERNAL_ERROR:
  280. return self._updated_copy(
  281. status=DeploymentStatus.UNHEALTHY,
  282. status_trigger=DeploymentStatusTrigger.INTERNAL_ERROR,
  283. message=message,
  284. )
  285. # If deployment is being deleted, set status immediately and return
  286. elif trigger == DeploymentStatusInternalTrigger.DELETE:
  287. return self._updated_copy(
  288. status=DeploymentStatus.UPDATING,
  289. status_trigger=DeploymentStatusTrigger.DELETING,
  290. message=message,
  291. )
  292. # Otherwise, go through normal state machine transitions
  293. elif self.status == DeploymentStatus.UPDATING:
  294. # Finished updating configuration and transition to healthy
  295. if trigger == DeploymentStatusInternalTrigger.HEALTHY:
  296. return self._updated_copy(
  297. status=DeploymentStatus.HEALTHY,
  298. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_COMPLETED,
  299. message=message,
  300. )
  301. # A new configuration has been deployed before deployment
  302. # has finished updating
  303. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
  304. return self._updated_copy(
  305. status=DeploymentStatus.UPDATING,
  306. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  307. message=message,
  308. )
  309. # Autoscaling.
  310. elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP:
  311. return self._updated_copy(
  312. status=DeploymentStatus.UPSCALING,
  313. status_trigger=DeploymentStatusTrigger.AUTOSCALING,
  314. message=message,
  315. )
  316. elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN:
  317. return self._updated_copy(
  318. status=DeploymentStatus.DOWNSCALING,
  319. status_trigger=DeploymentStatusTrigger.AUTOSCALING,
  320. message=message,
  321. )
  322. # Manually increasing or decreasing num replicas does not
  323. # change the status while deployment is still updating.
  324. elif trigger in {
  325. DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS,
  326. DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS,
  327. }:
  328. return self
  329. # Failures occurred while a deployment was being updated
  330. elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
  331. return self._updated_copy(
  332. status=DeploymentStatus.DEPLOY_FAILED,
  333. status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
  334. message=message,
  335. )
  336. elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
  337. return self._updated_copy(
  338. status=DeploymentStatus.DEPLOY_FAILED,
  339. status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
  340. message=message,
  341. )
  342. elif self.status in {DeploymentStatus.UPSCALING, DeploymentStatus.DOWNSCALING}:
  343. # Failures occurred while upscaling/downscaling
  344. if trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
  345. return self._updated_copy(
  346. status=DeploymentStatus.UNHEALTHY,
  347. status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
  348. message=message,
  349. )
  350. elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
  351. return self._updated_copy(
  352. status=DeploymentStatus.UNHEALTHY,
  353. status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
  354. message=message,
  355. )
  356. # Deployment transitions to healthy
  357. elif trigger == DeploymentStatusInternalTrigger.HEALTHY:
  358. return self._updated_copy(
  359. status=DeploymentStatus.HEALTHY,
  360. status_trigger=DeploymentStatusTrigger.UPSCALE_COMPLETED
  361. if self.status == DeploymentStatus.UPSCALING
  362. else DeploymentStatusTrigger.DOWNSCALE_COMPLETED,
  363. message=message,
  364. )
  365. # Configuration is updated before scaling is finished
  366. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
  367. return self._updated_copy(
  368. status=DeploymentStatus.UPDATING,
  369. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  370. message=message,
  371. )
  372. elif self.status_trigger == DeploymentStatusTrigger.AUTOSCALING:
  373. # Upscale replicas before previous autoscaling has finished
  374. if trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP:
  375. return self._updated_copy(
  376. status=DeploymentStatus.UPSCALING,
  377. message=message,
  378. )
  379. # Downscale replicas before previous autoscaling has finished
  380. elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN:
  381. return self._updated_copy(
  382. status=DeploymentStatus.DOWNSCALING,
  383. message=message,
  384. )
  385. # Manually upscale replicas with config update before previous autoscaling has finished
  386. elif (
  387. trigger
  388. == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS
  389. ):
  390. return self._updated_copy(
  391. status=DeploymentStatus.UPSCALING,
  392. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  393. message=message,
  394. )
  395. # Manually downscale replicas with config update before previous autoscaling has finished
  396. elif (
  397. trigger
  398. == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS
  399. ):
  400. return self._updated_copy(
  401. status=DeploymentStatus.DOWNSCALING,
  402. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  403. message=message,
  404. )
  405. elif self.status_trigger == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED:
  406. # Upscale replicas before previous config update has finished
  407. if (
  408. trigger
  409. == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS
  410. ):
  411. return self._updated_copy(
  412. status=DeploymentStatus.UPSCALING, message=message
  413. )
  414. # Downscale replicas before previous config update has finished
  415. elif (
  416. trigger
  417. == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS
  418. ):
  419. return self._updated_copy(
  420. status=DeploymentStatus.DOWNSCALING, message=message
  421. )
  422. elif self.status == DeploymentStatus.HEALTHY:
  423. # Deployment remains healthy
  424. if trigger == DeploymentStatusInternalTrigger.HEALTHY:
  425. return self
  426. # New configuration is deployed
  427. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
  428. return self._updated_copy(
  429. status=DeploymentStatus.UPDATING,
  430. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  431. message=message,
  432. )
  433. # Manually scaling / autoscaling num replicas
  434. elif (
  435. trigger
  436. == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS
  437. ):
  438. return self._updated_copy(
  439. status=DeploymentStatus.UPSCALING,
  440. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  441. message=message,
  442. )
  443. elif (
  444. trigger
  445. == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS
  446. ):
  447. return self._updated_copy(
  448. status=DeploymentStatus.DOWNSCALING,
  449. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  450. message=message,
  451. )
  452. elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP:
  453. return self._updated_copy(
  454. status=DeploymentStatus.UPSCALING,
  455. status_trigger=DeploymentStatusTrigger.AUTOSCALING,
  456. message=message,
  457. )
  458. elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN:
  459. return self._updated_copy(
  460. status=DeploymentStatus.DOWNSCALING,
  461. status_trigger=DeploymentStatusTrigger.AUTOSCALING,
  462. message=message,
  463. )
  464. # Health check for one or more replicas has failed
  465. elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
  466. return self._updated_copy(
  467. status=DeploymentStatus.UNHEALTHY,
  468. status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
  469. message=message,
  470. )
  471. elif self.status == DeploymentStatus.UNHEALTHY:
  472. # The deployment recovered
  473. if trigger == DeploymentStatusInternalTrigger.HEALTHY:
  474. return self._updated_copy(
  475. status=DeploymentStatus.HEALTHY,
  476. status_trigger=DeploymentStatusTrigger.UNSPECIFIED,
  477. message=message,
  478. )
  479. # A new configuration is being deployed.
  480. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
  481. return self._updated_copy(
  482. status=DeploymentStatus.UPDATING,
  483. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  484. message=message,
  485. )
  486. # Old failures keep getting triggered, or new failures occurred.
  487. elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
  488. return self._updated_copy(
  489. status=DeploymentStatus.UNHEALTHY,
  490. status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
  491. message=message,
  492. )
  493. elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
  494. return self._updated_copy(
  495. status=DeploymentStatus.UNHEALTHY,
  496. status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
  497. message=message,
  498. )
  499. elif self.status == DeploymentStatus.DEPLOY_FAILED:
  500. # The deployment recovered
  501. if trigger == DeploymentStatusInternalTrigger.HEALTHY:
  502. return self._updated_copy(
  503. status=DeploymentStatus.HEALTHY,
  504. status_trigger=DeploymentStatusTrigger.UNSPECIFIED,
  505. message=message,
  506. )
  507. # A new configuration is being deployed.
  508. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE:
  509. return self._updated_copy(
  510. status=DeploymentStatus.UPDATING,
  511. status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  512. message=message,
  513. )
  514. # Old failures keep getting triggered, or new failures occurred.
  515. elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED:
  516. return self._updated_copy(
  517. status=DeploymentStatus.DEPLOY_FAILED,
  518. status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED,
  519. message=message,
  520. )
  521. elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED:
  522. return self._updated_copy(
  523. status=DeploymentStatus.DEPLOY_FAILED,
  524. status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED,
  525. message=message,
  526. )
  527. # If it's any other transition, ignore it.
  528. return self
  529. def to_proto(self):
  530. return DeploymentStatusInfoProto(
  531. name=self.name,
  532. status=f"DEPLOYMENT_STATUS_{self.status.name}",
  533. status_trigger=f"DEPLOYMENT_STATUS_TRIGGER_{self.status_trigger.name}",
  534. message=self.message,
  535. )
  536. @classmethod
  537. def from_proto(cls, proto: DeploymentStatusInfoProto):
  538. status = DeploymentStatusProto.Name(proto.status)[len("DEPLOYMENT_STATUS_") :]
  539. status_trigger = DeploymentStatusTriggerProto.Name(proto.status_trigger)[
  540. len("DEPLOYMENT_STATUS_TRIGGER_") :
  541. ]
  542. return cls(
  543. name=proto.name,
  544. status=DeploymentStatus(status),
  545. status_trigger=DeploymentStatusTrigger(status_trigger),
  546. message=proto.message,
  547. )
  548. @dataclass(frozen=True)
  549. class RunningReplicaInfo:
  550. replica_id: ReplicaID
  551. node_id: Optional[str]
  552. node_ip: Optional[str]
  553. availability_zone: Optional[str]
  554. actor_name: str
  555. max_ongoing_requests: int
  556. is_cross_language: bool = False
  557. multiplexed_model_ids: List[str] = field(default_factory=list)
  558. routing_stats: Dict[str, Any] = field(default_factory=dict)
  559. port: Optional[int] = None
  560. def __post_init__(self):
  561. # Set hash value when object is constructed.
  562. # We use _actor_id to hash the ActorHandle object
  563. # instead of actor_handle itself to make sure
  564. # it is consistently same actor handle between different
  565. # object ids.
  566. hash_val = hash(
  567. " ".join(
  568. [
  569. self.replica_id.to_full_id_str(),
  570. self.node_id if self.node_id else "",
  571. self.actor_name,
  572. str(self.max_ongoing_requests),
  573. str(self.is_cross_language),
  574. str(self.multiplexed_model_ids),
  575. str(self.routing_stats),
  576. ]
  577. )
  578. )
  579. # RunningReplicaInfo class set frozen=True, this is the hacky way to set
  580. # new attribute for the class.
  581. object.__setattr__(self, "_hash", hash_val)
  582. def __hash__(self):
  583. return self._hash
  584. def __eq__(self, other):
  585. return all(
  586. [
  587. isinstance(other, RunningReplicaInfo),
  588. self._hash == other._hash,
  589. ]
  590. )
  591. def get_actor_handle(self) -> ActorHandle:
  592. actor_handle = ray.get_actor(self.actor_name, namespace=SERVE_NAMESPACE)
  593. return actor_handle
  594. @dataclass(frozen=True)
  595. class DeploymentTargetInfo:
  596. is_available: bool
  597. running_replicas: List[RunningReplicaInfo]
  598. class ServeDeployMode(str, Enum):
  599. MULTI_APP = "MULTI_APP"
  600. class ServeComponentType(str, Enum):
  601. REPLICA = "replica"
  602. @dataclass
  603. class RequestRoutingInfo:
  604. """Information about the request routing.
  605. It includes deployment name (from ReplicaID), replica tag (from ReplicaID),
  606. multiplex model ids, and routing stats.
  607. """
  608. replica_id: ReplicaID
  609. multiplexed_model_ids: Optional[List[str]] = None
  610. routing_stats: Optional[Dict[str, Any]] = None
  611. @dataclass
  612. class gRPCRequest:
  613. """Sent from the GRPC proxy to replicas on both unary and streaming codepaths."""
  614. user_request_proto: Any
  615. class RequestProtocol(str, Enum):
  616. UNDEFINED = "UNDEFINED"
  617. HTTP = "HTTP"
  618. GRPC = "gRPC"
  619. class DeploymentHandleSource(str, Enum):
  620. UNKNOWN = "UNKNOWN"
  621. PROXY = "PROXY"
  622. REPLICA = "REPLICA"
  623. @dataclass
  624. class RequestMetadata:
  625. # request_id can be passed by the client and is only generated by the proxy if the
  626. # client did not pass it in the headers. It is used for logging across different
  627. # system. We can not guarantee the uniqueness of its value.
  628. request_id: str
  629. # internal_request_id is always generated by the proxy and is used for tracking
  630. # request objects. We can assume this is always unique between requests.
  631. internal_request_id: str
  632. # Method of the user callable to execute.
  633. call_method: str = "__call__"
  634. # HTTP route path of the request.
  635. route: str = ""
  636. # Application name.
  637. app_name: str = ""
  638. # Multiplexed model ID.
  639. multiplexed_model_id: str = ""
  640. # If this request expects a streaming response.
  641. is_streaming: bool = False
  642. _http_method: str = ""
  643. # The protocol to serve this request
  644. _request_protocol: RequestProtocol = RequestProtocol.UNDEFINED
  645. # Serve's gRPC context associated with this request for getting and setting metadata
  646. grpc_context: Optional[RayServegRPCContext] = None
  647. # Tracing context
  648. tracing_context: Optional[Dict[str, str]] = None
  649. # Whether it is a direct ingress request
  650. is_direct_ingress: bool = False
  651. # By reference or value
  652. _by_reference: bool = True
  653. _on_separate_loop: bool = True
  654. # gRPC serialization options
  655. request_serialization: str = "cloudpickle"
  656. response_serialization: str = "cloudpickle"
  657. @property
  658. def is_http_request(self) -> bool:
  659. return self._request_protocol == RequestProtocol.HTTP
  660. @property
  661. def is_grpc_request(self) -> bool:
  662. return self._request_protocol == RequestProtocol.GRPC
  663. class StreamingHTTPRequest:
  664. """Sent from the HTTP proxy to replicas on the streaming codepath."""
  665. def __init__(
  666. self,
  667. asgi_scope: Scope,
  668. *,
  669. proxy_actor_name: Optional[str] = None,
  670. receive_asgi_messages: Optional[
  671. Callable[[RequestMetadata], Awaitable[bytes]]
  672. ] = None,
  673. ):
  674. self._asgi_scope: Scope = asgi_scope
  675. if proxy_actor_name is None and receive_asgi_messages is None:
  676. raise ValueError(
  677. "Either proxy_actor_name or receive_asgi_messages must be provided."
  678. )
  679. # If receive_asgi_messages is passed, it'll be called directly.
  680. # If proxy_actor_name is passed, the actor will be fetched and its
  681. # `receive_asgi_messages` method will be called.
  682. self._proxy_actor_name: Optional[str] = proxy_actor_name
  683. # Need to keep the actor handle cached to avoid "lost reference to actor" error.
  684. self._cached_proxy_actor: Optional[ActorHandle] = None
  685. self._receive_asgi_messages: Optional[
  686. Callable[[RequestMetadata], Awaitable[bytes]]
  687. ] = receive_asgi_messages
  688. @property
  689. def asgi_scope(self) -> Scope:
  690. return self._asgi_scope
  691. @property
  692. def receive_asgi_messages(self) -> Callable[[RequestMetadata], Awaitable[bytes]]:
  693. if self._receive_asgi_messages is None:
  694. self._cached_proxy_actor = ray.get_actor(
  695. self._proxy_actor_name, namespace=SERVE_NAMESPACE
  696. )
  697. self._receive_asgi_messages = (
  698. self._cached_proxy_actor.receive_asgi_messages.remote
  699. )
  700. return self._receive_asgi_messages
  701. class TargetCapacityDirection(str, Enum):
  702. """Determines what direction the target capacity is scaling."""
  703. UP = "UP"
  704. DOWN = "DOWN"
  705. @dataclass(frozen=True)
  706. class ReplicaQueueLengthInfo:
  707. accepted: bool
  708. num_ongoing_requests: int
  709. @dataclass(frozen=True)
  710. class CreatePlacementGroupRequest:
  711. bundles: List[Dict[str, float]]
  712. strategy: str
  713. target_node_id: str
  714. name: str
  715. runtime_env: Optional[str] = None
  716. bundle_label_selector: Optional[List[Dict[str, str]]] = None
  717. fallback_strategy: Optional[List[Dict[str, Any]]] = None
  718. # This error is used to raise when a by-value DeploymentResponse is converted to an
  719. # ObjectRef.
  720. OBJ_REF_NOT_SUPPORTED_ERROR = RuntimeError(
  721. "Converting by-value DeploymentResponses to ObjectRefs is not supported. "
  722. "Use handle.options(_by_reference=True) to enable it."
  723. )
  724. class AutoscalingStatus(str, Enum):
  725. UPSCALE = "AUTOSCALING_UPSCALE"
  726. DOWNSCALE = "AUTOSCALING_DOWNSCALE"
  727. STABLE = "AUTOSCALING_STABLE"
  728. @staticmethod
  729. def format_scaling_status(trigger: "AutoscalingStatus") -> str:
  730. mapping = {
  731. AutoscalingStatus.UPSCALE: "scaling up",
  732. AutoscalingStatus.DOWNSCALE: "scaling down",
  733. AutoscalingStatus.STABLE: "stable",
  734. }
  735. return mapping.get(trigger, str(trigger).lower())
  736. class DeploymentSnapshot(BaseModel):
  737. snapshot_type: str = "deployment"
  738. timestamp_str: str
  739. app: str
  740. deployment: str
  741. current_replicas: int
  742. target_replicas: int
  743. min_replicas: Optional[int]
  744. max_replicas: Optional[int]
  745. scaling_status: str
  746. policy_name: str
  747. look_back_period_s: Optional[float]
  748. queued_requests: Optional[float]
  749. ongoing_requests: float
  750. metrics_health: str
  751. errors: List[str]
  752. @staticmethod
  753. def format_metrics_health_text(
  754. *,
  755. time_since_last_collected_metrics_s: Optional[float],
  756. look_back_period_s: Optional[float],
  757. ) -> str:
  758. """
  759. - < 1s -> integer milliseconds
  760. - >= 1s -> seconds with two decimals
  761. """
  762. if time_since_last_collected_metrics_s is None:
  763. return "unknown"
  764. val = time_since_last_collected_metrics_s
  765. if val < 1.0:
  766. return f"{val * 1000:.0f}ms"
  767. return f"{val:.2f}s"
  768. def is_scaling_equivalent(self, other: "DeploymentSnapshot") -> bool:
  769. """Return True if scaling-related fields are equal.
  770. Used for autoscaling snapshot log deduplication. Compares only:
  771. target_replicas, min_replicas, max_replicas, scaling_status
  772. """
  773. if not isinstance(other, DeploymentSnapshot):
  774. return False
  775. return (
  776. self.app == other.app
  777. and self.deployment == other.deployment
  778. and self.target_replicas == other.target_replicas
  779. and self.min_replicas == other.min_replicas
  780. and self.max_replicas == other.max_replicas
  781. and self.scaling_status == other.scaling_status
  782. )
  783. RUNNING_REQUESTS_KEY = "running_requests"
  784. ONGOING_REQUESTS_KEY = "ongoing_requests"
  785. QUEUED_REQUESTS_KEY = "queued_requests"
  786. @dataclass(order=True)
  787. class TimeStampedValue:
  788. timestamp: float
  789. value: float = field(compare=False)
  790. # Type alias for time series data
  791. TimeSeries = List[TimeStampedValue]
  792. @dataclass
  793. class HandleMetricReport:
  794. """Report from a deployment handle on queued and ongoing requests.
  795. Args:
  796. deployment_id: The deployment ID of the deployment handle.
  797. handle_id: The handle ID of the deployment handle.
  798. actor_id: If the deployment handle (from which this metric was
  799. sent) lives on an actor, the ID of that actor.
  800. handle_source: Describes what kind of entity holds this
  801. deployment handle: a Serve proxy, a Serve replica, or
  802. unknown.
  803. aggregated_queued_requests: average number of queued requests at the
  804. handle over the past look_back_period_s seconds.
  805. queued_requests: list of values of queued requests at the
  806. handle over the past look_back_period_s seconds. This is a list because
  807. we take multiple measurements over time.
  808. aggregated_metrics: A map of metric name to the aggregated value over the past
  809. look_back_period_s seconds at the handle for each replica.
  810. metrics: A map of metric name to the list of values running at that handle for each replica
  811. over the past look_back_period_s seconds. This is a list because
  812. we take multiple measurements over time.
  813. timestamp: The time at which this report was created.
  814. """
  815. deployment_id: DeploymentID
  816. handle_id: str
  817. actor_id: str
  818. handle_source: DeploymentHandleSource
  819. aggregated_queued_requests: float
  820. queued_requests: TimeSeries
  821. aggregated_metrics: Dict[str, Dict[ReplicaID, float]]
  822. metrics: Dict[str, Dict[ReplicaID, TimeSeries]]
  823. timestamp: float
  824. @property
  825. def total_requests(self) -> float:
  826. """Total number of queued and running requests."""
  827. return self.aggregated_queued_requests + sum(
  828. self.aggregated_metrics.get(RUNNING_REQUESTS_KEY, {}).values()
  829. )
  830. @property
  831. def is_serve_component_source(self) -> bool:
  832. """Whether the handle source is a Serve actor.
  833. More specifically, this returns whether a Serve actor tracked
  834. by the controller holds the deployment handle that sent this
  835. report. If the deployment handle lives on a driver, a Ray task,
  836. or an actor that's not a Serve replica, then this returns False.
  837. """
  838. return self.handle_source in [
  839. DeploymentHandleSource.PROXY,
  840. DeploymentHandleSource.REPLICA,
  841. ]
  842. @dataclass
  843. class ReplicaMetricReport:
  844. """Report from a replica on ongoing requests.
  845. Args:
  846. replica_id: The replica ID of the replica.
  847. aggregated_metrics: A map of metric name to the aggregated value over the past
  848. look_back_period_s seconds at the replica.
  849. metrics: A map of metric name to the list of values running at that replica
  850. over the past look_back_period_s seconds. This is a list because
  851. we take multiple measurements over time.
  852. timestamp: The time at which this report was created.
  853. """
  854. replica_id: ReplicaID
  855. aggregated_metrics: Dict[str, float]
  856. metrics: Dict[str, TimeSeries]
  857. timestamp: float
  858. class AutoscalingSnapshotError(str, Enum):
  859. METRICS_UNAVAILABLE = "METRICS_UNAVAILABLE"