deployment_state.py 165 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187
  1. import json
  2. import logging
  3. import math
  4. import os
  5. import random
  6. import time
  7. import traceback
  8. from collections import defaultdict
  9. from copy import copy
  10. from dataclasses import dataclass
  11. from enum import Enum
  12. from typing import Any, Callable, Dict, List, Optional, Set, Tuple
  13. import ray
  14. from ray import ObjectRef, cloudpickle
  15. from ray._common import ray_constants
  16. from ray.actor import ActorHandle
  17. from ray.exceptions import RayActorError, RayError, RayTaskError, RuntimeEnvSetupError
  18. from ray.serve import metrics
  19. from ray.serve._private import default_impl
  20. from ray.serve._private.autoscaling_state import AutoscalingStateManager
  21. from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
  22. from ray.serve._private.common import (
  23. DeploymentID,
  24. DeploymentStatus,
  25. DeploymentStatusInfo,
  26. DeploymentStatusInternalTrigger,
  27. DeploymentStatusTrigger,
  28. DeploymentTargetInfo,
  29. Duration,
  30. ReplicaID,
  31. ReplicaState,
  32. RequestRoutingInfo,
  33. RunningReplicaInfo,
  34. )
  35. from ray.serve._private.config import DeploymentConfig
  36. from ray.serve._private.constants import (
  37. DEFAULT_LATENCY_BUCKET_MS,
  38. MAX_PER_REPLICA_RETRY_COUNT,
  39. RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S,
  40. RAY_SERVE_ENABLE_DIRECT_INGRESS,
  41. RAY_SERVE_ENABLE_TASK_EVENTS,
  42. RAY_SERVE_FAIL_ON_RANK_ERROR,
  43. RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
  44. RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY,
  45. REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
  46. REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
  47. REQUEST_LATENCY_BUCKETS_MS,
  48. SERVE_LOGGER_NAME,
  49. SERVE_NAMESPACE,
  50. )
  51. from ray.serve._private.deployment_info import DeploymentInfo
  52. from ray.serve._private.deployment_scheduler import (
  53. DeploymentDownscaleRequest,
  54. DeploymentScheduler,
  55. ReplicaSchedulingRequest,
  56. ReplicaSchedulingRequestStatus,
  57. SpreadDeploymentSchedulingPolicy,
  58. )
  59. from ray.serve._private.exceptions import DeploymentIsBeingDeletedError
  60. from ray.serve._private.long_poll import LongPollHost, LongPollNamespace
  61. from ray.serve._private.storage.kv_store import KVStoreBase
  62. from ray.serve._private.usage import ServeUsageTag
  63. from ray.serve._private.utils import (
  64. JavaActorHandleProxy,
  65. check_obj_ref_ready_nowait,
  66. get_capacity_adjusted_num_replicas,
  67. get_random_string,
  68. msgpack_deserialize,
  69. msgpack_serialize,
  70. )
  71. from ray.serve._private.version import DeploymentVersion
  72. from ray.serve.generated.serve_pb2 import DeploymentLanguage
  73. from ray.serve.schema import (
  74. DeploymentDetails,
  75. ReplicaDetails,
  76. ReplicaRank,
  77. _deployment_info_to_schema,
  78. )
  79. from ray.util import metrics as ray_metrics
  80. from ray.util.placement_group import PlacementGroup
  81. logger = logging.getLogger(SERVE_LOGGER_NAME)
  82. class ReplicaStartupStatus(Enum):
  83. PENDING_ALLOCATION = 1
  84. PENDING_INITIALIZATION = 2
  85. SUCCEEDED = 3
  86. FAILED = 4
  87. class ReplicaHealthCheckResponse(Enum):
  88. NONE = 1
  89. SUCCEEDED = 2
  90. APP_FAILURE = 3
  91. ACTOR_CRASHED = 4
  92. @dataclass
  93. class DeploymentTargetState:
  94. """The current goal state for a deployment.
  95. info: contains the information needed to initialize a replica.
  96. target_num_replicas: the number of replicas to run. This should already
  97. be adjusted by the target_capacity.
  98. version: the goal version of the deployment.
  99. deleting: whether the deployment is being deleted.
  100. """
  101. info: Optional[DeploymentInfo]
  102. target_num_replicas: int
  103. version: Optional[DeploymentVersion]
  104. deleting: bool
  105. @classmethod
  106. def default(cls) -> "DeploymentTargetState":
  107. return cls(None, -1, None, False)
  108. @classmethod
  109. def create(
  110. cls,
  111. info: DeploymentInfo,
  112. target_num_replicas: int,
  113. *,
  114. deleting: bool = False,
  115. ) -> "DeploymentTargetState":
  116. if deleting:
  117. if target_num_replicas != 0:
  118. raise ValueError(
  119. "target_num_replicas must be 0 when setting target state "
  120. f"to deleting. Got {target_num_replicas} instead."
  121. )
  122. version = DeploymentVersion(
  123. info.version,
  124. deployment_config=info.deployment_config,
  125. ray_actor_options=info.replica_config.ray_actor_options,
  126. placement_group_bundles=info.replica_config.placement_group_bundles,
  127. placement_group_strategy=info.replica_config.placement_group_strategy,
  128. max_replicas_per_node=info.replica_config.max_replicas_per_node,
  129. route_prefix=info.route_prefix,
  130. placement_group_bundle_label_selector=(
  131. info.replica_config.placement_group_bundle_label_selector
  132. ),
  133. placement_group_fallback_strategy=(
  134. info.replica_config.placement_group_fallback_strategy
  135. ),
  136. )
  137. return cls(info, target_num_replicas, version, deleting)
  138. def is_scaled_copy_of(self, other_target_state: "DeploymentTargetState") -> bool:
  139. """Checks if this target state is a scaled copy of another target state.
  140. A target state is a scaled copy of another target state if all
  141. configurable info is identical, other than target_num_replicas.
  142. Returns: True if this target state contains a non-None DeploymentInfo
  143. and is a scaled copy of the other target state.
  144. """
  145. if other_target_state.info is None:
  146. return False
  147. if self.info is None:
  148. return False
  149. actor_options_match = (
  150. self.info.replica_config.ray_actor_options
  151. == other_target_state.info.replica_config.ray_actor_options
  152. )
  153. bundles_match = (
  154. self.info.replica_config.placement_group_bundles
  155. == other_target_state.info.replica_config.placement_group_bundles
  156. )
  157. strategy_match = (
  158. self.info.replica_config.placement_group_strategy
  159. == other_target_state.info.replica_config.placement_group_strategy
  160. )
  161. max_replicas_match = (
  162. self.info.replica_config.max_replicas_per_node
  163. == other_target_state.info.replica_config.max_replicas_per_node
  164. )
  165. deployment_config_match = self.info.deployment_config.dict(
  166. exclude={"num_replicas"}
  167. ) == other_target_state.info.deployment_config.dict(exclude={"num_replicas"})
  168. # Backward compatibility check for older versions of Ray without these fields.
  169. current_bundle_label_selector = getattr(
  170. self.info.replica_config, "placement_group_bundle_label_selector", None
  171. )
  172. other_bundle_label_selector = getattr(
  173. other_target_state.info.replica_config,
  174. "placement_group_bundle_label_selector",
  175. None,
  176. )
  177. bundle_label_selector_match = (
  178. current_bundle_label_selector == other_bundle_label_selector
  179. )
  180. current_fallback = getattr(
  181. self.info.replica_config, "placement_group_fallback_strategy", None
  182. )
  183. other_fallback = getattr(
  184. other_target_state.info.replica_config,
  185. "placement_group_fallback_strategy",
  186. None,
  187. )
  188. fallback_match = current_fallback == other_fallback
  189. # TODO(zcin): version can be None, this is from an outdated codepath.
  190. # We should remove outdated code, so version can never be None.
  191. version_match = (
  192. self.version is not None and self.version == other_target_state.version
  193. )
  194. return all(
  195. [
  196. actor_options_match,
  197. bundles_match,
  198. strategy_match,
  199. bundle_label_selector_match,
  200. fallback_match,
  201. max_replicas_match,
  202. deployment_config_match,
  203. version_match,
  204. ]
  205. )
  206. @dataclass
  207. class DeploymentStateUpdateResult:
  208. deleted: bool
  209. any_replicas_recovering: bool
  210. upscale: List[ReplicaSchedulingRequest]
  211. downscale: Optional[DeploymentDownscaleRequest]
  212. CHECKPOINT_KEY = "serve-deployment-state-checkpoint"
  213. SLOW_STARTUP_WARNING_S = int(
  214. os.environ.get(
  215. "RAY_SERVE_SLOW_STARTUP_WARNING_S",
  216. os.environ.get("SERVE_SLOW_STARTUP_WARNING_S", 30),
  217. )
  218. )
  219. SLOW_STARTUP_WARNING_PERIOD_S = int(
  220. os.environ.get(
  221. "RAY_SERVE_SLOW_STARTUP_WARNING_PERIOD_S",
  222. os.environ.get("SERVE_SLOW_STARTUP_WARNING_PERIOD_S", 30),
  223. )
  224. )
  225. ALL_REPLICA_STATES = list(ReplicaState)
  226. _SCALING_LOG_ENABLED = os.environ.get("SERVE_ENABLE_SCALING_LOG", "0") != "0"
  227. def print_verbose_scaling_log():
  228. assert _SCALING_LOG_ENABLED
  229. log_path = "/tmp/ray/session_latest/logs/monitor.log"
  230. last_n_lines = 50
  231. autoscaler_log_last_n_lines = []
  232. if os.path.exists(log_path):
  233. with open(log_path) as f:
  234. autoscaler_log_last_n_lines = f.readlines()[-last_n_lines:]
  235. debug_info = {
  236. "nodes": ray.nodes(),
  237. "available_resources": ray.available_resources(),
  238. "total_resources": ray.cluster_resources(),
  239. "autoscaler_logs": autoscaler_log_last_n_lines,
  240. }
  241. logger.error(f"Scaling information\n{json.dumps(debug_info, indent=2)}")
  242. class ActorReplicaWrapper:
  243. """Wraps a Ray actor for a deployment replica.
  244. This is primarily defined so that we can mock out actual Ray operations
  245. for unit testing.
  246. *All Ray API calls should be made here, not in DeploymentState.*
  247. """
  248. def __init__(
  249. self,
  250. replica_id: ReplicaID,
  251. version: DeploymentVersion,
  252. ):
  253. self._replica_id = replica_id
  254. self._deployment_id = replica_id.deployment_id
  255. self._actor_name = replica_id.to_full_id_str()
  256. # Populated in either self.start() or self.recover()
  257. self._allocated_obj_ref: ObjectRef = None
  258. self._ready_obj_ref: ObjectRef = None
  259. self._actor_resources: Dict[str, float] = None
  260. # If the replica is being started, this will be the true version
  261. # If the replica is being recovered, this will be the target
  262. # version, which may be inconsistent with the actual replica
  263. # version. If so, the actual version will be updated later after
  264. # recover() and check_ready()
  265. self._version: DeploymentVersion = version
  266. self._healthy: bool = True
  267. self._health_check_ref: Optional[ObjectRef] = None
  268. self._last_health_check_time: float = 0.0
  269. self._consecutive_health_check_failures = 0
  270. self._last_health_check_latency_ms: Optional[float] = None
  271. self._last_health_check_failed: Optional[bool] = None
  272. self._initialization_latency_s: Optional[float] = None
  273. self._reconfigure_start_time: Optional[float] = None
  274. self._internal_grpc_port: Optional[int] = None
  275. self._docs_path: Optional[str] = None
  276. self._route_patterns: Optional[List[str]] = None
  277. # Rank assigned to the replica.
  278. self._assign_rank_callback: Optional[Callable[[ReplicaID], ReplicaRank]] = None
  279. self._rank: Optional[ReplicaRank] = None
  280. # Populated in `on_scheduled` or `recover`.
  281. self._actor_handle: ActorHandle = None
  282. self._placement_group: PlacementGroup = None
  283. # Populated after replica is allocated.
  284. self._pid: int = None
  285. self._actor_id: str = None
  286. self._worker_id: str = None
  287. self._node_id: str = None
  288. self._node_ip: str = None
  289. self._node_instance_id: str = None
  290. self._log_file_path: str = None
  291. self._http_port: int = None
  292. self._grpc_port: int = None
  293. # Populated in self.stop().
  294. self._graceful_shutdown_ref: ObjectRef = None
  295. # todo: will be confused with deployment_config.is_cross_language
  296. self._is_cross_language = False
  297. self._deployment_is_cross_language = False
  298. self._routing_stats: Dict[str, Any] = {}
  299. self._record_routing_stats_ref: Optional[ObjectRef] = None
  300. self._last_record_routing_stats_time: float = 0.0
  301. self._ingress: bool = False
  302. # Outbound deployments polling state
  303. self._outbound_deployments: Optional[List[DeploymentID]] = None
  304. # Histogram to track routing stats delay from replica to controller
  305. self._routing_stats_delay_histogram = metrics.Histogram(
  306. "serve_routing_stats_delay_ms",
  307. description=(
  308. "The delay in milliseconds for routing stats to propagate "
  309. "from replica to controller."
  310. ),
  311. boundaries=DEFAULT_LATENCY_BUCKET_MS,
  312. tag_keys=("deployment", "replica", "application"),
  313. )
  314. self._routing_stats_delay_histogram.set_default_tags(
  315. {
  316. "deployment": self._deployment_id.name,
  317. "replica": self._replica_id.unique_id,
  318. "application": self._deployment_id.app_name,
  319. }
  320. )
  321. # Counter to track exceptions/timeouts when getting routing stats
  322. self._routing_stats_error_counter = metrics.Counter(
  323. "serve_routing_stats_error",
  324. description=(
  325. "The number of errors (exceptions or timeouts) when getting "
  326. "routing stats from replica."
  327. ),
  328. tag_keys=("deployment", "replica", "application", "error_type"),
  329. )
  330. self._routing_stats_error_counter.set_default_tags(
  331. {
  332. "deployment": self._deployment_id.name,
  333. "replica": self._replica_id.unique_id,
  334. "application": self._deployment_id.app_name,
  335. }
  336. )
  337. @property
  338. def replica_id(self) -> str:
  339. return self._replica_id
  340. @property
  341. def deployment_name(self) -> str:
  342. return self._deployment_id.name
  343. @property
  344. def rank(self) -> Optional[ReplicaRank]:
  345. return self._rank
  346. @property
  347. def app_name(self) -> str:
  348. return self._deployment_id.app_name
  349. @property
  350. def is_cross_language(self) -> bool:
  351. return self._is_cross_language
  352. @property
  353. def actor_handle(self) -> Optional[ActorHandle]:
  354. if not self._actor_handle:
  355. try:
  356. self._actor_handle = ray.get_actor(
  357. self._actor_name, namespace=SERVE_NAMESPACE
  358. )
  359. except ValueError:
  360. self._actor_handle = None
  361. if self._is_cross_language:
  362. assert isinstance(self._actor_handle, JavaActorHandleProxy)
  363. return self._actor_handle.handle
  364. return self._actor_handle
  365. @property
  366. def placement_group_bundles(self) -> Optional[List[Dict[str, float]]]:
  367. if not self._placement_group:
  368. return None
  369. return self._placement_group.bundle_specs
  370. @property
  371. def version(self) -> DeploymentVersion:
  372. """Replica version. This can be incorrect during state recovery.
  373. If the controller crashes and the deployment state is being
  374. recovered, this will temporarily be the deployment-wide target
  375. version, which may be inconsistent with the actual version
  376. running on the replica actor. If so, the actual version will be
  377. updated when the replica transitions from RECOVERING -> RUNNING
  378. """
  379. return self._version
  380. @property
  381. def deployment_config(self) -> DeploymentConfig:
  382. """Deployment config. This can return an incorrect config during state recovery.
  383. If the controller hasn't yet recovered the up-to-date version
  384. from the running replica actor, this property will return the
  385. current target config for the deployment.
  386. """
  387. return self._version.deployment_config
  388. @property
  389. def docs_path(self) -> Optional[str]:
  390. return self._docs_path
  391. @property
  392. def route_patterns(self) -> Optional[List[str]]:
  393. return self._route_patterns
  394. @property
  395. def max_ongoing_requests(self) -> int:
  396. return self.deployment_config.max_ongoing_requests
  397. @property
  398. def max_queued_requests(self) -> int:
  399. return self.deployment_config.max_queued_requests
  400. @property
  401. def graceful_shutdown_timeout_s(self) -> float:
  402. return self.deployment_config.graceful_shutdown_timeout_s
  403. @property
  404. def health_check_period_s(self) -> float:
  405. return self.deployment_config.health_check_period_s
  406. @property
  407. def health_check_timeout_s(self) -> float:
  408. return self.deployment_config.health_check_timeout_s
  409. @property
  410. def http_port(self) -> Optional[int]:
  411. return self._http_port
  412. @property
  413. def grpc_port(self) -> Optional[int]:
  414. return self._grpc_port
  415. @property
  416. def request_routing_stats_period_s(self) -> float:
  417. return (
  418. self.deployment_config.request_router_config.request_routing_stats_period_s
  419. )
  420. @property
  421. def request_routing_stats_timeout_s(self) -> float:
  422. return (
  423. self.deployment_config.request_router_config.request_routing_stats_timeout_s
  424. )
  425. @property
  426. def pid(self) -> Optional[int]:
  427. """Returns the pid of the actor, None if not started."""
  428. return self._pid
  429. @property
  430. def actor_id(self) -> Optional[str]:
  431. """Returns the actor id, None if not started."""
  432. return self._actor_id
  433. @property
  434. def worker_id(self) -> Optional[str]:
  435. """Returns the worker id, None if not started."""
  436. return self._worker_id
  437. @property
  438. def node_id(self) -> Optional[str]:
  439. """Returns the node id of the actor, None if not placed."""
  440. return self._node_id
  441. @property
  442. def node_ip(self) -> Optional[str]:
  443. """Returns the node ip of the actor, None if not placed."""
  444. return self._node_ip
  445. @property
  446. def node_instance_id(self) -> Optional[str]:
  447. """Returns the node instance id of the actor, None if not placed."""
  448. return self._node_instance_id
  449. @property
  450. def log_file_path(self) -> Optional[str]:
  451. """Returns the relative log file path of the actor, None if not placed."""
  452. return self._log_file_path
  453. @property
  454. def initialization_latency_s(self) -> Optional[float]:
  455. """Returns the initialization latency for the replica actor.
  456. Returns None if the replica hasn't started yet.
  457. Note: this value isn't checkpointed, so if the controller restarts,
  458. this value goes back to None.
  459. """
  460. return self._initialization_latency_s
  461. @property
  462. def reconfigure_start_time(self) -> Optional[float]:
  463. """Returns the start time of the last reconfigure operation.
  464. Returns None if no reconfigure operation has started.
  465. """
  466. return self._reconfigure_start_time
  467. @property
  468. def last_health_check_latency_ms(self) -> Optional[float]:
  469. """Returns the latency of the last completed health check in milliseconds.
  470. Returns None if no health check has completed in the current check cycle.
  471. """
  472. return self._last_health_check_latency_ms
  473. @property
  474. def last_health_check_failed(self) -> Optional[bool]:
  475. """Returns whether the last completed health check failed.
  476. Returns False if no health check has completed in the current check cycle.
  477. """
  478. return self._last_health_check_failed
  479. def start(
  480. self,
  481. deployment_info: DeploymentInfo,
  482. assign_rank_callback: Callable[[ReplicaID], ReplicaRank],
  483. ) -> ReplicaSchedulingRequest:
  484. """Start the current DeploymentReplica instance.
  485. The replica will be in the STARTING and PENDING_ALLOCATION states
  486. until the deployment scheduler schedules the underlying actor.
  487. """
  488. self._assign_rank_callback = assign_rank_callback
  489. self._actor_resources = deployment_info.replica_config.resource_dict
  490. self._ingress = deployment_info.ingress
  491. # it is currently not possible to create a placement group
  492. # with no resources (https://github.com/ray-project/ray/issues/20401)
  493. self._deployment_is_cross_language = (
  494. deployment_info.deployment_config.is_cross_language
  495. )
  496. logger.info(
  497. f"Starting {self.replica_id}.",
  498. extra={"log_to_stderr": False},
  499. )
  500. actor_def = deployment_info.actor_def
  501. if (
  502. deployment_info.deployment_config.deployment_language
  503. == DeploymentLanguage.PYTHON
  504. ):
  505. if deployment_info.replica_config.serialized_init_args is None:
  506. serialized_init_args = cloudpickle.dumps(())
  507. else:
  508. serialized_init_args = (
  509. cloudpickle.dumps(
  510. msgpack_deserialize(
  511. deployment_info.replica_config.serialized_init_args
  512. )
  513. )
  514. if self._deployment_is_cross_language
  515. else deployment_info.replica_config.serialized_init_args
  516. )
  517. init_args = (
  518. self.replica_id,
  519. cloudpickle.dumps(deployment_info.replica_config.deployment_def)
  520. if self._deployment_is_cross_language
  521. else deployment_info.replica_config.serialized_deployment_def,
  522. serialized_init_args,
  523. deployment_info.replica_config.serialized_init_kwargs
  524. if deployment_info.replica_config.serialized_init_kwargs
  525. else cloudpickle.dumps({}),
  526. deployment_info.deployment_config.to_proto_bytes(),
  527. self._version,
  528. deployment_info.ingress,
  529. deployment_info.route_prefix,
  530. )
  531. # TODO(simon): unify the constructor arguments across language
  532. elif (
  533. deployment_info.deployment_config.deployment_language
  534. == DeploymentLanguage.JAVA
  535. ):
  536. self._is_cross_language = True
  537. actor_def = ray.cross_language.java_actor_class(
  538. "io.ray.serve.replica.RayServeWrappedReplica"
  539. )
  540. init_args = (
  541. # String deploymentName,
  542. self.deployment_name,
  543. # String replicaID,
  544. self.replica_id.to_full_id_str(),
  545. # String deploymentDef
  546. deployment_info.replica_config.deployment_def_name,
  547. # byte[] initArgsbytes
  548. msgpack_serialize(
  549. cloudpickle.loads(
  550. deployment_info.replica_config.serialized_init_args
  551. )
  552. )
  553. if self._deployment_is_cross_language
  554. else deployment_info.replica_config.serialized_init_args,
  555. # byte[] deploymentConfigBytes,
  556. deployment_info.deployment_config.to_proto_bytes(),
  557. # byte[] deploymentVersionBytes,
  558. self._version.to_proto().SerializeToString(),
  559. # String controllerName
  560. # String appName
  561. self.app_name,
  562. )
  563. actor_options = {
  564. "name": self._actor_name,
  565. "namespace": SERVE_NAMESPACE,
  566. "lifetime": "detached",
  567. "enable_task_events": RAY_SERVE_ENABLE_TASK_EVENTS,
  568. }
  569. actor_options.update(deployment_info.replica_config.ray_actor_options)
  570. # A replica's default `max_concurrency` value can prevent it from
  571. # respecting the configured `max_ongoing_requests`. To avoid this
  572. # unintentional behavior, use `max_ongoing_requests` to override
  573. # the Actor's `max_concurrency` if it is larger.
  574. if (
  575. deployment_info.deployment_config.max_ongoing_requests
  576. > ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC
  577. ):
  578. actor_options[
  579. "max_concurrency"
  580. ] = deployment_info.deployment_config.max_ongoing_requests
  581. return ReplicaSchedulingRequest(
  582. replica_id=self.replica_id,
  583. actor_def=actor_def,
  584. actor_resources=self._actor_resources,
  585. actor_options=actor_options,
  586. actor_init_args=init_args,
  587. placement_group_bundles=(
  588. deployment_info.replica_config.placement_group_bundles
  589. ),
  590. placement_group_strategy=(
  591. deployment_info.replica_config.placement_group_strategy
  592. ),
  593. placement_group_bundle_label_selector=(
  594. deployment_info.replica_config.placement_group_bundle_label_selector
  595. ),
  596. placement_group_fallback_strategy=(
  597. deployment_info.replica_config.placement_group_fallback_strategy
  598. ),
  599. max_replicas_per_node=(
  600. deployment_info.replica_config.max_replicas_per_node
  601. ),
  602. on_scheduled=self.on_scheduled,
  603. )
  604. def on_scheduled(
  605. self,
  606. actor_handle: ActorHandle,
  607. placement_group: Optional[PlacementGroup] = None,
  608. ):
  609. self._actor_handle = actor_handle
  610. self._placement_group = placement_group
  611. if self._is_cross_language:
  612. self._actor_handle = JavaActorHandleProxy(self._actor_handle)
  613. self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
  614. else:
  615. self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
  616. def _format_user_config(self, user_config: Any):
  617. temp = copy(user_config)
  618. if user_config is not None and self._deployment_is_cross_language:
  619. if self._is_cross_language:
  620. temp = msgpack_serialize(temp)
  621. else:
  622. temp = msgpack_deserialize(temp)
  623. return temp
  624. def reconfigure(self, version: DeploymentVersion, rank: ReplicaRank) -> bool:
  625. """
  626. Update replica version. Also, updates the deployment config on the actor
  627. behind this DeploymentReplica instance if necessary.
  628. Returns: whether the actor is being updated.
  629. """
  630. updating = False
  631. # Determine if we need heavyweight reconfiguration
  632. # vs lightweight updates
  633. needs_actor_reconfigure = self._version.requires_actor_reconfigure(version)
  634. has_rank_changes = self._rank != rank
  635. if needs_actor_reconfigure or has_rank_changes:
  636. # Call into replica actor reconfigure() with updated user config and
  637. # graceful_shutdown_wait_loop_s
  638. # Setting updating=True because we want to transition to UPDATING state
  639. # when rank is updated or deployment config changes.
  640. updating = True
  641. self._reconfigure_start_time = time.time()
  642. deployment_config = copy(version.deployment_config)
  643. deployment_config.user_config = self._format_user_config(
  644. deployment_config.user_config
  645. )
  646. self._ready_obj_ref = self._actor_handle.reconfigure.remote(
  647. deployment_config,
  648. rank,
  649. version.route_prefix,
  650. )
  651. self._version = version
  652. self._rank = rank
  653. return updating
  654. def recover(self, ingress: bool = False) -> bool:
  655. """Recover replica version from a live replica actor.
  656. When controller dies, the deployment state loses the info on the version that's
  657. running on each individual replica actor, so as part of the recovery process, we
  658. need to recover the version that is running on the replica actor.
  659. Also confirm that actor is allocated and initialized before marking as running.
  660. Args:
  661. ingress: Whether this replica is an ingress replica.
  662. Returns:
  663. False if the replica actor is no longer alive; the
  664. actor could have been killed in the time between when the
  665. controller fetching all Serve actors in the cluster and when
  666. the controller tries to recover it. Otherwise, return True.
  667. """
  668. logger.info(f"Recovering {self.replica_id}.")
  669. self._ingress = ingress
  670. try:
  671. self._actor_handle = ray.get_actor(
  672. self._actor_name, namespace=SERVE_NAMESPACE
  673. )
  674. except ValueError:
  675. logger.warning(
  676. f"Failed to get handle to replica {self._actor_name} "
  677. "during controller recovery. Marking as dead."
  678. )
  679. return False
  680. try:
  681. self._placement_group = ray.util.get_placement_group(
  682. self._actor_name,
  683. )
  684. except ValueError:
  685. # ValueError is raised if the placement group does not exist.
  686. self._placement_group = None
  687. # Re-fetch initialization proof
  688. self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
  689. # Running actor handle already has all info needed, thus successful
  690. # starting simply means retrieving replica version hash from actor
  691. if self._is_cross_language:
  692. self._ready_obj_ref = self._actor_handle.check_health.remote()
  693. else:
  694. self._ready_obj_ref = (
  695. self._actor_handle.initialize_and_get_metadata.remote()
  696. )
  697. return True
  698. def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
  699. """
  700. Check if current replica has started by making ray API calls on
  701. relevant actor / object ref.
  702. Replica initialization calls __init__(), reconfigure(), and check_health().
  703. Returns:
  704. state (ReplicaStartupStatus):
  705. PENDING_ALLOCATION: replica is waiting for a worker to start
  706. PENDING_INITIALIZATION: replica initialization hasn't finished.
  707. FAILED: replica initialization failed.
  708. SUCCEEDED: replica initialization succeeded.
  709. error_msg:
  710. None: for PENDING_ALLOCATION, PENDING_INITIALIZATION or SUCCEEDED states
  711. str: for FAILED state
  712. """
  713. # Check whether the replica has been allocated.
  714. if self._allocated_obj_ref is None or not check_obj_ref_ready_nowait(
  715. self._allocated_obj_ref
  716. ):
  717. return ReplicaStartupStatus.PENDING_ALLOCATION, None
  718. if not self._is_cross_language:
  719. try:
  720. (
  721. self._pid,
  722. self._actor_id,
  723. self._worker_id,
  724. self._node_id,
  725. self._node_ip,
  726. self._node_instance_id,
  727. self._log_file_path,
  728. ) = ray.get(self._allocated_obj_ref)
  729. except RayTaskError as e:
  730. logger.exception(
  731. f"Exception in {self._replica_id}, the replica will be stopped."
  732. )
  733. return ReplicaStartupStatus.FAILED, str(e.as_instanceof_cause())
  734. except RuntimeEnvSetupError as e:
  735. msg = f"Exception when allocating {self._replica_id}: {str(e)}"
  736. logger.exception(msg)
  737. return ReplicaStartupStatus.FAILED, msg
  738. except Exception:
  739. msg = (
  740. f"Exception when allocating {self._replica_id}:\n"
  741. + traceback.format_exc()
  742. )
  743. logger.exception(msg)
  744. return ReplicaStartupStatus.FAILED, msg
  745. if self._ready_obj_ref is None:
  746. # Perform auto method name translation for java handles.
  747. # See https://github.com/ray-project/ray/issues/21474
  748. deployment_config = copy(self._version.deployment_config)
  749. deployment_config.user_config = self._format_user_config(
  750. deployment_config.user_config
  751. )
  752. if self._is_cross_language:
  753. self._ready_obj_ref = self._actor_handle.is_initialized.remote(
  754. deployment_config.to_proto_bytes()
  755. )
  756. else:
  757. replica_ready_check_func = (
  758. self._actor_handle.initialize_and_get_metadata
  759. )
  760. # this guarantees that node_id is set before rank is assigned
  761. self._rank = self._assign_rank_callback(
  762. self._replica_id.unique_id, self._node_id
  763. )
  764. self._ready_obj_ref = replica_ready_check_func.remote(
  765. deployment_config, self._rank
  766. )
  767. return ReplicaStartupStatus.PENDING_INITIALIZATION, None
  768. # Check whether replica initialization has completed.
  769. replica_ready = check_obj_ref_ready_nowait(self._ready_obj_ref)
  770. # In case of deployment constructor failure, ray.get will help to
  771. # surface exception to each update() cycle.
  772. if not replica_ready:
  773. return ReplicaStartupStatus.PENDING_INITIALIZATION, None
  774. else:
  775. try:
  776. # TODO(simon): fully implement reconfigure for Java replicas.
  777. if self._is_cross_language:
  778. return ReplicaStartupStatus.SUCCEEDED, None
  779. # todo: The replica's userconfig whitch java client created
  780. # is different from the controller's userconfig
  781. if not self._deployment_is_cross_language:
  782. # This should only update version if the replica is being recovered.
  783. # If this is checking on a replica that is newly started, this
  784. # should return a version that is identical to what's already stored
  785. (
  786. _,
  787. self._version,
  788. self._initialization_latency_s,
  789. self._internal_grpc_port,
  790. self._docs_path,
  791. self._http_port,
  792. self._grpc_port,
  793. self._rank,
  794. self._route_patterns,
  795. self._outbound_deployments,
  796. ) = ray.get(self._ready_obj_ref)
  797. except RayTaskError as e:
  798. logger.exception(
  799. f"Exception in {self._replica_id}, the replica will be stopped."
  800. )
  801. # NOTE(zcin): we should use str(e) instead of traceback.format_exc()
  802. # here because the full details of the error is not displayed properly
  803. # with traceback.format_exc().
  804. return ReplicaStartupStatus.FAILED, str(e.as_instanceof_cause())
  805. except Exception as e:
  806. logger.exception(
  807. f"Exception in {self._replica_id}, the replica will be stopped."
  808. )
  809. return ReplicaStartupStatus.FAILED, repr(e)
  810. return ReplicaStartupStatus.SUCCEEDED, None
  811. @property
  812. def actor_resources(self) -> Optional[Dict[str, float]]:
  813. return self._actor_resources
  814. @property
  815. def available_resources(self) -> Dict[str, float]:
  816. return ray.available_resources()
  817. def graceful_stop(self) -> Duration:
  818. """Request the actor to exit gracefully.
  819. Returns the timeout after which to kill the actor.
  820. """
  821. try:
  822. handle = ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE)
  823. if self._is_cross_language:
  824. handle = JavaActorHandleProxy(handle)
  825. self._graceful_shutdown_ref = handle.perform_graceful_shutdown.remote()
  826. except ValueError:
  827. # ValueError thrown from ray.get_actor means actor has already been deleted.
  828. pass
  829. return self.graceful_shutdown_timeout_s
  830. def check_stopped(self) -> bool:
  831. """Check if the actor has exited."""
  832. try:
  833. handle = ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE)
  834. stopped = check_obj_ref_ready_nowait(self._graceful_shutdown_ref)
  835. if stopped:
  836. try:
  837. ray.get(self._graceful_shutdown_ref)
  838. except Exception:
  839. logger.exception(
  840. "Exception when trying to gracefully shutdown replica:\n"
  841. + traceback.format_exc()
  842. )
  843. ray.kill(handle, no_restart=True)
  844. except ValueError:
  845. # ValueError thrown from ray.get_actor means actor has already been deleted.
  846. stopped = True
  847. finally:
  848. # Remove the placement group both if the actor has already been deleted or
  849. # it was just killed above.
  850. if stopped and self._placement_group is not None:
  851. ray.util.remove_placement_group(self._placement_group)
  852. return stopped
  853. def _check_active_health_check(self) -> ReplicaHealthCheckResponse:
  854. """Check the active health check (if any).
  855. self._health_check_ref will be reset to `None` when the active health
  856. check is deemed to have succeeded or failed. This method *does not*
  857. start a new health check, that's up to the caller.
  858. Returns:
  859. - NONE if there's no active health check, or it hasn't returned
  860. yet and the timeout is not up.
  861. - SUCCEEDED if the active health check succeeded.
  862. - APP_FAILURE if the active health check failed (or didn't return
  863. before the timeout).
  864. - ACTOR_CRASHED if the underlying actor crashed.
  865. """
  866. # Reset the last health check status for this check cycle.
  867. # We do this because _check_active_health_check is being called in a loop,
  868. # and we want to avoid accumulating latency and failure metrics over multiple
  869. # check cycles.
  870. self._last_health_check_latency_ms = None
  871. self._last_health_check_failed = None
  872. if self._health_check_ref is None:
  873. # There is no outstanding health check.
  874. response = ReplicaHealthCheckResponse.NONE
  875. elif check_obj_ref_ready_nowait(self._health_check_ref):
  876. # Object ref is ready, ray.get it to check for exceptions.
  877. try:
  878. ray.get(self._health_check_ref)
  879. # Calculate health check latency.
  880. self._last_health_check_latency_ms = (
  881. time.time() - self._last_health_check_time
  882. ) * 1000
  883. self._last_health_check_failed = False
  884. # Health check succeeded without exception.
  885. response = ReplicaHealthCheckResponse.SUCCEEDED
  886. except RayActorError:
  887. # Health check failed due to actor crashing.
  888. response = ReplicaHealthCheckResponse.ACTOR_CRASHED
  889. self._last_health_check_failed = True
  890. except RayError as e:
  891. # Health check failed due to application-level exception.
  892. logger.warning(f"Health check for {self._replica_id} failed: {e}")
  893. response = ReplicaHealthCheckResponse.APP_FAILURE
  894. self._last_health_check_failed = True
  895. elif time.time() - self._last_health_check_time > self.health_check_timeout_s:
  896. # Health check hasn't returned and the timeout is up, consider it failed.
  897. logger.warning(
  898. "Didn't receive health check response for replica "
  899. f"{self._replica_id} after "
  900. f"{self.health_check_timeout_s}s, marking it unhealthy."
  901. )
  902. response = ReplicaHealthCheckResponse.APP_FAILURE
  903. # Calculate latency for timeout case.
  904. self._last_health_check_latency_ms = (
  905. time.time() - self._last_health_check_time
  906. ) * 1000
  907. self._last_health_check_failed = True
  908. else:
  909. # Health check hasn't returned and the timeout isn't up yet.
  910. response = ReplicaHealthCheckResponse.NONE
  911. if response is not ReplicaHealthCheckResponse.NONE:
  912. self._health_check_ref = None
  913. return response
  914. def _should_start_new_health_check(self) -> bool:
  915. """Determines if a new health check should be kicked off.
  916. A health check will be started if:
  917. 1) There is not already an active health check.
  918. 2) It has been more than health_check_period_s since the
  919. previous health check was *started*.
  920. This assumes that self._health_check_ref is reset to `None` when an
  921. active health check succeeds or fails (due to returning or timeout).
  922. """
  923. if self._health_check_ref is not None:
  924. # There's already an active health check.
  925. return False
  926. # If there's no active health check, kick off another and reset
  927. # the timer if it's been long enough since the last health
  928. # check. Add some randomness to avoid synchronizing across all
  929. # replicas.
  930. time_since_last = time.time() - self._last_health_check_time
  931. randomized_period = self.health_check_period_s * random.uniform(0.9, 1.1)
  932. return time_since_last > randomized_period
  933. def _should_record_routing_stats(self) -> bool:
  934. """Determines if a new record routing stats should be kicked off.
  935. A record routing stats will be started if:
  936. 1) There is not already an active record routing stats.
  937. 2) It has been more than request_routing_stats_period_s since
  938. the previous record routing stats was *started*.
  939. This assumes that self._record_routing_stats_ref is reset to `None`
  940. when an active record routing stats succeeds or fails (due to
  941. returning or timeout).
  942. """
  943. if self._record_routing_stats_ref is not None:
  944. # There's already an active record routing stats.
  945. return False
  946. # If there's no active record routing stats, kick off another and
  947. # reset the timer if it's been long enough since the last record
  948. # routing stats. Add some randomness to avoid synchronizing across
  949. # all replicas.
  950. time_since_last = time.time() - self._last_record_routing_stats_time
  951. randomized_period = self.request_routing_stats_period_s * random.uniform(
  952. 0.9, 1.1
  953. )
  954. return time_since_last > randomized_period
  955. def check_health(self) -> bool:
  956. """Check if the actor is healthy.
  957. self._healthy should *only* be modified in this method.
  958. This is responsible for:
  959. 1) Checking the outstanding health check (if any).
  960. 2) Determining the replica health based on the health check results.
  961. 3) Kicking off a new health check if needed.
  962. """
  963. response: ReplicaHealthCheckResponse = self._check_active_health_check()
  964. if response is ReplicaHealthCheckResponse.NONE:
  965. # No info; don't update replica health.
  966. pass
  967. elif response is ReplicaHealthCheckResponse.SUCCEEDED:
  968. # Health check succeeded. Reset the consecutive failure counter
  969. # and mark the replica healthy.
  970. if self._consecutive_health_check_failures > 0:
  971. logger.info(
  972. f"{self._replica_id} passed the health check after "
  973. f"{self._consecutive_health_check_failures} consecutive failures."
  974. )
  975. self._consecutive_health_check_failures = 0
  976. self._healthy = True
  977. elif response is ReplicaHealthCheckResponse.APP_FAILURE:
  978. # Health check failed. If it has failed more than N times in a row,
  979. # mark the replica unhealthy.
  980. self._consecutive_health_check_failures += 1
  981. if (
  982. self._consecutive_health_check_failures
  983. >= REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD
  984. ):
  985. logger.warning(
  986. f"Replica {self._replica_id} failed the health "
  987. f"check {self._consecutive_health_check_failures} "
  988. "times in a row, marking it unhealthy."
  989. )
  990. self._healthy = False
  991. elif response is ReplicaHealthCheckResponse.ACTOR_CRASHED:
  992. # Actor crashed, mark the replica unhealthy immediately.
  993. logger.warning(
  994. f"Actor for {self._replica_id} crashed, marking "
  995. "it unhealthy immediately."
  996. )
  997. self._healthy = False
  998. else:
  999. assert False, f"Unknown response type: {response}."
  1000. if self._should_start_new_health_check():
  1001. self._last_health_check_time = time.time()
  1002. self._health_check_ref = self._actor_handle.check_health.remote()
  1003. return self._healthy
  1004. def get_routing_stats(self) -> Dict[str, Any]:
  1005. """Get the routing stats for the replica."""
  1006. if self._record_routing_stats_ref is None:
  1007. # There's no active record routing stats.
  1008. pass
  1009. elif check_obj_ref_ready_nowait(self._record_routing_stats_ref):
  1010. # Object ref is ready, ray.get it to check for exceptions.
  1011. try:
  1012. self._routing_stats = ray.get(self._record_routing_stats_ref)
  1013. # Record the round-trip delay for routing stats
  1014. delay_ms = (time.time() - self._last_record_routing_stats_time) * 1000
  1015. self._routing_stats_delay_histogram.observe(delay_ms)
  1016. except Exception:
  1017. logger.exception(
  1018. "Exception when trying to get routing stats:\n"
  1019. + traceback.format_exc()
  1020. )
  1021. self._routing_stats_error_counter.inc(tags={"error_type": "exception"})
  1022. self._record_routing_stats_ref = None
  1023. elif (
  1024. time.time() - self._last_record_routing_stats_time
  1025. > self.request_routing_stats_timeout_s
  1026. ):
  1027. # Record routing stats hasn't returned and the timeout is up, retrying.
  1028. logger.warning(
  1029. "Didn't receive routing stats response for replica "
  1030. f"{self._replica_id} after "
  1031. f"{self.request_routing_stats_timeout_s}s, retrying."
  1032. )
  1033. self._routing_stats_error_counter.inc(tags={"error_type": "timeout"})
  1034. self._record_routing_stats_ref = None
  1035. if self._should_record_routing_stats():
  1036. self._last_record_routing_stats_time = time.time()
  1037. self._record_routing_stats_ref = (
  1038. self._actor_handle.record_routing_stats.remote()
  1039. )
  1040. return self._routing_stats
  1041. def force_stop(self):
  1042. """Force the actor to exit without shutting down gracefully."""
  1043. try:
  1044. ray.kill(ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE))
  1045. except ValueError:
  1046. pass
  1047. def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
  1048. return self._outbound_deployments
  1049. class DeploymentReplica:
  1050. """Manages state transitions for deployment replicas.
  1051. This is basically a checkpointable lightweight state machine.
  1052. """
  1053. def __init__(
  1054. self,
  1055. replica_id: ReplicaID,
  1056. version: DeploymentVersion,
  1057. ):
  1058. self._replica_id = replica_id
  1059. self._actor = ActorReplicaWrapper(replica_id, version)
  1060. self._start_time = None
  1061. self._shutdown_start_time: Optional[float] = None
  1062. self._actor_details = ReplicaDetails(
  1063. actor_name=replica_id.to_full_id_str(),
  1064. replica_id=self._replica_id.unique_id,
  1065. state=ReplicaState.STARTING,
  1066. start_time_s=0,
  1067. )
  1068. self._multiplexed_model_ids: List[str] = []
  1069. self._routing_stats: Dict[str, Any] = {}
  1070. def get_running_replica_info(
  1071. self, cluster_node_info_cache: ClusterNodeInfoCache
  1072. ) -> RunningReplicaInfo:
  1073. return RunningReplicaInfo(
  1074. replica_id=self._replica_id,
  1075. node_id=self.actor_node_id,
  1076. node_ip=self._actor.node_ip,
  1077. availability_zone=cluster_node_info_cache.get_node_az(self.actor_node_id),
  1078. actor_name=self._actor._actor_name,
  1079. max_ongoing_requests=self._actor.max_ongoing_requests,
  1080. is_cross_language=self._actor.is_cross_language,
  1081. multiplexed_model_ids=self.multiplexed_model_ids,
  1082. routing_stats=self.routing_stats,
  1083. port=self._actor._internal_grpc_port,
  1084. )
  1085. def record_multiplexed_model_ids(self, multiplexed_model_ids: List[str]):
  1086. """Record the multiplexed model ids for this replica."""
  1087. self._multiplexed_model_ids = multiplexed_model_ids
  1088. def record_routing_stats(self, routing_stats: Optional[Dict[str, Any]]):
  1089. """Record the routing stats for this replica.
  1090. Recording routing_stats as an empty dictionary is valid. But skip
  1091. update if the routing_stats is None.
  1092. """
  1093. if routing_stats is not None:
  1094. self._routing_stats = routing_stats
  1095. @property
  1096. def multiplexed_model_ids(self) -> List[str]:
  1097. return self._multiplexed_model_ids
  1098. @property
  1099. def routing_stats(self) -> Dict[str, Any]:
  1100. return self._routing_stats
  1101. @property
  1102. def actor_details(self) -> ReplicaDetails:
  1103. return self._actor_details
  1104. @property
  1105. def replica_id(self) -> ReplicaID:
  1106. return self._replica_id
  1107. @property
  1108. def deployment_name(self) -> str:
  1109. return self._replica_id.deployment_id.name
  1110. @property
  1111. def app_name(self) -> str:
  1112. return self._replica_id.deployment_id.app_name
  1113. @property
  1114. def version(self):
  1115. return self._actor.version
  1116. @property
  1117. def docs_path(self) -> Optional[str]:
  1118. return self._actor.docs_path
  1119. @property
  1120. def route_patterns(self) -> Optional[List[str]]:
  1121. return self._actor.route_patterns
  1122. @property
  1123. def actor_id(self) -> str:
  1124. return self._actor.actor_id
  1125. @property
  1126. def actor_handle(self) -> ActorHandle:
  1127. return self._actor.actor_handle
  1128. @property
  1129. def actor_node_id(self) -> Optional[str]:
  1130. """Returns the node id of the actor, None if not placed."""
  1131. return self._actor.node_id
  1132. @property
  1133. def actor_http_port(self) -> Optional[int]:
  1134. return self._actor.http_port
  1135. @property
  1136. def actor_grpc_port(self) -> Optional[int]:
  1137. return self._actor.grpc_port
  1138. @property
  1139. def actor_pid(self) -> Optional[int]:
  1140. """Returns the node id of the actor, None if not placed."""
  1141. return self._actor.pid
  1142. @property
  1143. def initialization_latency_s(self) -> Optional[float]:
  1144. """Returns how long the replica took to initialize."""
  1145. return self._actor.initialization_latency_s
  1146. @property
  1147. def reconfigure_start_time(self) -> Optional[float]:
  1148. """Returns the start time of the last reconfigure operation."""
  1149. return self._actor.reconfigure_start_time
  1150. @property
  1151. def last_health_check_latency_ms(self) -> Optional[float]:
  1152. """Returns the latency of the last completed health check in milliseconds."""
  1153. return self._actor.last_health_check_latency_ms
  1154. @property
  1155. def last_health_check_failed(self) -> Optional[bool]:
  1156. """Returns whether the last completed health check failed."""
  1157. return self._actor.last_health_check_failed
  1158. @property
  1159. def shutdown_start_time(self) -> Optional[float]:
  1160. """Returns the start time of the shutdown operation."""
  1161. return self._shutdown_start_time
  1162. def start(
  1163. self,
  1164. deployment_info: DeploymentInfo,
  1165. assign_rank_callback: Callable[[ReplicaID], ReplicaRank],
  1166. ) -> ReplicaSchedulingRequest:
  1167. """
  1168. Start a new actor for current DeploymentReplica instance.
  1169. """
  1170. replica_scheduling_request = self._actor.start(
  1171. deployment_info, assign_rank_callback=assign_rank_callback
  1172. )
  1173. self._start_time = time.time()
  1174. self.update_actor_details(start_time_s=self._start_time)
  1175. return replica_scheduling_request
  1176. def reconfigure(
  1177. self,
  1178. version: DeploymentVersion,
  1179. rank: ReplicaRank,
  1180. ) -> bool:
  1181. """
  1182. Update replica version. Also, updates the deployment config on the actor
  1183. behind this DeploymentReplica instance if necessary.
  1184. Returns: whether the actor is being updated.
  1185. """
  1186. return self._actor.reconfigure(version, rank=rank)
  1187. def recover(self, deployment_info: DeploymentInfo) -> bool:
  1188. """
  1189. Recover states in DeploymentReplica instance by fetching running actor
  1190. status
  1191. Args:
  1192. deployment_info: The deployment info for this replica.
  1193. Returns:
  1194. True if the replica actor is alive and recovered successfully.
  1195. False if the replica actor is no longer alive.
  1196. """
  1197. # If replica is no longer alive
  1198. if not self._actor.recover(ingress=deployment_info.ingress):
  1199. return False
  1200. self._start_time = time.time()
  1201. self.update_actor_details(start_time_s=self._start_time)
  1202. return True
  1203. @property
  1204. def rank(self) -> Optional[ReplicaRank]:
  1205. """Get the rank assigned to the replica."""
  1206. return self._actor.rank
  1207. def check_started(
  1208. self,
  1209. ) -> Tuple[ReplicaStartupStatus, Optional[str], Optional[float]]:
  1210. """Check if the replica has started. If so, transition to RUNNING.
  1211. Should handle the case where the replica has already stopped.
  1212. Returns:
  1213. status: Most recent state of replica by
  1214. querying actor obj ref
  1215. """
  1216. is_ready = self._actor.check_ready()
  1217. self.update_actor_details(
  1218. pid=self._actor.pid,
  1219. node_id=self._actor.node_id,
  1220. node_ip=self._actor.node_ip,
  1221. node_instance_id=self._actor.node_instance_id,
  1222. actor_id=self._actor.actor_id,
  1223. worker_id=self._actor.worker_id,
  1224. log_file_path=self._actor.log_file_path,
  1225. )
  1226. return is_ready
  1227. def stop(self, graceful: bool = True) -> None:
  1228. """Stop the replica.
  1229. Should handle the case where the replica is already stopped.
  1230. """
  1231. state = self._actor_details.state
  1232. logger.info(
  1233. f"Stopping {self.replica_id} (currently {state}).",
  1234. extra={"log_to_stderr": False},
  1235. )
  1236. self._shutdown_start_time = time.time()
  1237. timeout_s = self._actor.graceful_stop()
  1238. if not graceful:
  1239. timeout_s = 0
  1240. elif self._actor._ingress and RAY_SERVE_ENABLE_DIRECT_INGRESS:
  1241. # In direct ingress mode, ensure we wait at least
  1242. # RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S to give external
  1243. # load balancers (e.g., ALB) time to deregister the replica.
  1244. timeout_s = max(timeout_s, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S)
  1245. self._shutdown_deadline = time.time() + timeout_s
  1246. def check_stopped(self) -> bool:
  1247. """Check if the replica has finished stopping."""
  1248. if self._actor.check_stopped():
  1249. return True
  1250. timeout_passed = time.time() >= self._shutdown_deadline
  1251. if timeout_passed:
  1252. logger.info(
  1253. f"{self.replica_id} did not shut down after grace "
  1254. "period, force-killing it."
  1255. )
  1256. self._actor.force_stop()
  1257. return False
  1258. def check_health(self) -> bool:
  1259. """Check if the replica is healthy.
  1260. Returns `True` if the replica is healthy, else `False`.
  1261. """
  1262. return self._actor.check_health()
  1263. def pull_routing_stats(self) -> Optional[Dict[str, Any]]:
  1264. """Get the latest response from the routing stats on the replica.
  1265. Returns None if the replica is still calculating the stats.
  1266. """
  1267. return self._actor.get_routing_stats()
  1268. def update_state(self, state: ReplicaState) -> None:
  1269. """Updates state in actor details."""
  1270. self.update_actor_details(state=state)
  1271. def update_actor_details(self, **kwargs) -> None:
  1272. details_kwargs = self._actor_details.dict()
  1273. details_kwargs.update(kwargs)
  1274. self._actor_details = ReplicaDetails(**details_kwargs)
  1275. def resource_requirements(self) -> Tuple[str, str]:
  1276. """Returns required and currently available resources.
  1277. Only resources with nonzero requirements will be included in the
  1278. required dict and only resources in the required dict will be
  1279. included in the available dict (filtered for relevance).
  1280. """
  1281. if self._actor.actor_resources is None:
  1282. return "UNKNOWN", "UNKNOWN"
  1283. if self._actor.placement_group_bundles is not None:
  1284. required = self._actor.placement_group_bundles
  1285. else:
  1286. required = {
  1287. k: v
  1288. for k, v in self._actor.actor_resources.items()
  1289. if v is not None and v > 0
  1290. }
  1291. available = {
  1292. k: v for k, v in self._actor.available_resources.items() if k in required
  1293. }
  1294. # Use json.dumps() instead of str() here to avoid double-quoting keys
  1295. # when dumping these objects. See
  1296. # https://github.com/ray-project/ray/issues/26210 for the issue.
  1297. return json.dumps(required), json.dumps(available)
  1298. def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
  1299. return self._actor.get_outbound_deployments()
  1300. class ReplicaStateContainer:
  1301. """Container for mapping ReplicaStates to lists of DeploymentReplicas."""
  1302. def __init__(self):
  1303. self._replicas: Dict[ReplicaState, List[DeploymentReplica]] = defaultdict(list)
  1304. def add(self, state: ReplicaState, replica: DeploymentReplica):
  1305. """Add the provided replica under the provided state.
  1306. Args:
  1307. state: state to add the replica under.
  1308. replica: replica to add.
  1309. """
  1310. assert isinstance(state, ReplicaState), f"Type: {type(state)}"
  1311. replica.update_state(state)
  1312. self._replicas[state].append(replica)
  1313. def get(
  1314. self, states: Optional[List[ReplicaState]] = None
  1315. ) -> List[DeploymentReplica]:
  1316. """Get all replicas of the given states.
  1317. This does not remove them from the container. Replicas are returned
  1318. in order of state as passed in.
  1319. Args:
  1320. states: states to consider. If not specified, all replicas
  1321. are considered.
  1322. """
  1323. if states is None:
  1324. states = ALL_REPLICA_STATES
  1325. assert isinstance(states, list)
  1326. return sum((self._replicas[state] for state in states), [])
  1327. def pop(
  1328. self,
  1329. exclude_version: Optional[DeploymentVersion] = None,
  1330. states: Optional[List[ReplicaState]] = None,
  1331. max_replicas: Optional[int] = math.inf,
  1332. ) -> List[DeploymentReplica]:
  1333. """Get and remove all replicas of the given states.
  1334. This removes the replicas from the container. Replicas are returned
  1335. in order of state as passed in.
  1336. Args:
  1337. exclude_version: if specified, replicas of the
  1338. provided version will *not* be removed.
  1339. states: states to consider. If not specified, all replicas
  1340. are considered.
  1341. max_replicas: max number of replicas to return. If not
  1342. specified, will pop all replicas matching the criteria.
  1343. """
  1344. if states is None:
  1345. states = ALL_REPLICA_STATES
  1346. assert exclude_version is None or isinstance(exclude_version, DeploymentVersion)
  1347. assert isinstance(states, list)
  1348. replicas = []
  1349. for state in states:
  1350. popped = []
  1351. remaining = []
  1352. for replica in self._replicas[state]:
  1353. if len(replicas) + len(popped) == max_replicas:
  1354. remaining.append(replica)
  1355. elif exclude_version is not None and replica.version == exclude_version:
  1356. remaining.append(replica)
  1357. else:
  1358. popped.append(replica)
  1359. self._replicas[state] = remaining
  1360. replicas.extend(popped)
  1361. return replicas
  1362. def count(
  1363. self,
  1364. exclude_version: Optional[DeploymentVersion] = None,
  1365. version: Optional[DeploymentVersion] = None,
  1366. states: Optional[List[ReplicaState]] = None,
  1367. ):
  1368. """Get the total count of replicas of the given states.
  1369. Args:
  1370. exclude_version: version to exclude. If not
  1371. specified, all versions are considered.
  1372. version: version to filter to. If not specified,
  1373. all versions are considered.
  1374. states: states to consider. If not specified, all replicas
  1375. are considered.
  1376. """
  1377. if states is None:
  1378. states = ALL_REPLICA_STATES
  1379. assert isinstance(states, list)
  1380. assert exclude_version is None or isinstance(exclude_version, DeploymentVersion)
  1381. assert version is None or isinstance(version, DeploymentVersion)
  1382. if exclude_version is None and version is None:
  1383. return sum(len(self._replicas[state]) for state in states)
  1384. elif exclude_version is None and version is not None:
  1385. return sum(
  1386. len(list(filter(lambda r: r.version == version, self._replicas[state])))
  1387. for state in states
  1388. )
  1389. elif exclude_version is not None and version is None:
  1390. return sum(
  1391. len(
  1392. list(
  1393. filter(
  1394. lambda r: r.version != exclude_version,
  1395. self._replicas[state],
  1396. )
  1397. )
  1398. )
  1399. for state in states
  1400. )
  1401. else:
  1402. raise ValueError(
  1403. "Only one of `version` or `exclude_version` may be provided."
  1404. )
  1405. def __str__(self):
  1406. return str(self._replicas)
  1407. def __repr__(self):
  1408. return repr(self._replicas)
  1409. class RankManager:
  1410. """Manages ranks for a single node."""
  1411. def __init__(self):
  1412. self._ranks: Dict[str, int] = {}
  1413. self._released_ranks: Set[int] = set()
  1414. self._next_rank: int = 0
  1415. def assign_rank(self, key: str) -> int:
  1416. if key in self._ranks:
  1417. raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}")
  1418. if self._released_ranks:
  1419. # Reuse the smallest released rank
  1420. rank = min(self._released_ranks)
  1421. self._released_ranks.remove(rank)
  1422. else:
  1423. # Assign the next available rank
  1424. # This is the first time we're assigning a rank to this replica
  1425. rank = self._next_rank
  1426. self._next_rank += 1
  1427. self._ranks[key] = rank
  1428. return rank
  1429. def release_rank(self, key: str) -> None:
  1430. if key not in self._ranks:
  1431. raise RuntimeError(f"Rank for {key} not assigned")
  1432. rank = self._ranks.pop(key)
  1433. # Add the released rank to the set of released ranks
  1434. # This rank can be reused for a new replica
  1435. self._released_ranks.add(rank)
  1436. def recover_rank(self, key: str, rank: int) -> None:
  1437. if key in self._ranks:
  1438. raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}")
  1439. self._ranks[key] = rank
  1440. self._released_ranks.discard(rank)
  1441. if rank >= self._next_rank:
  1442. self._next_rank = rank + 1
  1443. def get_rank(self, key: str) -> int:
  1444. if key not in self._ranks:
  1445. raise RuntimeError(f"Rank for {key} not assigned")
  1446. return self._ranks[key]
  1447. def has_rank(self, key: str) -> bool:
  1448. return key in self._ranks
  1449. def get_ranks_mapping(self) -> Dict[str, int]:
  1450. return self._ranks.copy()
  1451. def clear(self) -> None:
  1452. self._ranks.clear()
  1453. self._released_ranks.clear()
  1454. self._next_rank = 0
  1455. def check_rank_consistency_and_reassign_minimally(
  1456. self,
  1457. active_keys: List[str],
  1458. ) -> List[str]:
  1459. """Verify rank system invariants and reassign ranks when needed.
  1460. This method ensures:
  1461. 1. All active keys have ranks
  1462. 2. No duplicate ranks exist
  1463. 3. Ranks are contiguous when at target count
  1464. Args:
  1465. active_keys: List of currently active keys
  1466. Returns:
  1467. List of keys that need to be reconfigured with new ranks
  1468. Raises:
  1469. RuntimeError: If rank system invariants are violated and fail_on_error=True
  1470. """
  1471. if not active_keys:
  1472. return []
  1473. active_keys_set = set(active_keys)
  1474. # Check for stale ranks - this should never happen
  1475. stale_keys = set(self._ranks.keys()) - active_keys_set
  1476. if stale_keys:
  1477. logger.error(
  1478. f"Found stale ranks for keys: {stale_keys}. "
  1479. "This should never happen. Please report this as a bug."
  1480. )
  1481. raise RuntimeError("Rank system is in an invalid state.")
  1482. # Verify system invariants - all active keys must have ranks
  1483. unranked_keys = active_keys_set - set(self._ranks.keys())
  1484. if unranked_keys:
  1485. logger.error(
  1486. f"Found active keys without ranks: {unranked_keys}. "
  1487. "This should never happen. Please report this as a bug."
  1488. )
  1489. raise RuntimeError("Rank system is in an invalid state.")
  1490. # Check for duplicate ranks - this should never happen
  1491. rank_counts = {}
  1492. for key, rank in self._ranks.copy().items():
  1493. if key in active_keys_set: # Only check active keys
  1494. rank_counts[rank] = rank_counts.get(rank, 0) + 1
  1495. if rank_counts[rank] > 1:
  1496. logger.error(
  1497. f"Found duplicate rank {rank} assigned to multiple keys. "
  1498. "This should never happen. Please report this as a bug."
  1499. )
  1500. raise RuntimeError("Rank system is in an invalid state.")
  1501. # Check if we need to reassign ranks for contiguity
  1502. # Only force contiguity when at target count (e.g., after autoscaling down)
  1503. current_ranks = sorted(self._ranks.values())
  1504. expected_ranks = list(range(len(active_keys)))
  1505. keys_needing_reconfiguration_from_reassignment = []
  1506. if current_ranks != expected_ranks:
  1507. logger.debug(
  1508. f"At target count but ranks are not contiguous. "
  1509. f"Current: {current_ranks}, Expected: {expected_ranks}. "
  1510. "Performing minimal reassignment."
  1511. )
  1512. keys_needing_reconfiguration_from_reassignment = (
  1513. self._perform_minimal_rank_reassignment(active_keys)
  1514. )
  1515. return keys_needing_reconfiguration_from_reassignment
  1516. def _perform_minimal_rank_reassignment(self, active_keys: List[str]) -> List[str]:
  1517. """Perform minimal rank reassignment to achieve contiguity.
  1518. This method reassigns ranks while minimizing the number of keys that need
  1519. to be reconfigured. It prioritizes keeping existing ranks when possible.
  1520. Args:
  1521. active_keys: List of currently active keys
  1522. Returns:
  1523. List of keys that need to be reconfigured with new ranks
  1524. """
  1525. target_ranks_set = set(range(len(active_keys)))
  1526. # Find which keys need new ranks
  1527. keys_needing_ranks = []
  1528. keys_keeping_ranks = []
  1529. for key in active_keys:
  1530. current_rank = self.get_rank(key)
  1531. if current_rank in target_ranks_set:
  1532. # This key can keep its rank
  1533. target_ranks_set.remove(current_rank) # O(1) operation
  1534. keys_keeping_ranks.append(key)
  1535. else:
  1536. # This key needs a new rank
  1537. keys_needing_ranks.append(key)
  1538. # Convert remaining target ranks to sorted list for deterministic assignment
  1539. available_ranks = sorted(target_ranks_set)
  1540. # Assign new ranks to keys that need them
  1541. for i, key in enumerate(keys_needing_ranks):
  1542. new_rank = available_ranks[i] # O(1) operation
  1543. # Store the old rank before updating
  1544. old_rank = self._ranks[key]
  1545. logger.debug(f"Reassigning key {key}: rank {old_rank} -> {new_rank}")
  1546. # Update the rank mapping
  1547. self._ranks[key] = new_rank
  1548. # Remove the newly assigned rank from available ranks
  1549. self._released_ranks.discard(new_rank)
  1550. # Add the old rank back to available ranks for reuse
  1551. self._released_ranks.add(old_rank)
  1552. # Log the reassignment summary
  1553. logger.debug(
  1554. f"Minimal reassignment complete: {len(keys_keeping_ranks)} keys kept ranks, "
  1555. f"{len(keys_needing_ranks)} keys reassigned"
  1556. )
  1557. return keys_needing_ranks
  1558. class DeploymentRankManager:
  1559. """Manages replica ranks for a deployment.
  1560. This class handles rank assignment, release, consistency checking, and reassignment.
  1561. It maintains the rank system invariants and provides a clean interface for rank operations.
  1562. Maintains three levels of rank tracking:
  1563. - Global rank: Replica-level rank across all nodes (0, 1, 2, ...)
  1564. - Local rank: Replica's rank within its node (0, 1, 2, ... per node)
  1565. - Node rank ID: Index assigned to each node (0, 1, 2, ...)
  1566. """
  1567. def __init__(self, fail_on_rank_error: bool = True):
  1568. # Global rank manager (existing replica-level rank)
  1569. self._replica_rank_manager = RankManager()
  1570. self._fail_on_rank_error = fail_on_rank_error
  1571. # Node rank manager (assigns rank IDs to nodes)
  1572. self._node_rank_manager = RankManager()
  1573. # Local rank managers (one per node, manages replica ranks within each node)
  1574. self._local_rank_managers: Dict[str, RankManager] = {}
  1575. # Track which node each replica is on
  1576. self._replica_to_node: Dict[str, str] = {}
  1577. def _execute_with_error_handling(self, func, safe_default, *args, **kwargs):
  1578. if self._fail_on_rank_error:
  1579. # Let exceptions propagate
  1580. return func(*args, **kwargs)
  1581. else:
  1582. # Catch exceptions and return safe default
  1583. try:
  1584. return func(*args, **kwargs)
  1585. except Exception as e:
  1586. logger.error(f"Error executing function {func.__name__}: {e}")
  1587. return safe_default
  1588. def assign_rank(self, replica_id: str, node_id: str) -> ReplicaRank:
  1589. """Assign a rank to a new replica.
  1590. Args:
  1591. replica_id: The unique ID of the replica
  1592. node_id: The unique ID of the node
  1593. Returns:
  1594. ReplicaRank object with the assigned rank
  1595. Raises:
  1596. RuntimeError: If the replica already has a rank assigned
  1597. """
  1598. def _assign_rank_impl():
  1599. if self.has_replica_rank(replica_id):
  1600. raise RuntimeError(
  1601. f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}"
  1602. )
  1603. # Track the replica-to-node mapping
  1604. self._replica_to_node[replica_id] = node_id
  1605. # Assign global rank
  1606. rank = self._replica_rank_manager.assign_rank(replica_id)
  1607. # Assign node rank if this node doesn't have one yet
  1608. if node_id not in self._local_rank_managers:
  1609. self._node_rank_manager.assign_rank(node_id)
  1610. self._local_rank_managers[node_id] = RankManager()
  1611. node_rank = self._node_rank_manager.get_rank(node_id)
  1612. # Assign local rank within the node
  1613. local_rank = self._local_rank_managers[node_id].assign_rank(replica_id)
  1614. return ReplicaRank(rank=rank, node_rank=node_rank, local_rank=local_rank)
  1615. return self._execute_with_error_handling(
  1616. _assign_rank_impl, ReplicaRank(rank=0, node_rank=0, local_rank=0)
  1617. )
  1618. def release_rank(self, replica_id: str) -> None:
  1619. """Release rank for a replica.
  1620. Args:
  1621. replica_id: ID of the replica
  1622. Raises:
  1623. RuntimeError: If replica doesn't have ranks
  1624. """
  1625. def _release_rank_impl():
  1626. if not self.has_replica_rank(replica_id):
  1627. raise RuntimeError(f"Rank for {replica_id} not assigned")
  1628. # Get the node_id from the replica mapping
  1629. node_id = self._replica_to_node[replica_id]
  1630. # Release global rank
  1631. self._replica_rank_manager.release_rank(replica_id)
  1632. # Release local rank
  1633. self._local_rank_managers[node_id].release_rank(replica_id)
  1634. # Release node rank if this was the last replica on the node
  1635. if len(self._local_rank_managers[node_id].get_ranks_mapping()) == 0:
  1636. self._node_rank_manager.release_rank(node_id)
  1637. del self._local_rank_managers[node_id]
  1638. # Remove replica from node mapping
  1639. del self._replica_to_node[replica_id]
  1640. return self._execute_with_error_handling(_release_rank_impl, None)
  1641. def recover_rank(
  1642. self,
  1643. replica_id: str,
  1644. node_id: str,
  1645. rank: ReplicaRank,
  1646. ) -> None:
  1647. """Recover rank for a replica (e.g., after controller restart).
  1648. Args:
  1649. replica_id: ID of the replica
  1650. node_id: ID of the node
  1651. rank: The rank to recover
  1652. Raises:
  1653. RuntimeError: If replica already has ranks assigned
  1654. """
  1655. def _recover_rank_impl():
  1656. if self.has_replica_rank(replica_id):
  1657. raise RuntimeError(
  1658. f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}"
  1659. )
  1660. # Recover global rank
  1661. self._replica_rank_manager.recover_rank(replica_id, rank.rank)
  1662. # Recover node rank only if this node doesn't already have one
  1663. if not self._node_rank_manager.has_rank(node_id):
  1664. self._node_rank_manager.recover_rank(node_id, rank.node_rank)
  1665. # Recover local rank
  1666. if node_id not in self._local_rank_managers:
  1667. self._local_rank_managers[node_id] = RankManager()
  1668. self._local_rank_managers[node_id].recover_rank(replica_id, rank.local_rank)
  1669. # Track the replica-to-node mapping
  1670. self._replica_to_node[replica_id] = node_id
  1671. return self._execute_with_error_handling(_recover_rank_impl, None)
  1672. def has_replica_rank(self, replica_id: str) -> bool:
  1673. """Check if replica has a rank assigned.
  1674. Args:
  1675. replica_id: The unique ID of the replica
  1676. Returns:
  1677. True if the replica has a rank assigned, False otherwise
  1678. Raises:
  1679. RuntimeError: If the replica doesn't have ranks assigned
  1680. """
  1681. if replica_id not in self._replica_to_node:
  1682. return False
  1683. node_id = self._replica_to_node[replica_id]
  1684. return (
  1685. self._replica_rank_manager.has_rank(replica_id)
  1686. and node_id in self._local_rank_managers
  1687. and self._node_rank_manager.has_rank(node_id)
  1688. and self._local_rank_managers[node_id].has_rank(replica_id)
  1689. )
  1690. def get_replica_rank(self, replica_id: str) -> ReplicaRank:
  1691. """Get the rank for a replica.
  1692. Args:
  1693. replica_id: ID of the replica
  1694. Returns:
  1695. ReplicaRank object
  1696. Raises:
  1697. RuntimeError: If replica doesn't have ranks assigned
  1698. """
  1699. def _get_replica_rank_impl():
  1700. if not self.has_replica_rank(replica_id):
  1701. raise RuntimeError(f"Rank for {replica_id} not assigned")
  1702. global_rank = self._replica_rank_manager.get_rank(replica_id)
  1703. node_id = self._replica_to_node[replica_id]
  1704. node_rank = self._node_rank_manager.get_rank(node_id)
  1705. local_rank = self._local_rank_managers[node_id].get_rank(replica_id)
  1706. return ReplicaRank(
  1707. rank=global_rank, node_rank=node_rank, local_rank=local_rank
  1708. )
  1709. return self._execute_with_error_handling(
  1710. _get_replica_rank_impl, ReplicaRank(rank=0, node_rank=0, local_rank=0)
  1711. )
  1712. def check_rank_consistency_and_reassign_minimally(
  1713. self,
  1714. active_replicas: List["DeploymentReplica"],
  1715. ) -> List["DeploymentReplica"]:
  1716. """Verify rank system invariants and reassign ranks when needed across all three levels.
  1717. This method ensures:
  1718. 1. Global ranks are contiguous [0, N-1] for N replicas
  1719. 2. Node ranks are contiguous [0, M-1] for M nodes
  1720. 3. Local ranks are contiguous [0, K-1] for K replicas on each node
  1721. Args:
  1722. active_replicas: List of currently active replicas
  1723. Returns:
  1724. List of replicas that need to be reconfigured with new ranks
  1725. """
  1726. def _check_rank_consistency_impl():
  1727. if not active_replicas:
  1728. return []
  1729. # Extract replica IDs from replicas
  1730. active_replica_ids = [
  1731. replica.replica_id.unique_id for replica in active_replicas
  1732. ]
  1733. # Create a mapping from replica ID to replica object for quick lookup
  1734. replica_id_to_replica = {
  1735. replica.replica_id.unique_id: replica for replica in active_replicas
  1736. }
  1737. # Track all replicas needing reconfiguration from any rank system
  1738. all_replica_ids_needing_reconfiguration = set()
  1739. # STEP 1: Check global rank consistency
  1740. replica_ids_from_global = self._replica_rank_manager.check_rank_consistency_and_reassign_minimally(
  1741. active_replica_ids
  1742. )
  1743. all_replica_ids_needing_reconfiguration.update(replica_ids_from_global)
  1744. # STEP 2: Group replicas by node and check local rank consistency per node
  1745. replicas_by_node: Dict[str, List[str]] = {}
  1746. for replica_id in active_replica_ids:
  1747. node_id = self._replica_to_node.get(replica_id)
  1748. assert (
  1749. node_id is not None
  1750. ), f"Replica {replica_id} not assigned to any node"
  1751. if node_id not in replicas_by_node:
  1752. replicas_by_node[node_id] = []
  1753. replicas_by_node[node_id].append(replica_id)
  1754. for node_id, replica_ids_on_node in replicas_by_node.items():
  1755. replica_ids_from_local = self._local_rank_managers[
  1756. node_id
  1757. ].check_rank_consistency_and_reassign_minimally(replica_ids_on_node)
  1758. all_replica_ids_needing_reconfiguration.update(replica_ids_from_local)
  1759. # STEP 3: Check node rank consistency
  1760. active_node_ids = list(replicas_by_node.keys())
  1761. if active_node_ids:
  1762. node_ids_needing_reassignment = self._node_rank_manager.check_rank_consistency_and_reassign_minimally(
  1763. active_node_ids,
  1764. )
  1765. # If any nodes were reassigned, all replicas on those nodes need reconfiguration
  1766. for node_id in node_ids_needing_reassignment:
  1767. all_replica_ids_needing_reconfiguration.update(
  1768. replicas_by_node[node_id]
  1769. )
  1770. # Convert replica IDs back to replica objects
  1771. # Filter out stale replicas that are not in the active set
  1772. replicas_needing_reconfiguration = [
  1773. replica_id_to_replica[replica_id]
  1774. for replica_id in all_replica_ids_needing_reconfiguration
  1775. if replica_id in replica_id_to_replica
  1776. ]
  1777. return replicas_needing_reconfiguration
  1778. return self._execute_with_error_handling(_check_rank_consistency_impl, [])
  1779. def clear(self) -> None:
  1780. self._replica_rank_manager.clear()
  1781. self._node_rank_manager.clear()
  1782. self._local_rank_managers.clear()
  1783. self._replica_to_node.clear()
  1784. def get_replica_ranks_mapping(self) -> Dict[str, ReplicaRank]:
  1785. """Get the current mapping of replica IDs to ReplicaRank objects.
  1786. Returns:
  1787. Dict mapping replica_id to ReplicaRank object
  1788. """
  1789. result = {}
  1790. for replica_id in self._replica_rank_manager.get_ranks_mapping().keys():
  1791. result[replica_id] = self.get_replica_rank(replica_id)
  1792. return result
  1793. class DeploymentState:
  1794. """Manages the target state and replicas for a single deployment."""
  1795. FORCE_STOP_UNHEALTHY_REPLICAS = RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS
  1796. def __init__(
  1797. self,
  1798. id: DeploymentID,
  1799. long_poll_host: LongPollHost,
  1800. deployment_scheduler: DeploymentScheduler,
  1801. cluster_node_info_cache: ClusterNodeInfoCache,
  1802. autoscaling_state_manager: AutoscalingStateManager,
  1803. ):
  1804. self._id = id
  1805. self._long_poll_host: LongPollHost = long_poll_host
  1806. self._deployment_scheduler = deployment_scheduler
  1807. self._cluster_node_info_cache = cluster_node_info_cache
  1808. self._autoscaling_state_manager = autoscaling_state_manager
  1809. # Each time we set a new deployment goal, we're trying to save new
  1810. # DeploymentInfo and bring current deployment to meet new status.
  1811. self._target_state: DeploymentTargetState = DeploymentTargetState.default()
  1812. self._prev_startup_warning: float = time.time()
  1813. self._replica_constructor_error_msg: Optional[str] = None
  1814. # Counter for how many times replicas failed to start. This is reset to 0 when:
  1815. # (1) The deployment is deployed / re-deployed.
  1816. # (2) The deployment reaches the HEALTHY state.
  1817. self._replica_constructor_retry_counter: int = 0
  1818. # Flag for whether any replicas of the target version has successfully started.
  1819. # This is reset to False when the deployment is re-deployed.
  1820. self._replica_has_started: bool = False
  1821. self._replicas: ReplicaStateContainer = ReplicaStateContainer()
  1822. self._curr_status_info: DeploymentStatusInfo = DeploymentStatusInfo(
  1823. self._id.name,
  1824. DeploymentStatus.UPDATING,
  1825. DeploymentStatusTrigger.CONFIG_UPDATE_STARTED,
  1826. )
  1827. self._rank_manager = DeploymentRankManager(
  1828. fail_on_rank_error=RAY_SERVE_FAIL_ON_RANK_ERROR
  1829. )
  1830. self.replica_average_ongoing_requests: Dict[str, float] = {}
  1831. self.health_check_gauge = metrics.Gauge(
  1832. "serve_deployment_replica_healthy",
  1833. description=(
  1834. "Tracks whether this deployment replica is healthy. 1 means "
  1835. "healthy, 0 means unhealthy."
  1836. ),
  1837. tag_keys=("deployment", "replica", "application"),
  1838. )
  1839. self.health_check_gauge.set_default_tags(
  1840. {"deployment": self._id.name, "application": self._id.app_name}
  1841. )
  1842. # Histogram for replica startup latency (time from creation to ready state).
  1843. self.replica_startup_latency_histogram = metrics.Histogram(
  1844. "serve_replica_startup_latency_ms",
  1845. description=("Time from replica creation to ready state in milliseconds."),
  1846. boundaries=REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
  1847. tag_keys=("deployment", "replica", "application"),
  1848. )
  1849. self.replica_startup_latency_histogram.set_default_tags(
  1850. {"deployment": self._id.name, "application": self._id.app_name}
  1851. )
  1852. # Histogram for replica initialization latency.
  1853. self.replica_initialization_latency_histogram = metrics.Histogram(
  1854. "serve_replica_initialization_latency_ms",
  1855. description=("Time for replica to initialize in milliseconds."),
  1856. boundaries=REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
  1857. tag_keys=("deployment", "replica", "application"),
  1858. )
  1859. self.replica_initialization_latency_histogram.set_default_tags(
  1860. {"deployment": self._id.name, "application": self._id.app_name}
  1861. )
  1862. # Histogram for replica reconfigure latency.
  1863. # NOTE(abrar): value of this metric represents reconfigure + time until next controller loop
  1864. self.replica_reconfigure_latency_histogram = metrics.Histogram(
  1865. "serve_replica_reconfigure_latency_ms",
  1866. description=("Time for replica to complete reconfigure in milliseconds."),
  1867. boundaries=REQUEST_LATENCY_BUCKETS_MS,
  1868. tag_keys=("deployment", "replica", "application"),
  1869. )
  1870. self.replica_reconfigure_latency_histogram.set_default_tags(
  1871. {"deployment": self._id.name, "application": self._id.app_name}
  1872. )
  1873. # Histogram for health check latency.
  1874. self.health_check_latency_histogram = metrics.Histogram(
  1875. "serve_health_check_latency_ms",
  1876. description=("Duration of health check calls in milliseconds."),
  1877. boundaries=REQUEST_LATENCY_BUCKETS_MS,
  1878. tag_keys=("deployment", "replica", "application"),
  1879. )
  1880. self.health_check_latency_histogram.set_default_tags(
  1881. {"deployment": self._id.name, "application": self._id.app_name}
  1882. )
  1883. # Counter for health check failures.
  1884. self.health_check_failures_counter = metrics.Counter(
  1885. "serve_health_check_failures_total",
  1886. description=("Count of failed health checks."),
  1887. tag_keys=("deployment", "replica", "application"),
  1888. )
  1889. self.health_check_failures_counter.set_default_tags(
  1890. {"deployment": self._id.name, "application": self._id.app_name}
  1891. )
  1892. # Histogram for replica shutdown duration.
  1893. self.replica_shutdown_duration_histogram = metrics.Histogram(
  1894. "serve_replica_shutdown_duration_ms",
  1895. description=(
  1896. "Time from shutdown signal to replica fully stopped in milliseconds."
  1897. ),
  1898. boundaries=REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
  1899. tag_keys=("deployment", "replica", "application"),
  1900. )
  1901. self.replica_shutdown_duration_histogram.set_default_tags(
  1902. {"deployment": self._id.name, "application": self._id.app_name}
  1903. )
  1904. self.target_replicas_gauge = metrics.Gauge(
  1905. "serve_autoscaling_target_replicas",
  1906. description=(
  1907. "The target number of replicas for this deployment. "
  1908. "This is the number the autoscaler is trying to reach."
  1909. ),
  1910. tag_keys=("deployment", "application"),
  1911. )
  1912. self.target_replicas_gauge.set_default_tags(
  1913. {"deployment": self._id.name, "application": self._id.app_name}
  1914. )
  1915. # Whether the request routing info have been updated since the last
  1916. # time we checked.
  1917. self._request_routing_info_updated = False
  1918. self._last_broadcasted_running_replica_infos: List[RunningReplicaInfo] = []
  1919. self._last_broadcasted_availability: bool = True
  1920. self._last_broadcasted_deployment_config = None
  1921. self._docs_path: Optional[str] = None
  1922. self._route_patterns: Optional[List[str]] = None
  1923. def should_autoscale(self) -> bool:
  1924. """
  1925. Check if the deployment is under autoscaling
  1926. """
  1927. return self._autoscaling_state_manager.should_autoscale_deployment(self._id)
  1928. def get_checkpoint_data(self) -> DeploymentTargetState:
  1929. """
  1930. Return deployment's target state submitted by user's deployment call.
  1931. Should be persisted and outlive current ray cluster.
  1932. """
  1933. return self._target_state
  1934. def recover_target_state_from_checkpoint(
  1935. self, target_state_checkpoint: DeploymentTargetState
  1936. ):
  1937. logger.info(f"Recovering target state for {self._id} from checkpoint.")
  1938. self._target_state = target_state_checkpoint
  1939. self._deployment_scheduler.on_deployment_deployed(
  1940. self._id, self._target_state.info.replica_config
  1941. )
  1942. if self._target_state.info.deployment_config.autoscaling_config:
  1943. self._autoscaling_state_manager.register_deployment(
  1944. self._id,
  1945. self._target_state.info,
  1946. self._target_state.target_num_replicas,
  1947. )
  1948. def recover_current_state_from_replica_actor_names(
  1949. self, replica_actor_names: List[str]
  1950. ):
  1951. """Recover deployment state from live replica actors found in the cluster."""
  1952. assert self._target_state is not None, (
  1953. "Target state should be recovered successfully first before "
  1954. "recovering current state from replica actor names."
  1955. )
  1956. logger.info(
  1957. f"Recovering current state for {self._id} "
  1958. f"from {len(replica_actor_names)} live actors."
  1959. )
  1960. # All current states use default value, only attach running replicas.
  1961. for replica_actor_name in replica_actor_names:
  1962. replica_id = ReplicaID.from_full_id_str(replica_actor_name)
  1963. new_deployment_replica = DeploymentReplica(
  1964. replica_id,
  1965. self._target_state.version,
  1966. )
  1967. # If replica is no longer alive, simply don't add it to the
  1968. # deployment state manager to track.
  1969. if not new_deployment_replica.recover(self._target_state.info):
  1970. logger.warning(f"{replica_id} died before controller could recover it.")
  1971. continue
  1972. self._replicas.add(ReplicaState.RECOVERING, new_deployment_replica)
  1973. self._deployment_scheduler.on_replica_recovering(replica_id)
  1974. logger.debug(f"RECOVERING {replica_id}.")
  1975. # TODO(jiaodong): this currently halts all traffic in the cluster
  1976. # briefly because we will broadcast a replica update with everything in
  1977. # RECOVERING. We should have a grace period where we recover the state
  1978. # of the replicas before doing this update.
  1979. @property
  1980. def target_info(self) -> DeploymentInfo:
  1981. return self._target_state.info
  1982. @property
  1983. def target_version(self) -> DeploymentVersion:
  1984. return self._target_state.version
  1985. @property
  1986. def target_num_replicas(self) -> int:
  1987. return self._target_state.target_num_replicas
  1988. @property
  1989. def curr_status_info(self) -> DeploymentStatusInfo:
  1990. return self._curr_status_info
  1991. @property
  1992. def deployment_name(self) -> str:
  1993. return self._id.name
  1994. @property
  1995. def app_name(self) -> str:
  1996. return self._id.app_name
  1997. @property
  1998. def docs_path(self) -> Optional[str]:
  1999. return self._docs_path
  2000. @property
  2001. def route_patterns(self) -> Optional[List[str]]:
  2002. return self._route_patterns
  2003. @property
  2004. def _failed_to_start_threshold(self) -> int:
  2005. return min(
  2006. self._target_state.info.deployment_config.max_constructor_retry_count,
  2007. self._target_state.target_num_replicas * MAX_PER_REPLICA_RETRY_COUNT,
  2008. )
  2009. def _replica_startup_failing(self) -> bool:
  2010. """Check whether replicas are currently failing and the number of
  2011. failures has exceeded a threshold.
  2012. """
  2013. return (
  2014. self._target_state.target_num_replicas > 0
  2015. and self._replica_constructor_retry_counter
  2016. >= self._failed_to_start_threshold
  2017. )
  2018. def _terminally_failed(self) -> bool:
  2019. """Check whether the current version is terminally errored.
  2020. The version is considered terminally errored if the number of
  2021. replica failures has exceeded a threshold, and there hasn't been
  2022. any replicas of the target version that has successfully started.
  2023. """
  2024. return not self._replica_has_started and self._replica_startup_failing()
  2025. def get_alive_replica_actor_ids(self) -> Set[str]:
  2026. return {replica.actor_id for replica in self._replicas.get()}
  2027. def get_running_replica_ids(self) -> List[ReplicaID]:
  2028. return [
  2029. replica.replica_id
  2030. for replica in self._replicas.get(
  2031. [ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION]
  2032. )
  2033. ]
  2034. def get_running_replica_infos(self) -> List[RunningReplicaInfo]:
  2035. return [
  2036. replica.get_running_replica_info(self._cluster_node_info_cache)
  2037. for replica in self._replicas.get(
  2038. [ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION]
  2039. )
  2040. ]
  2041. def get_num_running_replicas(self, version: DeploymentVersion = None) -> int:
  2042. return self._replicas.count(states=[ReplicaState.RUNNING], version=version)
  2043. def get_active_node_ids(self) -> Set[str]:
  2044. """Get the node ids of all running replicas in this deployment.
  2045. This is used to determine which node has replicas. Only nodes with replicas and
  2046. head node should have active proxies.
  2047. """
  2048. active_states = [
  2049. ReplicaState.STARTING,
  2050. ReplicaState.UPDATING,
  2051. ReplicaState.RECOVERING,
  2052. ReplicaState.RUNNING,
  2053. # NOTE(zcin): We still want a proxy to run on a draining
  2054. # node before all the replicas are migrated.
  2055. ReplicaState.PENDING_MIGRATION,
  2056. ]
  2057. return {
  2058. replica.actor_node_id
  2059. for replica in self._replicas.get(active_states)
  2060. if replica.actor_node_id is not None
  2061. }
  2062. def list_replica_details(self) -> List[ReplicaDetails]:
  2063. return [replica.actor_details for replica in self._replicas.get()]
  2064. def broadcast_running_replicas_if_changed(self) -> None:
  2065. """Broadcasts the set of running replicas over long poll if it has changed.
  2066. Keeps an in-memory record of the last set of running replicas that was broadcast
  2067. to determine if it has changed.
  2068. The set will also be broadcast if any replicas have an updated set of
  2069. multiplexed model IDs.
  2070. """
  2071. running_replica_infos = self.get_running_replica_infos()
  2072. is_available = not self._terminally_failed()
  2073. running_replicas_changed = (
  2074. set(self._last_broadcasted_running_replica_infos)
  2075. != set(running_replica_infos)
  2076. or self._request_routing_info_updated
  2077. )
  2078. availability_changed = is_available != self._last_broadcasted_availability
  2079. if not running_replicas_changed and not availability_changed:
  2080. return
  2081. deployment_metadata = DeploymentTargetInfo(
  2082. is_available=is_available,
  2083. running_replicas=running_replica_infos,
  2084. )
  2085. self._long_poll_host.notify_changed(
  2086. {
  2087. (
  2088. LongPollNamespace.DEPLOYMENT_TARGETS,
  2089. self._id,
  2090. ): deployment_metadata,
  2091. # NOTE(zcin): notify changed for Java routers. Since Java only
  2092. # supports 1.x API, there is no concept of applications in Java,
  2093. # so the key should remain a string describing the deployment
  2094. # name. If there are no Java routers, this is a no-op.
  2095. (
  2096. LongPollNamespace.DEPLOYMENT_TARGETS,
  2097. self._id.name,
  2098. ): deployment_metadata,
  2099. }
  2100. )
  2101. self._last_broadcasted_running_replica_infos = running_replica_infos
  2102. self._last_broadcasted_availability = is_available
  2103. self._request_routing_info_updated = False
  2104. def broadcast_deployment_config_if_changed(self) -> None:
  2105. """Broadcasts the deployment config over long poll if it has changed.
  2106. Keeps an in-memory record of the last config that was broadcast to determine
  2107. if it has changed.
  2108. """
  2109. current_deployment_config = self._target_state.info.deployment_config
  2110. if self._last_broadcasted_deployment_config == current_deployment_config:
  2111. return
  2112. self._long_poll_host.notify_changed(
  2113. {(LongPollNamespace.DEPLOYMENT_CONFIG, self._id): current_deployment_config}
  2114. )
  2115. self._last_broadcasted_deployment_config = current_deployment_config
  2116. def _set_target_state_deleting(self) -> None:
  2117. """Set the target state for the deployment to be deleted."""
  2118. target_state = DeploymentTargetState.create(
  2119. info=self._target_state.info,
  2120. target_num_replicas=0,
  2121. deleting=True,
  2122. )
  2123. self._target_state = target_state
  2124. self._curr_status_info = self._curr_status_info.handle_transition(
  2125. trigger=DeploymentStatusInternalTrigger.DELETE
  2126. )
  2127. logger.info(
  2128. f"Deleting {self._id}",
  2129. extra={"log_to_stderr": False},
  2130. )
  2131. def _set_target_state(
  2132. self,
  2133. target_info: DeploymentInfo,
  2134. target_num_replicas: int,
  2135. updated_via_api: bool = False,
  2136. ) -> None:
  2137. """Set the target state for the deployment to the provided info.
  2138. Args:
  2139. target_info: The info with which to set the target state.
  2140. target_num_replicas: The number of replicas that this deployment
  2141. should attempt to run.
  2142. status_trigger: The driver that triggered this change of state.
  2143. updated_via_api: Whether the target state update was triggered via API.
  2144. """
  2145. new_target_state = DeploymentTargetState.create(
  2146. target_info, target_num_replicas, deleting=False
  2147. )
  2148. if self._target_state.version == new_target_state.version:
  2149. # Record either num replica or autoscaling config lightweight update
  2150. if (
  2151. self._target_state.version.deployment_config.autoscaling_config
  2152. != new_target_state.version.deployment_config.autoscaling_config
  2153. ):
  2154. ServeUsageTag.AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED.record("True")
  2155. elif updated_via_api:
  2156. ServeUsageTag.NUM_REPLICAS_VIA_API_CALL_UPDATED.record("True")
  2157. elif (
  2158. self._target_state.version.deployment_config.num_replicas
  2159. != new_target_state.version.deployment_config.num_replicas
  2160. ):
  2161. ServeUsageTag.NUM_REPLICAS_LIGHTWEIGHT_UPDATED.record("True")
  2162. self._target_state = new_target_state
  2163. # Emit target replicas metric
  2164. self.target_replicas_gauge.set(target_num_replicas)
  2165. def deploy(self, deployment_info: DeploymentInfo) -> bool:
  2166. """Deploy the deployment.
  2167. If the deployment already exists with the same version, config,
  2168. target_capacity, and target_capacity_direction,
  2169. this method returns False.
  2170. Returns:
  2171. bool: Whether the target state has changed.
  2172. """
  2173. curr_deployment_info = self._target_state.info
  2174. if curr_deployment_info is not None:
  2175. # Redeploying should not reset the deployment's start time.
  2176. if not self._target_state.deleting:
  2177. deployment_info.start_time_ms = curr_deployment_info.start_time_ms
  2178. deployment_settings_changed = (
  2179. self._target_state.deleting
  2180. or curr_deployment_info.deployment_config
  2181. != deployment_info.deployment_config
  2182. or curr_deployment_info.replica_config.ray_actor_options
  2183. != deployment_info.replica_config.ray_actor_options
  2184. or curr_deployment_info.route_prefix != deployment_info.route_prefix
  2185. or deployment_info.version is None
  2186. or curr_deployment_info.version != deployment_info.version
  2187. )
  2188. target_capacity_changed = (
  2189. curr_deployment_info.target_capacity != deployment_info.target_capacity
  2190. or curr_deployment_info.target_capacity_direction
  2191. != deployment_info.target_capacity_direction
  2192. )
  2193. else:
  2194. deployment_settings_changed = True
  2195. target_capacity_changed = True
  2196. # Exit early if the deployment info hasn't changed. Ensures this method
  2197. # is idempotent.
  2198. if not deployment_settings_changed and not target_capacity_changed:
  2199. # Emit target replicas metric when the deployment info hasn't changed.
  2200. self.target_replicas_gauge.set(self._target_state.target_num_replicas)
  2201. return False
  2202. logger.debug(f"Deploying '{self._id}': {deployment_info.to_dict()}")
  2203. logger.debug(
  2204. f"Current target state for '{self._id}': "
  2205. f"{self._target_state.info.to_dict() if self._target_state.info is not None else None}"
  2206. )
  2207. if deployment_info.deployment_config.autoscaling_config:
  2208. target_num_replicas = self._autoscaling_state_manager.register_deployment(
  2209. self._id, deployment_info, self._target_state.target_num_replicas
  2210. )
  2211. else:
  2212. self._autoscaling_state_manager.deregister_deployment(self._id)
  2213. target_num_replicas = get_capacity_adjusted_num_replicas(
  2214. deployment_info.deployment_config.num_replicas,
  2215. deployment_info.target_capacity,
  2216. )
  2217. old_target_state = self._target_state
  2218. self._set_target_state(deployment_info, target_num_replicas=target_num_replicas)
  2219. self._deployment_scheduler.on_deployment_deployed(
  2220. self._id, deployment_info.replica_config
  2221. )
  2222. # Determine if the updated target state simply scales the current state.
  2223. # Although the else branch handles the CONFIG_UPDATE, we also take this branch
  2224. # for a config update whose only effect is changing `num_replicas`.
  2225. # Treating it as a scaling event keeps the user-visible deployment status more
  2226. # consistent for observability.
  2227. if self._target_state.is_scaled_copy_of(old_target_state):
  2228. old_num = old_target_state.target_num_replicas
  2229. new_num = self._target_state.target_num_replicas
  2230. if new_num > old_num:
  2231. self._curr_status_info = self._curr_status_info.handle_transition(
  2232. trigger=DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS, # noqa: E501
  2233. message=f"Upscaling from {old_num} to {new_num} replicas.",
  2234. )
  2235. elif new_num < old_num:
  2236. self._curr_status_info = self._curr_status_info.handle_transition(
  2237. trigger=DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS, # noqa: E501
  2238. message=f"Downscaling from {old_num} to {new_num} replicas.",
  2239. )
  2240. else:
  2241. # Otherwise, the deployment configuration has actually been updated.
  2242. self._curr_status_info = self._curr_status_info.handle_transition(
  2243. trigger=DeploymentStatusInternalTrigger.CONFIG_UPDATE
  2244. )
  2245. logger.info(
  2246. f"Deploying new version of {self._id} "
  2247. f"(initial target replicas: {target_num_replicas})."
  2248. )
  2249. self._replica_constructor_retry_counter = 0
  2250. self._replica_has_started = False
  2251. return True
  2252. def autoscale(self, decision_num_replicas: int) -> bool:
  2253. """
  2254. Apply the given scaling decision by updating the target replica count.
  2255. Skips if deleting, if `decision_num_replicas` is None, or matches the
  2256. current target. Otherwise updates the state and logs an up/down scaling.
  2257. Args:
  2258. decision_num_replicas: target replica count to apply.
  2259. Returns:
  2260. bool: True if the target state was updated, False if no change occurred.
  2261. """
  2262. if self._target_state.deleting:
  2263. return False
  2264. if decision_num_replicas == self._target_state.target_num_replicas:
  2265. return False
  2266. new_info = copy(self._target_state.info)
  2267. new_info.version = self._target_state.version.code_version
  2268. old_num = self._target_state.target_num_replicas
  2269. self._set_target_state(new_info, decision_num_replicas)
  2270. # The deployment should only transition to UPSCALING/DOWNSCALING
  2271. # if it's within the autoscaling bounds
  2272. if not self._autoscaling_state_manager.is_within_bounds(
  2273. self._id,
  2274. self._replicas.count(
  2275. states=[ReplicaState.RUNNING], version=self._target_state.version
  2276. ),
  2277. ):
  2278. return True
  2279. curr_stats_str = (
  2280. f"Current ongoing requests: "
  2281. f"{self._autoscaling_state_manager.get_total_num_requests_for_deployment(self._id):.2f}, "
  2282. f"current running replicas: "
  2283. f"{self._replicas.count(states=[ReplicaState.RUNNING])}."
  2284. )
  2285. new_num = self._target_state.target_num_replicas
  2286. if new_num > old_num:
  2287. logger.info(
  2288. f"Upscaling {self._id} from {old_num} to {new_num} replicas. "
  2289. f"{curr_stats_str}"
  2290. )
  2291. self._curr_status_info = self._curr_status_info.handle_transition(
  2292. trigger=DeploymentStatusInternalTrigger.AUTOSCALE_UP,
  2293. message=f"Upscaling from {old_num} to {new_num} replicas.",
  2294. )
  2295. self._autoscaling_state_manager.record_scale_up(self._id)
  2296. elif new_num < old_num:
  2297. logger.info(
  2298. f"Downscaling {self._id} from {old_num} to {new_num} replicas. "
  2299. f"{curr_stats_str}"
  2300. )
  2301. self._curr_status_info = self._curr_status_info.handle_transition(
  2302. trigger=DeploymentStatusInternalTrigger.AUTOSCALE_DOWN,
  2303. message=f"Downscaling from {old_num} to {new_num} replicas.",
  2304. )
  2305. self._autoscaling_state_manager.record_scale_down(self._id)
  2306. return True
  2307. def delete(self) -> bool:
  2308. if not self._target_state.deleting:
  2309. self._set_target_state_deleting()
  2310. return True
  2311. return False
  2312. def set_target_num_replicas(
  2313. self,
  2314. target_num_replicas: int,
  2315. ) -> None:
  2316. """Set the target state for the deployment to the provided info."""
  2317. self._set_target_state(
  2318. self._target_state.info, target_num_replicas, updated_via_api=True
  2319. )
  2320. def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> bool:
  2321. """Stop or update replicas with outdated versions.
  2322. Stop replicas with versions that require the actor to be restarted, and
  2323. reconfigure replicas that require refreshing deployment config values.
  2324. Args:
  2325. max_to_stop: max number of replicas to stop, by default,
  2326. it stops all replicas with an outdated version.
  2327. """
  2328. replicas_to_update = self._replicas.pop(
  2329. exclude_version=self._target_state.version,
  2330. states=[
  2331. ReplicaState.STARTING,
  2332. ReplicaState.PENDING_MIGRATION,
  2333. ReplicaState.RUNNING,
  2334. ],
  2335. )
  2336. replicas_changed = False
  2337. code_version_changes = 0
  2338. reconfigure_changes = 0
  2339. for replica in replicas_to_update:
  2340. if (code_version_changes + reconfigure_changes) >= max_to_stop:
  2341. self._replicas.add(replica.actor_details.state, replica)
  2342. # If the new version requires the actors to be restarted, stop the replica.
  2343. # A new one with the correct version will be started later as part of the
  2344. # normal scale-up process.
  2345. elif replica.version.requires_actor_restart(self._target_state.version):
  2346. code_version_changes += 1
  2347. # If the replica is still `STARTING`, we don't need to go through the
  2348. # graceful stop period.
  2349. graceful_stop = replica.actor_details.state == ReplicaState.RUNNING
  2350. self._stop_replica(replica, graceful_stop=graceful_stop)
  2351. replicas_changed = True
  2352. # Otherwise, only lightweight options in deployment config is a mismatch, so
  2353. # we update it dynamically without restarting the replica.
  2354. elif replica.actor_details.state == ReplicaState.RUNNING:
  2355. reconfigure_changes += 1
  2356. if replica.version.requires_long_poll_broadcast(
  2357. self._target_state.version
  2358. ):
  2359. replicas_changed = True
  2360. # Get current rank for the replica
  2361. current_rank = self._rank_manager.get_replica_rank(
  2362. replica.replica_id.unique_id
  2363. )
  2364. actor_updating = replica.reconfigure(
  2365. self._target_state.version, rank=current_rank.rank
  2366. )
  2367. if actor_updating:
  2368. self._replicas.add(ReplicaState.UPDATING, replica)
  2369. else:
  2370. self._replicas.add(ReplicaState.RUNNING, replica)
  2371. # We don't allow going from STARTING, PENDING_MIGRATION to UPDATING.
  2372. else:
  2373. self._replicas.add(replica.actor_details.state, replica)
  2374. if code_version_changes > 0:
  2375. logger.info(
  2376. f"Stopping {code_version_changes} replicas of {self._id} "
  2377. "with outdated versions."
  2378. )
  2379. if reconfigure_changes > 0:
  2380. logger.info(
  2381. f"Updating {reconfigure_changes} replicas of {self._id} "
  2382. "with outdated deployment configs."
  2383. )
  2384. # Record user config lightweight update
  2385. ServeUsageTag.USER_CONFIG_LIGHTWEIGHT_UPDATED.record("True")
  2386. return replicas_changed
  2387. def _check_and_stop_outdated_version_replicas(self) -> bool:
  2388. """Stops replicas with outdated versions to implement rolling updates.
  2389. This includes both explicit code version updates and changes to the
  2390. user_config.
  2391. Returns whether any replicas were stopped.
  2392. """
  2393. # Short circuit if target replicas is 0 (the deployment is being
  2394. # deleted) because this will be handled in the main loop.
  2395. if self._target_state.target_num_replicas == 0:
  2396. return False
  2397. # We include STARTING and UPDATING replicas here
  2398. # because if there are replicas still pending startup, we may as well
  2399. # terminate them and start new version replicas instead.
  2400. old_running_replicas = self._replicas.count(
  2401. exclude_version=self._target_state.version,
  2402. states=[
  2403. ReplicaState.STARTING,
  2404. ReplicaState.UPDATING,
  2405. ReplicaState.RUNNING,
  2406. ],
  2407. )
  2408. old_stopping_replicas = self._replicas.count(
  2409. exclude_version=self._target_state.version, states=[ReplicaState.STOPPING]
  2410. )
  2411. new_running_replicas = self._replicas.count(
  2412. version=self._target_state.version, states=[ReplicaState.RUNNING]
  2413. )
  2414. # If the deployment is currently scaling down, let the scale down
  2415. # complete before doing a rolling update.
  2416. if (
  2417. self._target_state.target_num_replicas
  2418. < old_running_replicas + old_stopping_replicas
  2419. ):
  2420. return False
  2421. # The number of replicas that are currently in transition between
  2422. # an old version and the new version. Note that we cannot directly
  2423. # count the number of stopping replicas because once replicas finish
  2424. # stopping, they are removed from the data structure.
  2425. pending_replicas = (
  2426. self._target_state.target_num_replicas
  2427. - new_running_replicas
  2428. - old_running_replicas
  2429. )
  2430. # Maximum number of replicas that can be updating at any given time.
  2431. # There should never be more than rollout_size old replicas stopping
  2432. # or rollout_size new replicas starting.
  2433. rollout_size = max(int(0.2 * self._target_state.target_num_replicas), 1)
  2434. max_to_stop = max(rollout_size - pending_replicas, 0)
  2435. return self._stop_or_update_outdated_version_replicas(max_to_stop)
  2436. def scale_deployment_replicas(
  2437. self,
  2438. ) -> Tuple[List[ReplicaSchedulingRequest], DeploymentDownscaleRequest]:
  2439. """Scale the given deployment to the number of replicas."""
  2440. assert (
  2441. self._target_state.target_num_replicas >= 0
  2442. ), "Target number of replicas must be greater than or equal to 0."
  2443. upscale = []
  2444. downscale = None
  2445. self._check_and_stop_outdated_version_replicas()
  2446. current_replicas = self._replicas.count(
  2447. states=[ReplicaState.STARTING, ReplicaState.UPDATING, ReplicaState.RUNNING]
  2448. )
  2449. recovering_replicas = self._replicas.count(states=[ReplicaState.RECOVERING])
  2450. delta_replicas = (
  2451. self._target_state.target_num_replicas
  2452. - current_replicas
  2453. - recovering_replicas
  2454. )
  2455. if delta_replicas == 0:
  2456. return (upscale, downscale)
  2457. elif delta_replicas > 0:
  2458. to_add = delta_replicas
  2459. if to_add > 0 and not self._terminally_failed():
  2460. logger.info(f"Adding {to_add} replica{'s' * (to_add>1)} to {self._id}.")
  2461. for _ in range(to_add):
  2462. replica_id = ReplicaID(get_random_string(), deployment_id=self._id)
  2463. new_deployment_replica = DeploymentReplica(
  2464. replica_id,
  2465. self._target_state.version,
  2466. )
  2467. scheduling_request = new_deployment_replica.start(
  2468. self._target_state.info,
  2469. assign_rank_callback=self._rank_manager.assign_rank,
  2470. )
  2471. upscale.append(scheduling_request)
  2472. self._replicas.add(ReplicaState.STARTING, new_deployment_replica)
  2473. elif delta_replicas < 0:
  2474. to_remove = -delta_replicas
  2475. removed_replicas = f"{to_remove} replica{'s' if to_remove > 1 else ''}"
  2476. logger.info(f"Removing {removed_replicas} from {self._id}.")
  2477. downscale = DeploymentDownscaleRequest(
  2478. deployment_id=self._id, num_to_stop=to_remove
  2479. )
  2480. return upscale, downscale
  2481. def check_curr_status(self) -> Tuple[bool, bool]:
  2482. """Check the current deployment status.
  2483. Checks the difference between the target vs. running replica count for
  2484. the target version.
  2485. This will update the current deployment status depending on the state
  2486. of the replicas.
  2487. Returns (deleted, any_replicas_recovering).
  2488. """
  2489. # TODO(edoakes): we could make this more efficient in steady-state by
  2490. # having a "healthy" flag that gets flipped if an update or replica
  2491. # failure happens.
  2492. target_version = self._target_state.version
  2493. any_replicas_recovering = (
  2494. self._replicas.count(states=[ReplicaState.RECOVERING]) > 0
  2495. )
  2496. all_running_replica_cnt = self._replicas.count(states=[ReplicaState.RUNNING])
  2497. running_at_target_version_replica_cnt = self._replicas.count(
  2498. states=[ReplicaState.RUNNING], version=target_version
  2499. )
  2500. # Got to make a call to complete current deploy() goal after
  2501. # start failure threshold reached, while we might still have
  2502. # pending replicas in current goal.
  2503. if running_at_target_version_replica_cnt > 0:
  2504. # At least one RUNNING replica at target state, partial
  2505. # success; We can stop tracking constructor failures and
  2506. # leave it to the controller to fully scale to target
  2507. # number of replicas and only return as completed once
  2508. # reached target replica count
  2509. self._replica_has_started = True
  2510. elif self._replica_startup_failing():
  2511. self._curr_status_info = self._curr_status_info.handle_transition(
  2512. trigger=DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED,
  2513. message=(
  2514. "The deployment failed to start "
  2515. f"{self._replica_constructor_retry_counter} times "
  2516. "in a row. This may be due to a problem with its "
  2517. "constructor or initial health check failing. See "
  2518. "controller logs for details. Error:\n"
  2519. f"{self._replica_constructor_error_msg}"
  2520. ),
  2521. )
  2522. return False, any_replicas_recovering
  2523. # If we have pending ops, the current goal is *not* ready.
  2524. if (
  2525. self._replicas.count(
  2526. states=[
  2527. ReplicaState.STARTING,
  2528. ReplicaState.UPDATING,
  2529. ReplicaState.RECOVERING,
  2530. ReplicaState.STOPPING,
  2531. ]
  2532. )
  2533. == 0
  2534. ):
  2535. # Check for deleting and a non-zero number of deployments.
  2536. if self._target_state.deleting and all_running_replica_cnt == 0:
  2537. return True, any_replicas_recovering
  2538. if (
  2539. self._target_state.target_num_replicas
  2540. == running_at_target_version_replica_cnt
  2541. and running_at_target_version_replica_cnt == all_running_replica_cnt
  2542. ):
  2543. self._curr_status_info = self._curr_status_info.handle_transition(
  2544. trigger=DeploymentStatusInternalTrigger.HEALTHY
  2545. )
  2546. self._replica_constructor_retry_counter = 0
  2547. return False, any_replicas_recovering
  2548. return False, any_replicas_recovering
  2549. def _check_startup_replicas(
  2550. self, original_state: ReplicaState, stop_on_slow=False
  2551. ) -> List[Tuple[DeploymentReplica, ReplicaStartupStatus]]:
  2552. """
  2553. Common helper function for startup actions tracking and status
  2554. transition: STARTING, UPDATING and RECOVERING.
  2555. Args:
  2556. stop_on_slow: If we consider a replica failed upon observing it's
  2557. slow to reach running state.
  2558. """
  2559. slow_replicas = []
  2560. for replica in self._replicas.pop(states=[original_state]):
  2561. start_status, error_msg = replica.check_started()
  2562. if start_status == ReplicaStartupStatus.SUCCEEDED:
  2563. if original_state == ReplicaState.RECOVERING:
  2564. # If the previous state was RECOVERING, that mean the replica
  2565. # crashed and is now starting up again. We need to recover the rank
  2566. # from the replica actor. The invariant is that the rank is assigned
  2567. # during startup and before the replica is added to the replicas
  2568. # data structure with RUNNING state.
  2569. # Recover rank from the replica actor during controller restart
  2570. replica_id = replica.replica_id.unique_id
  2571. self._rank_manager.recover_rank(
  2572. replica_id, replica.actor_node_id, replica.rank
  2573. )
  2574. # This replica should be now be added to handle's replica
  2575. # set.
  2576. self._replicas.add(ReplicaState.RUNNING, replica)
  2577. self._deployment_scheduler.on_replica_running(
  2578. replica.replica_id, replica.actor_node_id
  2579. )
  2580. # if replica version is the same as the target version,
  2581. # we update the docs path and route patterns
  2582. if replica.version == self._target_state.version:
  2583. self._docs_path = replica.docs_path
  2584. self._route_patterns = replica.route_patterns
  2585. # Log the startup latency.
  2586. e2e_replica_start_latency = time.time() - replica._start_time
  2587. replica_startup_message = (
  2588. f"{replica.replica_id} started successfully "
  2589. f"on node '{replica.actor_node_id}' after "
  2590. f"{e2e_replica_start_latency:.1f}s (PID: {replica.actor_pid})."
  2591. )
  2592. if replica.initialization_latency_s is not None:
  2593. # This condition should always be True. The initialization
  2594. # latency is only None before the replica has initialized.
  2595. replica_startup_message += (
  2596. " Replica constructor, "
  2597. "reconfigure method, and initial health check took "
  2598. f"{replica.initialization_latency_s:.1f}s."
  2599. )
  2600. logger.info(replica_startup_message, extra={"log_to_stderr": False})
  2601. # Record startup or reconfigure latency metrics.
  2602. metric_tags = {
  2603. "replica": replica.replica_id.unique_id,
  2604. }
  2605. if original_state == ReplicaState.STARTING:
  2606. # Record replica startup latency (end-to-end from creation to ready).
  2607. # This includes the time taken from starting a node, scheduling the replica,
  2608. # and the replica constructor.
  2609. e2e_replica_start_latency_ms = e2e_replica_start_latency * 1000
  2610. self.replica_startup_latency_histogram.observe(
  2611. e2e_replica_start_latency_ms, tags=metric_tags
  2612. )
  2613. # Record replica initialization latency.
  2614. if replica.initialization_latency_s is not None:
  2615. initialization_latency_ms = (
  2616. replica.initialization_latency_s * 1000
  2617. )
  2618. self.replica_initialization_latency_histogram.observe(
  2619. initialization_latency_ms, tags=metric_tags
  2620. )
  2621. elif original_state == ReplicaState.UPDATING:
  2622. # Record replica reconfigure latency.
  2623. if replica.reconfigure_start_time is not None:
  2624. reconfigure_latency_ms = (
  2625. time.time() - replica.reconfigure_start_time
  2626. ) * 1000
  2627. self.replica_reconfigure_latency_histogram.observe(
  2628. reconfigure_latency_ms, tags=metric_tags
  2629. )
  2630. elif start_status == ReplicaStartupStatus.FAILED:
  2631. # Replica reconfigure (deploy / upgrade) failed
  2632. self.record_replica_startup_failure(error_msg)
  2633. self._stop_replica(replica)
  2634. elif start_status in [
  2635. ReplicaStartupStatus.PENDING_ALLOCATION,
  2636. ReplicaStartupStatus.PENDING_INITIALIZATION,
  2637. ]:
  2638. is_slow = time.time() - replica._start_time > SLOW_STARTUP_WARNING_S
  2639. if is_slow:
  2640. slow_replicas.append((replica, start_status))
  2641. # Does it make sense to stop replicas in PENDING_ALLOCATION
  2642. # state?
  2643. if is_slow and stop_on_slow:
  2644. self._stop_replica(replica, graceful_stop=False)
  2645. else:
  2646. self._replicas.add(original_state, replica)
  2647. return slow_replicas
  2648. def record_replica_startup_failure(self, error_msg: str):
  2649. """Record that a replica failed to start."""
  2650. # There is no need to record replica failures if the target is 0.
  2651. if self._target_state.target_num_replicas == 0:
  2652. return
  2653. # Increase startup failure counter
  2654. self._replica_constructor_retry_counter += 1
  2655. self._replica_constructor_error_msg = error_msg
  2656. # Update the deployment message only if replicas are failing during
  2657. # the very first time the controller is trying to start replicas of
  2658. # this version.
  2659. retrying_msg = ""
  2660. if not self._replica_has_started:
  2661. remaining_retries = max(
  2662. self._failed_to_start_threshold
  2663. - self._replica_constructor_retry_counter,
  2664. 0,
  2665. )
  2666. retrying_msg = f" {remaining_retries} more time(s)"
  2667. message = (
  2668. f"A replica failed to start with exception. Retrying{retrying_msg}. "
  2669. f"Error:\n{error_msg}"
  2670. )
  2671. self._curr_status_info = self._curr_status_info.update_message(message)
  2672. def stop_replicas(self, replicas_to_stop) -> None:
  2673. for replica in self._replicas.pop():
  2674. if replica.replica_id in replicas_to_stop:
  2675. self._stop_replica(replica)
  2676. else:
  2677. self._replicas.add(replica.actor_details.state, replica)
  2678. def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True):
  2679. """Stop replica
  2680. 1. Stop the replica.
  2681. 2. Change the replica into stopping state.
  2682. 3. Set the health replica stats to 0.
  2683. """
  2684. logger.debug(f"Adding STOPPING to replica: {replica.replica_id}.")
  2685. replica.stop(graceful=graceful_stop)
  2686. self._replicas.add(ReplicaState.STOPPING, replica)
  2687. self._deployment_scheduler.on_replica_stopping(replica.replica_id)
  2688. self.health_check_gauge.set(
  2689. 0,
  2690. tags={
  2691. "replica": replica.replica_id.unique_id,
  2692. },
  2693. )
  2694. def check_and_update_replicas(self):
  2695. """
  2696. Check current state of all DeploymentReplica being tracked, and compare
  2697. with state container from previous update() cycle to see if any state
  2698. transition happened.
  2699. """
  2700. for replica in self._replicas.pop(
  2701. states=[ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION]
  2702. ):
  2703. is_healthy = replica.check_health()
  2704. # Record health check latency and failure metrics.
  2705. metric_tags = {
  2706. "replica": replica.replica_id.unique_id,
  2707. }
  2708. if replica.last_health_check_latency_ms is not None:
  2709. self.health_check_latency_histogram.observe(
  2710. replica.last_health_check_latency_ms, tags=metric_tags
  2711. )
  2712. if replica.last_health_check_failed:
  2713. self.health_check_failures_counter.inc(tags=metric_tags)
  2714. if is_healthy:
  2715. self._replicas.add(replica.actor_details.state, replica)
  2716. self.health_check_gauge.set(
  2717. 1,
  2718. tags={
  2719. "replica": replica.replica_id.unique_id,
  2720. },
  2721. )
  2722. routing_stats = replica.pull_routing_stats()
  2723. replica.record_routing_stats(routing_stats)
  2724. else:
  2725. logger.warning(
  2726. f"Replica {replica.replica_id} failed health check, stopping it."
  2727. )
  2728. self.health_check_gauge.set(
  2729. 0,
  2730. tags={
  2731. "replica": replica.replica_id.unique_id,
  2732. },
  2733. )
  2734. self._stop_replica(
  2735. replica, graceful_stop=not self.FORCE_STOP_UNHEALTHY_REPLICAS
  2736. )
  2737. # If this is a replica of the target version, the deployment
  2738. # enters the "UNHEALTHY" status until the replica is
  2739. # recovered or a new deploy happens.
  2740. if replica.version == self._target_state.version:
  2741. self._curr_status_info = self._curr_status_info.handle_transition(
  2742. trigger=DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED,
  2743. message="A replica's health check failed. This "
  2744. "deployment will be UNHEALTHY until the replica "
  2745. "recovers or a new deploy happens.",
  2746. )
  2747. slow_start_replicas = []
  2748. slow_start = self._check_startup_replicas(ReplicaState.STARTING)
  2749. slow_update = self._check_startup_replicas(ReplicaState.UPDATING)
  2750. slow_recover = self._check_startup_replicas(
  2751. ReplicaState.RECOVERING, stop_on_slow=True
  2752. )
  2753. slow_start_replicas = slow_start + slow_update + slow_recover
  2754. if (
  2755. len(slow_start_replicas)
  2756. and time.time() - self._prev_startup_warning > SLOW_STARTUP_WARNING_PERIOD_S
  2757. ):
  2758. pending_allocation = []
  2759. pending_initialization = []
  2760. for replica, startup_status in slow_start_replicas:
  2761. if startup_status == ReplicaStartupStatus.PENDING_ALLOCATION:
  2762. pending_allocation.append(replica)
  2763. if startup_status == ReplicaStartupStatus.PENDING_INITIALIZATION:
  2764. pending_initialization.append(replica)
  2765. if len(pending_allocation) > 0:
  2766. required, available = pending_allocation[0].resource_requirements()
  2767. message = (
  2768. f"Deployment '{self.deployment_name}' in application "
  2769. f"'{self.app_name}' has {len(pending_allocation)} replicas that "
  2770. f"have taken more than {SLOW_STARTUP_WARNING_S}s to be scheduled. "
  2771. "This may be due to waiting for the cluster to auto-scale or for a "
  2772. "runtime environment to be installed. "
  2773. f"Resources required for each replica: {required}, "
  2774. f"total resources available: {available}. "
  2775. "Use `ray status` for more details."
  2776. )
  2777. logger.warning(message)
  2778. if _SCALING_LOG_ENABLED:
  2779. print_verbose_scaling_log()
  2780. # If status is UNHEALTHY, leave the status and message as is.
  2781. # The issue that caused the deployment to be unhealthy should be
  2782. # prioritized over this resource availability issue.
  2783. if self._curr_status_info.status not in [
  2784. DeploymentStatus.UNHEALTHY,
  2785. DeploymentStatus.DEPLOY_FAILED,
  2786. ]:
  2787. self._curr_status_info = self._curr_status_info.update_message(
  2788. message
  2789. )
  2790. if len(pending_initialization) > 0:
  2791. message = (
  2792. f"Deployment '{self.deployment_name}' in application "
  2793. f"'{self.app_name}' has {len(pending_initialization)} replicas "
  2794. f"that have taken more than {SLOW_STARTUP_WARNING_S}s to "
  2795. "initialize.\n"
  2796. "This may be caused by a slow __init__ or reconfigure method."
  2797. )
  2798. logger.warning(message)
  2799. # If status is UNHEALTHY, leave the status and message as is.
  2800. # The issue that caused the deployment to be unhealthy should be
  2801. # prioritized over this resource availability issue.
  2802. if self._curr_status_info.status not in [
  2803. DeploymentStatus.UNHEALTHY,
  2804. DeploymentStatus.DEPLOY_FAILED,
  2805. ]:
  2806. self._curr_status_info = self._curr_status_info.update_message(
  2807. message
  2808. )
  2809. self._prev_startup_warning = time.time()
  2810. for replica in self._replicas.pop(states=[ReplicaState.STOPPING]):
  2811. stopped = replica.check_stopped()
  2812. if not stopped:
  2813. self._replicas.add(ReplicaState.STOPPING, replica)
  2814. else:
  2815. logger.info(f"{replica.replica_id} is stopped.")
  2816. # Record shutdown duration metric.
  2817. if replica.shutdown_start_time is not None:
  2818. shutdown_duration_ms = (
  2819. time.time() - replica.shutdown_start_time
  2820. ) * 1000
  2821. self.replica_shutdown_duration_histogram.observe(
  2822. shutdown_duration_ms,
  2823. tags={
  2824. "replica": replica.replica_id.unique_id,
  2825. },
  2826. )
  2827. # Release rank only after replica is successfully stopped
  2828. # This ensures rank is available during draining/graceful shutdown
  2829. replica_id = replica.replica_id.unique_id
  2830. if self._rank_manager.has_replica_rank(replica_id):
  2831. # Only release rank if assigned. Replicas that failed allocation
  2832. # or never reached RUNNING state won't have ranks.
  2833. self._rank_manager.release_rank(replica_id)
  2834. logger.debug(
  2835. f"Released rank from replica {replica_id} in deployment {self._id}"
  2836. )
  2837. self._autoscaling_state_manager.on_replica_stopped(replica.replica_id)
  2838. # After replica state updates, check rank consistency and perform minimal reassignment if needed
  2839. # This ensures ranks are continuous after lifecycle events
  2840. # Only do consistency check when deployment is stable (not during active updates)
  2841. # maybe this constraint need to be relaxed in the future. The implication is that
  2842. # if we delay the rank reassignment, the rank system will be in an invalid state
  2843. # for a longer period of time. Abrar made this decision because he is not confident
  2844. # about how rollouts work in the deployment state machine.
  2845. active_replicas = self._replicas.get()
  2846. if (
  2847. active_replicas
  2848. and self._curr_status_info.status == DeploymentStatus.HEALTHY
  2849. ):
  2850. replicas_to_reconfigure = (
  2851. self._rank_manager.check_rank_consistency_and_reassign_minimally(
  2852. active_replicas,
  2853. )
  2854. )
  2855. # Reconfigure replicas that had their ranks reassigned
  2856. self._reconfigure_replicas_with_new_ranks(replicas_to_reconfigure)
  2857. def _reconfigure_replicas_with_new_ranks(
  2858. self, replicas_to_reconfigure: List["DeploymentReplica"]
  2859. ):
  2860. """Reconfigure replicas with their new ranks after reassignment.
  2861. This uses the reconfigure() mechanism to update replicas with their new ranks.
  2862. """
  2863. if not replicas_to_reconfigure:
  2864. return
  2865. logger.debug(
  2866. f"Reconfiguring {len(replicas_to_reconfigure)} replicas with rank changes in deployment {self._id}"
  2867. )
  2868. updated_count = 0
  2869. for replica in replicas_to_reconfigure:
  2870. replica_id = replica.replica_id.unique_id
  2871. new_rank = self._rank_manager.get_replica_rank(replica_id)
  2872. # Use reconfigure() to update rank
  2873. # World size is calculated automatically from deployment config
  2874. _ = replica.reconfigure(
  2875. self._target_state.version,
  2876. rank=new_rank,
  2877. )
  2878. updated_count += 1
  2879. logger.debug(
  2880. f"Successfully reconfigured {updated_count} replicas with new ranks in deployment {self._id}"
  2881. )
  2882. def _get_replica_ranks_mapping(self) -> Dict[str, ReplicaRank]:
  2883. """Get the current mapping of replica IDs to ReplicaRank objects.
  2884. Returns:
  2885. Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
  2886. """
  2887. return self._rank_manager.get_replica_ranks_mapping()
  2888. def _choose_pending_migration_replicas_to_stop(
  2889. self,
  2890. replicas: List[DeploymentReplica],
  2891. deadlines: Dict[str, int],
  2892. min_replicas_to_stop: int,
  2893. ) -> Tuple[List[DeploymentReplica], List[DeploymentReplica]]:
  2894. """Returns a partition of replicas to stop and to keep.
  2895. Args:
  2896. replicas: The current list of replicas pending migration.
  2897. deadlines: The current draining node deadlines.
  2898. min_replicas_to_stop: The minimum number of replicas to stop.
  2899. """
  2900. to_stop = []
  2901. remaining = []
  2902. # Stop replicas whose deadline is up
  2903. for replica in replicas:
  2904. assert replica.actor_node_id in deadlines
  2905. curr_timestamp_ms = time.time() * 1000
  2906. timeout_ms = replica._actor.graceful_shutdown_timeout_s * 1000
  2907. if curr_timestamp_ms >= deadlines[replica.actor_node_id] - timeout_ms:
  2908. to_stop.append(replica)
  2909. else:
  2910. remaining.append(replica)
  2911. # Stop excess PENDING_MIGRATION replicas when new "replacement"
  2912. # replicas have transitioned to RUNNING. The replicas with the
  2913. # earliest deadlines should be chosen greedily.
  2914. remaining.sort(key=lambda r: deadlines[r.actor_node_id])
  2915. num_excess = min_replicas_to_stop - len(to_stop)
  2916. if num_excess > 0:
  2917. to_stop.extend(remaining[:num_excess])
  2918. remaining = remaining[num_excess:]
  2919. return to_stop, remaining
  2920. def migrate_replicas_on_draining_nodes(self, draining_nodes: Dict[str, int]):
  2921. # Move replicas back to running if they are no longer on a draining node.
  2922. # If this causes the number of replicas to exceed the target state,
  2923. # they will be scaled down because `scale_deployment_replicas` is called on
  2924. # each deployment after this
  2925. for replica in self._replicas.pop(states=[ReplicaState.PENDING_MIGRATION]):
  2926. if replica.actor_node_id not in draining_nodes:
  2927. self._replicas.add(ReplicaState.RUNNING, replica)
  2928. else:
  2929. self._replicas.add(ReplicaState.PENDING_MIGRATION, replica)
  2930. # Migrate replicas on draining nodes
  2931. for replica in self._replicas.pop(
  2932. states=[ReplicaState.UPDATING, ReplicaState.RUNNING, ReplicaState.STARTING]
  2933. ):
  2934. if replica.actor_node_id in draining_nodes:
  2935. # For RUNNING replicas, migrate them safely by starting
  2936. # a replacement replica first.
  2937. if replica.actor_details.state == ReplicaState.RUNNING:
  2938. logger.info(
  2939. f"Migrating {replica.replica_id} from draining node "
  2940. f"'{replica.actor_node_id}'. A new replica will be created on "
  2941. "another node."
  2942. )
  2943. self._replicas.add(ReplicaState.PENDING_MIGRATION, replica)
  2944. # For replicas that are STARTING or UPDATING, might as
  2945. # well terminate them immediately to allow replacement
  2946. # replicas to start. Otherwise we need to wait for them
  2947. # to transition to RUNNING before starting migration.
  2948. else:
  2949. self._stop_replica(replica, graceful_stop=True)
  2950. else:
  2951. self._replicas.add(replica.actor_details.state, replica)
  2952. num_running = self._replicas.count(states=[ReplicaState.RUNNING])
  2953. num_draining = self._replicas.count(states=[ReplicaState.PENDING_MIGRATION])
  2954. num_pending_migration_replicas_to_stop = (
  2955. num_running + num_draining - self._target_state.target_num_replicas
  2956. )
  2957. (
  2958. replicas_to_stop,
  2959. replicas_to_keep,
  2960. ) = self._choose_pending_migration_replicas_to_stop(
  2961. self._replicas.pop(states=[ReplicaState.PENDING_MIGRATION]),
  2962. draining_nodes,
  2963. num_pending_migration_replicas_to_stop,
  2964. )
  2965. for replica in replicas_to_stop:
  2966. logger.info(
  2967. f"Stopping {replica.replica_id} "
  2968. f"on draining node {replica.actor_node_id}."
  2969. )
  2970. self._stop_replica(replica, graceful_stop=True)
  2971. for replica in replicas_to_keep:
  2972. self._replicas.add(ReplicaState.PENDING_MIGRATION, replica)
  2973. def record_request_routing_info(self, info: RequestRoutingInfo) -> None:
  2974. """Records the multiplexed model IDs of a replica.
  2975. Args:
  2976. info: RequestRoutingInfo including deployment name, replica tag,
  2977. multiplex model ids, and routing stats.
  2978. """
  2979. # Find the replica
  2980. for replica in self._replicas.get():
  2981. if replica.replica_id == info.replica_id:
  2982. if info.multiplexed_model_ids is not None:
  2983. replica.record_multiplexed_model_ids(info.multiplexed_model_ids)
  2984. if info.routing_stats is not None:
  2985. replica.record_routing_stats(info.routing_stats)
  2986. self._request_routing_info_updated = True
  2987. return
  2988. logger.warning(f"{info.replica_id} not found.")
  2989. def _stop_one_running_replica_for_testing(self):
  2990. running_replicas = self._replicas.pop(states=[ReplicaState.RUNNING])
  2991. replica_to_stop = running_replicas.pop()
  2992. replica_to_stop.stop(graceful=False)
  2993. self._replicas.add(ReplicaState.STOPPING, replica_to_stop)
  2994. for replica in running_replicas:
  2995. self._replicas.add(ReplicaState.RUNNING, replica)
  2996. def is_ingress(self) -> bool:
  2997. return self._target_state.info.ingress
  2998. def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
  2999. """Get the outbound deployments.
  3000. Returns:
  3001. Sorted list of deployment IDs that this deployment calls. None if
  3002. outbound deployments are not yet polled.
  3003. """
  3004. result: Set[DeploymentID] = set()
  3005. has_outbound_deployments = False
  3006. for replica in self._replicas.get([ReplicaState.RUNNING]):
  3007. if replica.version != self._target_state.version:
  3008. # Only consider replicas of the target version
  3009. continue
  3010. outbound_deployments = replica.get_outbound_deployments()
  3011. if outbound_deployments is not None:
  3012. result.update(outbound_deployments)
  3013. has_outbound_deployments = True
  3014. if not has_outbound_deployments:
  3015. return None
  3016. return sorted(result, key=lambda d: (d.name))
  3017. class DeploymentStateManager:
  3018. """Manages all state for deployments in the system.
  3019. This class is *not* thread safe, so any state-modifying methods should be
  3020. called with a lock held.
  3021. """
  3022. def __init__(
  3023. self,
  3024. kv_store: KVStoreBase,
  3025. long_poll_host: LongPollHost,
  3026. all_current_actor_names: List[str],
  3027. all_current_placement_group_names: List[str],
  3028. cluster_node_info_cache: ClusterNodeInfoCache,
  3029. autoscaling_state_manager: AutoscalingStateManager,
  3030. head_node_id_override: Optional[str] = None,
  3031. create_placement_group_fn_override: Optional[Callable] = None,
  3032. ):
  3033. self._kv_store = kv_store
  3034. self._long_poll_host = long_poll_host
  3035. self._cluster_node_info_cache = cluster_node_info_cache
  3036. self._deployment_scheduler = default_impl.create_deployment_scheduler(
  3037. cluster_node_info_cache,
  3038. head_node_id_override,
  3039. create_placement_group_fn_override,
  3040. )
  3041. self._autoscaling_state_manager = autoscaling_state_manager
  3042. self._shutting_down = False
  3043. self._deployment_states: Dict[DeploymentID, DeploymentState] = {}
  3044. self._app_deployment_mapping: Dict[str, Set[str]] = defaultdict(set)
  3045. # Metric for tracking deployment status
  3046. self._deployment_status_gauge = ray_metrics.Gauge(
  3047. "serve_deployment_status",
  3048. description=(
  3049. "Numeric status of deployment. "
  3050. "0=UNKNOWN, 1=DEPLOY_FAILED, 2=UNHEALTHY, 3=UPDATING, "
  3051. "4=UPSCALING, 5=DOWNSCALING, 6=HEALTHY."
  3052. ),
  3053. tag_keys=("deployment", "application"),
  3054. )
  3055. self._recover_from_checkpoint(
  3056. all_current_actor_names, all_current_placement_group_names
  3057. )
  3058. def _create_deployment_state(self, deployment_id):
  3059. self._deployment_scheduler.on_deployment_created(
  3060. deployment_id, SpreadDeploymentSchedulingPolicy()
  3061. )
  3062. return DeploymentState(
  3063. deployment_id,
  3064. self._long_poll_host,
  3065. self._deployment_scheduler,
  3066. self._cluster_node_info_cache,
  3067. self._autoscaling_state_manager,
  3068. )
  3069. def _map_actor_names_to_deployment(
  3070. self, all_current_actor_names: List[str]
  3071. ) -> Dict[str, List[str]]:
  3072. """
  3073. Given a list of all actor names queried from current ray cluster,
  3074. map them to corresponding deployments.
  3075. Example:
  3076. Args:
  3077. [A#zxc123, B#xcv234, A#qwe234]
  3078. Returns:
  3079. {
  3080. A: [A#zxc123, A#qwe234]
  3081. B: [B#xcv234]
  3082. }
  3083. """
  3084. all_replica_names = [
  3085. actor_name
  3086. for actor_name in all_current_actor_names
  3087. if ReplicaID.is_full_id_str(actor_name)
  3088. ]
  3089. deployment_to_current_replicas = defaultdict(list)
  3090. if len(all_replica_names) > 0:
  3091. for replica_name in all_replica_names:
  3092. replica_id = ReplicaID.from_full_id_str(replica_name)
  3093. deployment_to_current_replicas[replica_id.deployment_id].append(
  3094. replica_name
  3095. )
  3096. return deployment_to_current_replicas
  3097. def _detect_and_remove_leaked_placement_groups(
  3098. self,
  3099. all_current_actor_names: List[str],
  3100. all_current_placement_group_names: List[str],
  3101. ):
  3102. """Detect and remove any placement groups not associated with a replica.
  3103. This can happen under certain rare circumstances:
  3104. - The controller creates a placement group then crashes before creating
  3105. the associated replica actor.
  3106. - While the controller is down, a replica actor crashes but its placement
  3107. group still exists.
  3108. In both of these (or any other unknown cases), we simply need to remove the
  3109. leaked placement groups.
  3110. """
  3111. leaked_pg_names = []
  3112. for pg_name in all_current_placement_group_names:
  3113. if (
  3114. ReplicaID.is_full_id_str(pg_name)
  3115. and pg_name not in all_current_actor_names
  3116. ):
  3117. leaked_pg_names.append(pg_name)
  3118. if len(leaked_pg_names) > 0:
  3119. logger.warning(
  3120. f"Detected leaked placement groups: {leaked_pg_names}. "
  3121. "The placement groups will be removed. This can happen in rare "
  3122. "circumstances when the controller crashes and should not cause any "
  3123. "issues. If this happens repeatedly, please file an issue on GitHub."
  3124. )
  3125. for leaked_pg_name in leaked_pg_names:
  3126. try:
  3127. pg = ray.util.get_placement_group(leaked_pg_name)
  3128. ray.util.remove_placement_group(pg)
  3129. except Exception:
  3130. logger.exception(
  3131. f"Failed to remove leaked placement group {leaked_pg_name}."
  3132. )
  3133. def _recover_from_checkpoint(
  3134. self,
  3135. all_current_actor_names: List[str],
  3136. all_current_placement_group_names: List[str],
  3137. ):
  3138. """
  3139. Recover from checkpoint upon controller failure with all actor names
  3140. found in current cluster.
  3141. Each deployment resumes target state from checkpoint if available.
  3142. For current state it will prioritize reconstructing from current
  3143. actor names found that matches deployment tag if applicable.
  3144. """
  3145. self._detect_and_remove_leaked_placement_groups(
  3146. all_current_actor_names,
  3147. all_current_placement_group_names,
  3148. )
  3149. deployment_to_current_replicas = self._map_actor_names_to_deployment(
  3150. all_current_actor_names
  3151. )
  3152. checkpoint = self._kv_store.get(CHECKPOINT_KEY)
  3153. if checkpoint is not None:
  3154. deployment_state_info = cloudpickle.loads(checkpoint)
  3155. for deployment_id, checkpoint_data in deployment_state_info.items():
  3156. deployment_state = self._create_deployment_state(deployment_id)
  3157. deployment_state.recover_target_state_from_checkpoint(checkpoint_data)
  3158. if len(deployment_to_current_replicas[deployment_id]) > 0:
  3159. deployment_state.recover_current_state_from_replica_actor_names( # noqa: E501
  3160. deployment_to_current_replicas[deployment_id]
  3161. )
  3162. self._deployment_states[deployment_id] = deployment_state
  3163. self._app_deployment_mapping[deployment_id.app_name].add(
  3164. deployment_id.name
  3165. )
  3166. def shutdown(self):
  3167. """
  3168. Shutdown all running replicas by notifying the controller, and leave
  3169. it to the controller event loop to take actions afterwards.
  3170. Once shutdown signal is received, it will also prevent any new
  3171. deployments or replicas from being created.
  3172. One can send multiple shutdown signals but won't effectively make any
  3173. difference compare to calling it once.
  3174. """
  3175. self._shutting_down = True
  3176. for deployment_state in self._deployment_states.values():
  3177. deployment_state.delete()
  3178. # TODO(jiaodong): This might not be 100% safe since we deleted
  3179. # everything without ensuring all shutdown goals are completed
  3180. # yet. Need to address in follow-up PRs.
  3181. self._kv_store.delete(CHECKPOINT_KEY)
  3182. # TODO(jiaodong): Need to add some logic to prevent new replicas
  3183. # from being created once shutdown signal is sent.
  3184. def is_ready_for_shutdown(self) -> bool:
  3185. """Return whether all deployments are shutdown.
  3186. Check there are no deployment states and no checkpoints.
  3187. """
  3188. return (
  3189. self._shutting_down
  3190. and len(self._deployment_states) == 0
  3191. and self._kv_store.get(CHECKPOINT_KEY) is None
  3192. )
  3193. def save_checkpoint(self) -> None:
  3194. """Write a checkpoint of all deployment states."""
  3195. if self._shutting_down:
  3196. # Once we're told to shut down, stop writing checkpoints.
  3197. # Calling .shutdown() deletes any existing checkpoint.
  3198. return
  3199. deployment_state_info = {
  3200. deployment_id: deployment_state.get_checkpoint_data()
  3201. for deployment_id, deployment_state in self._deployment_states.items()
  3202. }
  3203. self._kv_store.put(
  3204. CHECKPOINT_KEY,
  3205. cloudpickle.dumps(deployment_state_info),
  3206. )
  3207. def get_running_replica_infos(
  3208. self,
  3209. ) -> Dict[DeploymentID, List[RunningReplicaInfo]]:
  3210. return {
  3211. id: deployment_state.get_running_replica_infos()
  3212. for id, deployment_state in self._deployment_states.items()
  3213. }
  3214. def get_deployment_infos(self) -> Dict[DeploymentID, DeploymentInfo]:
  3215. infos: Dict[DeploymentID, DeploymentInfo] = {}
  3216. for deployment_id, deployment_state in self._deployment_states.items():
  3217. infos[deployment_id] = deployment_state.target_info
  3218. return infos
  3219. def get_deployment(self, deployment_id: DeploymentID) -> Optional[DeploymentInfo]:
  3220. if deployment_id in self._deployment_states:
  3221. return self._deployment_states[deployment_id].target_info
  3222. else:
  3223. return None
  3224. def get_deployment_docs_path(self, deployment_id: DeploymentID) -> Optional[str]:
  3225. if deployment_id in self._deployment_states:
  3226. return self._deployment_states[deployment_id].docs_path
  3227. def get_deployment_route_patterns(
  3228. self, deployment_id: DeploymentID
  3229. ) -> Optional[List[str]]:
  3230. """Get route patterns for a deployment if available."""
  3231. if deployment_id in self._deployment_states:
  3232. return self._deployment_states[deployment_id].route_patterns
  3233. return None
  3234. def get_deployment_target_num_replicas(
  3235. self, deployment_id: DeploymentID
  3236. ) -> Optional[int]:
  3237. if deployment_id not in self._deployment_states:
  3238. return None
  3239. return self._deployment_states[deployment_id].target_num_replicas
  3240. def get_deployment_details(self, id: DeploymentID) -> Optional[DeploymentDetails]:
  3241. """Gets detailed info on a deployment.
  3242. Returns:
  3243. DeploymentDetails: if the deployment is live.
  3244. None: if the deployment is deleted.
  3245. """
  3246. statuses = self.get_deployment_statuses([id])
  3247. if len(statuses) == 0:
  3248. return None
  3249. else:
  3250. status_info = statuses[0]
  3251. deployment_state = self._deployment_states[id]
  3252. return DeploymentDetails(
  3253. name=id.name,
  3254. status=status_info.status,
  3255. status_trigger=status_info.status_trigger,
  3256. message=status_info.message,
  3257. deployment_config=_deployment_info_to_schema(
  3258. id.name, self.get_deployment(id)
  3259. ),
  3260. target_num_replicas=deployment_state._target_state.target_num_replicas,
  3261. required_resources=deployment_state.target_info.replica_config.resource_dict,
  3262. replicas=deployment_state.list_replica_details(),
  3263. )
  3264. def get_deployment_statuses(
  3265. self, ids: Optional[List[DeploymentID]] = None
  3266. ) -> List[DeploymentStatusInfo]:
  3267. """
  3268. Return the statuses of the deployments with the given `ids`.
  3269. If `ids` is `None`, returns the status of all deployments.
  3270. """
  3271. if ids is None:
  3272. # fast path for returning all deployments,
  3273. # avoids checking `if ids is None` in a loop
  3274. return [
  3275. state.curr_status_info for state in self._deployment_states.values()
  3276. ]
  3277. else:
  3278. statuses = []
  3279. for id in ids:
  3280. state = self._deployment_states.get(id)
  3281. if state is not None:
  3282. statuses.append(state.curr_status_info)
  3283. return statuses
  3284. def get_alive_replica_actor_ids(self) -> Set[str]:
  3285. alive_replica_actor_ids = set()
  3286. for ds in self._deployment_states.values():
  3287. alive_replica_actor_ids |= ds.get_alive_replica_actor_ids()
  3288. return alive_replica_actor_ids
  3289. def deploy(
  3290. self,
  3291. deployment_id: DeploymentID,
  3292. deployment_info: DeploymentInfo,
  3293. ) -> bool:
  3294. """Deploy the deployment.
  3295. If the deployment already exists with the same version and config,
  3296. this is a no-op and returns False.
  3297. Returns:
  3298. bool: Whether the target state has changed.
  3299. """
  3300. if deployment_id not in self._deployment_states:
  3301. self._deployment_states[deployment_id] = self._create_deployment_state(
  3302. deployment_id
  3303. )
  3304. self._app_deployment_mapping[deployment_id.app_name].add(deployment_id.name)
  3305. self._record_deployment_usage()
  3306. return self._deployment_states[deployment_id].deploy(deployment_info)
  3307. def get_deployments_in_application(self, app_name: str) -> List[str]:
  3308. """Return list of deployment names in application."""
  3309. return list(self._app_deployment_mapping[app_name])
  3310. def delete_deployment(self, id: DeploymentID):
  3311. # This method must be idempotent. We should validate that the
  3312. # specified deployment exists on the client.
  3313. if id in self._deployment_states:
  3314. return self._deployment_states[id].delete()
  3315. return False
  3316. def _validate_deployment_state_for_num_replica_update(
  3317. self, deployment_id: DeploymentID
  3318. ):
  3319. """Validate the state of a deployment for num replica update."""
  3320. statuses = self.get_deployment_statuses([deployment_id])
  3321. if statuses is None or len(statuses) == 0:
  3322. raise ValueError(f"Deployment {deployment_id} not found")
  3323. elif statuses[0].status_trigger == DeploymentStatusTrigger.DELETING:
  3324. raise DeploymentIsBeingDeletedError(
  3325. f"Deployment {deployment_id} is being deleted. Scaling operations are not allowed."
  3326. )
  3327. def set_target_num_replicas(
  3328. self, deployment_id: DeploymentID, target_num_replicas: int
  3329. ):
  3330. """Set target number of replicas for a deployment."""
  3331. self._validate_deployment_state_for_num_replica_update(deployment_id)
  3332. deployment_state = self._deployment_states[deployment_id]
  3333. if target_num_replicas != deployment_state.target_num_replicas:
  3334. logger.info(
  3335. f"Target number of replicas changed from {deployment_state.target_num_replicas} to {target_num_replicas} for deployment {deployment_id}"
  3336. )
  3337. deployment_state.set_target_num_replicas(target_num_replicas)
  3338. self.save_checkpoint()
  3339. else:
  3340. logger.info(
  3341. f"Skipping updating target number of replicas as it did not change for deployment {deployment_id}"
  3342. )
  3343. def update(self) -> bool:
  3344. """Updates the state of all deployments to match their goal state.
  3345. Returns True if any of the deployments have replicas in the RECOVERING state.
  3346. """
  3347. deleted_ids = []
  3348. any_recovering = False
  3349. upscales: Dict[DeploymentID, List[ReplicaSchedulingRequest]] = {}
  3350. downscales: Dict[DeploymentID, DeploymentDownscaleRequest] = {}
  3351. target_state_changed = False
  3352. # STEP 1: Update current state
  3353. for deployment_state in self._deployment_states.values():
  3354. deployment_state.check_and_update_replicas()
  3355. # STEP 2: Check current status
  3356. for deployment_state in self._deployment_states.values():
  3357. deployment_state.check_curr_status()
  3358. # STEP 3: Drain nodes
  3359. draining_nodes = self._cluster_node_info_cache.get_draining_nodes()
  3360. allow_new_compaction = len(draining_nodes) == 0 and all(
  3361. ds.curr_status_info.status == DeploymentStatus.HEALTHY
  3362. # TODO(zcin): Make sure that status should never be healthy if
  3363. # the number of running replicas at target version is not at
  3364. # target number, so we can remove this defensive check.
  3365. and ds.get_num_running_replicas(ds.target_version) == ds.target_num_replicas
  3366. # To be extra conservative, only actively compact if there
  3367. # are no non-running replicas
  3368. and len(ds._replicas.get()) == ds.target_num_replicas
  3369. for ds in self._deployment_states.values()
  3370. )
  3371. if RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY:
  3372. # Tuple of target node to compact, and its draining deadline
  3373. node_info: Optional[
  3374. Tuple[str, float]
  3375. ] = self._deployment_scheduler.get_node_to_compact(
  3376. allow_new_compaction=allow_new_compaction
  3377. )
  3378. if node_info:
  3379. target_node_id, deadline = node_info
  3380. draining_nodes = {target_node_id: deadline}
  3381. for deployment_id, deployment_state in self._deployment_states.items():
  3382. deployment_state.migrate_replicas_on_draining_nodes(draining_nodes)
  3383. # STEP 4: Scale replicas
  3384. for deployment_id, deployment_state in self._deployment_states.items():
  3385. upscale, downscale = deployment_state.scale_deployment_replicas()
  3386. if upscale:
  3387. upscales[deployment_id] = upscale
  3388. if downscale:
  3389. downscales[deployment_id] = downscale
  3390. # STEP 5: Update status
  3391. for deployment_id, deployment_state in self._deployment_states.items():
  3392. deleted, any_replicas_recovering = deployment_state.check_curr_status()
  3393. if deleted:
  3394. deleted_ids.append(deployment_id)
  3395. any_recovering |= any_replicas_recovering
  3396. # STEP 6: Schedule all STARTING replicas and stop all STOPPING replicas
  3397. deployment_to_replicas_to_stop = self._deployment_scheduler.schedule(
  3398. upscales, downscales
  3399. )
  3400. for deployment_id, replicas_to_stop in deployment_to_replicas_to_stop.items():
  3401. self._deployment_states[deployment_id].stop_replicas(replicas_to_stop)
  3402. for deployment_id, scheduling_requests in upscales.items():
  3403. self._handle_scheduling_request_failures(deployment_id, scheduling_requests)
  3404. # STEP 7: Broadcast long poll information
  3405. for deployment_id, deployment_state in self._deployment_states.items():
  3406. deployment_state.broadcast_running_replicas_if_changed()
  3407. deployment_state.broadcast_deployment_config_if_changed()
  3408. if deployment_state.should_autoscale():
  3409. self._autoscaling_state_manager.update_running_replica_ids(
  3410. deployment_id=deployment_id,
  3411. running_replicas=deployment_state.get_running_replica_ids(),
  3412. )
  3413. # STEP 8: Record deployment status metrics
  3414. for deployment_id, deployment_state in self._deployment_states.items():
  3415. status = deployment_state.curr_status_info.status
  3416. self._deployment_status_gauge.set(
  3417. status.to_numeric(),
  3418. tags={
  3419. "deployment": deployment_id.name,
  3420. "application": deployment_id.app_name,
  3421. },
  3422. )
  3423. # STEP 9: Cleanup
  3424. for deployment_id in deleted_ids:
  3425. self._deployment_scheduler.on_deployment_deleted(deployment_id)
  3426. self._autoscaling_state_manager.deregister_deployment(deployment_id)
  3427. del self._deployment_states[deployment_id]
  3428. if (
  3429. deployment_id.app_name in self._app_deployment_mapping
  3430. and deployment_id.name
  3431. in self._app_deployment_mapping[deployment_id.app_name]
  3432. ):
  3433. self._app_deployment_mapping[deployment_id.app_name].remove(
  3434. deployment_id.name
  3435. )
  3436. # Clean up the app_name entry if no deployments are left
  3437. if not self._app_deployment_mapping[deployment_id.app_name]:
  3438. del self._app_deployment_mapping[deployment_id.app_name]
  3439. if len(deleted_ids):
  3440. self._record_deployment_usage()
  3441. if target_state_changed:
  3442. self.save_checkpoint()
  3443. return any_recovering
  3444. def autoscale(self, deployment_id: DeploymentID, target_num_replicas: int) -> bool:
  3445. """Autoscale the deployment to the target number of replicas.
  3446. Args:
  3447. deployment_id: The deployment ID.
  3448. target_num_replicas: The target number of replicas.
  3449. Returns:
  3450. True if the deployment was autoscaled, False otherwise.
  3451. """
  3452. if deployment_id not in self._deployment_states:
  3453. return False
  3454. return self._deployment_states[deployment_id].autoscale(target_num_replicas)
  3455. def _handle_scheduling_request_failures(
  3456. self,
  3457. deployment_id: DeploymentID,
  3458. scheduling_requests: List[ReplicaSchedulingRequest],
  3459. ):
  3460. """Updates internal datastructures when replicas fail to be scheduled."""
  3461. failed_replicas: List[ReplicaID] = []
  3462. for scheduling_request in scheduling_requests:
  3463. if (
  3464. scheduling_request.status
  3465. == ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED
  3466. ):
  3467. failed_replicas.append(scheduling_request.replica_id)
  3468. self._deployment_states[deployment_id].record_replica_startup_failure(
  3469. "Replica scheduling failed. Failed to create a placement "
  3470. f"group for replica {scheduling_request.replica_id}. "
  3471. "See Serve controller logs for more details."
  3472. )
  3473. elif (
  3474. scheduling_request.status
  3475. == ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED
  3476. ):
  3477. failed_replicas.append(scheduling_request.replica_id)
  3478. self._deployment_states[deployment_id].record_replica_startup_failure(
  3479. "Replica scheduling failed. Failed to create an actor "
  3480. f"for replica {scheduling_request.replica_id}. "
  3481. "See Serve controller logs for more details."
  3482. )
  3483. if failed_replicas:
  3484. self._deployment_states[deployment_id].stop_replicas(failed_replicas)
  3485. def _record_deployment_usage(self):
  3486. ServeUsageTag.NUM_DEPLOYMENTS.record(str(len(self._deployment_states)))
  3487. num_gpu_deployments = 0
  3488. for deployment_state in self._deployment_states.values():
  3489. if (
  3490. deployment_state.target_info is not None
  3491. and deployment_state.target_info.replica_config is not None
  3492. and deployment_state.target_info.replica_config.ray_actor_options
  3493. is not None
  3494. and (
  3495. deployment_state.target_info.replica_config.ray_actor_options.get(
  3496. "num_gpus", 0
  3497. )
  3498. > 0
  3499. )
  3500. ):
  3501. num_gpu_deployments += 1
  3502. ServeUsageTag.NUM_GPU_DEPLOYMENTS.record(str(num_gpu_deployments))
  3503. def record_request_routing_info(self, info: RequestRoutingInfo) -> None:
  3504. """
  3505. Record request routing information for a replica.
  3506. Args:
  3507. info: Request routing info including deployment name, replica tag,
  3508. multiplex model ids, and routing stats.
  3509. """
  3510. deployment_id = info.replica_id.deployment_id
  3511. if deployment_id not in self._deployment_states:
  3512. app_msg = f" in application '{deployment_id.app_name}'"
  3513. logger.error(
  3514. f"Deployment '{deployment_id.name}'{app_msg} not found in state "
  3515. "manager."
  3516. )
  3517. return
  3518. self._deployment_states[deployment_id].record_request_routing_info(info)
  3519. def get_active_node_ids(self) -> Set[str]:
  3520. """Return set of node ids with running replicas of any deployment.
  3521. This is used to determine which node has replicas. Only nodes with replicas and
  3522. head node should have active proxies.
  3523. """
  3524. node_ids = set()
  3525. for deployment_state in self._deployment_states.values():
  3526. node_ids.update(deployment_state.get_active_node_ids())
  3527. return node_ids
  3528. def get_ingress_replicas_info(self) -> List[Tuple[str, str, int, int]]:
  3529. """Get all ingress replicas info for all deployments."""
  3530. ingress_replicas_list = [
  3531. deployment_state._replicas.get()
  3532. for deployment_state in self._deployment_states.values()
  3533. if deployment_state.is_ingress()
  3534. ]
  3535. ingress_replicas_info = []
  3536. for replicas in ingress_replicas_list:
  3537. for replica in replicas:
  3538. ingress_replicas_info.append(
  3539. (
  3540. replica.actor_node_id,
  3541. replica.replica_id.unique_id,
  3542. replica.actor_http_port,
  3543. replica.actor_grpc_port,
  3544. )
  3545. )
  3546. return ingress_replicas_info
  3547. def _get_replica_ranks_mapping(
  3548. self, deployment_id: DeploymentID
  3549. ) -> Dict[str, ReplicaRank]:
  3550. """Get the current rank mapping for all replicas in a deployment.
  3551. Args:
  3552. deployment_id: The deployment ID to get ranks for.
  3553. Returns:
  3554. Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
  3555. """
  3556. deployment_state = self._deployment_states.get(deployment_id)
  3557. if deployment_state is None:
  3558. return {}
  3559. return deployment_state._get_replica_ranks_mapping()
  3560. def get_deployment_outbound_deployments(
  3561. self, deployment_id: DeploymentID
  3562. ) -> Optional[List[DeploymentID]]:
  3563. """Get the cached outbound deployments for a specific deployment.
  3564. Args:
  3565. deployment_id: The deployment ID to get outbound deployments for.
  3566. Returns:
  3567. List of deployment IDs that this deployment calls, or None if
  3568. the deployment doesn't exist or hasn't been polled yet.
  3569. """
  3570. deployment_state = self._deployment_states.get(deployment_id)
  3571. if deployment_state is None:
  3572. return None
  3573. return deployment_state.get_outbound_deployments()