worker.py 143 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820
  1. import atexit
  2. import faulthandler
  3. import functools
  4. import inspect
  5. import io
  6. import json
  7. import logging
  8. import os
  9. import sys
  10. import threading
  11. import time
  12. import traceback
  13. import urllib
  14. import warnings
  15. from abc import ABCMeta, abstractmethod
  16. from collections.abc import Mapping
  17. from contextlib import contextmanager
  18. from dataclasses import dataclass
  19. from functools import wraps
  20. from typing import (
  21. TYPE_CHECKING,
  22. Any,
  23. AnyStr,
  24. Callable,
  25. Dict,
  26. Generic,
  27. Iterator,
  28. List,
  29. Literal,
  30. Optional,
  31. Protocol,
  32. Sequence,
  33. Tuple,
  34. Type,
  35. TypeVar,
  36. Union,
  37. overload,
  38. )
  39. from urllib.parse import urlparse
  40. if TYPE_CHECKING:
  41. import torch
  42. import colorama
  43. import ray
  44. import ray._private.node
  45. import ray._private.parameter
  46. import ray._private.profiling as profiling
  47. import ray._private.ray_constants as ray_constants
  48. import ray._private.serialization as serialization
  49. import ray._private.services as services
  50. import ray._private.state
  51. import ray._private.worker
  52. # Ray modules
  53. import ray.actor
  54. import ray.cloudpickle as pickle # noqa
  55. import ray.job_config
  56. import ray.remote_function
  57. from ray import ActorID, JobID, Language, ObjectRef
  58. from ray._common import ray_option_utils
  59. from ray._common.constants import RAY_WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR
  60. from ray._common.utils import load_class
  61. from ray._private.authentication.authentication_token_setup import (
  62. ensure_token_if_auth_enabled,
  63. )
  64. from ray._private.client_mode_hook import client_mode_hook
  65. from ray._private.function_manager import FunctionActorManager
  66. from ray._private.inspect_util import is_cython
  67. from ray._private.ray_logging import (
  68. global_worker_stdstream_dispatcher,
  69. setup_logger,
  70. stderr_deduplicator,
  71. stdout_deduplicator,
  72. )
  73. from ray._private.ray_logging.logging_config import LoggingConfig
  74. from ray._private.resource_isolation_config import ResourceIsolationConfig
  75. from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
  76. from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
  77. from ray._private.runtime_env.setup_hook import (
  78. upload_worker_process_setup_hook_if_needed,
  79. )
  80. from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
  81. from ray._private.utils import get_ray_doc_version
  82. from ray._raylet import (
  83. ObjectRefGenerator,
  84. TaskID,
  85. WorkerID,
  86. raise_sys_exit_with_custom_error_message,
  87. )
  88. from ray.actor import ActorClass
  89. from ray.exceptions import (
  90. ActorHandleNotFoundError,
  91. ObjectStoreFullError,
  92. RayError,
  93. RaySystemError,
  94. RayTaskError,
  95. )
  96. from ray.experimental import tqdm_ray
  97. from ray.experimental.compiled_dag_ref import CompiledDAGRef
  98. from ray.experimental.internal_kv import (
  99. _initialize_internal_kv,
  100. _internal_kv_get,
  101. _internal_kv_initialized,
  102. _internal_kv_reset,
  103. )
  104. from ray.experimental.tqdm_ray import RAY_TQDM_MAGIC
  105. from ray.runtime_env.runtime_env import _merge_runtime_env
  106. from ray.util.annotations import Deprecated, PublicAPI
  107. from ray.util.debug import log_once
  108. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  109. from ray.util.tracing.tracing_helper import _import_from_string
  110. from ray.widgets import Template
  111. from ray.widgets.util import repr_with_fallback
  112. SCRIPT_MODE = 0
  113. WORKER_MODE = 1
  114. LOCAL_MODE = 2
  115. SPILL_WORKER_MODE = 3
  116. RESTORE_WORKER_MODE = 4
  117. # Logger for this module. It should be configured at the entry point
  118. # into the program using Ray. Ray provides a default configuration at
  119. # entry/init points.
  120. logger = logging.getLogger(__name__)
  121. T = TypeVar("T")
  122. T0 = TypeVar("T0")
  123. T1 = TypeVar("T1")
  124. T2 = TypeVar("T2")
  125. T3 = TypeVar("T3")
  126. T4 = TypeVar("T4")
  127. T5 = TypeVar("T5")
  128. T6 = TypeVar("T6")
  129. T7 = TypeVar("T7")
  130. T8 = TypeVar("T8")
  131. T9 = TypeVar("T9")
  132. R = TypeVar("R")
  133. DAGNode = TypeVar("DAGNode")
  134. # Only used for type annotations as a placeholder
  135. Undefined: Any = object()
  136. # TypeVar for self-referential generics in `RemoteFunction[N]`.
  137. RF = TypeVar("RF", bound="HasOptions")
  138. class HasOptions(Protocol):
  139. def options(self: RF, **task_options) -> RF:
  140. ...
  141. class RemoteFunctionNoArgs(HasOptions, Generic[R]):
  142. def __init__(self, function: Callable[[], R]) -> None:
  143. pass
  144. def remote(
  145. self,
  146. ) -> "ObjectRef[R]":
  147. ...
  148. def bind(
  149. self,
  150. ) -> "DAGNode[R]":
  151. ...
  152. class RemoteFunction0(HasOptions, Generic[R, T0]):
  153. def __init__(self, function: Callable[[T0], R]) -> None:
  154. pass
  155. def remote(
  156. self,
  157. __arg0: "Union[T0, ObjectRef[T0]]",
  158. ) -> "ObjectRef[R]":
  159. ...
  160. def bind(
  161. self,
  162. __arg0: "Union[T0, DAGNode[T0]]",
  163. ) -> "DAGNode[R]":
  164. ...
  165. class RemoteFunction1(HasOptions, Generic[R, T0, T1]):
  166. def __init__(self, function: Callable[[T0, T1], R]) -> None:
  167. pass
  168. def remote(
  169. self,
  170. __arg0: "Union[T0, ObjectRef[T0]]",
  171. __arg1: "Union[T1, ObjectRef[T1]]",
  172. ) -> "ObjectRef[R]":
  173. ...
  174. def bind(
  175. self,
  176. __arg0: "Union[T0, DAGNode[T0]]",
  177. __arg1: "Union[T1, DAGNode[T1]]",
  178. ) -> "DAGNode[R]":
  179. ...
  180. class RemoteFunction2(HasOptions, Generic[R, T0, T1, T2]):
  181. def __init__(self, function: Callable[[T0, T1, T2], R]) -> None:
  182. pass
  183. def remote(
  184. self,
  185. __arg0: "Union[T0, ObjectRef[T0]]",
  186. __arg1: "Union[T1, ObjectRef[T1]]",
  187. __arg2: "Union[T2, ObjectRef[T2]]",
  188. ) -> "ObjectRef[R]":
  189. ...
  190. def bind(
  191. self,
  192. __arg0: "Union[T0, DAGNode[T0]]",
  193. __arg1: "Union[T1, DAGNode[T1]]",
  194. __arg2: "Union[T2, DAGNode[T2]]",
  195. ) -> "DAGNode[R]":
  196. ...
  197. class RemoteFunction3(HasOptions, Generic[R, T0, T1, T2, T3]):
  198. def __init__(self, function: Callable[[T0, T1, T2, T3], R]) -> None:
  199. pass
  200. def remote(
  201. self,
  202. __arg0: "Union[T0, ObjectRef[T0]]",
  203. __arg1: "Union[T1, ObjectRef[T1]]",
  204. __arg2: "Union[T2, ObjectRef[T2]]",
  205. __arg3: "Union[T3, ObjectRef[T3]]",
  206. ) -> "ObjectRef[R]":
  207. ...
  208. def bind(
  209. self,
  210. __arg0: "Union[T0, DAGNode[T0]]",
  211. __arg1: "Union[T1, DAGNode[T1]]",
  212. __arg2: "Union[T2, DAGNode[T2]]",
  213. __arg3: "Union[T3, DAGNode[T3]]",
  214. ) -> "DAGNode[R]":
  215. ...
  216. class RemoteFunction4(HasOptions, Generic[R, T0, T1, T2, T3, T4]):
  217. def __init__(self, function: Callable[[T0, T1, T2, T3, T4], R]) -> None:
  218. pass
  219. def remote(
  220. self,
  221. __arg0: "Union[T0, ObjectRef[T0]]",
  222. __arg1: "Union[T1, ObjectRef[T1]]",
  223. __arg2: "Union[T2, ObjectRef[T2]]",
  224. __arg3: "Union[T3, ObjectRef[T3]]",
  225. __arg4: "Union[T4, ObjectRef[T4]]",
  226. ) -> "ObjectRef[R]":
  227. ...
  228. def bind(
  229. self,
  230. __arg0: "Union[T0, DAGNode[T0]]",
  231. __arg1: "Union[T1, DAGNode[T1]]",
  232. __arg2: "Union[T2, DAGNode[T2]]",
  233. __arg3: "Union[T3, DAGNode[T3]]",
  234. __arg4: "Union[T4, DAGNode[T4]]",
  235. ) -> "DAGNode[R]":
  236. ...
  237. class RemoteFunction5(HasOptions, Generic[R, T0, T1, T2, T3, T4, T5]):
  238. def __init__(self, function: Callable[[T0, T1, T2, T3, T4, T5], R]) -> None:
  239. pass
  240. def remote(
  241. self,
  242. __arg0: "Union[T0, ObjectRef[T0]]",
  243. __arg1: "Union[T1, ObjectRef[T1]]",
  244. __arg2: "Union[T2, ObjectRef[T2]]",
  245. __arg3: "Union[T3, ObjectRef[T3]]",
  246. __arg4: "Union[T4, ObjectRef[T4]]",
  247. __arg5: "Union[T5, ObjectRef[T5]]",
  248. ) -> "ObjectRef[R]":
  249. ...
  250. def bind(
  251. self,
  252. __arg0: "Union[T0, DAGNode[T0]]",
  253. __arg1: "Union[T1, DAGNode[T1]]",
  254. __arg2: "Union[T2, DAGNode[T2]]",
  255. __arg3: "Union[T3, DAGNode[T3]]",
  256. __arg4: "Union[T4, DAGNode[T4]]",
  257. __arg5: "Union[T5, DAGNode[T5]]",
  258. ) -> "DAGNode[R]":
  259. ...
  260. class RemoteFunction6(HasOptions, Generic[R, T0, T1, T2, T3, T4, T5, T6]):
  261. def __init__(self, function: Callable[[T0, T1, T2, T3, T4, T5, T6], R]) -> None:
  262. pass
  263. def remote(
  264. self,
  265. __arg0: "Union[T0, ObjectRef[T0]]",
  266. __arg1: "Union[T1, ObjectRef[T1]]",
  267. __arg2: "Union[T2, ObjectRef[T2]]",
  268. __arg3: "Union[T3, ObjectRef[T3]]",
  269. __arg4: "Union[T4, ObjectRef[T4]]",
  270. __arg5: "Union[T5, ObjectRef[T5]]",
  271. __arg6: "Union[T6, ObjectRef[T6]]",
  272. ) -> "ObjectRef[R]":
  273. ...
  274. def bind(
  275. self,
  276. __arg0: "Union[T0, DAGNode[T0]]",
  277. __arg1: "Union[T1, DAGNode[T1]]",
  278. __arg2: "Union[T2, DAGNode[T2]]",
  279. __arg3: "Union[T3, DAGNode[T3]]",
  280. __arg4: "Union[T4, DAGNode[T4]]",
  281. __arg5: "Union[T5, DAGNode[T5]]",
  282. __arg6: "Union[T6, DAGNode[T6]]",
  283. ) -> "DAGNode[R]":
  284. ...
  285. class RemoteFunction7(HasOptions, Generic[R, T0, T1, T2, T3, T4, T5, T6, T7]):
  286. def __init__(self, function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7], R]) -> None:
  287. pass
  288. def remote(
  289. self,
  290. __arg0: "Union[T0, ObjectRef[T0]]",
  291. __arg1: "Union[T1, ObjectRef[T1]]",
  292. __arg2: "Union[T2, ObjectRef[T2]]",
  293. __arg3: "Union[T3, ObjectRef[T3]]",
  294. __arg4: "Union[T4, ObjectRef[T4]]",
  295. __arg5: "Union[T5, ObjectRef[T5]]",
  296. __arg6: "Union[T6, ObjectRef[T6]]",
  297. __arg7: "Union[T7, ObjectRef[T7]]",
  298. ) -> "ObjectRef[R]":
  299. ...
  300. def bind(
  301. self,
  302. __arg0: "Union[T0, DAGNode[T0]]",
  303. __arg1: "Union[T1, DAGNode[T1]]",
  304. __arg2: "Union[T2, DAGNode[T2]]",
  305. __arg3: "Union[T3, DAGNode[T3]]",
  306. __arg4: "Union[T4, DAGNode[T4]]",
  307. __arg5: "Union[T5, DAGNode[T5]]",
  308. __arg6: "Union[T6, DAGNode[T6]]",
  309. __arg7: "Union[T7, DAGNode[T7]]",
  310. ) -> "DAGNode[R]":
  311. ...
  312. class RemoteFunction8(HasOptions, Generic[R, T0, T1, T2, T3, T4, T5, T6, T7, T8]):
  313. def __init__(
  314. self, function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7, T8], R]
  315. ) -> None:
  316. pass
  317. def remote(
  318. self,
  319. __arg0: "Union[T0, ObjectRef[T0]]",
  320. __arg1: "Union[T1, ObjectRef[T1]]",
  321. __arg2: "Union[T2, ObjectRef[T2]]",
  322. __arg3: "Union[T3, ObjectRef[T3]]",
  323. __arg4: "Union[T4, ObjectRef[T4]]",
  324. __arg5: "Union[T5, ObjectRef[T5]]",
  325. __arg6: "Union[T6, ObjectRef[T6]]",
  326. __arg7: "Union[T7, ObjectRef[T7]]",
  327. __arg8: "Union[T8, ObjectRef[T8]]",
  328. ) -> "ObjectRef[R]":
  329. ...
  330. def bind(
  331. self,
  332. __arg0: "Union[T0, DAGNode[T0]]",
  333. __arg1: "Union[T1, DAGNode[T1]]",
  334. __arg2: "Union[T2, DAGNode[T2]]",
  335. __arg3: "Union[T3, DAGNode[T3]]",
  336. __arg4: "Union[T4, DAGNode[T4]]",
  337. __arg5: "Union[T5, DAGNode[T5]]",
  338. __arg6: "Union[T6, DAGNode[T6]]",
  339. __arg7: "Union[T7, DAGNode[T7]]",
  340. __arg8: "Union[T8, DAGNode[T8]]",
  341. ) -> "DAGNode[R]":
  342. ...
  343. class RemoteFunction9(HasOptions, Generic[R, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]):
  344. def __init__(
  345. self, function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9], R]
  346. ) -> None:
  347. pass
  348. def remote(
  349. self,
  350. __arg0: "Union[T0, ObjectRef[T0]]",
  351. __arg1: "Union[T1, ObjectRef[T1]]",
  352. __arg2: "Union[T2, ObjectRef[T2]]",
  353. __arg3: "Union[T3, ObjectRef[T3]]",
  354. __arg4: "Union[T4, ObjectRef[T4]]",
  355. __arg5: "Union[T5, ObjectRef[T5]]",
  356. __arg6: "Union[T6, ObjectRef[T6]]",
  357. __arg7: "Union[T7, ObjectRef[T7]]",
  358. __arg8: "Union[T8, ObjectRef[T8]]",
  359. __arg9: "Union[T9, ObjectRef[T9]]",
  360. ) -> "ObjectRef[R]":
  361. ...
  362. def bind(
  363. self,
  364. __arg0: "Union[T0, DAGNode[T0]]",
  365. __arg1: "Union[T1, DAGNode[T1]]",
  366. __arg2: "Union[T2, DAGNode[T2]]",
  367. __arg3: "Union[T3, DAGNode[T3]]",
  368. __arg4: "Union[T4, DAGNode[T4]]",
  369. __arg5: "Union[T5, DAGNode[T5]]",
  370. __arg6: "Union[T6, DAGNode[T6]]",
  371. __arg7: "Union[T7, DAGNode[T7]]",
  372. __arg8: "Union[T8, DAGNode[T8]]",
  373. __arg9: "Union[T9, DAGNode[T9]]",
  374. ) -> "DAGNode[R]":
  375. ...
  376. # Visible for testing.
  377. def _unhandled_error_handler(e: Exception):
  378. logger.error(
  379. f"Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): {e}"
  380. )
  381. class Worker:
  382. """A class used to define the control flow of a worker process.
  383. Note:
  384. The methods in this class are considered unexposed to the user. The
  385. functions outside of this class are considered exposed.
  386. Attributes:
  387. node (ray._private.node.Node): The node this worker is attached to.
  388. mode: The mode of the worker. One of SCRIPT_MODE, LOCAL_MODE, and
  389. WORKER_MODE.
  390. """
  391. def __init__(self):
  392. """Initialize a Worker object."""
  393. self.node = None
  394. self.mode = None
  395. self.actors = {}
  396. # GPU object manager to manage GPU object lifecycles, including coordinating out-of-band
  397. # tensor transfers between actors, storing and retrieving GPU objects, and garbage collection.
  398. # We create the GPU object manager lazily, if a user specifies a
  399. # non-default tensor_transport, to avoid circular import and because it
  400. # imports third-party dependencies like PyTorch.
  401. self._gpu_object_manager = None
  402. # When the worker is constructed. Record the original value of the
  403. # (CUDA_VISIBLE_DEVICES, ONEAPI_DEVICE_SELECTOR, HIP_VISIBLE_DEVICES,
  404. # NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS, ..) environment variables.
  405. self.original_visible_accelerator_ids = (
  406. ray._private.utils.get_visible_accelerator_ids()
  407. )
  408. # A dictionary that maps from driver id to SerializationContext
  409. # TODO: clean up the SerializationContext once the job finished.
  410. self.serialization_context_map = {}
  411. self.function_actor_manager = FunctionActorManager(self)
  412. # This event is checked regularly by all of the threads so that they
  413. # know when to exit.
  414. self.threads_stopped = threading.Event()
  415. # If this is set, the next .remote call should drop into the
  416. # debugger, at the specified breakpoint ID.
  417. self.debugger_breakpoint = b""
  418. # If this is set, ray.get calls invoked on the object ID returned
  419. # by the worker should drop into the debugger at the specified
  420. # breakpoint ID.
  421. self.debugger_get_breakpoint = b""
  422. # If True, make the debugger external to the node this worker is
  423. # running on.
  424. self.ray_debugger_external = False
  425. self._load_code_from_local = False
  426. # Opened file descriptor to stdout/stderr for this python worker.
  427. self._enable_record_actor_task_log = (
  428. ray_constants.RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING
  429. )
  430. # Whether rotation is enabled for out file and err file, task log position report will be skipped if rotation enabled, since the position cannot be accurate.
  431. self._file_rotation_enabled = False
  432. self._out_filepath = None
  433. self._err_filepath = None
  434. # Create the lock here because the serializer will use it before
  435. # initializing Ray.
  436. self.lock = threading.RLock()
  437. # By default, don't show logs from other drivers. This is set to true by Serve
  438. # in order to stream logs from the controller and replica actors across
  439. # different drivers that connect to the same Serve instance.
  440. # See https://github.com/ray-project/ray/pull/35070.
  441. self._filter_logs_by_job = True
  442. # the debugger port for this worker
  443. self._debugger_port = None
  444. # Cache the job id from initialize_job_config() to optimize lookups.
  445. # This is on the critical path of ray.get()/put() calls.
  446. self._cached_job_id = None
  447. # Indicates whether the worker is connected to the Ray cluster.
  448. # It should be set to True in `connect` and False in `disconnect`.
  449. self._is_connected: bool = False
  450. @property
  451. def gpu_object_manager(self) -> "ray.experimental.GPUObjectManager":
  452. if self._gpu_object_manager is None:
  453. # We create the GPU object manager lazily, if a user specifies a
  454. # non-default tensor_transport, to avoid circular import and because it
  455. # imports third-party dependencies like PyTorch.
  456. from ray.experimental import GPUObjectManager
  457. self._gpu_object_manager = GPUObjectManager()
  458. return self._gpu_object_manager
  459. @property
  460. def connected(self):
  461. """bool: True if Ray has been started and False otherwise."""
  462. return self._is_connected
  463. def set_is_connected(self, is_connected: bool):
  464. self._is_connected = is_connected
  465. @property
  466. def node_ip_address(self):
  467. self.check_connected()
  468. return self.node.node_ip_address
  469. @property
  470. def load_code_from_local(self):
  471. self.check_connected()
  472. return self._load_code_from_local
  473. @property
  474. def current_job_id(self):
  475. if self._cached_job_id is not None:
  476. return self._cached_job_id
  477. elif hasattr(self, "core_worker"):
  478. return self.core_worker.get_current_job_id()
  479. return JobID.nil()
  480. @property
  481. def actor_id(self):
  482. if hasattr(self, "core_worker"):
  483. return self.core_worker.get_actor_id()
  484. return ActorID.nil()
  485. @property
  486. def actor_name(self):
  487. if hasattr(self, "core_worker"):
  488. return self.core_worker.get_actor_name().decode("utf-8")
  489. return None
  490. @property
  491. def current_task_id(self):
  492. return self.core_worker.get_current_task_id()
  493. @property
  494. def current_task_name(self):
  495. return self.core_worker.get_current_task_name()
  496. @property
  497. def current_task_function_name(self):
  498. return self.core_worker.get_current_task_function_name()
  499. @property
  500. def current_node_id(self):
  501. return self.core_worker.get_current_node_id()
  502. @property
  503. def task_depth(self):
  504. return self.core_worker.get_task_depth()
  505. @property
  506. def namespace(self):
  507. return self.core_worker.get_job_config().ray_namespace
  508. @property
  509. def placement_group_id(self):
  510. return self.core_worker.get_placement_group_id()
  511. @property
  512. def worker_id(self):
  513. return self.core_worker.get_worker_id().binary()
  514. @property
  515. def should_capture_child_tasks_in_placement_group(self):
  516. return self.core_worker.should_capture_child_tasks_in_placement_group()
  517. @property
  518. def current_cluster_and_job(self):
  519. """Get the current session index and job id as pair."""
  520. assert isinstance(self.node.cluster_id, ray.ClusterID)
  521. assert isinstance(self.current_job_id, ray.JobID)
  522. return self.node.cluster_id, self.current_job_id
  523. @property
  524. def runtime_env(self):
  525. """Get the runtime env in json format"""
  526. return self.core_worker.get_current_runtime_env()
  527. @property
  528. def debugger_port(self):
  529. """Get the debugger port for this worker"""
  530. worker_id = self.core_worker.get_worker_id()
  531. return ray._private.state.get_worker_debugger_port(worker_id)
  532. @property
  533. def job_logging_config(self):
  534. """Get the job's logging config for this worker"""
  535. if not hasattr(self, "core_worker"):
  536. return None
  537. job_config = self.core_worker.get_job_config()
  538. if not job_config.serialized_py_logging_config:
  539. return None
  540. logging_config = pickle.loads(job_config.serialized_py_logging_config)
  541. return logging_config
  542. @property
  543. def current_node_labels(self):
  544. # Return the node labels of this worker's current node.
  545. return self.node.node_labels
  546. def set_debugger_port(self, port):
  547. worker_id = self.core_worker.get_worker_id()
  548. ray._private.state.update_worker_debugger_port(worker_id, port)
  549. def set_cached_job_id(self, job_id):
  550. """Set the cached job id to speed `current_job_id()`."""
  551. self._cached_job_id = job_id
  552. @property
  553. def is_canceled(self):
  554. return self.core_worker.is_canceled()
  555. @contextmanager
  556. def task_paused_by_debugger(self):
  557. """Use while the task is paused by debugger"""
  558. try:
  559. self.core_worker.update_task_is_debugger_paused(
  560. ray.get_runtime_context()._get_current_task_id(), True
  561. )
  562. yield
  563. finally:
  564. self.core_worker.update_task_is_debugger_paused(
  565. ray.get_runtime_context()._get_current_task_id(), False
  566. )
  567. @contextmanager
  568. def worker_paused_by_debugger(self):
  569. """
  570. Updates the worker num paused threads when the worker is paused by debugger
  571. """
  572. try:
  573. worker_id = self.core_worker.get_worker_id()
  574. ray._private.state.update_worker_num_paused_threads(worker_id, 1)
  575. yield
  576. finally:
  577. ray._private.state.update_worker_num_paused_threads(worker_id, -1)
  578. def set_file_rotation_enabled(self, rotation_enabled: bool) -> None:
  579. """Set whether rotation is enabled for outfile and errfile."""
  580. self._file_rotation_enabled = rotation_enabled
  581. def set_err_file(self, err_filepath=Optional[AnyStr]) -> None:
  582. """Set the worker's err file where stderr is redirected to"""
  583. self._err_filepath = err_filepath
  584. def set_out_file(self, out_filepath=Optional[AnyStr]) -> None:
  585. """Set the worker's out file where stdout is redirected to"""
  586. self._out_filepath = out_filepath
  587. def record_task_log_start(self, task_id: TaskID, attempt_number: int):
  588. """Record the task log info when task starts executing for
  589. non concurrent actor tasks."""
  590. if not self._enable_record_actor_task_log and not self.actor_id.is_nil():
  591. # We are not recording actor task log if not enabled explicitly.
  592. # Recording actor task log is expensive and should be enabled only
  593. # when needed.
  594. # https://github.com/ray-project/ray/issues/35598
  595. return
  596. if not hasattr(self, "core_worker"):
  597. return
  598. if self._file_rotation_enabled:
  599. return
  600. self.core_worker.record_task_log_start(
  601. task_id,
  602. attempt_number,
  603. self.get_out_file_path(),
  604. self.get_err_file_path(),
  605. self.get_current_out_offset(),
  606. self.get_current_err_offset(),
  607. )
  608. def record_task_log_end(self, task_id: TaskID, attempt_number: int):
  609. """Record the task log info when task finishes executing for
  610. non concurrent actor tasks."""
  611. if not self._enable_record_actor_task_log and not self.actor_id.is_nil():
  612. # We are not recording actor task log if not enabled explicitly.
  613. # Recording actor task log is expensive and should be enabled only
  614. # when needed.
  615. # https://github.com/ray-project/ray/issues/35598
  616. return
  617. if not hasattr(self, "core_worker"):
  618. return
  619. # Disable file offset fetch if rotation enabled (since file offset doesn't make sense for rotated files).
  620. if self._file_rotation_enabled:
  621. return
  622. self.core_worker.record_task_log_end(
  623. task_id,
  624. attempt_number,
  625. self.get_current_out_offset(),
  626. self.get_current_err_offset(),
  627. )
  628. def get_out_file_path(self) -> str:
  629. """Get the out log file path"""
  630. return self._out_filepath if self._out_filepath is not None else ""
  631. def get_err_file_path(self) -> str:
  632. """Get the err log file path"""
  633. return self._err_filepath if self._err_filepath is not None else ""
  634. def get_current_out_offset(self) -> int:
  635. """Get the current offset of the out file if seekable, else 0"""
  636. if self._out_filepath is not None:
  637. return os.path.getsize(self._out_filepath)
  638. return 0
  639. def get_current_err_offset(self) -> int:
  640. """Get the current offset of the err file if seekable, else 0"""
  641. if self._err_filepath is not None:
  642. return os.path.getsize(self._err_filepath)
  643. return 0
  644. def get_serialization_context(self):
  645. """Get the SerializationContext of the job that this worker is processing.
  646. Returns:
  647. The serialization context of the given job.
  648. """
  649. # This function needs to be protected by a lock, because it will be
  650. # called by`register_class_for_serialization`, as well as the import
  651. # thread, from different threads. Also, this function will recursively
  652. # call itself, so we use RLock here.
  653. job_id = self.current_job_id
  654. context_map = self.serialization_context_map
  655. with self.lock:
  656. if job_id not in context_map:
  657. # The job ID is nil before initializing Ray.
  658. if JobID.nil() in context_map:
  659. # Transfer the serializer context used before initializing Ray.
  660. context_map[job_id] = context_map.pop(JobID.nil())
  661. else:
  662. context_map[job_id] = serialization.SerializationContext(self)
  663. return context_map[job_id]
  664. def check_connected(self):
  665. """Check if the worker is connected.
  666. Raises:
  667. Exception: An exception is raised if the worker is not connected.
  668. """
  669. if not self.connected:
  670. raise RaySystemError(
  671. "Ray has not been started yet. You can start Ray with 'ray.init()'."
  672. )
  673. def set_mode(self, mode):
  674. """Set the mode of the worker.
  675. The mode SCRIPT_MODE should be used if this Worker is a driver that is
  676. being run as a Python script or interactively in a shell. It will print
  677. information about task failures.
  678. The mode WORKER_MODE should be used if this Worker is not a driver. It
  679. will not print information about tasks.
  680. The mode LOCAL_MODE should be used if this Worker is a driver and if
  681. you want to run the driver in a manner equivalent to serial Python for
  682. debugging purposes. It will not send remote function calls to the
  683. scheduler and will instead execute them in a blocking fashion.
  684. Args:
  685. mode: One of SCRIPT_MODE, WORKER_MODE, and LOCAL_MODE.
  686. """
  687. self.mode = mode
  688. def set_load_code_from_local(self, load_code_from_local):
  689. self._load_code_from_local = load_code_from_local
  690. def put_object(
  691. self,
  692. value: Any,
  693. owner_address: Optional[str] = None,
  694. _is_experimental_channel: bool = False,
  695. _tensor_transport: Optional[str] = None,
  696. ):
  697. """Put value in the local object store.
  698. If the plasma store is full, the worker will automatically
  699. retry up to DEFAULT_PUT_OBJECT_RETRIES times. Each retry
  700. will delay for an exponentially doubling amount of time,
  701. starting with DEFAULT_PUT_OBJECT_DELAY. After this, exception
  702. will be raised.
  703. Args:
  704. value: The value to put in the object store.
  705. owner_address: The serialized address of object's owner.
  706. _is_experimental_channel: An experimental flag for mutable
  707. objects. If True, then the returned object will not have a
  708. valid value. The object must be written to using the
  709. ray.experimental.channel API before readers can read.
  710. _tensor_transport: [Alpha] The tensor transport backend to use. Currently, this only supports one-sided transports like "nixl".
  711. Returns:
  712. ObjectRef: The object ref the object was put under.
  713. Raises:
  714. ray.exceptions.ObjectStoreFullError: This is raised if the attempt
  715. to store the object fails because the object store is full even
  716. after multiple retries.
  717. """
  718. # Make sure that the value is not an object ref.
  719. if isinstance(value, ObjectRef):
  720. raise TypeError(
  721. "Calling 'put' on an ray.ObjectRef is not allowed. "
  722. "If you really want to do this, you can wrap the "
  723. "ray.ObjectRef in a list and call 'put' on it."
  724. )
  725. tensors = None
  726. from ray.experimental.gpu_object_manager.util import (
  727. normalize_and_validate_tensor_transport,
  728. validate_one_sided,
  729. )
  730. tensor_transport = None
  731. if _tensor_transport is not None:
  732. tensor_transport = normalize_and_validate_tensor_transport(
  733. _tensor_transport
  734. )
  735. validate_one_sided(tensor_transport, "ray.put")
  736. try:
  737. if tensor_transport is not None:
  738. (
  739. serialized_value,
  740. tensors,
  741. ) = self.get_serialization_context().serialize_gpu_objects(value)
  742. else:
  743. serialized_value = self.get_serialization_context().serialize(value)
  744. except TypeError as e:
  745. sio = io.StringIO()
  746. ray.util.inspect_serializability(value, print_file=sio)
  747. msg = (
  748. "Could not serialize the put value "
  749. f"{repr(value)}:\n"
  750. f"{sio.getvalue()}"
  751. )
  752. raise TypeError(msg) from e
  753. # If the object is mutable, then the raylet should never read the
  754. # object. Instead, clients will keep the object pinned.
  755. pin_object = not _is_experimental_channel
  756. # This *must* be the first place that we construct this python
  757. # ObjectRef because an entry with 0 local references is created when
  758. # the object is Put() in the core worker, expecting that this python
  759. # reference will be created. If another reference is created and
  760. # removed before this one, it will corrupt the state in the
  761. # reference counter.
  762. ret = self.core_worker.put_object(
  763. serialized_value,
  764. pin_object=pin_object,
  765. owner_address=owner_address,
  766. inline_small_object=True,
  767. _is_experimental_channel=_is_experimental_channel,
  768. tensor_transport=tensor_transport,
  769. )
  770. if tensors:
  771. self.gpu_object_manager.put_object(ret, tensor_transport, tensors)
  772. return ret
  773. def raise_errors(self, serialized_objects, object_refs):
  774. out = self.deserialize_objects(serialized_objects, object_refs)
  775. if "RAY_IGNORE_UNHANDLED_ERRORS" in os.environ:
  776. return
  777. for e in out:
  778. _unhandled_error_handler(e)
  779. def deserialize_objects(
  780. self,
  781. serialized_objects,
  782. object_refs,
  783. use_object_store: bool = False,
  784. ):
  785. gpu_objects: Dict[str, List["torch.Tensor"]] = {}
  786. for obj_ref, (_, _, tensor_transport) in zip(object_refs, serialized_objects):
  787. if tensor_transport is None:
  788. # The object is not a gpu object, so we cannot use other external transport to
  789. # fetch it.
  790. continue
  791. object_id = obj_ref.hex()
  792. if object_id not in gpu_objects:
  793. # If using a non-object store transport, then tensors will be sent
  794. # out-of-band. Get them before deserializing the object store data.
  795. # The user can set use_object_store to fetch the RDT object
  796. # through the object store.
  797. gpu_objects[object_id] = self.gpu_object_manager.get_gpu_object(
  798. object_id, use_object_store
  799. )
  800. # Function actor manager or the import thread may call pickle.loads
  801. # at the same time which can lead to failed imports
  802. # TODO: We may be better off locking on all imports or injecting a lock
  803. # into pickle.loads (https://github.com/ray-project/ray/issues/16304)
  804. with self.function_actor_manager.lock:
  805. context = self.get_serialization_context()
  806. return context.deserialize_objects(
  807. serialized_objects, object_refs, gpu_objects
  808. )
  809. def get_objects(
  810. self,
  811. object_refs: list,
  812. timeout: Optional[float] = None,
  813. return_exceptions: bool = False,
  814. skip_deserialization: bool = False,
  815. use_object_store: bool = False,
  816. ) -> Tuple[List[serialization.SerializedRayObject], bytes]:
  817. """Get the values in the object store associated with the IDs.
  818. Return the values from the local object store for object_refs. This
  819. will block until all the values for object_refs have been written to
  820. the local object store.
  821. Args:
  822. object_refs: A list of the object refs
  823. whose values should be retrieved.
  824. timeout: The maximum amount of time in
  825. seconds to wait before returning.
  826. return_exceptions: If any of the objects deserialize to an
  827. Exception object, whether to return them as values in the
  828. returned list. If False, then the first found exception will be
  829. raised.
  830. skip_deserialization: If true, only the buffer will be released and
  831. the object associated with the buffer will not be deserialized.
  832. use_object_store: [Alpha] To fetch an RDT object through the object store.
  833. Returns:
  834. list: List of deserialized objects or None if skip_deserialization is True.
  835. bytes: UUID of the debugger breakpoint we should drop
  836. into or b"" if there is no breakpoint.
  837. """
  838. # Make sure that the values are object refs.
  839. for object_ref in object_refs:
  840. if not isinstance(object_ref, ObjectRef):
  841. raise TypeError(
  842. f"Attempting to call `get` on the value {object_ref}, "
  843. "which is not an ray.ObjectRef."
  844. )
  845. timeout_ms = (
  846. int(timeout * 1000) if timeout is not None and timeout != -1 else -1
  847. )
  848. serialized_objects: List[
  849. serialization.SerializedRayObject
  850. ] = self.core_worker.get_objects(
  851. object_refs,
  852. timeout_ms,
  853. )
  854. debugger_breakpoint = b""
  855. for _, metadata, _ in serialized_objects:
  856. if metadata:
  857. metadata_fields = metadata.split(b",")
  858. if len(metadata_fields) >= 2 and metadata_fields[1].startswith(
  859. ray_constants.OBJECT_METADATA_DEBUG_PREFIX
  860. ):
  861. debugger_breakpoint = metadata_fields[1][
  862. len(ray_constants.OBJECT_METADATA_DEBUG_PREFIX) :
  863. ]
  864. if skip_deserialization:
  865. return None, debugger_breakpoint
  866. values = self.deserialize_objects(
  867. serialized_objects, object_refs, use_object_store
  868. )
  869. if not return_exceptions:
  870. # Raise exceptions instead of returning them to the user.
  871. for value in values:
  872. if isinstance(value, RayError):
  873. if isinstance(
  874. value, ray.exceptions.ObjectLostError
  875. ) and not isinstance(value, ray.exceptions.OwnerDiedError):
  876. global_worker.core_worker.log_plasma_usage()
  877. if isinstance(value, RayTaskError):
  878. raise value.as_instanceof_cause()
  879. else:
  880. raise value
  881. return values, debugger_breakpoint
  882. def main_loop(self):
  883. """The main loop a worker runs to receive and execute tasks."""
  884. def sigterm_handler(signum, frame):
  885. raise_sys_exit_with_custom_error_message(
  886. "The process receives a SIGTERM.", exit_code=1
  887. )
  888. # Note: shutdown() function is called from atexit handler.
  889. ray._private.utils.set_sigterm_handler(sigterm_handler)
  890. self.core_worker.run_task_loop()
  891. sys.exit(0)
  892. def print_logs(self):
  893. """Prints log messages from workers on all nodes in the same job."""
  894. subscriber = self.gcs_log_subscriber
  895. subscriber.subscribe()
  896. exception_type = ray.exceptions.RpcError
  897. localhost = services.get_node_ip_address()
  898. try:
  899. # Number of messages received from the last polling. When the batch
  900. # size exceeds 100 and keeps increasing, the worker and the user
  901. # probably will not be able to consume the log messages as rapidly
  902. # as they are coming in.
  903. # This is meaningful only for GCS subscriber.
  904. last_polling_batch_size = 0
  905. job_id_hex = self.current_job_id.hex()
  906. while True:
  907. # Exit if we received a signal that we should stop.
  908. if self.threads_stopped.is_set():
  909. return
  910. data = subscriber.poll()
  911. # GCS subscriber only returns None on unavailability.
  912. if data is None:
  913. last_polling_batch_size = 0
  914. continue
  915. if (
  916. self._filter_logs_by_job
  917. and data["job"]
  918. and data["job"] != job_id_hex
  919. ):
  920. last_polling_batch_size = 0
  921. continue
  922. data["localhost"] = localhost
  923. global_worker_stdstream_dispatcher.emit(data)
  924. lagging = 100 <= last_polling_batch_size < subscriber.last_batch_size
  925. if lagging:
  926. logger.warning(
  927. "The driver may not be able to keep up with the "
  928. "stdout/stderr of the workers. To avoid forwarding "
  929. "logs to the driver, use "
  930. "'ray.init(log_to_driver=False)'."
  931. )
  932. last_polling_batch_size = subscriber.last_batch_size
  933. except (OSError, exception_type) as e:
  934. logger.error(f"print_logs: {e}")
  935. finally:
  936. # Close the pubsub client to avoid leaking file descriptors.
  937. subscriber.close()
  938. def get_accelerator_ids_for_accelerator_resource(
  939. self, resource_name: str, resource_regex: str
  940. ) -> Union[List[str], List[int]]:
  941. """Get the accelerator IDs that are assigned to the given accelerator resource.
  942. Args:
  943. resource_name: The name of the resource.
  944. resource_regex: The regex of the resource.
  945. Returns:
  946. (List[str]) The IDs that are assigned to the given resource pre-configured.
  947. (List[int]) The IDs that are assigned to the given resource.
  948. """
  949. resource_ids = self.core_worker.resource_ids()
  950. assigned_ids = set()
  951. # Handle both normal and placement group accelerator resources.
  952. # Note: We should only get the accelerator ids from the placement
  953. # group resource that does not contain the bundle index!
  954. import re
  955. for resource, assignment in resource_ids.items():
  956. if resource == resource_name or re.match(resource_regex, resource):
  957. for resource_id, _ in assignment:
  958. assigned_ids.add(resource_id)
  959. # If the user had already set the environment variables
  960. # (CUDA_VISIBLE_DEVICES, ONEAPI_DEVICE_SELECTOR, NEURON_RT_VISIBLE_CORES,
  961. # TPU_VISIBLE_CHIPS, ..) then respect that in the sense that only IDs
  962. # that appear in (CUDA_VISIBLE_DEVICES, ONEAPI_DEVICE_SELECTOR,
  963. # HIP_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS, ..)
  964. # should be returned.
  965. if self.original_visible_accelerator_ids.get(resource_name, None) is not None:
  966. original_ids = self.original_visible_accelerator_ids[resource_name]
  967. assigned_ids = {str(original_ids[i]) for i in assigned_ids}
  968. # Give all accelerator ids in local_mode.
  969. if self.mode == LOCAL_MODE:
  970. if resource_name == ray_constants.GPU:
  971. max_accelerators = self.node.get_resource_and_label_spec().num_gpus
  972. else:
  973. max_accelerators = (
  974. self.node.get_resource_and_label_spec().resources.get(
  975. resource_name, None
  976. )
  977. )
  978. if max_accelerators:
  979. assigned_ids = original_ids[:max_accelerators]
  980. return list(assigned_ids)
  981. def shutdown_gpu_object_manager(self):
  982. if self._gpu_object_manager:
  983. self._gpu_object_manager.shutdown()
  984. _connect_or_shutdown_lock = threading.RLock()
  985. def with_connect_or_shutdown_lock(func: Callable) -> Callable:
  986. @wraps(func)
  987. def wrapper(*args, **kwargs):
  988. with _connect_or_shutdown_lock:
  989. return func(*args, **kwargs)
  990. return wrapper
  991. @PublicAPI
  992. @client_mode_hook
  993. def get_gpu_ids() -> Union[List[int], List[str]]:
  994. """Get the IDs of the GPUs that are available to the worker.
  995. This method should only be called inside of a task or actor, and not a driver.
  996. If the CUDA_VISIBLE_DEVICES environment variable was set when the worker
  997. started up, then the IDs returned by this method will be a subset of the
  998. IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range
  999. [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.
  1000. Returns:
  1001. A list of GPU IDs.
  1002. """
  1003. worker = global_worker
  1004. worker.check_connected()
  1005. return worker.get_accelerator_ids_for_accelerator_resource(
  1006. ray_constants.GPU, f"^{ray_constants.GPU}_group_[0-9A-Za-z]+$"
  1007. )
  1008. @Deprecated(
  1009. message="Use ray.get_runtime_context().get_assigned_resources() instead.",
  1010. warning=True,
  1011. )
  1012. def get_resource_ids():
  1013. """Get the IDs of the resources that are available to the worker.
  1014. Returns:
  1015. A dictionary mapping the name of a resource to a list of pairs, where
  1016. each pair consists of the ID of a resource and the fraction of that
  1017. resource reserved for this worker.
  1018. """
  1019. worker = global_worker
  1020. worker.check_connected()
  1021. if _mode() == LOCAL_MODE:
  1022. raise RuntimeError(
  1023. "ray._private.worker.get_resource_ids() does not work in local_mode."
  1024. )
  1025. return global_worker.core_worker.resource_ids()
  1026. @Deprecated(message="Use ray.init().address_info['webui_url'] instead.")
  1027. def get_dashboard_url():
  1028. """Get the URL to access the Ray dashboard.
  1029. Note that the URL does not specify which node the dashboard is on.
  1030. Returns:
  1031. The URL of the dashboard as a string.
  1032. """
  1033. if ray_constants.RAY_OVERRIDE_DASHBOARD_URL in os.environ:
  1034. return _remove_protocol_from_url(
  1035. os.environ.get(ray_constants.RAY_OVERRIDE_DASHBOARD_URL)
  1036. )
  1037. else:
  1038. worker = global_worker
  1039. worker.check_connected()
  1040. return _global_node.webui_url
  1041. def _remove_protocol_from_url(url: Optional[str]) -> str:
  1042. """
  1043. Helper function to remove protocol from URL if it exists.
  1044. """
  1045. if not url:
  1046. return url
  1047. parsed_url = urllib.parse.urlparse(url)
  1048. if parsed_url.scheme:
  1049. # Construct URL without protocol
  1050. scheme = f"{parsed_url.scheme}://"
  1051. return parsed_url.geturl().replace(scheme, "", 1)
  1052. return url
  1053. class BaseContext(metaclass=ABCMeta):
  1054. """
  1055. Base class for RayContext and ClientContext
  1056. """
  1057. dashboard_url: Optional[str]
  1058. python_version: str
  1059. ray_version: str
  1060. @abstractmethod
  1061. def disconnect(self):
  1062. """
  1063. If this context is for directly attaching to a cluster, disconnect
  1064. will call ray.shutdown(). Otherwise, if the context is for a ray
  1065. client connection, the client will be disconnected.
  1066. """
  1067. pass
  1068. @abstractmethod
  1069. def __enter__(self):
  1070. pass
  1071. @abstractmethod
  1072. def __exit__(self):
  1073. pass
  1074. def _context_table_template(self):
  1075. if self.dashboard_url:
  1076. dashboard_row = Template("context_dashrow.html.j2").render(
  1077. dashboard_url="http://" + self.dashboard_url
  1078. )
  1079. else:
  1080. dashboard_row = None
  1081. return Template("context_table.html.j2").render(
  1082. python_version=self.python_version,
  1083. ray_version=self.ray_version,
  1084. dashboard_row=dashboard_row,
  1085. )
  1086. def _repr_html_(self):
  1087. return Template("context.html.j2").render(
  1088. context_logo=Template("context_logo.html.j2").render(),
  1089. context_table=self._context_table_template(),
  1090. )
  1091. @repr_with_fallback(["ipywidgets", "8"])
  1092. def _get_widget_bundle(self, **kwargs) -> Dict[str, Any]:
  1093. """Get the mimebundle for the widget representation of the context.
  1094. Args:
  1095. **kwargs: Passed to the _repr_mimebundle_() function for the widget
  1096. Returns:
  1097. Dictionary ("mimebundle") of the widget representation of the context.
  1098. """
  1099. import ipywidgets
  1100. disconnect_button = ipywidgets.Button(
  1101. description="Disconnect",
  1102. disabled=False,
  1103. button_style="",
  1104. tooltip="Disconnect from the Ray cluster",
  1105. layout=ipywidgets.Layout(margin="auto 0px 0px 0px"),
  1106. )
  1107. def disconnect_callback(button):
  1108. button.disabled = True
  1109. button.description = "Disconnecting..."
  1110. self.disconnect()
  1111. button.description = "Disconnected"
  1112. disconnect_button.on_click(disconnect_callback)
  1113. left_content = ipywidgets.VBox(
  1114. [
  1115. ipywidgets.HTML(Template("context_logo.html.j2").render()),
  1116. disconnect_button,
  1117. ],
  1118. layout=ipywidgets.Layout(),
  1119. )
  1120. right_content = ipywidgets.HTML(self._context_table_template())
  1121. widget = ipywidgets.HBox(
  1122. [left_content, right_content], layout=ipywidgets.Layout(width="100%")
  1123. )
  1124. return widget._repr_mimebundle_(**kwargs)
  1125. def _repr_mimebundle_(self, **kwargs):
  1126. bundle = self._get_widget_bundle(**kwargs)
  1127. # Overwrite the widget html repr and default repr with those of the BaseContext
  1128. bundle.update({"text/html": self._repr_html_(), "text/plain": repr(self)})
  1129. return bundle
  1130. @dataclass
  1131. class RayContext(BaseContext, Mapping):
  1132. """
  1133. Context manager for attached drivers.
  1134. """
  1135. dashboard_url: Optional[str]
  1136. python_version: str
  1137. ray_version: str
  1138. ray_commit: str
  1139. def __init__(self, address_info: Dict[str, Optional[str]]):
  1140. super().__init__()
  1141. self.dashboard_url = get_dashboard_url()
  1142. self.python_version = "{}.{}.{}".format(*sys.version_info[:3])
  1143. self.ray_version = ray.__version__
  1144. self.ray_commit = ray.__commit__
  1145. self.address_info = address_info
  1146. def __getitem__(self, key):
  1147. if log_once("ray_context_getitem"):
  1148. warnings.warn(
  1149. f'Accessing values through ctx["{key}"] is deprecated. '
  1150. f'Use ctx.address_info["{key}"] instead.',
  1151. DeprecationWarning,
  1152. stacklevel=2,
  1153. )
  1154. return self.address_info[key]
  1155. def __len__(self):
  1156. if log_once("ray_context_len"):
  1157. warnings.warn("len(ctx) is deprecated. Use len(ctx.address_info) instead.")
  1158. return len(self.address_info)
  1159. def __iter__(self):
  1160. if log_once("ray_context_len"):
  1161. warnings.warn(
  1162. "iter(ctx) is deprecated. Use iter(ctx.address_info) instead."
  1163. )
  1164. return iter(self.address_info)
  1165. def __enter__(self) -> "RayContext":
  1166. return self
  1167. def __exit__(self, *exc):
  1168. ray.shutdown()
  1169. def disconnect(self):
  1170. # Include disconnect() to stay consistent with ClientContext
  1171. ray.shutdown()
  1172. global_worker = Worker()
  1173. """Worker: The global Worker object for this worker process.
  1174. We use a global Worker object to ensure that there is a single worker object
  1175. per worker process.
  1176. """
  1177. _global_node = None
  1178. """ray._private.node.Node: The global node object that is created by ray.init()."""
  1179. def _maybe_modify_runtime_env(
  1180. runtime_env: Optional[Dict[str, Any]], _skip_env_hook: bool
  1181. ) -> Dict[str, Any]:
  1182. """
  1183. If you set RAY_ENABLE_UV_RUN_RUNTIME_ENV, which is the default, and run the driver with `uv run`,
  1184. this function sets up a runtime environment that replicates the driver's environment to the
  1185. workers. Otherwise, if a runtime environment hook is present it will modify the runtime environment.
  1186. """
  1187. if ray_constants.RAY_ENABLE_UV_RUN_RUNTIME_ENV:
  1188. from ray._private.runtime_env.uv_runtime_env_hook import (
  1189. _get_uv_run_cmdline,
  1190. hook,
  1191. )
  1192. cmdline = _get_uv_run_cmdline()
  1193. if cmdline:
  1194. # This means the current driver is running in `uv run`, in which case we want
  1195. # to propagate the uv environment to the workers.
  1196. return hook(runtime_env)
  1197. if ray_constants.RAY_RUNTIME_ENV_HOOK in os.environ and not _skip_env_hook:
  1198. return load_class(os.environ[ray_constants.RAY_RUNTIME_ENV_HOOK])(runtime_env)
  1199. return runtime_env
  1200. @PublicAPI
  1201. @client_mode_hook
  1202. def init(
  1203. address: Optional[str] = None,
  1204. *,
  1205. num_cpus: Optional[int] = None,
  1206. num_gpus: Optional[int] = None,
  1207. resources: Optional[Dict[str, float]] = None,
  1208. labels: Optional[Dict[str, str]] = None,
  1209. object_store_memory: Optional[int] = None,
  1210. local_mode: bool = False,
  1211. ignore_reinit_error: bool = False,
  1212. include_dashboard: Optional[bool] = None,
  1213. dashboard_host: str = ray_constants.DEFAULT_DASHBOARD_IP,
  1214. dashboard_port: Optional[int] = None,
  1215. job_config: "ray.job_config.JobConfig" = None,
  1216. configure_logging: bool = True,
  1217. logging_level: int = ray_constants.LOGGER_LEVEL,
  1218. logging_format: Optional[str] = None,
  1219. logging_config: Optional[LoggingConfig] = None,
  1220. log_to_driver: Optional[bool] = None,
  1221. namespace: Optional[str] = None,
  1222. runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821
  1223. enable_resource_isolation: bool = False,
  1224. cgroup_path: Optional[str] = None,
  1225. system_reserved_cpu: Optional[float] = None,
  1226. system_reserved_memory: Optional[int] = None,
  1227. **kwargs,
  1228. ) -> BaseContext:
  1229. """
  1230. Connect to an existing Ray cluster or start one and connect to it.
  1231. This method handles two cases; either a Ray cluster already exists and we
  1232. just attach this driver to it or we start all of the processes associated
  1233. with a Ray cluster and attach to the newly started cluster.
  1234. Note: This method overwrite sigterm handler of the driver process.
  1235. In most cases, it is enough to just call this method with no arguments.
  1236. This will autodetect an existing Ray cluster or start a new Ray instance if
  1237. no existing cluster is found:
  1238. .. testcode::
  1239. ray.init()
  1240. To explicitly connect to an existing local cluster, use this as follows. A
  1241. ConnectionError will be thrown if no existing local cluster is found.
  1242. .. testcode::
  1243. :skipif: True
  1244. ray.init(address="auto")
  1245. To connect to an existing remote cluster, use this as follows (substituting
  1246. in the appropriate address). Note the addition of "ray://" at the beginning
  1247. of the address. This requires `ray[client]`.
  1248. .. testcode::
  1249. :skipif: True
  1250. ray.init(address="ray://123.45.67.89:10001")
  1251. More details for starting and connecting to a remote cluster can be found
  1252. here: https://docs.ray.io/en/master/cluster/getting-started.html
  1253. You can also define an environment variable called `RAY_ADDRESS` in
  1254. the same format as the `address` parameter to connect to an existing
  1255. cluster with ray.init() or ray.init(address="auto").
  1256. Args:
  1257. address: The address of the Ray cluster to connect to. The provided
  1258. address is resolved as follows:
  1259. 1. If a concrete address (e.g., localhost:<port>) is provided, try to
  1260. connect to it. Concrete addresses can be prefixed with "ray://" to
  1261. connect to a remote cluster. For example, passing in the address
  1262. "ray://123.45.67.89:50005" will connect to the cluster at the given
  1263. address.
  1264. 2. If no address is provided, try to find an existing Ray instance
  1265. to connect to. This is done by first checking the environment
  1266. variable `RAY_ADDRESS`. If this is not defined, check the address
  1267. of the latest cluster started (found in
  1268. /tmp/ray/ray_current_cluster) if available. If this is also empty,
  1269. then start a new local Ray instance.
  1270. 3. If the provided address is "auto", then follow the same process
  1271. as above. However, if there is no existing cluster found, this will
  1272. throw a ConnectionError instead of starting a new local Ray
  1273. instance.
  1274. 4. If the provided address is "local", start a new local Ray
  1275. instance, even if there is already an existing local Ray instance.
  1276. num_cpus: Number of CPUs the user wishes to assign to each
  1277. raylet. By default, this is set based on virtual cores.
  1278. num_gpus: Number of GPUs the user wishes to assign to each
  1279. raylet. By default, this is set based on detected GPUs.
  1280. resources: A dictionary mapping the names of custom resources to the
  1281. quantities for them available.
  1282. labels: [Experimental] The key-value labels of the node.
  1283. object_store_memory: The amount of memory (in bytes) to start the
  1284. object store with.
  1285. By default, this is 30% of available system memory capped by
  1286. the shm size and 200G but can be set higher.
  1287. local_mode: Deprecated: consider using the Ray Distributed Debugger instead.
  1288. ignore_reinit_error: If true, Ray suppresses errors from calling
  1289. ray.init() a second time. Ray won't be restarted.
  1290. include_dashboard: Boolean flag indicating whether or not to start the
  1291. Ray dashboard, which displays the status of the Ray
  1292. cluster. If this argument is None, then the UI will be started if
  1293. the relevant dependencies are present.
  1294. dashboard_host: The host to bind the dashboard server to. Can either be
  1295. localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces).
  1296. By default, this is set to localhost to prevent access from
  1297. external machines.
  1298. dashboard_port(int, None): The port to bind the dashboard server to.
  1299. Defaults to 8265 and Ray will automatically find a free port if
  1300. 8265 is not available.
  1301. job_config (ray.job_config.JobConfig): The job configuration.
  1302. configure_logging: True (default) if configuration of logging is
  1303. allowed here. Otherwise, the user may want to configure it
  1304. separately.
  1305. logging_level: Logging level for the "ray" logger of the driver process,
  1306. defaults to logging.INFO. Ignored unless "configure_logging" is true.
  1307. logging_format: Logging format for the "ray" logger of the driver process,
  1308. defaults to a string containing a timestamp, filename, line number, and
  1309. message. See the source file ray_constants.py for details. Ignored unless
  1310. "configure_logging" is true.
  1311. logging_config: [Experimental] Logging configuration will be applied to the
  1312. root loggers for both the driver process and all worker processes belonging
  1313. to the current job. See :class:`~ray.LoggingConfig` for details.
  1314. log_to_driver: If true, the output from all of the worker
  1315. processes on all nodes will be directed to the driver.
  1316. namespace: A namespace is a logical grouping of jobs and named actors.
  1317. runtime_env: The runtime environment to use
  1318. for this job (see :ref:`runtime-environments` for details).
  1319. object_spilling_directory: The path to spill objects to. The same path will
  1320. be used as the object store fallback directory as well.
  1321. enable_resource_isolation: Enable resource isolation through cgroupv2 by reserving
  1322. memory and cpu resources for ray system processes. To use, only cgroupv2 (not cgroupv1)
  1323. must be enabled with read and write permissions for the raylet. Cgroup memory and
  1324. cpu controllers must also be enabled.
  1325. cgroup_path: The path for the cgroup the raylet should use to enforce resource isolation.
  1326. By default, the cgroup used for resource isolation will be /sys/fs/cgroup.
  1327. The process starting ray must have read/write permissions to this path.
  1328. Cgroup memory and cpu controllers must be enabled for this cgroup.
  1329. This option only works if enable_resource_isolation is True.
  1330. system_reserved_cpu: The number of cpu cores to reserve for ray system processes.
  1331. Cores can be fractional i.e. 1.5 means one and a half a cpu core.
  1332. By default, the value will be atleast 1 core, and at maximum 3 cores. The default value
  1333. is calculated using the formula min(3.0, max(1.0, 0.05 * num_cores_on_the_system))
  1334. This option only works if enable_resource_isolation is True.
  1335. system_reserved_memory: The amount of memory (in bytes) to reserve for ray system processes.
  1336. By default, the value will be atleast 500MB, and at most 10GB. The default value is
  1337. calculated using the formula min(10GB, max(500MB, 0.10 * memory_available_on_the_system))
  1338. This option only works if enable_resource_isolation is True.
  1339. _enable_object_reconstruction: If True, when an object stored in
  1340. the distributed plasma store is lost due to node failure, Ray will
  1341. attempt to reconstruct the object by re-executing the task that
  1342. created the object. Arguments to the task will be recursively
  1343. reconstructed. If False, then ray.ObjectLostError will be
  1344. thrown.
  1345. _plasma_directory: Override the plasma mmap file directory.
  1346. _node_ip_address: The IP address of the node that we are on.
  1347. _driver_object_store_memory: Deprecated.
  1348. _memory: Amount of reservable memory resource in bytes rounded
  1349. down to the nearest integer.
  1350. _redis_username: Prevents external clients without the username
  1351. from connecting to Redis if provided.
  1352. _redis_password: Prevents external clients without the password
  1353. from connecting to Redis if provided.
  1354. _temp_dir: If provided, specifies the root temporary
  1355. directory for the Ray process. Must be an absolute path. Defaults to an
  1356. OS-specific conventional location, e.g., "/tmp/ray".
  1357. _metrics_export_port: Port number Ray exposes system metrics
  1358. through a Prometheus endpoint. It is currently under active
  1359. development, and the API is subject to change.
  1360. _system_config: Configuration for overriding
  1361. RayConfig defaults. For testing purposes ONLY.
  1362. _tracing_startup_hook: If provided, turns on and sets up tracing
  1363. for Ray. Must be the name of a function that takes no arguments and
  1364. sets up a Tracer Provider, Remote Span Processors, and
  1365. (optional) additional instruments. See more at
  1366. docs.ray.io/tracing.html. It is currently under active development,
  1367. and the API is subject to change.
  1368. _node_name: User-provided node name or identifier. Defaults to
  1369. the node IP address.
  1370. Returns:
  1371. If the provided address includes a protocol, for example by prepending
  1372. "ray://" to the address to get "ray://1.2.3.4:10001", then a
  1373. ClientContext is returned with information such as settings, server
  1374. versions for ray and python, and the dashboard_url. Otherwise,
  1375. a RayContext is returned with ray and python versions, and address
  1376. information about the started processes.
  1377. Raises:
  1378. Exception: An exception is raised if an inappropriate combination of
  1379. arguments is passed in.
  1380. """
  1381. if log_to_driver is None:
  1382. log_to_driver = ray_constants.RAY_LOG_TO_DRIVER
  1383. # Configure the "ray" logger for the driver process.
  1384. if configure_logging:
  1385. setup_logger(logging_level, logging_format or ray_constants.LOGGER_FORMAT)
  1386. else:
  1387. logging.getLogger("ray").handlers.clear()
  1388. # Configure the logging settings for the driver process.
  1389. if logging_config or ray_constants.RAY_LOGGING_CONFIG_ENCODING:
  1390. logging_config = logging_config or LoggingConfig(
  1391. encoding=ray_constants.RAY_LOGGING_CONFIG_ENCODING
  1392. )
  1393. logging_config._apply()
  1394. # Parse the hidden options
  1395. _enable_object_reconstruction: bool = kwargs.pop(
  1396. "_enable_object_reconstruction", False
  1397. )
  1398. _plasma_directory: Optional[str] = kwargs.pop("_plasma_directory", None)
  1399. _object_spilling_directory: Optional[str] = kwargs.pop(
  1400. "object_spilling_directory", None
  1401. )
  1402. _node_ip_address: str = kwargs.pop("_node_ip_address", None)
  1403. _driver_object_store_memory: Optional[int] = kwargs.pop(
  1404. "_driver_object_store_memory", None
  1405. )
  1406. _memory: Optional[int] = kwargs.pop("_memory", None)
  1407. _redis_username: str = kwargs.pop(
  1408. "_redis_username", ray_constants.REDIS_DEFAULT_USERNAME
  1409. )
  1410. _redis_password: str = kwargs.pop(
  1411. "_redis_password", ray_constants.REDIS_DEFAULT_PASSWORD
  1412. )
  1413. _temp_dir: Optional[str] = kwargs.pop("_temp_dir", None)
  1414. _metrics_export_port: Optional[int] = kwargs.pop("_metrics_export_port", None)
  1415. _system_config: Optional[Dict[str, str]] = kwargs.pop("_system_config", None)
  1416. _tracing_startup_hook: Optional[Callable] = kwargs.pop(
  1417. "_tracing_startup_hook", None
  1418. )
  1419. _node_name: str = kwargs.pop("_node_name", None)
  1420. # Fix for https://github.com/ray-project/ray/issues/26729
  1421. _skip_env_hook: bool = kwargs.pop("_skip_env_hook", False)
  1422. # terminate any signal before connecting driver
  1423. def sigterm_handler(signum, frame):
  1424. sys.exit(signum)
  1425. if threading.current_thread() is threading.main_thread():
  1426. ray._private.utils.set_sigterm_handler(sigterm_handler)
  1427. else:
  1428. logger.warning(
  1429. "SIGTERM handler is not set because current thread "
  1430. "is not the main thread."
  1431. )
  1432. # If available, use RAY_ADDRESS to override if the address was left
  1433. # unspecified, or set to "auto" in the call to init
  1434. address_env_var = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
  1435. if address_env_var and (address is None or address == "auto"):
  1436. address = address_env_var
  1437. logger.info(
  1438. f"Using address {address_env_var} set in the environment "
  1439. f"variable {ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE}"
  1440. )
  1441. if address is not None and "://" in address:
  1442. # Address specified a protocol, use ray client
  1443. builder = ray.client(address, _deprecation_warn_enabled=False)
  1444. # Forward any keyword arguments that were changed from their default
  1445. # values to the builder
  1446. init_sig = inspect.signature(init)
  1447. passed_kwargs = {}
  1448. for argument_name, param_obj in init_sig.parameters.items():
  1449. if argument_name in {"kwargs", "address"}:
  1450. # kwargs and address are handled separately
  1451. continue
  1452. default_value = param_obj.default
  1453. passed_value = locals()[argument_name]
  1454. if passed_value != default_value:
  1455. # passed value is different than default, pass to the client
  1456. # builder
  1457. passed_kwargs[argument_name] = passed_value
  1458. passed_kwargs.update(kwargs)
  1459. builder._init_args(**passed_kwargs)
  1460. ctx = builder.connect()
  1461. from ray._common.usage import usage_lib
  1462. if passed_kwargs.get("allow_multiple") is True:
  1463. with ctx:
  1464. usage_lib.put_pre_init_usage_stats()
  1465. else:
  1466. usage_lib.put_pre_init_usage_stats()
  1467. usage_lib.record_library_usage("client")
  1468. return ctx
  1469. if kwargs.get("allow_multiple"):
  1470. raise RuntimeError(
  1471. "`allow_multiple` argument is passed to `ray.init` when the "
  1472. "ray client is not used ("
  1473. f"https://docs.ray.io/en/{get_ray_doc_version()}/cluster"
  1474. "/running-applications/job-submission"
  1475. "/ray-client.html#connect-to-multiple-ray-clusters-experimental). "
  1476. "Do not pass the `allow_multiple` to `ray.init` to fix the issue."
  1477. )
  1478. if kwargs.get("storage"):
  1479. raise RuntimeError(
  1480. "Cluster-wide storage configuration has been removed. "
  1481. "The last Ray version supporting the `storage` argument is `ray==2.47`."
  1482. )
  1483. if kwargs:
  1484. # User passed in extra keyword arguments but isn't connecting through
  1485. # ray client. Raise an error, since most likely a typo in keyword
  1486. unknown = ", ".join(kwargs)
  1487. raise RuntimeError(f"Unknown keyword argument(s): {unknown}")
  1488. # Try to increase the file descriptor limit, which is too low by
  1489. # default for Ray: https://github.com/ray-project/ray/issues/11239
  1490. try:
  1491. import resource
  1492. soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
  1493. if soft < hard:
  1494. # https://github.com/ray-project/ray/issues/12059
  1495. soft = max(soft, min(hard, 65536))
  1496. logger.debug(
  1497. f"Automatically increasing RLIMIT_NOFILE to max value of {hard}"
  1498. )
  1499. try:
  1500. resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))
  1501. except ValueError:
  1502. logger.debug("Failed to raise limit.")
  1503. soft, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
  1504. if soft < 4096:
  1505. logger.warning(
  1506. "File descriptor limit {} is too low for production "
  1507. "servers and may result in connection errors. "
  1508. "At least 8192 is recommended. --- "
  1509. "Fix with 'ulimit -n 8192'".format(soft)
  1510. )
  1511. except ImportError:
  1512. logger.debug("Could not import resource module (on Windows)")
  1513. pass
  1514. if job_config is None:
  1515. job_config = ray.job_config.JobConfig()
  1516. if RAY_JOB_CONFIG_JSON_ENV_VAR in os.environ:
  1517. injected_job_config_json = json.loads(
  1518. os.environ.get(RAY_JOB_CONFIG_JSON_ENV_VAR)
  1519. )
  1520. injected_job_config: ray.job_config.JobConfig = (
  1521. ray.job_config.JobConfig.from_json(injected_job_config_json)
  1522. )
  1523. driver_runtime_env = runtime_env
  1524. runtime_env = _merge_runtime_env(
  1525. injected_job_config.runtime_env,
  1526. driver_runtime_env,
  1527. override=os.getenv("RAY_OVERRIDE_JOB_RUNTIME_ENV") == "1",
  1528. )
  1529. if runtime_env is None:
  1530. # None means there was a conflict.
  1531. raise ValueError(
  1532. "Failed to merge the Job's runtime env "
  1533. f"{injected_job_config.runtime_env} with "
  1534. f"a ray.init's runtime env {driver_runtime_env} because "
  1535. "of a conflict. Specifying the same runtime_env fields "
  1536. "or the same environment variable keys is not allowed. "
  1537. "Use RAY_OVERRIDE_JOB_RUNTIME_ENV=1 to instruct Ray to "
  1538. "combine Job and Driver's runtime environment in the event of "
  1539. "a conflict."
  1540. )
  1541. runtime_env = _maybe_modify_runtime_env(runtime_env, _skip_env_hook)
  1542. job_config.set_runtime_env(runtime_env)
  1543. # Similarly, we prefer metadata provided via job submission API
  1544. for key, value in injected_job_config.metadata.items():
  1545. job_config.set_metadata(key, value)
  1546. # RAY_JOB_CONFIG_JSON_ENV_VAR is only set at ray job manager level and has
  1547. # higher priority in case user also provided runtime_env for ray.init()
  1548. else:
  1549. runtime_env = _maybe_modify_runtime_env(runtime_env, _skip_env_hook)
  1550. if runtime_env:
  1551. # Set runtime_env in job_config if passed in as part of ray.init()
  1552. job_config.set_runtime_env(runtime_env)
  1553. # Pass the logging_config to job_config to configure loggers of all worker
  1554. # processes belonging to the job.
  1555. if logging_config is not None:
  1556. job_config.set_py_logging_config(logging_config)
  1557. redis_address, gcs_address = None, None
  1558. bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
  1559. if bootstrap_address is not None:
  1560. gcs_address = bootstrap_address
  1561. logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address)
  1562. if _node_ip_address is not None:
  1563. _node_ip_address = services.resolve_ip_for_localhost(_node_ip_address)
  1564. if local_mode:
  1565. driver_mode = LOCAL_MODE
  1566. warnings.warn(
  1567. "`local_mode` is an experimental feature that is no "
  1568. "longer maintained and will be removed in the near future. "
  1569. "For debugging consider using the Ray distributed debugger.",
  1570. FutureWarning,
  1571. stacklevel=2,
  1572. )
  1573. else:
  1574. driver_mode = SCRIPT_MODE
  1575. global _global_node
  1576. if global_worker.connected:
  1577. if ignore_reinit_error:
  1578. logger.info("Calling ray.init() again after it has already been called.")
  1579. node_id = global_worker.core_worker.get_current_node_id()
  1580. return RayContext(dict(_global_node.address_info, node_id=node_id.hex()))
  1581. else:
  1582. raise RuntimeError(
  1583. "Maybe you called ray.init twice by accident? "
  1584. "This error can be suppressed by passing in "
  1585. "'ignore_reinit_error=True' or by calling "
  1586. "'ray.shutdown()' prior to 'ray.init()'."
  1587. )
  1588. _system_config = _system_config or {}
  1589. if not isinstance(_system_config, dict):
  1590. raise TypeError("The _system_config must be a dict.")
  1591. if bootstrap_address is None:
  1592. # In this case, we need to start a new cluster.
  1593. # Setup and verify authentication for new cluster
  1594. ensure_token_if_auth_enabled(_system_config, create_token_if_missing=True)
  1595. # Don't collect usage stats in ray.init() unless it's a nightly wheel.
  1596. from ray._common.usage import usage_lib
  1597. if usage_lib.is_nightly_wheel():
  1598. usage_lib.show_usage_stats_prompt(cli=False)
  1599. else:
  1600. usage_lib.set_usage_stats_enabled_via_env_var(False)
  1601. available_memory_bytes = ray._private.utils.estimate_available_memory()
  1602. object_store_memory = ray._private.utils.resolve_object_store_memory(
  1603. available_memory_bytes, object_store_memory
  1604. )
  1605. resource_isolation_config = ResourceIsolationConfig(
  1606. enable_resource_isolation=enable_resource_isolation,
  1607. cgroup_path=cgroup_path,
  1608. system_reserved_cpu=system_reserved_cpu,
  1609. system_reserved_memory=system_reserved_memory,
  1610. object_store_memory=object_store_memory,
  1611. )
  1612. # Use a random port by not specifying Redis port / GCS server port.
  1613. ray_params = ray._private.parameter.RayParams(
  1614. node_ip_address=_node_ip_address,
  1615. driver_mode=driver_mode,
  1616. redirect_output=None,
  1617. num_cpus=num_cpus,
  1618. num_gpus=num_gpus,
  1619. resources=resources,
  1620. labels=labels,
  1621. num_redis_shards=None,
  1622. redis_max_clients=None,
  1623. redis_username=_redis_username,
  1624. redis_password=_redis_password,
  1625. plasma_directory=_plasma_directory,
  1626. object_spilling_directory=_object_spilling_directory,
  1627. huge_pages=None,
  1628. include_dashboard=include_dashboard,
  1629. dashboard_host=dashboard_host,
  1630. dashboard_port=dashboard_port,
  1631. memory=_memory,
  1632. available_memory_bytes=available_memory_bytes,
  1633. object_store_memory=object_store_memory,
  1634. plasma_store_socket_name=None,
  1635. temp_dir=_temp_dir,
  1636. _system_config=_system_config,
  1637. enable_object_reconstruction=_enable_object_reconstruction,
  1638. metrics_export_port=_metrics_export_port,
  1639. tracing_startup_hook=_tracing_startup_hook,
  1640. node_name=_node_name,
  1641. resource_isolation_config=resource_isolation_config,
  1642. )
  1643. # Start the Ray processes. We set shutdown_at_exit=False because we
  1644. # shutdown the node in the ray.shutdown call that happens in the atexit
  1645. # handler. We still spawn a reaper process in case the atexit handler
  1646. # isn't called.
  1647. _global_node = ray._private.node.Node(
  1648. ray_params=ray_params,
  1649. head=True,
  1650. shutdown_at_exit=False,
  1651. spawn_reaper=True,
  1652. ray_init_cluster=True,
  1653. )
  1654. else:
  1655. # In this case, we are connecting to an existing cluster.
  1656. if num_cpus is not None or num_gpus is not None:
  1657. raise ValueError(
  1658. "When connecting to an existing cluster, num_cpus "
  1659. "and num_gpus must not be provided."
  1660. )
  1661. if resources is not None:
  1662. raise ValueError(
  1663. "When connecting to an existing cluster, "
  1664. "resources must not be provided."
  1665. )
  1666. if labels is not None:
  1667. raise ValueError(
  1668. "When connecting to an existing cluster, "
  1669. "labels must not be provided."
  1670. )
  1671. if object_store_memory is not None:
  1672. raise ValueError(
  1673. "When connecting to an existing cluster, "
  1674. "object_store_memory must not be provided."
  1675. )
  1676. if _system_config is not None and len(_system_config) != 0:
  1677. raise ValueError(
  1678. "When connecting to an existing cluster, "
  1679. "_system_config must not be provided."
  1680. )
  1681. if _enable_object_reconstruction:
  1682. raise ValueError(
  1683. "When connecting to an existing cluster, "
  1684. "_enable_object_reconstruction must not be provided."
  1685. )
  1686. if _node_name is not None:
  1687. raise ValueError(
  1688. "_node_name cannot be configured when connecting to "
  1689. "an existing cluster."
  1690. )
  1691. # Setup and verify authentication for connecting to existing cluster
  1692. ensure_token_if_auth_enabled(_system_config, create_token_if_missing=False)
  1693. # In this case, we only need to connect the node.
  1694. ray_params = ray._private.parameter.RayParams(
  1695. node_ip_address=_node_ip_address,
  1696. gcs_address=gcs_address,
  1697. redis_address=redis_address,
  1698. redis_username=_redis_username,
  1699. redis_password=_redis_password,
  1700. temp_dir=_temp_dir,
  1701. _system_config=_system_config,
  1702. enable_object_reconstruction=_enable_object_reconstruction,
  1703. metrics_export_port=_metrics_export_port,
  1704. )
  1705. try:
  1706. _global_node = ray._private.node.Node(
  1707. ray_params,
  1708. head=False,
  1709. shutdown_at_exit=False,
  1710. spawn_reaper=False,
  1711. connect_only=True,
  1712. )
  1713. except (ConnectionError, RuntimeError):
  1714. if gcs_address == ray._private.utils.read_ray_address(_temp_dir):
  1715. logger.info(
  1716. "Failed to connect to the default Ray cluster address at "
  1717. f"{gcs_address}. This is most likely due to a previous Ray "
  1718. "instance that has since crashed. To reset the default "
  1719. "address to connect to, run `ray stop` or restart Ray with "
  1720. "`ray start`."
  1721. )
  1722. raise ConnectionError
  1723. # Log a message to find the Ray address that we connected to and the
  1724. # dashboard URL.
  1725. if ray_constants.RAY_OVERRIDE_DASHBOARD_URL in os.environ:
  1726. dashboard_url = os.environ.get(ray_constants.RAY_OVERRIDE_DASHBOARD_URL)
  1727. else:
  1728. dashboard_url = _global_node.webui_url
  1729. # Add http protocol to dashboard URL if it doesn't
  1730. # already contain a protocol.
  1731. if dashboard_url and not urlparse(dashboard_url).scheme:
  1732. dashboard_url = "http://" + dashboard_url
  1733. # We logged the address before attempting the connection, so we don't need
  1734. # to log it again.
  1735. info_str = "Connected to Ray cluster."
  1736. if gcs_address is None:
  1737. info_str = "Started a local Ray instance."
  1738. if dashboard_url:
  1739. logger.info(
  1740. info_str + " View the dashboard at %s%s%s %s%s",
  1741. colorama.Style.BRIGHT,
  1742. colorama.Fore.GREEN,
  1743. dashboard_url,
  1744. colorama.Fore.RESET,
  1745. colorama.Style.NORMAL,
  1746. )
  1747. else:
  1748. logger.info(info_str)
  1749. connect(
  1750. _global_node,
  1751. _global_node.session_name,
  1752. mode=driver_mode,
  1753. log_to_driver=log_to_driver,
  1754. worker=global_worker,
  1755. driver_object_store_memory=_driver_object_store_memory,
  1756. job_id=None,
  1757. namespace=namespace,
  1758. job_config=job_config,
  1759. entrypoint=ray._private.utils.get_entrypoint_name(),
  1760. )
  1761. if job_config and job_config.code_search_path:
  1762. global_worker.set_load_code_from_local(True)
  1763. else:
  1764. # Because `ray.shutdown()` doesn't reset this flag, for multiple
  1765. # sessions in one process, the 2nd `ray.init()` will reuse the
  1766. # flag of last session. For example:
  1767. # ray.init(load_code_from_local=True)
  1768. # ray.shutdown()
  1769. # ray.init()
  1770. # # Here the flag `load_code_from_local` is still True if we
  1771. # # doesn't have this `else` branch.
  1772. # ray.shutdown()
  1773. global_worker.set_load_code_from_local(False)
  1774. for hook in _post_init_hooks:
  1775. hook()
  1776. # Check and show accelerator override warning during driver initialization
  1777. from ray._private.ray_constants import env_bool
  1778. override_on_zero = env_bool(
  1779. ray._private.accelerators.RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO_ENV_VAR,
  1780. True,
  1781. )
  1782. if override_on_zero and log_once("ray_accel_env_var_override_on_zero"):
  1783. warnings.warn(
  1784. "Tip: In future versions of Ray, Ray will no longer override accelerator "
  1785. "visible devices env var if num_gpus=0 or num_gpus=None (default). To enable "
  1786. "this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0",
  1787. FutureWarning,
  1788. )
  1789. # Check for Pydantic v1 and emit deprecation warning
  1790. from ray._common.pydantic_compat import PYDANTIC_MAJOR_VERSION
  1791. if (
  1792. PYDANTIC_MAJOR_VERSION
  1793. and PYDANTIC_MAJOR_VERSION == 1
  1794. and log_once("pydantic_v1_deprecation")
  1795. ):
  1796. warnings.warn(
  1797. "Pydantic v1 is deprecated and will no longer be supported in Ray 2.56. "
  1798. "Please upgrade to Pydantic v2 by running `pip install pydantic>=2`. "
  1799. "See https://github.com/ray-project/ray/issues/58876 for more details.",
  1800. FutureWarning,
  1801. )
  1802. node_id = global_worker.core_worker.get_current_node_id()
  1803. global_node_address_info = _global_node.address_info.copy()
  1804. global_node_address_info["webui_url"] = _remove_protocol_from_url(dashboard_url)
  1805. return RayContext(dict(global_node_address_info, node_id=node_id.hex()))
  1806. # Functions to run as callback after a successful ray init.
  1807. _post_init_hooks = []
  1808. @PublicAPI
  1809. @client_mode_hook
  1810. @with_connect_or_shutdown_lock
  1811. def shutdown(_exiting_interpreter: bool = False):
  1812. """Disconnect the worker, and terminate processes started by ray.init().
  1813. This will automatically run at the end when a Python process that uses Ray
  1814. exits. It is ok to run this twice in a row. The primary use case for this
  1815. function is to cleanup state between tests.
  1816. Note that this will clear any remote function definitions, actor
  1817. definitions, and existing actors, so if you wish to use any previously
  1818. defined remote functions or actors after calling ray.shutdown(), then you
  1819. need to redefine them. If they were defined in an imported module, then you
  1820. will need to reload the module.
  1821. .. note::
  1822. The behavior of ``ray.shutdown()`` differs depending on how the cluster
  1823. was initialized:
  1824. * If a new local Ray cluster was started by ``ray.init()`` (i.e., no
  1825. ``address`` argument was provided and no existing cluster was found,
  1826. or ``address="local"`` was explicitly used), ``ray.shutdown()`` will
  1827. terminate all the local Ray processes (raylet, object store, etc.)
  1828. that were spawned by ``ray.init()``.
  1829. * If you connected to an existing cluster (e.g., via
  1830. ``ray.init(address="auto")`` or ``ray.init(address="ray://<ip>:<port>")``),
  1831. ``ray.shutdown()`` will only disconnect the client from the cluster.
  1832. It will **not** shut down the remote cluster. The cluster will
  1833. continue running and can be connected to again.
  1834. Args:
  1835. _exiting_interpreter: True if this is called by the atexit hook
  1836. and false otherwise. If we are exiting the interpreter, we will
  1837. wait a little while to print any extra error messages.
  1838. """
  1839. # Make sure to clean up compiled dag node if exists.
  1840. from ray.dag.compiled_dag_node import _shutdown_all_compiled_dags
  1841. _shutdown_all_compiled_dags()
  1842. global_worker.shutdown_gpu_object_manager()
  1843. if _exiting_interpreter and global_worker.mode == SCRIPT_MODE:
  1844. # This is a duration to sleep before shutting down everything in order
  1845. # to make sure that log messages finish printing.
  1846. time.sleep(0.5)
  1847. disconnect(_exiting_interpreter)
  1848. # disconnect internal kv
  1849. if hasattr(global_worker, "gcs_client"):
  1850. del global_worker.gcs_client
  1851. _internal_kv_reset()
  1852. # We need to destruct the core worker here because after this function,
  1853. # we will tear down any processes spawned by ray.init() and the background
  1854. # IO thread in the core worker doesn't currently handle that gracefully.
  1855. if hasattr(global_worker, "core_worker"):
  1856. if global_worker.mode == SCRIPT_MODE or global_worker.mode == LOCAL_MODE:
  1857. global_worker.core_worker.shutdown_driver()
  1858. del global_worker.core_worker
  1859. # We need to reset function actor manager to clear the context
  1860. global_worker.function_actor_manager = FunctionActorManager(global_worker)
  1861. # Disconnect global state from GCS.
  1862. ray._private.state.state.disconnect()
  1863. # Shut down the Ray processes.
  1864. global _global_node
  1865. if _global_node is not None:
  1866. if _global_node.is_head():
  1867. _global_node.destroy_external_storage()
  1868. _global_node.kill_all_processes(check_alive=False, allow_graceful=True)
  1869. _global_node = None
  1870. # TODO(rkn): Instead of manually resetting some of the worker fields, we
  1871. # should simply set "global_worker" to equal "None" or something like that.
  1872. global_worker.set_mode(None)
  1873. global_worker.set_cached_job_id(None)
  1874. atexit.register(shutdown, True)
  1875. # Define a custom excepthook so that if the driver exits with an exception, we
  1876. # can push that exception to Redis.
  1877. normal_excepthook = sys.excepthook
  1878. def custom_excepthook(type, value, tb):
  1879. import ray.core.generated.common_pb2 as common_pb2
  1880. # If this is a driver, push the exception to GCS worker table.
  1881. if global_worker.mode == SCRIPT_MODE and hasattr(global_worker, "worker_id"):
  1882. error_message = "".join(traceback.format_tb(tb))
  1883. worker_id = global_worker.worker_id
  1884. worker_type = common_pb2.DRIVER
  1885. worker_info = {"exception": error_message}
  1886. ray._private.state.state._connect_and_get_accessor()
  1887. ray._private.state.state.add_worker(worker_id, worker_type, worker_info)
  1888. # Call the normal excepthook.
  1889. normal_excepthook(type, value, tb)
  1890. sys.excepthook = custom_excepthook
  1891. def print_to_stdstream(data, ignore_prefix: bool):
  1892. should_dedup = data.get("pid") not in ["autoscaler"]
  1893. if data["is_err"]:
  1894. if should_dedup:
  1895. batches = stderr_deduplicator.deduplicate(data)
  1896. else:
  1897. batches = [data]
  1898. sink = sys.stderr
  1899. else:
  1900. if should_dedup:
  1901. batches = stdout_deduplicator.deduplicate(data)
  1902. else:
  1903. batches = [data]
  1904. sink = sys.stdout
  1905. for batch in batches:
  1906. print_worker_logs(batch, sink, ignore_prefix)
  1907. # Start time of this process, used for relative time logs.
  1908. t0 = time.time()
  1909. autoscaler_log_fyi_printed = False
  1910. def filter_autoscaler_events(lines: List[str]) -> Iterator[str]:
  1911. """Given raw log lines from the monitor, return only autoscaler events.
  1912. For Autoscaler V1:
  1913. Autoscaler events are denoted by the ":event_summary:" magic token.
  1914. For Autoscaler V2:
  1915. Autoscaler events are published from log_monitor.py which read
  1916. them from the `event_AUTOSCALER.log`.
  1917. """
  1918. if not ray_constants.AUTOSCALER_EVENTS:
  1919. return
  1920. AUTOSCALER_LOG_FYI = (
  1921. "Tip: use `ray status` to view detailed "
  1922. "cluster status. To disable these "
  1923. "messages, set RAY_SCHEDULER_EVENTS=0."
  1924. )
  1925. def autoscaler_log_fyi_needed() -> bool:
  1926. global autoscaler_log_fyi_printed
  1927. if not autoscaler_log_fyi_printed:
  1928. autoscaler_log_fyi_printed = True
  1929. return True
  1930. return False
  1931. from ray.autoscaler.v2.utils import is_autoscaler_v2
  1932. if is_autoscaler_v2():
  1933. from ray._private.event.event_logger import filter_event_by_level, parse_event
  1934. for event_line in lines:
  1935. if autoscaler_log_fyi_needed():
  1936. yield AUTOSCALER_LOG_FYI
  1937. event = parse_event(event_line)
  1938. if not event or not event.message:
  1939. continue
  1940. if filter_event_by_level(
  1941. event, ray_constants.RAY_LOG_TO_DRIVER_EVENT_LEVEL
  1942. ):
  1943. continue
  1944. yield event.message
  1945. else:
  1946. # Print out autoscaler events only, ignoring other messages.
  1947. for line in lines:
  1948. if ray_constants.LOG_PREFIX_EVENT_SUMMARY in line:
  1949. if autoscaler_log_fyi_needed():
  1950. yield AUTOSCALER_LOG_FYI
  1951. # The event text immediately follows the ":event_summary:"
  1952. # magic token.
  1953. yield line.split(ray_constants.LOG_PREFIX_EVENT_SUMMARY)[1]
  1954. def time_string() -> str:
  1955. """Return the relative time from the start of this job.
  1956. For example, 15m30s.
  1957. """
  1958. delta = time.time() - t0
  1959. hours = 0
  1960. minutes = 0
  1961. while delta > 3600:
  1962. hours += 1
  1963. delta -= 3600
  1964. while delta > 60:
  1965. minutes += 1
  1966. delta -= 60
  1967. output = ""
  1968. if hours:
  1969. output += f"{hours}h"
  1970. if minutes:
  1971. output += f"{minutes}m"
  1972. output += f"{int(delta)}s"
  1973. return output
  1974. # When we enter a breakpoint, worker logs are automatically disabled via this.
  1975. _worker_logs_enabled = True
  1976. def print_worker_logs(
  1977. data: Dict[str, str], print_file: Any, ignore_prefix: bool = False
  1978. ):
  1979. if not _worker_logs_enabled:
  1980. return
  1981. def prefix_for(data: Dict[str, str]) -> str:
  1982. """The PID prefix for this log line."""
  1983. if data.get("pid") in ["autoscaler", "raylet"]:
  1984. return ""
  1985. else:
  1986. res = "pid="
  1987. if data.get("actor_name"):
  1988. res = f"{data['actor_name']} {res}"
  1989. elif data.get("task_name"):
  1990. res = f"{data['task_name']} {res}"
  1991. return res
  1992. def message_for(data: Dict[str, str], line: str) -> str:
  1993. """The printed message of this log line."""
  1994. if ray_constants.LOG_PREFIX_INFO_MESSAGE in line:
  1995. return line.split(ray_constants.LOG_PREFIX_INFO_MESSAGE)[1]
  1996. return line
  1997. def color_for(data: Dict[str, str], line: str) -> str:
  1998. """The color for this log line."""
  1999. if (
  2000. data.get("pid") == "raylet"
  2001. and ray_constants.LOG_PREFIX_INFO_MESSAGE not in line
  2002. ):
  2003. return colorama.Fore.YELLOW
  2004. elif data.get("pid") == "autoscaler":
  2005. if "Error:" in line or "Warning:" in line:
  2006. return colorama.Fore.YELLOW
  2007. else:
  2008. return colorama.Fore.CYAN
  2009. elif os.getenv("RAY_COLOR_PREFIX") == "1":
  2010. colors = [
  2011. # colorama.Fore.BLUE, # Too dark
  2012. colorama.Fore.MAGENTA,
  2013. colorama.Fore.CYAN,
  2014. colorama.Fore.GREEN,
  2015. # colorama.Fore.WHITE, # Too light
  2016. # colorama.Fore.RED,
  2017. colorama.Fore.LIGHTBLACK_EX,
  2018. colorama.Fore.LIGHTBLUE_EX,
  2019. # colorama.Fore.LIGHTCYAN_EX, # Too light
  2020. # colorama.Fore.LIGHTGREEN_EX, # Too light
  2021. colorama.Fore.LIGHTMAGENTA_EX,
  2022. # colorama.Fore.LIGHTWHITE_EX, # Too light
  2023. # colorama.Fore.LIGHTYELLOW_EX, # Too light
  2024. ]
  2025. pid = data.get("pid", 0)
  2026. try:
  2027. i = int(pid)
  2028. except ValueError:
  2029. i = 0
  2030. return colors[i % len(colors)]
  2031. else:
  2032. return colorama.Fore.CYAN
  2033. if data.get("pid") == "autoscaler":
  2034. pid = "autoscaler +{}".format(time_string())
  2035. lines = filter_autoscaler_events(data.get("lines", []))
  2036. else:
  2037. pid = data.get("pid")
  2038. lines = data.get("lines", [])
  2039. ip = data.get("ip")
  2040. ip_prefix = "" if ip == data.get("localhost") else f", ip={ip}"
  2041. for line in lines:
  2042. if RAY_TQDM_MAGIC in line:
  2043. process_tqdm(line)
  2044. else:
  2045. hide_tqdm()
  2046. # If RAY_COLOR_PREFIX=0, do not wrap with any color codes
  2047. if os.getenv("RAY_COLOR_PREFIX") == "0":
  2048. color_pre = ""
  2049. color_post = ""
  2050. else:
  2051. color_pre = color_for(data, line)
  2052. color_post = colorama.Style.RESET_ALL
  2053. if ignore_prefix:
  2054. print(
  2055. f"{message_for(data, line)}",
  2056. file=print_file,
  2057. )
  2058. else:
  2059. print(
  2060. f"{color_pre}({prefix_for(data)}{pid}{ip_prefix}){color_post} "
  2061. f"{message_for(data, line)}",
  2062. file=print_file,
  2063. )
  2064. # Restore once at end of batch to avoid excess hiding/unhiding of tqdm.
  2065. restore_tqdm()
  2066. def process_tqdm(line):
  2067. """Experimental distributed tqdm: see ray.experimental.tqdm_ray."""
  2068. try:
  2069. data = json.loads(line)
  2070. tqdm_ray.instance().process_state_update(data)
  2071. except Exception:
  2072. if log_once("tqdm_corruption"):
  2073. logger.warning(
  2074. f"[tqdm_ray] Failed to decode {line}, this may be due to "
  2075. "logging too fast. This warning will not be printed again."
  2076. )
  2077. def hide_tqdm():
  2078. """Hide distributed tqdm bars temporarily to avoid conflicts with other logs."""
  2079. tqdm_ray.instance().hide_bars()
  2080. def restore_tqdm():
  2081. """Undo hide_tqdm()."""
  2082. tqdm_ray.instance().unhide_bars()
  2083. def listen_error_messages(worker, threads_stopped):
  2084. """Listen to error messages in the background on the driver.
  2085. This runs in a separate thread on the driver and pushes (error, time)
  2086. tuples to be published.
  2087. Args:
  2088. worker: The worker class that this thread belongs to.
  2089. threads_stopped (threading.Event): A threading event used to signal to
  2090. the thread that it should exit.
  2091. """
  2092. # TODO: we should just subscribe to the errors for this specific job.
  2093. worker.gcs_error_subscriber.subscribe()
  2094. try:
  2095. if _internal_kv_initialized():
  2096. # Get any autoscaler errors that occurred before the call to
  2097. # subscribe.
  2098. error_message = _internal_kv_get(ray_constants.DEBUG_AUTOSCALING_ERROR)
  2099. if error_message is not None:
  2100. logger.warning(error_message.decode())
  2101. while True:
  2102. # Exit if received a signal that the thread should stop.
  2103. if threads_stopped.is_set():
  2104. return
  2105. _, error_data = worker.gcs_error_subscriber.poll()
  2106. if error_data is None:
  2107. continue
  2108. if error_data["job_id"] is not None and error_data["job_id"] not in [
  2109. worker.current_job_id.binary(),
  2110. JobID.nil().binary(),
  2111. ]:
  2112. continue
  2113. error_message = error_data["error_message"]
  2114. print_to_stdstream(
  2115. {
  2116. "lines": [error_message],
  2117. "pid": "raylet",
  2118. "is_err": False,
  2119. },
  2120. ignore_prefix=False,
  2121. )
  2122. except (OSError, ConnectionError) as e:
  2123. logger.error(f"listen_error_messages: {e}")
  2124. @PublicAPI
  2125. @client_mode_hook
  2126. def is_initialized() -> bool:
  2127. """Check if ray.init has been called yet.
  2128. Returns:
  2129. True if ray.init has already been called and false otherwise.
  2130. """
  2131. return ray._private.worker.global_worker.connected
  2132. @with_connect_or_shutdown_lock
  2133. def connect(
  2134. node,
  2135. session_name: str,
  2136. mode=WORKER_MODE,
  2137. log_to_driver: bool = False,
  2138. worker=global_worker,
  2139. driver_object_store_memory: Optional[int] = None,
  2140. job_id=None,
  2141. namespace: Optional[str] = None,
  2142. job_config=None,
  2143. runtime_env_hash: int = 0,
  2144. worker_id: WorkerID = WorkerID.nil(),
  2145. ray_debugger_external: bool = False,
  2146. entrypoint: str = "",
  2147. worker_launch_time_ms: int = -1,
  2148. worker_launched_time_ms: int = -1,
  2149. debug_source: str = "",
  2150. ):
  2151. """Connect this worker to the raylet, to Plasma, and to GCS.
  2152. Args:
  2153. node (ray._private.node.Node): The node to connect.
  2154. session_name: The current Ray session name.
  2155. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and LOCAL_MODE.
  2156. log_to_driver: If true, then output from all of the worker
  2157. processes on all nodes will be directed to the driver.
  2158. worker: The ray.Worker instance.
  2159. driver_object_store_memory: Deprecated.
  2160. job_id: The ID of job. If it's None, then we will generate one.
  2161. namespace: Namespace to use.
  2162. job_config (ray.job_config.JobConfig): The job configuration.
  2163. runtime_env_hash: The hash of the runtime env for this worker.
  2164. worker_id: The worker ID assigned by raylet when starting the worker
  2165. process (hex string). Nil for drivers.
  2166. ray_debugger_external: If True, make the debugger external to the
  2167. node this worker is running on.
  2168. entrypoint: The name of the entrypoint script. Ignored if the
  2169. mode != SCRIPT_MODE
  2170. worker_launch_time_ms: The time when the worker process for this worker
  2171. is launched. If the worker is not launched by raylet (e.g.,
  2172. driver), this must be -1 (default value).
  2173. worker_launched_time_ms: The time when the worker process for this worker
  2174. finshes launching. If the worker is not launched by raylet (e.g.,
  2175. driver), this must be -1 (default value).
  2176. debug_source: Source information for `CoreWorker`, used for debugging and informational purpose, rather than functional purpose.
  2177. """
  2178. # Do some basic checking to make sure we didn't call ray.init twice.
  2179. error_message = "Perhaps you called ray.init twice by accident?"
  2180. assert not worker.connected, error_message
  2181. # Enable nice stack traces on SIGSEGV etc.
  2182. try:
  2183. if not faulthandler.is_enabled():
  2184. faulthandler.enable(all_threads=False)
  2185. except io.UnsupportedOperation:
  2186. pass # ignore
  2187. worker.gcs_client = node.get_gcs_client()
  2188. assert worker.gcs_client is not None
  2189. _initialize_internal_kv(worker.gcs_client)
  2190. ray._private.state.state._initialize_global_state(
  2191. ray._raylet.GcsClientOptions.create(
  2192. node.gcs_address,
  2193. node.cluster_id.hex(),
  2194. allow_cluster_id_nil=False,
  2195. fetch_cluster_id_if_nil=False,
  2196. )
  2197. )
  2198. # Initialize some fields.
  2199. if mode in (WORKER_MODE, RESTORE_WORKER_MODE, SPILL_WORKER_MODE):
  2200. # We should not specify the job_id if it's `WORKER_MODE`.
  2201. assert job_id is None
  2202. job_id = JobID.nil()
  2203. else:
  2204. # This is the code path of driver mode.
  2205. if job_id is None:
  2206. job_id = ray._private.state.next_job_id()
  2207. if mode is not SCRIPT_MODE and mode is not LOCAL_MODE:
  2208. process_name = ray_constants.WORKER_PROCESS_TYPE_IDLE_WORKER
  2209. if mode is SPILL_WORKER_MODE:
  2210. process_name = ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE
  2211. elif mode is RESTORE_WORKER_MODE:
  2212. process_name = ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE
  2213. ray._raylet.setproctitle(process_name)
  2214. if not isinstance(job_id, JobID):
  2215. raise TypeError("The type of given job id must be JobID.")
  2216. # All workers start out as non-actors. A worker can be turned into an actor
  2217. # after it is created.
  2218. worker.node = node
  2219. worker.set_mode(mode)
  2220. # For driver's check that the version information matches the version
  2221. # information that the Ray cluster was started with.
  2222. try:
  2223. node.check_version_info()
  2224. except Exception as e:
  2225. if mode == SCRIPT_MODE:
  2226. raise e
  2227. elif mode == WORKER_MODE:
  2228. traceback_str = traceback.format_exc()
  2229. ray._private.utils.publish_error_to_driver(
  2230. ray_constants.VERSION_MISMATCH_PUSH_ERROR,
  2231. traceback_str,
  2232. gcs_client=worker.gcs_client,
  2233. )
  2234. driver_name = ""
  2235. interactive_mode = False
  2236. if mode == SCRIPT_MODE:
  2237. import __main__ as main
  2238. if hasattr(main, "__file__"):
  2239. driver_name = main.__file__
  2240. else:
  2241. interactive_mode = True
  2242. driver_name = "INTERACTIVE MODE"
  2243. elif not LOCAL_MODE:
  2244. raise ValueError("Invalid worker mode. Expected DRIVER, WORKER or LOCAL.")
  2245. gcs_options = ray._raylet.GcsClientOptions.create(
  2246. node.gcs_address,
  2247. node.cluster_id.hex(),
  2248. allow_cluster_id_nil=False,
  2249. fetch_cluster_id_if_nil=False,
  2250. )
  2251. if job_config is None:
  2252. job_config = ray.job_config.JobConfig()
  2253. if namespace is not None:
  2254. ray._private.utils.validate_namespace(namespace)
  2255. # The namespace field of job config may have already been set in code
  2256. # paths such as the client.
  2257. job_config.set_ray_namespace(namespace)
  2258. # Make sure breakpoint() in the user's code will
  2259. # invoke the Ray debugger if we are in a worker or actor process
  2260. # (but not on the driver).
  2261. if mode == WORKER_MODE:
  2262. os.environ["PYTHONBREAKPOINT"] = "ray.util.rpdb.set_trace"
  2263. else:
  2264. # Add hook to suppress worker logs during breakpoint.
  2265. os.environ["PYTHONBREAKPOINT"] = "ray.util.rpdb._driver_set_trace"
  2266. worker.ray_debugger_external = ray_debugger_external
  2267. # If it's a driver and it's not coming from ray client, we'll prepare the
  2268. # environment here. If it's ray client, the environment will be prepared
  2269. # at the server side.
  2270. if mode == SCRIPT_MODE and not job_config._client_job and job_config.runtime_env:
  2271. from ray._private.ray_constants import RAY_RUNTIME_ENV_IGNORE_GITIGNORE
  2272. scratch_dir: str = worker.node.get_runtime_env_dir_path()
  2273. runtime_env = job_config.runtime_env or {}
  2274. # Determine whether to respect .gitignore files based on environment variable
  2275. # Default is True (respect .gitignore). Set to False if env var is "1".
  2276. include_gitignore = os.environ.get(RAY_RUNTIME_ENV_IGNORE_GITIGNORE, "0") != "1"
  2277. runtime_env = upload_py_modules_if_needed(
  2278. runtime_env,
  2279. include_gitignore=include_gitignore,
  2280. scratch_dir=scratch_dir,
  2281. logger=logger,
  2282. )
  2283. runtime_env = upload_working_dir_if_needed(
  2284. runtime_env,
  2285. include_gitignore=include_gitignore,
  2286. scratch_dir=scratch_dir,
  2287. logger=logger,
  2288. )
  2289. runtime_env = upload_worker_process_setup_hook_if_needed(
  2290. runtime_env,
  2291. worker,
  2292. )
  2293. # Remove excludes, it isn't relevant after the upload step.
  2294. runtime_env.pop("excludes", None)
  2295. job_config.set_runtime_env(runtime_env, validate=True)
  2296. if mode == SCRIPT_MODE:
  2297. # Add the directory containing the script that is running to the Python
  2298. # paths of the workers. Also add the current directory. Note that this
  2299. # assumes that the directory structures on the machines in the clusters
  2300. # are the same.
  2301. # When using an interactive shell, there is no script directory.
  2302. # We also want to skip adding script directory when running from dashboard.
  2303. code_paths = []
  2304. if not interactive_mode and not (
  2305. namespace and namespace == ray._raylet.RAY_INTERNAL_DASHBOARD_NAMESPACE
  2306. ):
  2307. script_directory = os.path.dirname(os.path.realpath(sys.argv[0]))
  2308. # If driver's sys.path doesn't include the script directory
  2309. # (e.g driver is started via `python -m`,
  2310. # see https://peps.python.org/pep-0338/),
  2311. # then we shouldn't add it to the workers.
  2312. if script_directory in sys.path:
  2313. code_paths.append(script_directory)
  2314. # In client mode, if we use runtime envs with "working_dir", then
  2315. # it'll be handled automatically. Otherwise, add the current dir.
  2316. if not job_config._client_job and not job_config._runtime_env_has_working_dir():
  2317. current_directory = os.path.abspath(os.path.curdir)
  2318. code_paths.append(current_directory)
  2319. if len(code_paths) != 0:
  2320. job_config._py_driver_sys_path.extend(code_paths)
  2321. serialized_job_config = job_config._serialize()
  2322. if not node.should_redirect_logs():
  2323. # Logging to stderr, so give core worker empty logs directory.
  2324. logs_dir = ""
  2325. else:
  2326. logs_dir = node.get_logs_dir_path()
  2327. worker.core_worker = ray._raylet.CoreWorker(
  2328. mode,
  2329. node.plasma_store_socket_name,
  2330. node.raylet_socket_name,
  2331. job_id,
  2332. gcs_options,
  2333. logs_dir,
  2334. node.node_ip_address,
  2335. node.node_manager_port,
  2336. (mode == LOCAL_MODE),
  2337. driver_name,
  2338. serialized_job_config,
  2339. node.metrics_agent_port,
  2340. runtime_env_hash,
  2341. worker_id,
  2342. session_name,
  2343. node.cluster_id.hex(),
  2344. "" if mode != SCRIPT_MODE else entrypoint,
  2345. worker_launch_time_ms,
  2346. worker_launched_time_ms,
  2347. debug_source,
  2348. )
  2349. if mode == SCRIPT_MODE:
  2350. worker_id = worker.worker_id
  2351. worker.gcs_error_subscriber = ray._raylet.GcsErrorSubscriber(
  2352. worker_id=worker_id, address=worker.gcs_client.address
  2353. )
  2354. worker.gcs_log_subscriber = ray._raylet.GcsLogSubscriber(
  2355. worker_id=worker_id, address=worker.gcs_client.address
  2356. )
  2357. if driver_object_store_memory is not None:
  2358. logger.warning(
  2359. "`driver_object_store_memory` is deprecated"
  2360. " and will be removed in the future."
  2361. )
  2362. # If this is a driver running in SCRIPT_MODE, start a thread to print error
  2363. # messages asynchronously in the background. Ideally the scheduler would
  2364. # push messages to the driver's worker service, but we ran into bugs when
  2365. # trying to properly shutdown the driver's worker service, so we are
  2366. # temporarily using this implementation which constantly queries the
  2367. # scheduler for new error messages.
  2368. if mode == SCRIPT_MODE:
  2369. worker.listener_thread = threading.Thread(
  2370. target=listen_error_messages,
  2371. name="ray_listen_error_messages",
  2372. args=(worker, worker.threads_stopped),
  2373. )
  2374. worker.listener_thread.daemon = True
  2375. worker.listener_thread.start()
  2376. # If the job's logging config is set, don't add the prefix
  2377. # (task/actor's name and its PID) to the logs.
  2378. ignore_prefix = global_worker.job_logging_config is not None
  2379. if log_to_driver:
  2380. global_worker_stdstream_dispatcher.add_handler(
  2381. "ray_print_logs",
  2382. functools.partial(print_to_stdstream, ignore_prefix=ignore_prefix),
  2383. )
  2384. worker.logger_thread = threading.Thread(
  2385. target=worker.print_logs, name="ray_print_logs"
  2386. )
  2387. worker.logger_thread.daemon = True
  2388. worker.logger_thread.start()
  2389. # Setup tracing here
  2390. tracing_hook_val = worker.gcs_client.internal_kv_get(
  2391. b"tracing_startup_hook", ray_constants.KV_NAMESPACE_TRACING
  2392. )
  2393. if tracing_hook_val is not None:
  2394. ray.util.tracing.tracing_helper._enable_tracing()
  2395. if not getattr(ray, "__traced__", False):
  2396. _setup_tracing = _import_from_string(tracing_hook_val.decode("utf-8"))
  2397. _setup_tracing()
  2398. ray.__traced__ = True
  2399. # Mark the worker as connected.
  2400. worker.set_is_connected(True)
  2401. def disconnect(exiting_interpreter=False):
  2402. """Disconnect this worker from the raylet and object store."""
  2403. # Reset the list of cached remote functions and actors so that if more
  2404. # remote functions or actors are defined and then connect is called again,
  2405. # the remote functions will be exported. This is mostly relevant for the
  2406. # tests.
  2407. worker = global_worker
  2408. if worker.connected:
  2409. # Shutdown all of the threads that we've started. TODO(rkn): This
  2410. # should be handled cleanly in the worker object's destructor and not
  2411. # in this disconnect method.
  2412. worker.threads_stopped.set()
  2413. if hasattr(worker, "gcs_error_subscriber"):
  2414. worker.gcs_error_subscriber.close()
  2415. if hasattr(worker, "gcs_log_subscriber"):
  2416. worker.gcs_log_subscriber.close()
  2417. if hasattr(worker, "listener_thread"):
  2418. worker.listener_thread.join()
  2419. if hasattr(worker, "logger_thread"):
  2420. worker.logger_thread.join()
  2421. worker.threads_stopped.clear()
  2422. # Ignore the prefix if the logging config is set.
  2423. ignore_prefix = worker.job_logging_config is not None
  2424. for leftover in stdout_deduplicator.flush():
  2425. print_worker_logs(leftover, sys.stdout, ignore_prefix)
  2426. for leftover in stderr_deduplicator.flush():
  2427. print_worker_logs(leftover, sys.stderr, ignore_prefix)
  2428. global_worker_stdstream_dispatcher.remove_handler("ray_print_logs")
  2429. worker.node = None # Disconnect the worker from the node.
  2430. worker.serialization_context_map.clear()
  2431. try:
  2432. ray_actor = ray.actor
  2433. except AttributeError:
  2434. ray_actor = None # This can occur during program termination
  2435. if ray_actor is not None:
  2436. ray_actor._ActorClassMethodMetadata.reset_cache()
  2437. # Mark the worker as disconnected.
  2438. worker.set_is_connected(False)
  2439. @contextmanager
  2440. def _changeproctitle(title, next_title):
  2441. if _mode() is not LOCAL_MODE:
  2442. ray._raylet.setproctitle(title)
  2443. try:
  2444. yield
  2445. finally:
  2446. if _mode() is not LOCAL_MODE:
  2447. ray._raylet.setproctitle(next_title)
  2448. # Global variable to make sure we only send out the warning once.
  2449. blocking_get_inside_async_warned = False
  2450. @overload
  2451. def get(
  2452. object_refs: "Sequence[ObjectRef[R]]", *, timeout: Optional[float] = None
  2453. ) -> List[R]:
  2454. ...
  2455. @overload
  2456. def get(
  2457. object_refs: "Sequence[ObjectRef[Any]]", *, timeout: Optional[float] = None
  2458. ) -> List[Any]:
  2459. ...
  2460. @overload
  2461. def get(object_refs: "ObjectRef[R]", *, timeout: Optional[float] = None) -> R:
  2462. ...
  2463. @overload
  2464. def get(
  2465. object_refs: Sequence[CompiledDAGRef], *, timeout: Optional[float] = None
  2466. ) -> List[Any]:
  2467. ...
  2468. @overload
  2469. def get(object_refs: CompiledDAGRef, *, timeout: Optional[float] = None) -> Any:
  2470. ...
  2471. @PublicAPI
  2472. @client_mode_hook
  2473. def get(
  2474. object_refs: Union[
  2475. "ObjectRef[Any]",
  2476. Sequence["ObjectRef[Any]"],
  2477. CompiledDAGRef,
  2478. Sequence[CompiledDAGRef],
  2479. ],
  2480. *,
  2481. timeout: Optional[float] = None,
  2482. _use_object_store: bool = False,
  2483. ) -> Union[Any, List[Any]]:
  2484. """Get a remote object or a list of remote objects from the object store.
  2485. This method blocks until the object corresponding to the object ref is
  2486. available in the local object store. If this object is not in the local
  2487. object store, it will be shipped from an object store that has it (once the
  2488. object has been created). If object_refs is a list, then the objects
  2489. corresponding to each object in the list will be returned.
  2490. Ordering for an input list of object refs is preserved for each object
  2491. returned. That is, if an object ref to A precedes an object ref to B in the
  2492. input list, then A will precede B in the returned list.
  2493. This method will issue a warning if it's running inside async context,
  2494. you can use ``await object_ref`` instead of ``ray.get(object_ref)``. For
  2495. a list of object refs, you can use ``await asyncio.gather(*object_refs)``.
  2496. Passing :class:`~ObjectRefGenerator` is not allowed.
  2497. Related patterns and anti-patterns:
  2498. - :doc:`/ray-core/patterns/ray-get-loop`
  2499. - :doc:`/ray-core/patterns/unnecessary-ray-get`
  2500. - :doc:`/ray-core/patterns/ray-get-submission-order`
  2501. - :doc:`/ray-core/patterns/ray-get-too-many-objects`
  2502. Args:
  2503. object_refs: Object ref of the object to get or a list of object refs
  2504. to get.
  2505. timeout (Optional[float]): The maximum amount of time in seconds to
  2506. wait before returning. Set this to None will block until the
  2507. corresponding object becomes available. Setting ``timeout=0`` will
  2508. return the object immediately if it's available, else raise
  2509. GetTimeoutError in accordance with the above docstring.
  2510. _use_object_store: [Alpha] To fetch an RDT object through the object store
  2511. instead of using its designated tensor transport. You can set this to True
  2512. for cases where the caller does not support the object's tensor transport,
  2513. e.g., the tensor transport is "nccl" and the caller is not part of the collective.
  2514. When this is False (default), Ray will use the object store for normal objects,
  2515. and attempt to use the object's tensor transport for RDT objects.
  2516. Returns:
  2517. A Python object or a list of Python objects.
  2518. Raises:
  2519. GetTimeoutError: A GetTimeoutError is raised if a timeout is set and
  2520. the get takes longer than timeout to return.
  2521. Exception: An exception is raised immediately if any task that created
  2522. the object or that created one of the objects raised an exception,
  2523. without waiting for the remaining ones to finish.
  2524. """
  2525. worker = global_worker
  2526. worker.check_connected()
  2527. if hasattr(worker, "core_worker") and worker.core_worker.current_actor_is_asyncio():
  2528. global blocking_get_inside_async_warned
  2529. if not blocking_get_inside_async_warned:
  2530. if ray_constants.env_bool(
  2531. RAY_WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR,
  2532. True,
  2533. ):
  2534. logger.warning(
  2535. "Using blocking ray.get inside async actor. "
  2536. "This blocks the event loop. Please use `await` "
  2537. "on object ref with asyncio.gather if you want to "
  2538. "yield execution to the event loop instead."
  2539. )
  2540. blocking_get_inside_async_warned = True
  2541. with profiling.profile("ray.get"):
  2542. # TODO(sang): Should make ObjectRefGenerator
  2543. # compatible to ray.get for dataset.
  2544. if isinstance(object_refs, ObjectRefGenerator):
  2545. return object_refs
  2546. if isinstance(object_refs, CompiledDAGRef):
  2547. return object_refs.get(timeout=timeout)
  2548. if isinstance(object_refs, list):
  2549. all_compiled_dag_refs = True
  2550. any_compiled_dag_refs = False
  2551. for object_ref in object_refs:
  2552. is_dag_ref = isinstance(object_ref, CompiledDAGRef)
  2553. all_compiled_dag_refs = all_compiled_dag_refs and is_dag_ref
  2554. any_compiled_dag_refs = any_compiled_dag_refs or is_dag_ref
  2555. if all_compiled_dag_refs:
  2556. return [object_ref.get(timeout=timeout) for object_ref in object_refs]
  2557. elif any_compiled_dag_refs:
  2558. raise ValueError(
  2559. "Invalid type of object refs. 'object_refs' must be a list of "
  2560. "CompiledDAGRefs if there is any CompiledDAGRef within it. "
  2561. )
  2562. is_individual_id = isinstance(object_refs, ray.ObjectRef)
  2563. if is_individual_id:
  2564. object_refs = [object_refs]
  2565. if not isinstance(object_refs, list):
  2566. raise ValueError(
  2567. f"Invalid type of object refs, {type(object_refs)}, is given. "
  2568. "'object_refs' must either be an ObjectRef or a list of ObjectRefs. "
  2569. )
  2570. values, debugger_breakpoint = worker.get_objects(
  2571. object_refs, timeout, use_object_store=_use_object_store
  2572. )
  2573. for i, value in enumerate(values):
  2574. if isinstance(value, RayError):
  2575. # If the object was lost and it wasn't due to owner death, it may be
  2576. # because the object store is full and objects needed to be evicted.
  2577. if isinstance(value, ray.exceptions.ObjectLostError) and not isinstance(
  2578. value, ray.exceptions.OwnerDiedError
  2579. ):
  2580. worker.core_worker.log_plasma_usage()
  2581. if isinstance(value, RayTaskError):
  2582. raise value.as_instanceof_cause()
  2583. else:
  2584. raise value
  2585. if is_individual_id:
  2586. values = values[0]
  2587. if debugger_breakpoint != b"":
  2588. frame = sys._getframe().f_back
  2589. rdb = ray.util.pdb._connect_ray_pdb(
  2590. host=None,
  2591. port=None,
  2592. patch_stdstreams=False,
  2593. quiet=None,
  2594. breakpoint_uuid=(
  2595. debugger_breakpoint.decode() if debugger_breakpoint else None
  2596. ),
  2597. debugger_external=worker.ray_debugger_external,
  2598. )
  2599. rdb.set_trace(frame=frame)
  2600. return values
  2601. @PublicAPI
  2602. @client_mode_hook
  2603. def put(
  2604. value: Any,
  2605. *,
  2606. _owner: Optional["ray.actor.ActorHandle"] = None,
  2607. _tensor_transport: Optional[str] = None,
  2608. ) -> "ray.ObjectRef":
  2609. """Store an object in the object store.
  2610. The object may not be evicted while a reference to the returned ID exists.
  2611. Related patterns and anti-patterns:
  2612. - :doc:`/ray-core/patterns/return-ray-put`
  2613. - :doc:`/ray-core/patterns/pass-large-arg-by-value`
  2614. - :doc:`/ray-core/patterns/closure-capture-large-objects`
  2615. Args:
  2616. value: The Python object to be stored.
  2617. _owner [Experimental]: The actor that should own this object. This
  2618. allows creating objects with lifetimes decoupled from that of the
  2619. creating process. The owner actor must be passed a reference to the
  2620. object prior to the object creator exiting, otherwise the reference
  2621. will still be lost. *Note that this argument is an experimental API
  2622. and should be avoided if possible.*
  2623. _tensor_transport: [Alpha] The tensor transport to use for the GPU object.
  2624. Currently, this only supports one-sided tensor transports such as "nixl".
  2625. When this is None (default), Ray will use the object store.
  2626. Returns:
  2627. The object ref assigned to this value.
  2628. """
  2629. worker = global_worker
  2630. worker.check_connected()
  2631. if _owner is None:
  2632. serialize_owner_address = None
  2633. elif isinstance(_owner, ray.actor.ActorHandle):
  2634. # Ensure GlobalState is connected
  2635. ray._private.state.state._connect_and_get_accessor()
  2636. serialize_owner_address = (
  2637. ray._raylet._get_actor_serialized_owner_address_or_none(
  2638. ray._private.state.state.get_actor_info(_owner._actor_id)
  2639. )
  2640. )
  2641. if not serialize_owner_address:
  2642. raise RuntimeError(f"{_owner} is not alive, it's worker_id is empty!")
  2643. else:
  2644. raise TypeError(f"Expect an `ray.actor.ActorHandle`, but got: {type(_owner)}")
  2645. with profiling.profile("ray.put"):
  2646. try:
  2647. object_ref = worker.put_object(
  2648. value,
  2649. owner_address=serialize_owner_address,
  2650. _tensor_transport=_tensor_transport,
  2651. )
  2652. except ObjectStoreFullError:
  2653. logger.info(
  2654. "Put failed since the value was either too large or the "
  2655. "store was full of pinned objects."
  2656. )
  2657. raise
  2658. return object_ref
  2659. # Global variable to make sure we only send out the warning once.
  2660. blocking_wait_inside_async_warned = False
  2661. @PublicAPI
  2662. @client_mode_hook
  2663. def wait(
  2664. ray_waitables: List[Union[ObjectRef, ObjectRefGenerator]],
  2665. *,
  2666. num_returns: int = 1,
  2667. timeout: Optional[float] = None,
  2668. fetch_local: bool = True,
  2669. ) -> Tuple[
  2670. List[Union[ObjectRef, ObjectRefGenerator]],
  2671. List[Union[ObjectRef, ObjectRefGenerator]],
  2672. ]:
  2673. """Return a list of IDs that are ready and a list of IDs that are not.
  2674. If timeout is set, the function returns either when the requested number of
  2675. IDs are ready or when the timeout is reached, whichever occurs first. If it
  2676. is not set, the function simply waits until that number of objects is ready
  2677. and returns that exact number of object refs.
  2678. `ray_waitables` is a list of :class:`~ray.ObjectRef` and
  2679. :class:`~ray.ObjectRefGenerator`.
  2680. The method returns two lists, ready and unready `ray_waitables`.
  2681. ObjectRef:
  2682. object refs that correspond to objects that are available
  2683. in the object store are in the first list.
  2684. The rest of the object refs are in the second list.
  2685. ObjectRefGenerator:
  2686. Generators whose next reference (that will be obtained
  2687. via `next(generator)`) has a corresponding object available
  2688. in the object store are in the first list.
  2689. All other generators are placed in the second list.
  2690. Ordering of the input list of ray_waitables is preserved. That is, if A
  2691. precedes B in the input list, and both are in the ready list, then A will
  2692. precede B in the ready list. This also holds true if A and B are both in
  2693. the remaining list.
  2694. This method will issue a warning if it's running inside an async context.
  2695. Instead of ``ray.wait(ray_waitables)``, you can use
  2696. ``await asyncio.wait(ray_waitables)``.
  2697. Related patterns and anti-patterns:
  2698. - :doc:`/ray-core/patterns/limit-pending-tasks`
  2699. - :doc:`/ray-core/patterns/ray-get-submission-order`
  2700. Args:
  2701. ray_waitables: List of :class:`~ObjectRef` or
  2702. :class:`~ObjectRefGenerator` for objects that may or may
  2703. not be ready. Note that these must be unique.
  2704. num_returns: The number of ray_waitables that should be returned.
  2705. timeout: The maximum amount of time in seconds to wait before
  2706. returning.
  2707. fetch_local: If True, wait for the object to be downloaded onto
  2708. the local node before returning it as ready. If the `ray_waitable`
  2709. is a generator, it will wait until the next object in the generator
  2710. is downloaed. If False, ray.wait() will not trigger fetching of
  2711. objects to the local node and will return immediately once the
  2712. object is available anywhere in the cluster.
  2713. Returns:
  2714. A list of object refs that are ready and a list of the remaining object
  2715. IDs.
  2716. """
  2717. worker = global_worker
  2718. worker.check_connected()
  2719. if (
  2720. hasattr(worker, "core_worker")
  2721. and worker.core_worker.current_actor_is_asyncio()
  2722. and timeout != 0
  2723. ):
  2724. global blocking_wait_inside_async_warned
  2725. if not blocking_wait_inside_async_warned:
  2726. logger.debug(
  2727. "Using blocking ray.wait inside async method. "
  2728. "This blocks the event loop. Please use `await` "
  2729. "on object ref with asyncio.wait. "
  2730. )
  2731. blocking_wait_inside_async_warned = True
  2732. if isinstance(ray_waitables, ObjectRef) or isinstance(
  2733. ray_waitables, ObjectRefGenerator
  2734. ):
  2735. raise TypeError(
  2736. "wait() expected a list of ray.ObjectRef or ray.ObjectRefGenerator"
  2737. ", got a single ray.ObjectRef or ray.ObjectRefGenerator "
  2738. f"{ray_waitables}"
  2739. )
  2740. if not isinstance(ray_waitables, list):
  2741. raise TypeError(
  2742. "wait() expected a list of ray.ObjectRef or "
  2743. "ray.ObjectRefGenerator, "
  2744. f"got {type(ray_waitables)}"
  2745. )
  2746. if timeout is not None and timeout < 0:
  2747. raise ValueError(
  2748. "The 'timeout' argument must be nonnegative. " f"Received {timeout}"
  2749. )
  2750. for ray_waitable in ray_waitables:
  2751. if not isinstance(ray_waitable, ObjectRef) and not isinstance(
  2752. ray_waitable, ObjectRefGenerator
  2753. ):
  2754. raise TypeError(
  2755. "wait() expected a list of ray.ObjectRef or "
  2756. "ray.ObjectRefGenerator, "
  2757. f"got list containing {type(ray_waitable)}"
  2758. )
  2759. worker.check_connected()
  2760. # TODO(swang): Check main thread.
  2761. with profiling.profile("ray.wait"):
  2762. # TODO(rkn): This is a temporary workaround for
  2763. # https://github.com/ray-project/ray/issues/997. However, it should be
  2764. # fixed in Arrow instead of here.
  2765. if len(ray_waitables) == 0:
  2766. return [], []
  2767. if len(ray_waitables) != len(set(ray_waitables)):
  2768. raise ValueError("Wait requires a list of unique ray_waitables.")
  2769. if num_returns <= 0:
  2770. raise ValueError("Invalid number of objects to return %d." % num_returns)
  2771. if num_returns > len(ray_waitables):
  2772. raise ValueError(
  2773. "num_returns cannot be greater than the number "
  2774. "of ray_waitables provided to ray.wait."
  2775. )
  2776. timeout = timeout if timeout is not None else 10**6
  2777. timeout_milliseconds = int(timeout * 1000)
  2778. ready_ids, remaining_ids = worker.core_worker.wait(
  2779. ray_waitables,
  2780. num_returns,
  2781. timeout_milliseconds,
  2782. fetch_local,
  2783. )
  2784. return ready_ids, remaining_ids
  2785. @PublicAPI
  2786. @client_mode_hook
  2787. def get_actor(name: str, namespace: Optional[str] = None) -> "ray.actor.ActorHandle":
  2788. """Get a handle to a named actor.
  2789. Gets a handle to an actor with the given name. The actor must
  2790. have been created with Actor.options(name="name").remote(). This
  2791. works for both detached & non-detached actors.
  2792. This method is a sync call and it'll timeout after 60s. This can be modified
  2793. by setting OS env RAY_gcs_server_request_timeout_seconds before starting
  2794. the cluster.
  2795. Args:
  2796. name: The name of the actor.
  2797. namespace: The namespace of the actor, or None to specify the current
  2798. namespace.
  2799. Returns:
  2800. ActorHandle to the actor.
  2801. Raises:
  2802. ValueError: if the named actor does not exist.
  2803. """
  2804. if not name:
  2805. raise ValueError("Please supply a non-empty value to get_actor")
  2806. if namespace is not None:
  2807. ray._private.utils.validate_namespace(namespace)
  2808. worker = global_worker
  2809. worker.check_connected()
  2810. return worker.core_worker.get_named_actor_handle(name, namespace or "")
  2811. @PublicAPI
  2812. @client_mode_hook
  2813. def kill(actor: "ray.actor.ActorHandle", *, no_restart: bool = True):
  2814. """Kill an actor forcefully.
  2815. This will interrupt any running tasks on the actor, causing them to fail
  2816. immediately. ``atexit`` handlers installed in the actor will not be run.
  2817. If you want to kill the actor but let pending tasks finish,
  2818. you can call ``actor.__ray_terminate__.remote()`` instead to queue a
  2819. termination task. Any ``atexit`` handlers installed in the actor *will*
  2820. be run in this case.
  2821. If the actor is a detached actor, subsequent calls to get its handle via
  2822. ray.get_actor will fail.
  2823. Args:
  2824. actor: Handle to the actor to kill.
  2825. no_restart: Whether or not this actor should be restarted if
  2826. it's a restartable actor.
  2827. """
  2828. worker = global_worker
  2829. worker.check_connected()
  2830. if not isinstance(actor, ray.actor.ActorHandle):
  2831. raise ValueError(
  2832. "ray.kill() only supported for actors. For tasks, try ray.cancel(). "
  2833. "Got: {}.".format(type(actor))
  2834. )
  2835. try:
  2836. worker.core_worker.kill_actor(actor._ray_actor_id, no_restart)
  2837. except ActorHandleNotFoundError as e:
  2838. actor_job_id = actor._ray_actor_id.job_id
  2839. current_job_id = worker.current_job_id
  2840. raise ActorHandleNotFoundError(
  2841. f"ActorHandle objects are not valid across Ray sessions. "
  2842. f"The actor handle was created in job {actor_job_id.hex()}, "
  2843. f"but the current job is {current_job_id.hex()}. "
  2844. f"This typically happens when you try to use an actor handle "
  2845. f"from a previous session after calling ray.shutdown() and ray.init(). "
  2846. f"Please create a new actor handle in the current session."
  2847. ) from e
  2848. @PublicAPI
  2849. @client_mode_hook
  2850. def cancel(
  2851. ray_waitable: Union["ObjectRef[R]", "ObjectRefGenerator[R]"],
  2852. *,
  2853. force: bool = False,
  2854. recursive: bool = True,
  2855. ) -> None:
  2856. """Cancels a task.
  2857. Cancel API has a different behavior depending on if it is a remote function
  2858. (Task) or a remote Actor method (Actor Task).
  2859. Task:
  2860. If the specified Task is pending execution, it is cancelled and not
  2861. executed. If the Task is currently executing, the behavior depends
  2862. on the `force` flag. When `force=False`, a KeyboardInterrupt is
  2863. raised in Python and when `force=True`, the executing Task
  2864. immediately exits. If the Task is already finished, nothing happens.
  2865. Cancelled Tasks aren't retried. `max_task_retries` aren't respected.
  2866. Calling ray.get on a cancelled Task raises a TaskCancelledError
  2867. if the Task has been scheduled or interrupted.
  2868. It raises a WorkerCrashedError if `force=True`.
  2869. If `recursive=True`, all the child Tasks and Actor Tasks
  2870. are cancelled. If `force=True` and `recursive=True`, `force=True`
  2871. is ignored for child Actor Tasks.
  2872. Actor Task:
  2873. If the specified Task is pending execution, it is cancelled and not
  2874. executed. If the Task is currently executing, the behavior depends
  2875. on the execution model of an Actor. If it is a regular Actor
  2876. or a threaded Actor, Ray sets a cancellation flag that can be checked
  2877. via `ray.get_runtime_context().is_canceled()` within the task body.
  2878. This allows for graceful cancellation by periodically checking the
  2879. cancellation status. If it is an async Actor, Ray cancels a `asyncio.Task`.
  2880. The semantic of cancellation is equivalent to asyncio's cancellation.
  2881. https://docs.python.org/3/library/asyncio-task.html#task-cancellation
  2882. Note: `is_canceled()` is not supported for async actors and will raise
  2883. a RuntimeError. If the Task has finished, nothing happens.
  2884. Only `force=False` is allowed for an Actor Task. Otherwise, it raises
  2885. `ValueError`. Use `ray.kill(actor)` instead to kill an Actor.
  2886. Cancelled Tasks aren't retried. `max_task_retries` aren't respected.
  2887. Calling ray.get on a cancelled Task raises a TaskCancelledError
  2888. if the Task has been scheduled or interrupted.
  2889. If `recursive=True`, all the child Tasks and actor Tasks
  2890. are cancelled.
  2891. Args:
  2892. ray_waitable: :class:`~ObjectRef` and
  2893. :class:`~ObjectRefGenerator`
  2894. returned by the task that should be canceled.
  2895. force: Whether to force-kill a running task by killing
  2896. the worker that is running the task.
  2897. recursive: Whether to try to cancel tasks submitted by the
  2898. task specified.
  2899. """
  2900. worker = ray._private.worker.global_worker
  2901. worker.check_connected()
  2902. if isinstance(ray_waitable, ray._raylet.ObjectRefGenerator):
  2903. assert hasattr(ray_waitable, "_generator_ref")
  2904. ray_waitable = ray_waitable._generator_ref
  2905. if not isinstance(ray_waitable, ray.ObjectRef):
  2906. raise TypeError(
  2907. "ray.cancel() only supported for object refs. "
  2908. f"For actors, try ray.kill(). Got: {type(ray_waitable)}."
  2909. )
  2910. return worker.core_worker.cancel_task(ray_waitable, force, recursive)
  2911. def _mode(worker=global_worker):
  2912. """This is a wrapper around worker.mode.
  2913. We use this wrapper so that in the remote decorator, we can call _mode()
  2914. instead of worker.mode. The difference is that when we attempt to
  2915. serialize remote functions, we don't attempt to serialize the worker
  2916. object, which cannot be serialized.
  2917. """
  2918. return worker.mode
  2919. def _make_remote(function_or_class, options):
  2920. if not function_or_class.__module__:
  2921. function_or_class.__module__ = "global"
  2922. if inspect.isfunction(function_or_class) or is_cython(function_or_class):
  2923. ray_option_utils.validate_task_options(options, in_options=False)
  2924. return ray.remote_function.RemoteFunction(
  2925. Language.PYTHON,
  2926. function_or_class,
  2927. None,
  2928. options,
  2929. )
  2930. if inspect.isclass(function_or_class):
  2931. ray_option_utils.validate_actor_options(options, in_options=False)
  2932. return ray.actor._make_actor(function_or_class, options)
  2933. raise TypeError(
  2934. "The @ray.remote decorator must be applied to either a function or a class."
  2935. )
  2936. class RemoteDecorator(Protocol):
  2937. @overload
  2938. def __call__(self, __t: Type[T]) -> ActorClass[T]:
  2939. ...
  2940. @overload
  2941. def __call__(self, __function: Callable[[], R]) -> RemoteFunctionNoArgs[R]:
  2942. ...
  2943. @overload
  2944. def __call__(self, __function: Callable[[T0], R]) -> RemoteFunction0[R, T0]:
  2945. ...
  2946. @overload
  2947. def __call__(self, __function: Callable[[T0, T1], R]) -> RemoteFunction1[R, T0, T1]:
  2948. ...
  2949. @overload
  2950. def __call__(
  2951. self, __function: Callable[[T0, T1, T2], R]
  2952. ) -> RemoteFunction2[R, T0, T1, T2]:
  2953. ...
  2954. @overload
  2955. def __call__(
  2956. self, __function: Callable[[T0, T1, T2, T3], R]
  2957. ) -> RemoteFunction3[R, T0, T1, T2, T3]:
  2958. ...
  2959. @overload
  2960. def __call__(
  2961. self, __function: Callable[[T0, T1, T2, T3, T4], R]
  2962. ) -> RemoteFunction4[R, T0, T1, T2, T3, T4]:
  2963. ...
  2964. @overload
  2965. def __call__(
  2966. self, __function: Callable[[T0, T1, T2, T3, T4, T5], R]
  2967. ) -> RemoteFunction5[R, T0, T1, T2, T3, T4, T5]:
  2968. ...
  2969. @overload
  2970. def __call__(
  2971. self, __function: Callable[[T0, T1, T2, T3, T4, T5, T6], R]
  2972. ) -> RemoteFunction6[R, T0, T1, T2, T3, T4, T5, T6]:
  2973. ...
  2974. @overload
  2975. def __call__(
  2976. self, __function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7], R]
  2977. ) -> RemoteFunction7[R, T0, T1, T2, T3, T4, T5, T6, T7]:
  2978. ...
  2979. @overload
  2980. def __call__(
  2981. self, __function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7, T8], R]
  2982. ) -> RemoteFunction8[R, T0, T1, T2, T3, T4, T5, T6, T7, T8]:
  2983. ...
  2984. @overload
  2985. def __call__(
  2986. self, __function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9], R]
  2987. ) -> RemoteFunction9[R, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]:
  2988. ...
  2989. @overload
  2990. def remote(__t: Type[T]) -> ActorClass[T]:
  2991. ...
  2992. @overload
  2993. def remote(__function: Callable[[], R]) -> RemoteFunctionNoArgs[R]:
  2994. ...
  2995. @overload
  2996. def remote(__function: Callable[[T0], R]) -> RemoteFunction0[R, T0]:
  2997. ...
  2998. @overload
  2999. def remote(__function: Callable[[T0, T1], R]) -> RemoteFunction1[R, T0, T1]:
  3000. ...
  3001. @overload
  3002. def remote(__function: Callable[[T0, T1, T2], R]) -> RemoteFunction2[R, T0, T1, T2]:
  3003. ...
  3004. @overload
  3005. def remote(
  3006. __function: Callable[[T0, T1, T2, T3], R]
  3007. ) -> RemoteFunction3[R, T0, T1, T2, T3]:
  3008. ...
  3009. @overload
  3010. def remote(
  3011. __function: Callable[[T0, T1, T2, T3, T4], R]
  3012. ) -> RemoteFunction4[R, T0, T1, T2, T3, T4]:
  3013. ...
  3014. @overload
  3015. def remote(
  3016. __function: Callable[[T0, T1, T2, T3, T4, T5], R]
  3017. ) -> RemoteFunction5[R, T0, T1, T2, T3, T4, T5]:
  3018. ...
  3019. @overload
  3020. def remote(
  3021. __function: Callable[[T0, T1, T2, T3, T4, T5, T6], R]
  3022. ) -> RemoteFunction6[R, T0, T1, T2, T3, T4, T5, T6]:
  3023. ...
  3024. @overload
  3025. def remote(
  3026. __function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7], R]
  3027. ) -> RemoteFunction7[R, T0, T1, T2, T3, T4, T5, T6, T7]:
  3028. ...
  3029. @overload
  3030. def remote(
  3031. __function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7, T8], R]
  3032. ) -> RemoteFunction8[R, T0, T1, T2, T3, T4, T5, T6, T7, T8]:
  3033. ...
  3034. @overload
  3035. def remote(
  3036. __function: Callable[[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9], R]
  3037. ) -> RemoteFunction9[R, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]:
  3038. ...
  3039. # Passing options
  3040. @overload
  3041. def remote(
  3042. *,
  3043. num_returns: Union[int, Literal["streaming"]] = Undefined,
  3044. num_cpus: Union[int, float] = Undefined,
  3045. num_gpus: Union[int, float] = Undefined,
  3046. resources: Dict[str, float] = Undefined,
  3047. accelerator_type: str = Undefined,
  3048. memory: Union[int, float] = Undefined,
  3049. max_calls: int = Undefined,
  3050. max_restarts: int = Undefined,
  3051. max_task_retries: int = Undefined,
  3052. max_retries: int = Undefined,
  3053. runtime_env: Dict[str, Any] = Undefined,
  3054. retry_exceptions: bool = Undefined,
  3055. scheduling_strategy: Union[
  3056. None, Literal["DEFAULT"], Literal["SPREAD"], PlacementGroupSchedulingStrategy
  3057. ] = Undefined,
  3058. label_selector: Dict[str, str] = Undefined,
  3059. fallback_strategy: List[Dict[str, Any]] = Undefined,
  3060. ) -> RemoteDecorator:
  3061. ...
  3062. @PublicAPI
  3063. def remote(
  3064. *args, **kwargs
  3065. ) -> Union[ray.remote_function.RemoteFunction, ray.actor.ActorClass]:
  3066. """Defines a remote function or an actor class.
  3067. This function can be used as a decorator with no arguments
  3068. to define a remote function or actor as follows:
  3069. .. testcode::
  3070. import ray
  3071. @ray.remote
  3072. def f(a, b, c):
  3073. return a + b + c
  3074. object_ref = f.remote(1, 2, 3)
  3075. result = ray.get(object_ref)
  3076. assert result == (1 + 2 + 3)
  3077. @ray.remote
  3078. class Foo:
  3079. def __init__(self, arg):
  3080. self.x = arg
  3081. def method(self, a):
  3082. return self.x + a
  3083. actor_handle = Foo.remote(123)
  3084. object_ref = actor_handle.method.remote(321)
  3085. result = ray.get(object_ref)
  3086. assert result == (123 + 321)
  3087. Equivalently, use a function call to create a remote function or actor.
  3088. .. testcode::
  3089. def g(a, b, c):
  3090. return a + b + c
  3091. remote_g = ray.remote(g)
  3092. object_ref = remote_g.remote(1, 2, 3)
  3093. assert ray.get(object_ref) == (1 + 2 + 3)
  3094. class Bar:
  3095. def __init__(self, arg):
  3096. self.x = arg
  3097. def method(self, a):
  3098. return self.x + a
  3099. RemoteBar = ray.remote(Bar)
  3100. actor_handle = RemoteBar.remote(123)
  3101. object_ref = actor_handle.method.remote(321)
  3102. result = ray.get(object_ref)
  3103. assert result == (123 + 321)
  3104. It can also be used with specific keyword arguments as follows:
  3105. .. testcode::
  3106. @ray.remote(num_gpus=1, max_calls=1, num_returns=2)
  3107. def f():
  3108. return 1, 2
  3109. @ray.remote(num_cpus=2, resources={"CustomResource": 1})
  3110. class Foo:
  3111. def method(self):
  3112. return 1
  3113. Remote task and actor objects returned by @ray.remote can also be
  3114. dynamically modified with the same arguments as above using
  3115. ``.options()`` as follows:
  3116. .. testcode::
  3117. :hide:
  3118. ray.shutdown()
  3119. ray.init(num_cpus=5, num_gpus=5)
  3120. .. testcode::
  3121. @ray.remote(num_gpus=1, max_calls=1, num_returns=2)
  3122. def f():
  3123. return 1, 2
  3124. f_with_2_gpus = f.options(num_gpus=2)
  3125. object_refs = f_with_2_gpus.remote()
  3126. assert ray.get(object_refs) == [1, 2]
  3127. @ray.remote(num_cpus=2, resources={"CustomResource": 1})
  3128. class Foo:
  3129. def method(self):
  3130. return 1
  3131. Foo_with_no_resources = Foo.options(num_cpus=1, resources=None)
  3132. foo_actor = Foo_with_no_resources.remote()
  3133. assert ray.get(foo_actor.method.remote()) == 1
  3134. A remote actor will be terminated when all actor handle to it
  3135. in Python is deleted, which will cause them to complete any outstanding
  3136. work and then shut down. If you only have 1 reference to an actor handle,
  3137. calling ``del actor`` *could* trigger actor deletion. Note that your program
  3138. may have multiple references to the same ActorHandle, and actor termination
  3139. will not occur until the reference count goes to 0. See the Python
  3140. documentation for more context about object deletion.
  3141. https://docs.python.org/3.9/reference/datamodel.html#object.__del__
  3142. If you want to kill actors immediately, you can also call ``ray.kill(actor)``.
  3143. .. tip::
  3144. Avoid repeatedly passing in large arguments to remote task or method calls.
  3145. Instead, use ray.put to create a copy of the object in the object store.
  3146. See :ref:`more info here <ray-pass-large-arg-by-value>`.
  3147. Args:
  3148. num_returns: This is only for *remote functions*. It specifies
  3149. the number of object refs returned by the remote function
  3150. invocation. The default value is 1.
  3151. Pass "dynamic" to allow the task to decide how many
  3152. return values to return during execution, and the caller will
  3153. receive an ObjectRef[DynamicObjectRefGenerator].
  3154. See :ref:`dynamic generators <dynamic-generators>` for more details.
  3155. num_cpus: The quantity of CPU resources to reserve
  3156. for this task or for the lifetime of the actor.
  3157. By default, tasks use 1 CPU resource and actors use 1 CPU
  3158. for scheduling and 0 CPU for running
  3159. (This means, by default, actors cannot get scheduled on a zero-cpu node,
  3160. but an infinite number of them can run on any non-zero cpu node.
  3161. The default value for actors was chosen for historical reasons.
  3162. It's recommended to always explicitly set num_cpus for actors
  3163. to avoid any surprises.
  3164. If resources are specified explicitly,
  3165. they are required for both scheduling and running.)
  3166. See :ref:`specifying resource requirements <resource-requirements>`
  3167. for more details.
  3168. num_gpus: The quantity of GPU resources to reserve
  3169. for this task or for the lifetime of the actor.
  3170. The default value is 0.
  3171. See :ref:`Ray GPU support <gpu-support>` for more details.
  3172. resources (Dict[str, float]): The quantity of various
  3173. :ref:`custom resources <custom-resources>`
  3174. to reserve for this task or for the lifetime of the actor.
  3175. This is a dictionary mapping strings (resource names) to floats.
  3176. By default it is empty.
  3177. label_selector: [Experimental] If specified, the labels required for the node on
  3178. which this actor can be scheduled on. The label selector consist of key-value pairs,
  3179. where the keys are label names and the value are expressions consisting of an operator
  3180. with label values or just a value to indicate equality.
  3181. fallback_strategy: [Experimental] If specified, expresses soft constraints for scheduling
  3182. through a list of dicts of decorator options to fall back on when scheduling on a node.
  3183. Decorator options are evaluated together during scheduling. The first satisfied
  3184. dict of options is used. Currently only `label_selector` is a supported option.
  3185. accelerator_type: If specified, requires that the task or actor run
  3186. on a node with the specified type of accelerator.
  3187. See :ref:`accelerator types <accelerator_types>`.
  3188. memory: The heap memory request in bytes for this task/actor,
  3189. rounded down to the nearest integer.
  3190. max_calls: Only for *remote functions*. This specifies the
  3191. maximum number of times that a given worker can execute
  3192. the given remote function before it must exit
  3193. (this can be used to address :ref:`memory leaks <gpu-leak>` in third-party
  3194. libraries or to reclaim resources that cannot easily be
  3195. released, e.g., GPU memory that was acquired by TensorFlow).
  3196. By default this is infinite for CPU tasks and 1 for GPU tasks
  3197. (to force GPU tasks to release resources after finishing).
  3198. max_restarts: Only for *actors*. This specifies the maximum
  3199. number of times that the actor should be restarted when it dies
  3200. unexpectedly. The minimum valid value is 0 (default),
  3201. which indicates that the actor doesn't need to be restarted.
  3202. A value of -1 indicates that an actor should be restarted
  3203. indefinitely.
  3204. See :ref:`actor fault tolerance <fault-tolerance-actors>` for more details.
  3205. max_task_retries: Only for *actors*. How many times to
  3206. retry an actor task if the task fails due to a system error,
  3207. e.g., the actor has died. If set to -1, the system will
  3208. retry the failed task until the task succeeds, or the actor
  3209. has reached its max_restarts limit. If set to `n > 0`, the
  3210. system will retry the failed task up to n times, after which the
  3211. task will throw a `RayActorError` exception upon :obj:`ray.get`.
  3212. Note that Python exceptions are not considered system errors
  3213. and will not trigger retries.
  3214. The default value is 0.
  3215. See :ref:`actor fault tolerance <fault-tolerance-actors>` for more details.
  3216. max_retries: Only for *remote functions*. This specifies
  3217. the maximum number of times that the remote function
  3218. should be rerun when the worker process executing it
  3219. crashes unexpectedly. The minimum valid value is 0,
  3220. the default value is 3, and a value of -1 indicates
  3221. infinite retries.
  3222. See :ref:`task fault tolerance <fault-tolerance-tasks>` for more details.
  3223. allow_out_of_order_execution: Only for *actors*. Whether Ray executes actor
  3224. tasks out of order. If you're using multi-threaded (``max_concurrency > 1``)
  3225. or async actors, you can't set this to False. Defaults to True if you're
  3226. using multi-threaded or async actors, and False otherwise. Actor task
  3227. retries are always executed out of order.
  3228. runtime_env (Dict[str, Any]): Specifies the runtime environment for
  3229. this actor or task and its children. See
  3230. :ref:`runtime-environments` for detailed documentation.
  3231. retry_exceptions: Only for *remote functions*. This specifies whether
  3232. application-level errors should be retried up to max_retries times.
  3233. This can be a boolean or a list of exceptions that should be retried.
  3234. See :ref:`task fault tolerance <fault-tolerance-tasks>` for more details.
  3235. scheduling_strategy: Strategy about how to
  3236. schedule a remote function or actor. Possible values are
  3237. None: ray will figure out the scheduling strategy to use, it
  3238. will either be the PlacementGroupSchedulingStrategy using parent's
  3239. placement group if parent has one and has
  3240. placement_group_capture_child_tasks set to true,
  3241. or "DEFAULT";
  3242. "DEFAULT": default hybrid scheduling;
  3243. "SPREAD": best effort spread scheduling;
  3244. `PlacementGroupSchedulingStrategy`:
  3245. placement group based scheduling;
  3246. `NodeAffinitySchedulingStrategy`:
  3247. node id based affinity scheduling.
  3248. See :ref:`Ray scheduling strategies <ray-scheduling-strategies>`
  3249. for more details.
  3250. _labels: The key-value labels of a task or actor.
  3251. """
  3252. # "callable" returns true for both function and class.
  3253. if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
  3254. # This is the case where the decorator is just @ray.remote.
  3255. # "args[0]" is the class or function under the decorator.
  3256. return _make_remote(args[0], {})
  3257. assert len(args) == 0 and len(kwargs) > 0, ray_option_utils.remote_args_error_string
  3258. return functools.partial(_make_remote, options=kwargs)