actor.py 100 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486
  1. import inspect
  2. import logging
  3. from typing import (
  4. TYPE_CHECKING,
  5. Any,
  6. Callable,
  7. Dict,
  8. Generic,
  9. List,
  10. Literal,
  11. Optional,
  12. Tuple,
  13. TypeVar,
  14. Union,
  15. overload,
  16. )
  17. try:
  18. from typing import Concatenate, ParamSpec
  19. except ImportError:
  20. from typing_extensions import Concatenate, ParamSpec
  21. import ray._common.signature as signature
  22. import ray._private.ray_constants as ray_constants
  23. import ray._raylet
  24. from ray import ActorClassID, Language, ObjectRef, cross_language
  25. from ray._common import ray_option_utils
  26. from ray._common.ray_constants import DEFAULT_MAX_CONCURRENCY_ASYNC
  27. from ray._common.ray_option_utils import _warn_if_using_deprecated_placement_group
  28. from ray._private.async_compat import has_async_methods
  29. from ray._private.auto_init_hook import wrap_auto_init
  30. from ray._private.client_mode_hook import (
  31. client_mode_convert_actor,
  32. client_mode_hook,
  33. client_mode_should_convert,
  34. )
  35. from ray._private.inspect_util import (
  36. is_class_method,
  37. is_function_or_method,
  38. is_static_method,
  39. )
  40. from ray._private.utils import get_runtime_env_info, parse_runtime_env_for_task_or_actor
  41. from ray._raylet import (
  42. STREAMING_GENERATOR_RETURN,
  43. ObjectRefGenerator,
  44. PythonFunctionDescriptor,
  45. raise_sys_exit_with_custom_error_message,
  46. )
  47. from ray.exceptions import ActorAlreadyExistsError, AsyncioActorExit
  48. from ray.util.annotations import DeveloperAPI, PublicAPI
  49. from ray.util.placement_group import _configure_placement_group_based_on_context
  50. from ray.util.scheduling_strategies import (
  51. PlacementGroupSchedulingStrategy,
  52. SchedulingStrategyT,
  53. )
  54. from ray.util.tracing.tracing_helper import (
  55. _inject_tracing_into_class,
  56. _tracing_actor_creation,
  57. _tracing_actor_method_invocation,
  58. )
  59. if TYPE_CHECKING:
  60. pass
  61. logger = logging.getLogger(__name__)
  62. # Hook to call with (actor, resources, strategy) on each local actor creation.
  63. _actor_launch_hook = None
  64. # TypeVar for generic ActorHandle
  65. T = TypeVar("T")
  66. # return type of ActorClass[T].remote()
  67. ActorProxy = Union["ActorHandle[T]", type[T]]
  68. _Ret = TypeVar("_Ret")
  69. _P = ParamSpec("_P")
  70. _T0 = TypeVar("_T0")
  71. _T1 = TypeVar("_T1")
  72. _T2 = TypeVar("_T2")
  73. _T3 = TypeVar("_T3")
  74. _T4 = TypeVar("_T4")
  75. _T5 = TypeVar("_T5")
  76. _T6 = TypeVar("_T6")
  77. _T7 = TypeVar("_T7")
  78. _T8 = TypeVar("_T8")
  79. _T9 = TypeVar("_T9")
  80. class _RemoteMethodNoArgs(Generic[_Ret]):
  81. def remote(self) -> "ObjectRef[_Ret]":
  82. ...
  83. def bind(self) -> Any:
  84. ...
  85. class _RemoteMethod0(Generic[_Ret, _T0]):
  86. def remote(self, __arg0: "Union[_T0, ObjectRef[_T0]]") -> "ObjectRef[_Ret]":
  87. ...
  88. def bind(self, __arg0: _T0) -> Any:
  89. ...
  90. class _RemoteMethod1(Generic[_Ret, _T0, _T1]):
  91. def remote(
  92. self, __arg0: "Union[_T0, ObjectRef[_T0]]", __arg1: "Union[_T1, ObjectRef[_T1]]"
  93. ) -> "ObjectRef[_Ret]":
  94. ...
  95. def bind(self, __arg0: _T0, __arg1: _T1) -> Any:
  96. ...
  97. class _RemoteMethod2(Generic[_Ret, _T0, _T1, _T2]):
  98. def remote(
  99. self,
  100. __arg0: "Union[_T0, ObjectRef[_T0]]",
  101. __arg1: "Union[_T1, ObjectRef[_T1]]",
  102. __arg2: "Union[_T2, ObjectRef[_T2]]",
  103. ) -> "ObjectRef[_Ret]":
  104. ...
  105. def bind(self, __arg0: _T0, __arg1: _T1, __arg2: _T2) -> Any:
  106. ...
  107. class _RemoteMethod3(Generic[_Ret, _T0, _T1, _T2, _T3]):
  108. def remote(
  109. self,
  110. __arg0: "Union[_T0, ObjectRef[_T0]]",
  111. __arg1: "Union[_T1, ObjectRef[_T1]]",
  112. __arg2: "Union[_T2, ObjectRef[_T2]]",
  113. __arg3: "Union[_T3, ObjectRef[_T3]]",
  114. ) -> "ObjectRef[_Ret]":
  115. ...
  116. def bind(self, __arg0: _T0, __arg1: _T1, __arg2: _T2, __arg3: _T3) -> Any:
  117. ...
  118. class _RemoteMethod4(Generic[_Ret, _T0, _T1, _T2, _T3, _T4]):
  119. def remote(
  120. self,
  121. __arg0: "Union[_T0, ObjectRef[_T0]]",
  122. __arg1: "Union[_T1, ObjectRef[_T1]]",
  123. __arg2: "Union[_T2, ObjectRef[_T2]]",
  124. __arg3: "Union[_T3, ObjectRef[_T3]]",
  125. __arg4: "Union[_T4, ObjectRef[_T4]]",
  126. ) -> "ObjectRef[_Ret]":
  127. ...
  128. def bind(
  129. self, __arg0: _T0, __arg1: _T1, __arg2: _T2, __arg3: _T3, __arg4: _T4
  130. ) -> Any:
  131. ...
  132. class _RemoteMethod5(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5]):
  133. def remote(
  134. self,
  135. __arg0: "Union[_T0, ObjectRef[_T0]]",
  136. __arg1: "Union[_T1, ObjectRef[_T1]]",
  137. __arg2: "Union[_T2, ObjectRef[_T2]]",
  138. __arg3: "Union[_T3, ObjectRef[_T3]]",
  139. __arg4: "Union[_T4, ObjectRef[_T4]]",
  140. __arg5: "Union[_T5, ObjectRef[_T5]]",
  141. ) -> "ObjectRef[_Ret]":
  142. ...
  143. def bind(
  144. self,
  145. __arg0: _T0,
  146. __arg1: _T1,
  147. __arg2: _T2,
  148. __arg3: _T3,
  149. __arg4: _T4,
  150. __arg5: _T5,
  151. ) -> Any:
  152. ...
  153. class _RemoteMethod6(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6]):
  154. def remote(
  155. self,
  156. __arg0: "Union[_T0, ObjectRef[_T0]]",
  157. __arg1: "Union[_T1, ObjectRef[_T1]]",
  158. __arg2: "Union[_T2, ObjectRef[_T2]]",
  159. __arg3: "Union[_T3, ObjectRef[_T3]]",
  160. __arg4: "Union[_T4, ObjectRef[_T4]]",
  161. __arg5: "Union[_T5, ObjectRef[_T5]]",
  162. __arg6: "Union[_T6, ObjectRef[_T6]]",
  163. ) -> "ObjectRef[_Ret]":
  164. ...
  165. def bind(
  166. self,
  167. __arg0: _T0,
  168. __arg1: _T1,
  169. __arg2: _T2,
  170. __arg3: _T3,
  171. __arg4: _T4,
  172. __arg5: _T5,
  173. __arg6: _T6,
  174. ) -> Any:
  175. ...
  176. class _RemoteMethod7(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]):
  177. def remote(
  178. self,
  179. __arg0: "Union[_T0, ObjectRef[_T0]]",
  180. __arg1: "Union[_T1, ObjectRef[_T1]]",
  181. __arg2: "Union[_T2, ObjectRef[_T2]]",
  182. __arg3: "Union[_T3, ObjectRef[_T3]]",
  183. __arg4: "Union[_T4, ObjectRef[_T4]]",
  184. __arg5: "Union[_T5, ObjectRef[_T5]]",
  185. __arg6: "Union[_T6, ObjectRef[_T6]]",
  186. __arg7: "Union[_T7, ObjectRef[_T7]]",
  187. ) -> "ObjectRef[_Ret]":
  188. ...
  189. def bind(
  190. self,
  191. __arg0: _T0,
  192. __arg1: _T1,
  193. __arg2: _T2,
  194. __arg3: _T3,
  195. __arg4: _T4,
  196. __arg5: _T5,
  197. __arg6: _T6,
  198. __arg7: _T7,
  199. ) -> Any:
  200. ...
  201. class _RemoteMethod8(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8]):
  202. def remote(
  203. self,
  204. __arg0: "Union[_T0, ObjectRef[_T0]]",
  205. __arg1: "Union[_T1, ObjectRef[_T1]]",
  206. __arg2: "Union[_T2, ObjectRef[_T2]]",
  207. __arg3: "Union[_T3, ObjectRef[_T3]]",
  208. __arg4: "Union[_T4, ObjectRef[_T4]]",
  209. __arg5: "Union[_T5, ObjectRef[_T5]]",
  210. __arg6: "Union[_T6, ObjectRef[_T6]]",
  211. __arg7: "Union[_T7, ObjectRef[_T7]]",
  212. __arg8: "Union[_T8, ObjectRef[_T8]]",
  213. ) -> "ObjectRef[_Ret]":
  214. ...
  215. def bind(
  216. self,
  217. __arg0: _T0,
  218. __arg1: _T1,
  219. __arg2: _T2,
  220. __arg3: _T3,
  221. __arg4: _T4,
  222. __arg5: _T5,
  223. __arg6: _T6,
  224. __arg7: _T7,
  225. __arg8: _T8,
  226. ) -> Any:
  227. ...
  228. class _RemoteMethod9(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9]):
  229. def remote(
  230. self,
  231. __arg0: "Union[_T0, ObjectRef[_T0]]",
  232. __arg1: "Union[_T1, ObjectRef[_T1]]",
  233. __arg2: "Union[_T2, ObjectRef[_T2]]",
  234. __arg3: "Union[_T3, ObjectRef[_T3]]",
  235. __arg4: "Union[_T4, ObjectRef[_T4]]",
  236. __arg5: "Union[_T5, ObjectRef[_T5]]",
  237. __arg6: "Union[_T6, ObjectRef[_T6]]",
  238. __arg7: "Union[_T7, ObjectRef[_T7]]",
  239. __arg8: "Union[_T8, ObjectRef[_T8]]",
  240. __arg9: "Union[_T9, ObjectRef[_T9]]",
  241. ) -> "ObjectRef[_Ret]":
  242. ...
  243. def bind(
  244. self,
  245. __arg0: _T0,
  246. __arg1: _T1,
  247. __arg2: _T2,
  248. __arg3: _T3,
  249. __arg4: _T4,
  250. __arg5: _T5,
  251. __arg6: _T6,
  252. __arg7: _T7,
  253. __arg8: _T8,
  254. __arg9: _T9,
  255. ) -> Any:
  256. ...
  257. @overload
  258. def method(
  259. __method: Callable[[Any, _T0], _Ret],
  260. ) -> _RemoteMethod0[_Ret, _T0]:
  261. ...
  262. @overload
  263. def method(
  264. __method: Callable[[Any, _T0, _T1], _Ret],
  265. ) -> _RemoteMethod1[_Ret, _T0, _T1]:
  266. ...
  267. @overload
  268. def method(
  269. __method: Callable[[Any, _T0, _T1, _T2], _Ret],
  270. ) -> _RemoteMethod2[_Ret, _T0, _T1, _T2]:
  271. ...
  272. @overload
  273. def method(
  274. __method: Callable[[Any, _T0, _T1, _T2, _T3], _Ret],
  275. ) -> _RemoteMethod3[_Ret, _T0, _T1, _T2, _T3]:
  276. ...
  277. @overload
  278. def method(
  279. __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4], _Ret],
  280. ) -> _RemoteMethod4[_Ret, _T0, _T1, _T2, _T3, _T4]:
  281. ...
  282. @overload
  283. def method(
  284. __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5], _Ret],
  285. ) -> _RemoteMethod5[_Ret, _T0, _T1, _T2, _T3, _T4, _T5]:
  286. ...
  287. @overload
  288. def method(
  289. __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6], _Ret],
  290. ) -> _RemoteMethod6[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6]:
  291. ...
  292. @overload
  293. def method(
  294. __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7], _Ret],
  295. ) -> _RemoteMethod7[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]:
  296. ...
  297. @overload
  298. def method(
  299. __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8], _Ret],
  300. ) -> _RemoteMethod8[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8]:
  301. ...
  302. @overload
  303. def method(
  304. __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9], _Ret],
  305. ) -> _RemoteMethod9[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9]:
  306. ...
  307. @overload
  308. def method(
  309. __method: Callable[[Any], _Ret],
  310. ) -> _RemoteMethodNoArgs[_Ret]:
  311. ...
  312. @overload
  313. def method(
  314. *,
  315. num_returns: Optional[Union[int, Literal["streaming"]]] = None,
  316. concurrency_group: Optional[str] = None,
  317. max_task_retries: Optional[int] = None,
  318. retry_exceptions: Optional[Union[bool, list, tuple]] = None,
  319. _generator_backpressure_num_objects: Optional[int] = None,
  320. enable_task_events: Optional[bool] = None,
  321. tensor_transport: Optional[str] = None,
  322. ) -> Callable[[Callable[Concatenate[Any, _P], _Ret]], Any]:
  323. ...
  324. @PublicAPI
  325. @client_mode_hook
  326. def method(*args, **kwargs):
  327. """Annotate an actor method.
  328. .. code-block:: python
  329. @ray.remote
  330. class Foo:
  331. @ray.method(num_returns=2)
  332. def bar(self):
  333. return 1, 2
  334. f = Foo.remote()
  335. _, _ = f.bar.remote()
  336. Args:
  337. num_returns: The number of object refs that should be returned by
  338. invocations of this actor method. The default value is 1 for a
  339. normal actor task and "streaming" for an actor generator task (a
  340. function that yields objects instead of returning them).
  341. max_task_retries: How many times to retry an actor task if the task
  342. fails due to a runtime error, e.g., the actor has died. The
  343. default value is 0. If set to -1, the system will retry the
  344. failed task until the task succeeds, or the actor has reached
  345. its max_restarts limit. If set to `n > 0`, the system will retry
  346. the failed task up to n times, after which the task will throw a
  347. `RayActorError` exception upon :obj:`ray.get`. Note that Python
  348. exceptions may trigger retries
  349. *only if* `retry_exceptions` is set for the method, in that case
  350. when `max_task_retries` runs out the task will rethrow the
  351. exception from the task. You can override this number with the
  352. method's `max_task_retries` option in `@ray.method` decorator or
  353. in `.option()`.
  354. retry_exceptions: Boolean of whether to retry all Python
  355. exceptions, or a list of allowlist exceptions to retry. The default
  356. value is False (only retry tasks upon system failures and if
  357. max_task_retries is set)
  358. concurrency_group: The name of the concurrency group
  359. to use for the actor method. By default, the actor is
  360. single-threaded and runs all actor tasks on the same thread.
  361. See :ref:`Defining Concurrency Groups <defining-concurrency-groups>`.
  362. tensor_transport: [Alpha] The tensor transport protocol to
  363. use for the actor method. If a tensor transport is specified,
  364. Ray will store a *reference* instead of a copy of any torch.Tensors found inside
  365. values returned by this task, and the tensors will be sent directly
  366. to other tasks using the specified transport. The object store will be used
  367. when this is None (default). "NIXL", "NCCL", and "GLOO" (case-insensitive) are
  368. the three transports supported by default. The NCCL and GLOO transports
  369. require first creating a collective with the involved actors using
  370. :func:`ray.experimental.collective.create_collective_group`.
  371. See :ref:`Ray Direct Transport (RDT) <direct-transport>` for more
  372. details.
  373. """
  374. valid_kwargs = [
  375. "num_returns",
  376. "concurrency_group",
  377. "max_task_retries",
  378. "retry_exceptions",
  379. "_generator_backpressure_num_objects",
  380. "enable_task_events",
  381. "tensor_transport",
  382. ]
  383. def annotate_method(method: Callable[_P, _Ret]):
  384. if "num_returns" in kwargs:
  385. method.__ray_num_returns__ = kwargs["num_returns"]
  386. if "max_task_retries" in kwargs:
  387. method.__ray_max_task_retries__ = kwargs["max_task_retries"]
  388. if "retry_exceptions" in kwargs:
  389. method.__ray_retry_exceptions__ = kwargs["retry_exceptions"]
  390. if "concurrency_group" in kwargs:
  391. method.__ray_concurrency_group__ = kwargs["concurrency_group"]
  392. if "_generator_backpressure_num_objects" in kwargs:
  393. method.__ray_generator_backpressure_num_objects__ = kwargs[
  394. "_generator_backpressure_num_objects"
  395. ]
  396. if "enable_task_events" in kwargs and kwargs["enable_task_events"] is not None:
  397. method.__ray_enable_task_events__ = kwargs["enable_task_events"]
  398. if "tensor_transport" in kwargs:
  399. tensor_transport = kwargs["tensor_transport"]
  400. from ray.experimental.gpu_object_manager.util import (
  401. normalize_and_validate_tensor_transport,
  402. )
  403. tensor_transport = normalize_and_validate_tensor_transport(tensor_transport)
  404. method.__ray_tensor_transport__ = tensor_transport
  405. return method
  406. # Check if decorator is called without parentheses (args[0] would be the function)
  407. if len(args) == 1 and callable(args[0]) and len(kwargs) == 0:
  408. # Called as @ray.method (without parentheses)
  409. return annotate_method(args[0])
  410. # Called as @ray.method() or @ray.method(options...)
  411. error_string = (
  412. "The @ray.method decorator must be applied using no arguments or at "
  413. f"least one of the arguments in the list {valid_kwargs}, for example "
  414. "'@ray.method(num_returns=2)'."
  415. )
  416. assert len(args) == 0, error_string
  417. for key in kwargs:
  418. key_error_string = (
  419. f"Unexpected keyword argument to @ray.method: '{key}'. The "
  420. f"supported keyword arguments are {valid_kwargs}"
  421. )
  422. assert key in valid_kwargs, key_error_string
  423. return annotate_method
  424. class _ActorMethodMetadata:
  425. """A container for the metadata required to invoke an actor method.
  426. This class intentionally does *not* hold a reference to the `ActorHandle`, as that causes
  427. a circular reference that delays `ActorHandle` destruction until the Python GC runs.
  428. Instead, it can be used as a factory to lazily generate `ActorMethod` instances that can
  429. be used to submit actor tasks for this method.
  430. """
  431. def __init__(
  432. self,
  433. method_name: str,
  434. num_returns: Optional[Union[int, Literal["streaming"]]],
  435. max_task_retries: int,
  436. retry_exceptions: Union[bool, list, tuple],
  437. is_generator: bool,
  438. generator_backpressure_num_objects: int,
  439. enable_task_events: bool,
  440. decorator: Optional[Any] = None,
  441. signature: Optional[List[inspect.Parameter]] = None,
  442. tensor_transport: Optional[str] = None,
  443. ):
  444. """Initialize an _ActorMethodMetadata.
  445. Args:
  446. method_name: The name of the actor method.
  447. num_returns: The default number of return values that the method
  448. invocation should return. If None is given, it uses
  449. DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS for a normal actor task
  450. and "streaming" for a generator task (when `is_generator` is True).
  451. max_task_retries: Number of retries on method failure.
  452. retry_exceptions: Boolean or list/tuple of exceptions to retry.
  453. is_generator: True if the method is a generator.
  454. generator_backpressure_num_objects: Generator-only config for backpressure.
  455. enable_task_events: True if task events are enabled for this method.
  456. decorator: Optional decorator for the method invocation.
  457. signature: The signature of the actor method.
  458. tensor_transport: The tensor transport protocol to use for the actor method.
  459. """
  460. self._method_name = method_name
  461. # Default case.
  462. if num_returns is None:
  463. if is_generator:
  464. num_returns = "streaming"
  465. else:
  466. num_returns = ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS
  467. self._num_returns = num_returns
  468. self._max_task_retries = max_task_retries
  469. self._retry_exceptions = retry_exceptions
  470. self._is_generator = is_generator
  471. self._generator_backpressure_num_objects = generator_backpressure_num_objects
  472. self._enable_task_events = enable_task_events
  473. self._decorator = decorator
  474. self._signature = signature
  475. self._tensor_transport = tensor_transport
  476. def bind(self, actor_handle: "ActorHandle") -> "ActorMethod":
  477. """
  478. Produce a bound ActorMethod that holds a strong reference to actor_handle.
  479. """
  480. return ActorMethod(
  481. actor_handle,
  482. self._method_name,
  483. self._num_returns,
  484. self._max_task_retries,
  485. self._retry_exceptions,
  486. self._is_generator,
  487. self._generator_backpressure_num_objects,
  488. self._enable_task_events,
  489. decorator=self._decorator,
  490. signature=self._signature,
  491. tensor_transport=self._tensor_transport,
  492. )
  493. # Create objects to wrap method invocations. This is done so that we can
  494. # invoke methods with actor.method.remote() instead of actor.method().
  495. @PublicAPI
  496. class ActorMethod:
  497. """A class used to invoke an actor method.
  498. Note: This class should not be instantiated directly. Instead, it should
  499. only be used as a return value from the `@ray.method` decorator.
  500. """
  501. def __init__(
  502. self,
  503. actor,
  504. method_name,
  505. num_returns: Optional[Union[int, Literal["streaming"]]],
  506. max_task_retries: int,
  507. retry_exceptions: Union[bool, list, tuple],
  508. is_generator: bool,
  509. generator_backpressure_num_objects: int,
  510. enable_task_events: bool,
  511. decorator=None,
  512. signature: Optional[List[inspect.Parameter]] = None,
  513. tensor_transport: Optional[str] = None,
  514. ):
  515. """Initialize an ActorMethod.
  516. Args:
  517. actor: The actor instance this method belongs to.
  518. method_name: The name of the actor method.
  519. num_returns: The default number of return values that the method
  520. invocation should return. If None is given, it uses
  521. DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS for a normal actor task
  522. and "streaming" for a generator task (when `is_generator` is True).
  523. max_task_retries: Number of retries on method failure.
  524. retry_exceptions: Boolean of whether you want to retry all user-raised
  525. exceptions, or a list of allowlist exceptions to retry.
  526. is_generator: True if a given method is a Python generator.
  527. generator_backpressure_num_objects: Generator-only config.
  528. If a number of unconsumed objects reach this threshold,
  529. the actor task stops pausing.
  530. enable_task_events: True if task events is enabled, i.e., task events from
  531. the actor should be reported. Defaults to True.
  532. decorator: An optional decorator that should be applied to the actor
  533. method invocation.
  534. signature: The signature of the actor method. It is None only when cross
  535. language feature is used.
  536. tensor_transport: The tensor transport protocol to use for the actor method.
  537. """
  538. self._actor = actor
  539. self._method_name = method_name
  540. self._num_returns = num_returns
  541. # Default case.
  542. if self._num_returns is None:
  543. if is_generator:
  544. self._num_returns = "streaming"
  545. else:
  546. self._num_returns = ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS
  547. self._max_task_retries = max_task_retries
  548. self._retry_exceptions = retry_exceptions
  549. self._is_generator = is_generator
  550. self._generator_backpressure_num_objects = generator_backpressure_num_objects
  551. self._enable_task_events = enable_task_events
  552. self._signature = signature
  553. # This is a decorator that is used to wrap the function invocation (as
  554. # opposed to the function execution). The decorator must return a
  555. # function that takes in two arguments ("args" and "kwargs"). In most
  556. # cases, it should call the function that was passed into the decorator
  557. # and return the resulting ObjectRefs.
  558. self._decorator = decorator
  559. self._tensor_transport = tensor_transport
  560. def __call__(self, *args, **kwargs):
  561. raise TypeError(
  562. "Actor methods cannot be called directly. Instead "
  563. f"of running 'object.{self._method_name}()', try "
  564. f"'object.{self._method_name}.remote()'."
  565. )
  566. @DeveloperAPI
  567. def bind(self, *args, **kwargs):
  568. """
  569. Bind arguments to the actor method for Ray DAG building.
  570. This method generates and returns an intermediate representation (IR)
  571. node that indicates the actor method will be called with the given
  572. arguments at execution time.
  573. This method is used in both :ref:`Ray DAG <ray-dag-guide>` and
  574. :ref:`Ray Compiled Graph <ray-compiled-graph>` for building a DAG.
  575. """
  576. return self._bind(args, kwargs)
  577. def remote(self, *args, **kwargs):
  578. return self._remote(args, kwargs)
  579. def options(self, **options):
  580. """Convenience method for executing an actor method call with options.
  581. Same arguments as func._remote(), but returns a wrapped function
  582. that a non-underscore .remote() can be called on.
  583. Examples:
  584. # The following two calls are equivalent.
  585. >>> actor.my_method._remote(args=[x, y], name="foo", num_returns=2)
  586. >>> actor.my_method.options(name="foo", num_returns=2).remote(x, y)
  587. """
  588. func_cls = self
  589. tensor_transport = options.get("tensor_transport", None)
  590. if tensor_transport is not None:
  591. from ray.experimental.gpu_object_manager.util import (
  592. normalize_and_validate_tensor_transport,
  593. )
  594. tensor_transport = normalize_and_validate_tensor_transport(tensor_transport)
  595. options["tensor_transport"] = tensor_transport
  596. class FuncWrapper:
  597. def remote(self, *args, **kwargs):
  598. return func_cls._remote(args=args, kwargs=kwargs, **options)
  599. @DeveloperAPI
  600. def bind(self, *args, **kwargs):
  601. return func_cls._bind(args=args, kwargs=kwargs, **options)
  602. return FuncWrapper()
  603. @wrap_auto_init
  604. @_tracing_actor_method_invocation
  605. def _bind(
  606. self,
  607. args=None,
  608. kwargs=None,
  609. name="",
  610. num_returns=None,
  611. concurrency_group=None,
  612. _generator_backpressure_num_objects=None,
  613. ) -> Union["ray.dag.ClassMethodNode", Tuple["ray.dag.ClassMethodNode", ...]]:
  614. from ray.dag.class_node import (
  615. BIND_INDEX_KEY,
  616. IS_CLASS_METHOD_OUTPUT_KEY,
  617. PARENT_CLASS_NODE_KEY,
  618. PREV_CLASS_METHOD_CALL_KEY,
  619. ClassMethodNode,
  620. )
  621. # TODO(sang): unify option passing
  622. options = {
  623. "name": name,
  624. "num_returns": num_returns,
  625. "concurrency_group": concurrency_group,
  626. "_generator_backpressure_num_objects": _generator_backpressure_num_objects,
  627. }
  628. actor = self._actor
  629. if actor is None:
  630. # Ref is GC'ed. It happens when the actor handle is GC'ed
  631. # when bind is called.
  632. raise RuntimeError("Lost reference to actor")
  633. other_args_to_resolve = {
  634. PARENT_CLASS_NODE_KEY: actor,
  635. PREV_CLASS_METHOD_CALL_KEY: None,
  636. BIND_INDEX_KEY: actor._ray_dag_bind_index,
  637. }
  638. actor._ray_dag_bind_index += 1
  639. assert (
  640. self._signature is not None
  641. ), "self._signature should be set for .bind API."
  642. try:
  643. signature.validate_args(self._signature, args, kwargs)
  644. except TypeError as e:
  645. signature_copy = self._signature.copy()
  646. if len(signature_copy) > 0 and signature_copy[-1].name == "_ray_trace_ctx":
  647. # Remove the trace context arg for readability.
  648. signature_copy.pop(-1)
  649. signature_copy = inspect.Signature(parameters=signature_copy)
  650. raise TypeError(
  651. f"{str(e)}. The function `{self._method_name}` has a signature "
  652. f"`{signature_copy}`, but the given arguments to `bind` doesn't "
  653. f"match. args: {args}. kwargs: {kwargs}."
  654. ) from None
  655. node = ClassMethodNode(
  656. self._method_name,
  657. args,
  658. kwargs,
  659. options,
  660. other_args_to_resolve=other_args_to_resolve,
  661. )
  662. if node.num_returns > 1:
  663. output_nodes: List[ClassMethodNode] = []
  664. for i in range(node.num_returns):
  665. output_node = ClassMethodNode(
  666. f"return_idx_{i}",
  667. (node, i),
  668. dict(),
  669. dict(),
  670. {IS_CLASS_METHOD_OUTPUT_KEY: True, PARENT_CLASS_NODE_KEY: actor},
  671. )
  672. output_nodes.append(output_node)
  673. return tuple(output_nodes)
  674. else:
  675. return node
  676. @wrap_auto_init
  677. @_tracing_actor_method_invocation
  678. def _remote(
  679. self,
  680. args=None,
  681. kwargs=None,
  682. name="",
  683. num_returns=None,
  684. max_task_retries=None,
  685. retry_exceptions=None,
  686. concurrency_group=None,
  687. _generator_backpressure_num_objects=None,
  688. enable_task_events=None,
  689. tensor_transport: Optional[str] = None,
  690. ):
  691. if num_returns is None:
  692. num_returns = self._num_returns
  693. if max_task_retries is None:
  694. max_task_retries = self._max_task_retries
  695. if max_task_retries is None:
  696. max_task_retries = 0
  697. if retry_exceptions is None:
  698. retry_exceptions = self._retry_exceptions
  699. if enable_task_events is None:
  700. enable_task_events = self._enable_task_events
  701. if _generator_backpressure_num_objects is None:
  702. _generator_backpressure_num_objects = (
  703. self._generator_backpressure_num_objects
  704. )
  705. if tensor_transport is None:
  706. tensor_transport = self._tensor_transport
  707. if tensor_transport is not None:
  708. if num_returns != 1:
  709. raise ValueError(
  710. f"Currently, methods with tensor_transport={tensor_transport} only support 1 return value. "
  711. "Please make sure the actor method is decorated with `@ray.method(num_returns=1)` (the default)."
  712. )
  713. if not self._actor._ray_enable_tensor_transport:
  714. raise ValueError(
  715. f'Currently, methods with .options(tensor_transport="{tensor_transport}") are not supported when enable_tensor_transport=False. '
  716. "Please set @ray.remote(enable_tensor_transport=True) on the actor class definition."
  717. )
  718. gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
  719. if not gpu_object_manager.actor_has_tensor_transport(
  720. self._actor, tensor_transport
  721. ):
  722. raise ValueError(
  723. f'{self._actor} does not have tensor transport {tensor_transport} available. If using a collective-based transport ("nccl" or "gloo"), please create a communicator with '
  724. "`ray.experimental.collective.create_collective_group` "
  725. "before calling actor tasks with non-default tensor_transport."
  726. )
  727. # Wait for source actor to have the transport registered.
  728. gpu_object_manager.wait_until_custom_transports_registered(self._actor)
  729. args = args or []
  730. kwargs = kwargs or {}
  731. def invocation(args, kwargs):
  732. dst_actor = self._actor
  733. if dst_actor is None:
  734. # See https://github.com/ray-project/ray/issues/6265 for more details.
  735. raise RuntimeError(
  736. "Lost reference to actor. Actor handles must be stored as variables, e.g. `actor = MyActor.remote()` before calling methods."
  737. )
  738. gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
  739. gpu_object_manager.queue_or_trigger_out_of_band_tensor_transfer(
  740. dst_actor, args
  741. )
  742. return dst_actor._actor_method_call(
  743. self._method_name,
  744. args=args,
  745. kwargs=kwargs,
  746. name=name,
  747. num_returns=num_returns,
  748. max_task_retries=max_task_retries,
  749. retry_exceptions=retry_exceptions,
  750. concurrency_group_name=concurrency_group,
  751. generator_backpressure_num_objects=(
  752. _generator_backpressure_num_objects
  753. ),
  754. enable_task_events=enable_task_events,
  755. tensor_transport=tensor_transport,
  756. )
  757. # Apply the decorator if there is one.
  758. if self._decorator is not None:
  759. invocation = self._decorator(invocation)
  760. object_refs = invocation(args, kwargs)
  761. if tensor_transport is not None:
  762. # Currently, we only support RDT when num_returns is 1.
  763. assert isinstance(object_refs, ObjectRef)
  764. object_ref = object_refs
  765. gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
  766. gpu_object_manager.add_gpu_object_ref(
  767. object_ref, self._actor, tensor_transport
  768. )
  769. return object_refs
  770. def __getstate__(self):
  771. return {
  772. "actor": self._actor,
  773. "method_name": self._method_name,
  774. "num_returns": self._num_returns,
  775. "max_task_retries": self._max_task_retries,
  776. "retry_exceptions": self._retry_exceptions,
  777. "decorator": self._decorator,
  778. "is_generator": self._is_generator,
  779. "generator_backpressure_num_objects": self._generator_backpressure_num_objects, # noqa
  780. "enable_task_events": self._enable_task_events,
  781. "_tensor_transport": self._tensor_transport,
  782. }
  783. def __setstate__(self, state):
  784. self.__init__(
  785. state["actor"],
  786. state["method_name"],
  787. state["num_returns"],
  788. state["max_task_retries"],
  789. state["retry_exceptions"],
  790. state["is_generator"],
  791. state["generator_backpressure_num_objects"],
  792. state["enable_task_events"],
  793. state["decorator"],
  794. state["_tensor_transport"],
  795. )
  796. class _ActorClassMethodMetadata(object):
  797. """Metadata for all methods in an actor class. This data can be cached.
  798. Attributes:
  799. methods: The actor methods.
  800. decorators: Optional decorators that should be applied to the
  801. method invocation function before invoking the actor methods. These
  802. can be set by attaching the attribute
  803. "__ray_invocation_decorator__" to the actor method.
  804. signatures: The signatures of the methods.
  805. num_returns: The default number of return values for
  806. each actor method.
  807. max_task_retries: Number of retries on method failure.
  808. retry_exceptions: Boolean of whether you want to retry all user-raised
  809. exceptions, or a list of allowlist exceptions to retry, for each method.
  810. enable_task_events: True if tracing is enabled, i.e., task events from
  811. the actor should be reported. Defaults to True.
  812. """
  813. _cache = {} # This cache will be cleared in ray._private.worker.disconnect()
  814. def __init__(self):
  815. class_name = type(self).__name__
  816. raise TypeError(
  817. f"{class_name} can not be constructed directly, "
  818. f"instead of running '{class_name}()', "
  819. f"try '{class_name}.create()'"
  820. )
  821. @classmethod
  822. def reset_cache(cls):
  823. cls._cache.clear()
  824. @classmethod
  825. def create(
  826. cls,
  827. modified_class,
  828. actor_creation_function_descriptor,
  829. ):
  830. # Try to create an instance from cache.
  831. cached_meta = cls._cache.get(actor_creation_function_descriptor)
  832. if cached_meta is not None:
  833. return cached_meta
  834. # Create an instance without __init__ called.
  835. self = cls.__new__(cls)
  836. actor_methods = inspect.getmembers(modified_class, is_function_or_method)
  837. self.methods = dict(actor_methods)
  838. # Extract the signatures of each of the methods. This will be used
  839. # to catch some errors if the methods are called with inappropriate
  840. # arguments.
  841. self.decorators = {}
  842. self.signatures = {}
  843. self.num_returns = {}
  844. self.max_task_retries = {}
  845. self.retry_exceptions = {}
  846. self.method_is_generator = {}
  847. self.enable_task_events = {}
  848. self.generator_backpressure_num_objects = {}
  849. self.concurrency_group_for_methods = {}
  850. self.method_name_to_tensor_transport: Dict[str, str] = {}
  851. # Check whether any actor methods specify a non-default tensor transport.
  852. self.has_tensor_transport_methods = any(
  853. getattr(
  854. method,
  855. "__ray_tensor_transport__",
  856. None,
  857. )
  858. is not None
  859. for _, method in actor_methods
  860. )
  861. for method_name, method in actor_methods:
  862. # Whether or not this method requires binding of its first
  863. # argument. For class and static methods, we do not want to bind
  864. # the first argument, but we do for instance methods
  865. method = inspect.unwrap(method)
  866. is_bound = is_class_method(method) or is_static_method(
  867. modified_class, method_name
  868. )
  869. # Print a warning message if the method signature is not
  870. # supported. We don't raise an exception because if the actor
  871. # inherits from a class that has a method whose signature we
  872. # don't support, there may not be much the user can do about it.
  873. self.signatures[method_name] = signature.extract_signature(
  874. method, ignore_first=not is_bound
  875. )
  876. # Set the default number of return values for this method.
  877. if hasattr(method, "__ray_num_returns__"):
  878. self.num_returns[method_name] = method.__ray_num_returns__
  879. else:
  880. self.num_returns[method_name] = None
  881. # Only contains entries from `@ray.method(max_task_retries=...)`
  882. # Ray may not populate the others with max_task_retries here because you may
  883. # have set in `actor.method.options(max_task_retries=...)`. So Ray always
  884. # stores max_task_retries both from the method and from the actor, and
  885. # favors the former.
  886. if hasattr(method, "__ray_max_task_retries__"):
  887. self.max_task_retries[method_name] = method.__ray_max_task_retries__
  888. if hasattr(method, "__ray_retry_exceptions__"):
  889. self.retry_exceptions[method_name] = method.__ray_retry_exceptions__
  890. if hasattr(method, "__ray_invocation_decorator__"):
  891. self.decorators[method_name] = method.__ray_invocation_decorator__
  892. if hasattr(method, "__ray_concurrency_group__"):
  893. self.concurrency_group_for_methods[
  894. method_name
  895. ] = method.__ray_concurrency_group__
  896. if hasattr(method, "__ray_enable_task_events__"):
  897. self.enable_task_events[method_name] = method.__ray_enable_task_events__
  898. is_generator = inspect.isgeneratorfunction(
  899. method
  900. ) or inspect.isasyncgenfunction(method)
  901. self.method_is_generator[method_name] = is_generator
  902. if hasattr(method, "__ray_generator_backpressure_num_objects__"):
  903. self.generator_backpressure_num_objects[
  904. method_name
  905. ] = method.__ray_generator_backpressure_num_objects__
  906. if hasattr(method, "__ray_tensor_transport__"):
  907. self.method_name_to_tensor_transport[
  908. method_name
  909. ] = method.__ray_tensor_transport__
  910. # Update cache.
  911. cls._cache[actor_creation_function_descriptor] = self
  912. return self
  913. class _ActorClassMetadata:
  914. """Metadata for an actor class.
  915. Attributes:
  916. language: The actor language, e.g. Python, Java.
  917. modified_class: The original class that was decorated (with some
  918. additional methods added like __ray_terminate__).
  919. actor_creation_function_descriptor: The function descriptor for
  920. the actor creation task.
  921. class_id: The ID of this actor class.
  922. method_meta: The actor method metadata.
  923. class_name: The name of this class.
  924. num_cpus: The default number of CPUs required by the actor creation
  925. task.
  926. num_gpus: The default number of GPUs required by the actor creation
  927. task.
  928. memory: The heap memory quota for this actor.
  929. resources: The default resources required by the actor creation task.
  930. label_selector: The labels required for the node on which this actor
  931. can be scheduled on. The label selector consist of key-value pairs, where the keys
  932. are label names and the value are expressions consisting of an operator with label
  933. values or just a value to indicate equality.
  934. fallback_strategy: If specified, expresses soft constraints through a list of decorator
  935. options to fall back on when scheduling on a node. Decorator options are evaluated
  936. together during scheduling. The first satisfied dict of options is used. Currently
  937. only `label_selector` is a supported option.
  938. accelerator_type: The specified type of accelerator required for the
  939. node on which this actor runs.
  940. See :ref:`accelerator types <accelerator_types>`.
  941. runtime_env: The runtime environment for this actor.
  942. scheduling_strategy: Strategy about how to schedule this actor.
  943. last_export_cluster_and_job: A pair of the last exported cluster
  944. and job to help us to know whether this function was exported.
  945. This is an imperfect mechanism used to determine if we need to
  946. export the remote function again. It is imperfect in the sense that
  947. the actor class definition could be exported multiple times by
  948. different workers.
  949. enable_tensor_transport: Whether to enable out-of-band tensor transport
  950. for this actor.
  951. """
  952. def __init__(
  953. self,
  954. language,
  955. modified_class,
  956. actor_creation_function_descriptor,
  957. class_id,
  958. method_meta,
  959. max_restarts,
  960. max_task_retries,
  961. num_cpus,
  962. num_gpus,
  963. memory,
  964. object_store_memory,
  965. resources,
  966. label_selector,
  967. fallback_strategy,
  968. accelerator_type,
  969. runtime_env,
  970. concurrency_groups,
  971. scheduling_strategy: SchedulingStrategyT,
  972. enable_tensor_transport: bool,
  973. ):
  974. self.language = language
  975. self.modified_class = modified_class
  976. self.actor_creation_function_descriptor = actor_creation_function_descriptor
  977. self.method_meta = method_meta
  978. self.class_name = actor_creation_function_descriptor.class_name
  979. self.is_cross_language = language != Language.PYTHON
  980. self.class_id = class_id
  981. self.max_restarts = max_restarts
  982. self.max_task_retries = max_task_retries
  983. self.num_cpus = num_cpus
  984. self.num_gpus = num_gpus
  985. self.memory = memory
  986. self.object_store_memory = object_store_memory
  987. self.resources = resources
  988. self.label_selector = label_selector
  989. self.fallback_strategy = fallback_strategy
  990. self.accelerator_type = accelerator_type
  991. self.runtime_env = runtime_env
  992. self.concurrency_groups = concurrency_groups
  993. self.scheduling_strategy = scheduling_strategy
  994. self.last_export_cluster_and_job = None
  995. self.enable_tensor_transport = enable_tensor_transport
  996. @PublicAPI
  997. class ActorClassInheritanceException(TypeError):
  998. pass
  999. def _process_option_dict(actor_options, has_tensor_transport_methods):
  1000. _filled_options = {}
  1001. arg_names = set(inspect.getfullargspec(_ActorClassMetadata.__init__)[0])
  1002. for k, v in ray_option_utils.actor_options.items():
  1003. if k in arg_names:
  1004. _filled_options[k] = actor_options.get(k, v.default_value)
  1005. _filled_options["runtime_env"] = parse_runtime_env_for_task_or_actor(
  1006. _filled_options["runtime_env"]
  1007. )
  1008. # If any actor method has a non-default tensor transport, automatically
  1009. # enable tensor transport, unless it was explicitly set to False by the
  1010. # user.
  1011. if has_tensor_transport_methods:
  1012. if _filled_options["enable_tensor_transport"] is False:
  1013. raise ValueError(
  1014. "Actor class has methods with @ray.method(tensor_transport=...) decorator but @ray.remote(enable_tensor_transport=False). "
  1015. "Either set enable_tensor_transport=True or remove the @ray.method(tensor_transport=...) decorator from the methods."
  1016. )
  1017. _filled_options["enable_tensor_transport"] = True
  1018. # Ray GPU objects requires a background thread for data transfer. However,
  1019. # currently by default the background thread will be blocked if the main
  1020. # thread does not yield. For now, we explicitly create the background thread
  1021. # if `@ray.remote(enable_tensor_transport=True)` or if any methods are
  1022. # decorated with `@ray.method(tensor_transport=...)` and a non-default
  1023. # tensor transport. This forces Ray to execute all tasks on background
  1024. # threads instead of the main thread.
  1025. # TODO(swang): Remove this code once
  1026. # https://github.com/ray-project/ray/issues/54639 is fixed.
  1027. enable_tensor_transport = _filled_options.get("enable_tensor_transport", False)
  1028. if enable_tensor_transport:
  1029. if _filled_options.get("concurrency_groups", None) is None:
  1030. _filled_options["concurrency_groups"] = {}
  1031. _filled_options["concurrency_groups"]["_ray_system"] = 1
  1032. _filled_options["concurrency_groups"]["_ray_system_error"] = 1
  1033. return _filled_options
  1034. @PublicAPI
  1035. class ActorClass(Generic[T]):
  1036. """An actor class.
  1037. This is a decorated class. It can be used to create actors.
  1038. Attributes:
  1039. __ray_metadata__: Contains metadata for the actor.
  1040. """
  1041. def __init__(cls, name, bases, attr):
  1042. """Prevents users from directly inheriting from an ActorClass.
  1043. This will be called when a class is defined with an ActorClass object
  1044. as one of its base classes. To intentionally construct an ActorClass,
  1045. use the '_ray_from_modified_class' classmethod.
  1046. Raises:
  1047. ActorClassInheritanceException: When ActorClass is inherited.
  1048. AssertionError: If ActorClassInheritanceException is not raised i.e.,
  1049. conditions for raising it are not met in any
  1050. iteration of the loop.
  1051. TypeError: In all other cases.
  1052. """
  1053. for base in bases:
  1054. if isinstance(base, ActorClass):
  1055. raise ActorClassInheritanceException(
  1056. f"Attempted to define subclass '{name}' of actor "
  1057. f"class '{base.__ray_metadata__.class_name}'. "
  1058. "Inheriting from actor classes is "
  1059. "not currently supported. You can instead "
  1060. "inherit from a non-actor base class and make "
  1061. "the derived class an actor class (with "
  1062. "@ray.remote)."
  1063. )
  1064. # This shouldn't be reached because one of the base classes must be
  1065. # an actor class if this was meant to be subclassed.
  1066. assert False, (
  1067. "ActorClass.__init__ should not be called. Please use "
  1068. "the @ray.remote decorator instead."
  1069. )
  1070. def __call__(self, *args, **kwargs):
  1071. """Prevents users from directly instantiating an ActorClass.
  1072. This will be called instead of __init__ when 'ActorClass()' is executed
  1073. because an is an object rather than a metaobject. To properly
  1074. instantiated a remote actor, use 'ActorClass.remote()'.
  1075. Raises:
  1076. Exception: Always.
  1077. """
  1078. raise TypeError(
  1079. "Actors cannot be instantiated directly. "
  1080. f"Instead of '{self.__ray_metadata__.class_name}()', "
  1081. f"use '{self.__ray_metadata__.class_name}.remote()'."
  1082. )
  1083. @classmethod
  1084. def _ray_from_modified_class(
  1085. cls,
  1086. modified_class,
  1087. class_id,
  1088. actor_options,
  1089. ):
  1090. for attribute in [
  1091. "remote",
  1092. "_remote",
  1093. "_ray_from_modified_class",
  1094. "_ray_from_function_descriptor",
  1095. ]:
  1096. if hasattr(modified_class, attribute):
  1097. logger.warning(
  1098. "Creating an actor from class "
  1099. f"{modified_class.__name__} overwrites "
  1100. f"attribute {attribute} of that class"
  1101. )
  1102. # Make sure the actor class we are constructing inherits from the
  1103. # original class so it retains all class properties.
  1104. class DerivedActorClass(cls, modified_class):
  1105. def __init__(self, *args, **kwargs):
  1106. try:
  1107. cls.__init__(self, *args, **kwargs)
  1108. except Exception as e:
  1109. # Delegate call to modified_class.__init__ only
  1110. # if the exception raised by cls.__init__ is
  1111. # TypeError and not ActorClassInheritanceException(TypeError).
  1112. # In all other cases proceed with raise e.
  1113. if isinstance(e, TypeError) and not isinstance(
  1114. e, ActorClassInheritanceException
  1115. ):
  1116. modified_class.__init__(self, *args, **kwargs)
  1117. else:
  1118. raise e
  1119. name = f"ActorClass({modified_class.__name__})"
  1120. DerivedActorClass.__module__ = modified_class.__module__
  1121. DerivedActorClass.__name__ = name
  1122. DerivedActorClass.__qualname__ = name
  1123. # Construct the base object.
  1124. self = DerivedActorClass.__new__(DerivedActorClass)
  1125. # Actor creation function descriptor.
  1126. actor_creation_function_descriptor = PythonFunctionDescriptor.from_class(
  1127. modified_class.__ray_actor_class__
  1128. )
  1129. actor_method_meta = _ActorClassMethodMetadata.create(
  1130. modified_class,
  1131. actor_creation_function_descriptor,
  1132. )
  1133. self.__ray_metadata__ = _ActorClassMetadata(
  1134. Language.PYTHON,
  1135. modified_class,
  1136. actor_creation_function_descriptor,
  1137. class_id,
  1138. actor_method_meta,
  1139. **_process_option_dict(
  1140. actor_options, actor_method_meta.has_tensor_transport_methods
  1141. ),
  1142. )
  1143. self._default_options = actor_options
  1144. if "runtime_env" in self._default_options:
  1145. self._default_options["runtime_env"] = self.__ray_metadata__.runtime_env
  1146. return self
  1147. @classmethod
  1148. def _ray_from_function_descriptor(
  1149. cls,
  1150. language,
  1151. actor_creation_function_descriptor,
  1152. actor_options,
  1153. ):
  1154. self = ActorClass.__new__(ActorClass)
  1155. modified_class = None
  1156. actor_method_meta = _ActorClassMethodMetadata.create(
  1157. modified_class,
  1158. actor_creation_function_descriptor,
  1159. )
  1160. self.__ray_metadata__ = _ActorClassMetadata(
  1161. language,
  1162. modified_class,
  1163. actor_creation_function_descriptor,
  1164. None,
  1165. actor_method_meta,
  1166. **_process_option_dict(
  1167. actor_options, actor_method_meta.has_tensor_transport_methods
  1168. ),
  1169. )
  1170. self._default_options = actor_options
  1171. if "runtime_env" in self._default_options:
  1172. self._default_options["runtime_env"] = self.__ray_metadata__.runtime_env
  1173. return self
  1174. def remote(self, *args, **kwargs) -> ActorProxy[T]:
  1175. """Create an actor.
  1176. Args:
  1177. args: These arguments are forwarded directly to the actor
  1178. constructor.
  1179. kwargs: These arguments are forwarded directly to the actor
  1180. constructor.
  1181. Returns:
  1182. A handle to the newly created actor.
  1183. """
  1184. return self._remote(args=args, kwargs=kwargs, **self._default_options)
  1185. def options(self, **actor_options) -> "ActorClass[T]":
  1186. """Configures and overrides the actor instantiation parameters.
  1187. The arguments are the same as those that can be passed
  1188. to :obj:`ray.remote`.
  1189. Args:
  1190. num_cpus: The quantity of CPU cores to reserve
  1191. for this task or for the lifetime of the actor.
  1192. num_gpus: The quantity of GPUs to reserve
  1193. for this task or for the lifetime of the actor.
  1194. resources (Dict[str, float]): The quantity of various custom resources
  1195. to reserve for this task or for the lifetime of the actor.
  1196. This is a dictionary mapping strings (resource names) to floats.
  1197. label_selector (Dict[str, str]): If specified, requires that the actor run
  1198. on a node which meets the specified label conditions (equals, in, not in, etc.).
  1199. fallback_strategy (List[Dict[str, Any]]): If specified, expresses soft constraints
  1200. through a list of decorator options to fall back on when scheduling on a node.
  1201. accelerator_type: If specified, requires that the task or actor run
  1202. on a node with the specified type of accelerator.
  1203. See :ref:`accelerator types <accelerator_types>`.
  1204. memory: The heap memory request in bytes for this task/actor,
  1205. rounded down to the nearest integer.
  1206. object_store_memory: The object store memory request for actors only.
  1207. max_restarts: This specifies the maximum
  1208. number of times that the actor should be restarted when it dies
  1209. unexpectedly. The minimum valid value is 0 (default),
  1210. which indicates that the actor doesn't need to be restarted.
  1211. A value of -1 indicates that an actor should be restarted
  1212. indefinitely.
  1213. max_task_retries: How many times to retry an actor task if the task
  1214. fails due to a runtime error, e.g., the actor has died. The
  1215. default value is 0. If set to -1, the system will retry the
  1216. failed task until the task succeeds, or the actor has reached
  1217. its max_restarts limit. If set to `n > 0`, the system will retry
  1218. the failed task up to n times, after which the task will throw a
  1219. `RayActorError` exception upon :obj:`ray.get`. Note that Python
  1220. exceptions may trigger retries
  1221. *only if* `retry_exceptions` is set for the method, in that case
  1222. when `max_task_retries` runs out the task will rethrow the
  1223. exception from the task. You can override this number with the
  1224. method's `max_task_retries` option in `@ray.method` decorator or
  1225. in `.option()`.
  1226. max_pending_calls: Set the max number of pending calls
  1227. allowed on the actor handle. When this value is exceeded,
  1228. PendingCallsLimitExceeded will be raised for further tasks.
  1229. Note that this limit is counted per handle. -1 means that the
  1230. number of pending calls is unlimited.
  1231. max_concurrency: The max number of concurrent calls to allow for
  1232. this actor. This only works with direct actor calls. The max
  1233. concurrency defaults to 1 for threaded execution, and 1000 for
  1234. asyncio execution. Note that the execution order is not
  1235. guaranteed when max_concurrency > 1.
  1236. allow_out_of_order_execution: Only for *actors*. Whether Ray executes actor
  1237. tasks out of order. If you're using multi-threaded
  1238. (``max_concurrency > 1``) or async actors, you can't set this to False.
  1239. Defaults to True if you're using multi-threaded or async actors, and
  1240. False otherwise. Actor task retries are always executed out of order.
  1241. name: The globally unique name for the actor, which can be used
  1242. to retrieve the actor via ray.get_actor(name) as long as the
  1243. actor is still alive.
  1244. namespace: Override the namespace to use for the actor. By default,
  1245. actors are created in an anonymous namespace. The actor can
  1246. be retrieved via ray.get_actor(name=name, namespace=namespace).
  1247. lifetime: Either `None`, which defaults to the actor will fate
  1248. share with its creator and will be deleted once its refcount
  1249. drops to zero, or "detached", which means the actor will live
  1250. as a global object independent of the creator.
  1251. runtime_env (Dict[str, Any]): Specifies the runtime environment for
  1252. this actor or task and its children. See
  1253. :ref:`runtime-environments` for detailed documentation.
  1254. scheduling_strategy: Strategy about how to
  1255. schedule a remote function or actor. Possible values are
  1256. None: ray will figure out the scheduling strategy to use, it
  1257. will either be the PlacementGroupSchedulingStrategy using parent's
  1258. placement group if parent has one and has
  1259. placement_group_capture_child_tasks set to true,
  1260. or "DEFAULT";
  1261. "DEFAULT": default hybrid scheduling;
  1262. "SPREAD": best effort spread scheduling;
  1263. `PlacementGroupSchedulingStrategy`:
  1264. placement group based scheduling;
  1265. `NodeAffinitySchedulingStrategy`:
  1266. node id based affinity scheduling.
  1267. enable_task_events: True if tracing is enabled, i.e., task events from
  1268. the actor should be reported. Defaults to True.
  1269. Examples:
  1270. .. code-block:: python
  1271. @ray.remote(num_cpus=2, resources={"CustomResource": 1})
  1272. class Foo:
  1273. def method(self):
  1274. return 1
  1275. # Class Bar will require 1 cpu instead of 2.
  1276. # It will also require no custom resources.
  1277. Bar = Foo.options(num_cpus=1, resources=None)
  1278. """
  1279. actor_cls = self
  1280. # override original options
  1281. default_options = self._default_options.copy()
  1282. # "concurrency_groups" could not be used in ".options()",
  1283. # we should remove it before merging options from '@ray.remote'.
  1284. default_options.pop("concurrency_groups", None)
  1285. updated_options = ray_option_utils.update_options(
  1286. default_options, actor_options
  1287. )
  1288. ray_option_utils.validate_actor_options(updated_options, in_options=True)
  1289. # only update runtime_env when ".options()" specifies new runtime_env
  1290. if "runtime_env" in actor_options:
  1291. updated_options["runtime_env"] = parse_runtime_env_for_task_or_actor(
  1292. updated_options["runtime_env"]
  1293. )
  1294. class ActorOptionWrapper:
  1295. def remote(self, *args, **kwargs):
  1296. return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
  1297. @DeveloperAPI
  1298. def bind(self, *args, **kwargs):
  1299. """
  1300. For Ray DAG building that creates static graph from decorated
  1301. class or functions.
  1302. """
  1303. from ray.dag.class_node import ClassNode
  1304. return ClassNode(
  1305. actor_cls.__ray_metadata__.modified_class,
  1306. args,
  1307. kwargs,
  1308. updated_options,
  1309. )
  1310. return ActorOptionWrapper()
  1311. @wrap_auto_init
  1312. @_tracing_actor_creation
  1313. def _remote(self, args=None, kwargs=None, **actor_options) -> ActorProxy[T]:
  1314. """Create an actor.
  1315. This method allows more flexibility than the remote method because
  1316. resource requirements can be specified and override the defaults in the
  1317. decorator.
  1318. Args:
  1319. args: The arguments to forward to the actor constructor.
  1320. kwargs: The keyword arguments to forward to the actor constructor.
  1321. **actor_options: Keyword arguments for configuring the actor options.
  1322. See ``ActorClass.options`` for more details.
  1323. Returns:
  1324. A handle to the newly created actor.
  1325. """
  1326. name = actor_options.get("name")
  1327. namespace = actor_options.get("namespace")
  1328. if name is not None:
  1329. if not isinstance(name, str):
  1330. raise TypeError(f"name must be None or a string, got: '{type(name)}'.")
  1331. elif name == "":
  1332. raise ValueError("Actor name cannot be an empty string.")
  1333. if namespace is not None:
  1334. ray._private.utils.validate_namespace(namespace)
  1335. # Handle the get-or-create case.
  1336. if actor_options.get("get_if_exists"):
  1337. try:
  1338. return ray.get_actor(name, namespace=namespace)
  1339. except ValueError:
  1340. # Attempt to create it (may race with other attempts).
  1341. updated_options = actor_options.copy()
  1342. updated_options["get_if_exists"] = False # prevent infinite loop
  1343. try:
  1344. return self._remote(args, kwargs, **updated_options)
  1345. except ActorAlreadyExistsError:
  1346. pass
  1347. # The actor was created between the first and second get_actor calls.
  1348. # Try to get it again to see if it's there.
  1349. return ray.get_actor(name, namespace=namespace)
  1350. # We pop the "concurrency_groups" coming from "@ray.remote" here. We no longer
  1351. # need it in "_remote()".
  1352. actor_options.pop("concurrency_groups", None)
  1353. if args is None:
  1354. args = []
  1355. if kwargs is None:
  1356. kwargs = {}
  1357. meta = self.__ray_metadata__
  1358. is_asyncio = has_async_methods(meta.modified_class)
  1359. if actor_options.get("max_concurrency") is None:
  1360. actor_options["max_concurrency"] = (
  1361. DEFAULT_MAX_CONCURRENCY_ASYNC
  1362. if is_asyncio
  1363. else ray_constants.DEFAULT_MAX_CONCURRENCY_THREADED
  1364. )
  1365. if client_mode_should_convert():
  1366. return client_mode_convert_actor(self, args, kwargs, **actor_options)
  1367. # fill actor required options
  1368. for k, v in ray_option_utils.actor_options.items():
  1369. actor_options[k] = actor_options.get(k, v.default_value)
  1370. # "concurrency_groups" already takes effects and should not apply again.
  1371. # Remove the default value here.
  1372. actor_options.pop("concurrency_groups", None)
  1373. # TODO(suquark): cleanup these fields
  1374. max_concurrency = actor_options["max_concurrency"]
  1375. lifetime = actor_options["lifetime"]
  1376. runtime_env = actor_options["runtime_env"]
  1377. placement_group = actor_options["placement_group"]
  1378. placement_group_bundle_index = actor_options["placement_group_bundle_index"]
  1379. placement_group_capture_child_tasks = actor_options[
  1380. "placement_group_capture_child_tasks"
  1381. ]
  1382. scheduling_strategy = actor_options["scheduling_strategy"]
  1383. max_restarts = actor_options["max_restarts"]
  1384. max_task_retries = actor_options["max_task_retries"]
  1385. max_pending_calls = actor_options["max_pending_calls"]
  1386. # Override enable_task_events to default for actor if not specified (i.e. None)
  1387. enable_task_events = actor_options.get("enable_task_events")
  1388. if scheduling_strategy is None or not isinstance(
  1389. scheduling_strategy, PlacementGroupSchedulingStrategy
  1390. ):
  1391. _warn_if_using_deprecated_placement_group(actor_options, 3)
  1392. worker = ray._private.worker.global_worker
  1393. worker.check_connected()
  1394. if worker.mode != ray._private.worker.WORKER_MODE:
  1395. from ray._common.usage import usage_lib
  1396. usage_lib.record_library_usage("core")
  1397. # Check whether the name is already taken.
  1398. # TODO(edoakes): this check has a race condition because two drivers
  1399. # could pass the check and then create the same named actor. We should
  1400. # instead check this when we create the actor, but that's currently an
  1401. # async call.
  1402. if name is not None:
  1403. try:
  1404. ray.get_actor(name, namespace=namespace)
  1405. except ValueError: # Name is not taken.
  1406. pass
  1407. else:
  1408. raise ActorAlreadyExistsError(
  1409. f"The name {name} (namespace={namespace}) is already "
  1410. "taken. Please use "
  1411. "a different name or get the existing actor using "
  1412. f"ray.get_actor('{name}', namespace='{namespace}')"
  1413. )
  1414. if lifetime is None:
  1415. detached = None
  1416. elif lifetime == "detached":
  1417. detached = True
  1418. elif lifetime == "non_detached":
  1419. detached = False
  1420. else:
  1421. raise ValueError(
  1422. "actor `lifetime` argument must be one of 'detached', "
  1423. "'non_detached' and 'None'."
  1424. )
  1425. # LOCAL_MODE cannot handle cross_language
  1426. if worker.mode == ray.LOCAL_MODE:
  1427. assert (
  1428. not meta.is_cross_language
  1429. ), "Cross language ActorClass cannot be executed locally."
  1430. # Export the actor.
  1431. if not meta.is_cross_language and (
  1432. meta.last_export_cluster_and_job != worker.current_cluster_and_job
  1433. ):
  1434. # If this actor class was not exported in this cluster and job,
  1435. # we need to export this function again, because current GCS
  1436. # doesn't have it.
  1437. # After serialize / deserialize modified class, the __module__
  1438. # of modified class will be ray.cloudpickle.cloudpickle.
  1439. # So, here pass actor_creation_function_descriptor to make
  1440. # sure export actor class correct.
  1441. worker.function_actor_manager.export_actor_class(
  1442. meta.modified_class,
  1443. meta.actor_creation_function_descriptor,
  1444. meta.method_meta.methods.keys(),
  1445. )
  1446. meta.last_export_cluster_and_job = worker.current_cluster_and_job
  1447. resources = ray._common.utils.resources_from_ray_options(actor_options)
  1448. # Set the actor's default resources if not already set. First three
  1449. # conditions are to check that no resources were specified in the
  1450. # decorator. Last three conditions are to check that no resources were
  1451. # specified when _remote() was called.
  1452. # TODO(suquark): In the original code, memory is not considered as resources,
  1453. # when deciding the default CPUs. It is strange, but we keep the original
  1454. # semantics in case that it breaks user applications & tests.
  1455. if not set(resources.keys()).difference({"memory", "object_store_memory"}):
  1456. # In the default case, actors acquire no resources for
  1457. # their lifetime, and actor methods will require 1 CPU.
  1458. resources.setdefault("CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE)
  1459. actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
  1460. else:
  1461. # If any resources are specified (here or in decorator), then
  1462. # all resources are acquired for the actor's lifetime and no
  1463. # resources are associated with methods.
  1464. resources.setdefault(
  1465. "CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
  1466. )
  1467. actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
  1468. # If the actor methods require CPU resources, then set the required
  1469. # placement resources. If actor_placement_resources is empty, then
  1470. # the required placement resources will be the same as resources.
  1471. actor_placement_resources = {}
  1472. assert actor_method_cpu in [0, 1]
  1473. if actor_method_cpu == 1:
  1474. actor_placement_resources = resources.copy()
  1475. actor_placement_resources["CPU"] += 1
  1476. if meta.is_cross_language:
  1477. creation_args = cross_language._format_args(worker, args, kwargs)
  1478. else:
  1479. function_signature = meta.method_meta.signatures["__init__"]
  1480. creation_args = signature.flatten_args(function_signature, args, kwargs)
  1481. use_placement_group = scheduling_strategy is not None and isinstance(
  1482. scheduling_strategy, PlacementGroupSchedulingStrategy
  1483. )
  1484. is_restartable = max_restarts > 0 or max_restarts == -1
  1485. if use_placement_group and detached and is_restartable:
  1486. # TODO(kevin85421): Checking `max_restarts > 0` is because Ray Serve currently schedules detached actors with
  1487. # placement groups. Adding the check avoids printing this warning for all Ray Serve applications. In the future,
  1488. # we should consider raising an error instead of a warning, but this is a breaking change.
  1489. logger.warning(
  1490. "Scheduling a restartable detached actor with a placement group is not recommended "
  1491. "because Ray will kill the actor when the placement group is removed and the actor will "
  1492. "not be able to be restarted."
  1493. )
  1494. if scheduling_strategy is None or isinstance(
  1495. scheduling_strategy, PlacementGroupSchedulingStrategy
  1496. ):
  1497. # TODO(jjyao) Clean this up once the
  1498. # placement_group option is removed.
  1499. # We should also consider pushing this logic down to c++
  1500. # so that it can be reused by all languages.
  1501. if isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy):
  1502. placement_group = scheduling_strategy.placement_group
  1503. placement_group_bundle_index = (
  1504. scheduling_strategy.placement_group_bundle_index
  1505. )
  1506. placement_group_capture_child_tasks = (
  1507. scheduling_strategy.placement_group_capture_child_tasks
  1508. )
  1509. if placement_group_capture_child_tasks is None:
  1510. placement_group_capture_child_tasks = (
  1511. worker.should_capture_child_tasks_in_placement_group
  1512. )
  1513. placement_group = _configure_placement_group_based_on_context(
  1514. placement_group_capture_child_tasks,
  1515. placement_group_bundle_index,
  1516. resources,
  1517. actor_placement_resources,
  1518. meta.class_name,
  1519. placement_group=placement_group,
  1520. )
  1521. if not placement_group.is_empty:
  1522. scheduling_strategy = PlacementGroupSchedulingStrategy(
  1523. placement_group,
  1524. placement_group_bundle_index,
  1525. placement_group_capture_child_tasks,
  1526. )
  1527. else:
  1528. scheduling_strategy = "DEFAULT"
  1529. serialized_runtime_env_info = None
  1530. if runtime_env is not None:
  1531. serialized_runtime_env_info = get_runtime_env_info(
  1532. runtime_env,
  1533. is_job_runtime_env=False,
  1534. serialize=True,
  1535. )
  1536. concurrency_groups_dict = {}
  1537. if meta.concurrency_groups is None:
  1538. meta.concurrency_groups = []
  1539. for cg_name in meta.concurrency_groups:
  1540. concurrency_groups_dict[cg_name] = {
  1541. "name": cg_name,
  1542. "max_concurrency": meta.concurrency_groups[cg_name],
  1543. "function_descriptors": [],
  1544. }
  1545. # Update methods
  1546. for method_name in meta.method_meta.concurrency_group_for_methods:
  1547. cg_name = meta.method_meta.concurrency_group_for_methods[method_name]
  1548. assert cg_name in concurrency_groups_dict
  1549. module_name = meta.actor_creation_function_descriptor.module_name
  1550. class_name = meta.actor_creation_function_descriptor.class_name
  1551. concurrency_groups_dict[cg_name]["function_descriptors"].append(
  1552. PythonFunctionDescriptor(module_name, method_name, class_name)
  1553. )
  1554. # Update the creation descriptor based on number of arguments
  1555. if meta.is_cross_language:
  1556. func_name = "<init>"
  1557. if meta.language == Language.CPP:
  1558. func_name = meta.actor_creation_function_descriptor.function_name
  1559. meta.actor_creation_function_descriptor = (
  1560. cross_language._get_function_descriptor_for_actor_method(
  1561. meta.language,
  1562. meta.actor_creation_function_descriptor,
  1563. func_name,
  1564. str(len(args) + len(kwargs)),
  1565. )
  1566. )
  1567. allow_out_of_order_execution = actor_options.get("allow_out_of_order_execution")
  1568. # If the actor is async or multi-threaded, default to out-of-order execution.
  1569. if allow_out_of_order_execution is None:
  1570. allow_out_of_order_execution = is_asyncio or max_concurrency > 1
  1571. if is_asyncio and not allow_out_of_order_execution:
  1572. raise ValueError(
  1573. "If you're using async actors, Ray can't execute actor tasks in order. "
  1574. "Set `allow_out_of_order_execution=True` to allow out-of-order "
  1575. "execution."
  1576. )
  1577. elif max_concurrency > 1 and not allow_out_of_order_execution:
  1578. raise ValueError(
  1579. "If you're using multi-threaded actors, Ray can't execute actor tasks "
  1580. "in order. Set `allow_out_of_order_execution=True` to allow "
  1581. "out-of-order execution."
  1582. )
  1583. actor_id = worker.core_worker.create_actor(
  1584. meta.language,
  1585. meta.actor_creation_function_descriptor,
  1586. creation_args,
  1587. max_restarts,
  1588. max_task_retries,
  1589. resources,
  1590. actor_placement_resources,
  1591. max_concurrency,
  1592. detached,
  1593. name if name is not None else "",
  1594. namespace if namespace is not None else "",
  1595. is_asyncio,
  1596. # Store actor_method_cpu in actor handle's extension data.
  1597. extension_data=str(actor_method_cpu),
  1598. serialized_runtime_env_info=serialized_runtime_env_info or "{}",
  1599. concurrency_groups_dict=concurrency_groups_dict or dict(),
  1600. max_pending_calls=max_pending_calls,
  1601. scheduling_strategy=scheduling_strategy,
  1602. enable_task_events=enable_task_events,
  1603. labels=actor_options.get("_labels"),
  1604. label_selector=actor_options.get("label_selector"),
  1605. fallback_strategy=actor_options.get("fallback_strategy"),
  1606. allow_out_of_order_execution=allow_out_of_order_execution,
  1607. enable_tensor_transport=meta.enable_tensor_transport,
  1608. )
  1609. if _actor_launch_hook:
  1610. _actor_launch_hook(
  1611. meta.actor_creation_function_descriptor, resources, scheduling_strategy
  1612. )
  1613. actor_handle = ActorHandle(
  1614. meta.language,
  1615. actor_id,
  1616. max_task_retries,
  1617. enable_task_events,
  1618. meta.method_meta.method_is_generator,
  1619. meta.method_meta.decorators,
  1620. meta.method_meta.signatures,
  1621. meta.method_meta.num_returns,
  1622. meta.method_meta.max_task_retries,
  1623. meta.method_meta.retry_exceptions,
  1624. meta.method_meta.generator_backpressure_num_objects,
  1625. meta.method_meta.enable_task_events,
  1626. meta.enable_tensor_transport,
  1627. meta.method_meta.method_name_to_tensor_transport,
  1628. actor_method_cpu,
  1629. meta.actor_creation_function_descriptor,
  1630. worker.current_cluster_and_job,
  1631. original_handle=True,
  1632. allow_out_of_order_execution=allow_out_of_order_execution,
  1633. )
  1634. if meta.enable_tensor_transport:
  1635. gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
  1636. gpu_object_manager.register_custom_transports_on_actor(actor_handle)
  1637. return actor_handle
  1638. @DeveloperAPI
  1639. def bind(self, *args, **kwargs):
  1640. """
  1641. For Ray DAG building that creates static graph from decorated
  1642. class or functions.
  1643. """
  1644. from ray.dag.class_node import ClassNode
  1645. return ClassNode(
  1646. self.__ray_metadata__.modified_class, args, kwargs, self._default_options
  1647. )
  1648. @PublicAPI
  1649. class ActorHandle(Generic[T]):
  1650. """A handle to an actor.
  1651. The fields in this class are prefixed with _ray_ to hide them from the user
  1652. and to avoid collision with actor method names.
  1653. An ActorHandle can be created in three ways. First, by calling .remote() on
  1654. an ActorClass. Second, by passing an actor handle into a task (forking the
  1655. ActorHandle). Third, by directly serializing the ActorHandle (e.g., with
  1656. cloudpickle).
  1657. Attributes:
  1658. _ray_actor_language: The actor language.
  1659. _ray_actor_id: Actor ID.
  1660. _ray_enable_task_events: The default value of whether task events is
  1661. enabled, i.e., task events from the actor should be reported.
  1662. _ray_method_is_generator: Map of method name -> if it is a generator
  1663. method.
  1664. _ray_method_decorators: Optional decorators for the function
  1665. invocation. This can be used to change the behavior on the
  1666. invocation side, whereas a regular decorator can be used to change
  1667. the behavior on the execution side.
  1668. _ray_method_signatures: The signatures of the actor methods.
  1669. _ray_method_max_task_retries: Max number of retries on method failure.
  1670. _ray_method_num_returns: The default number of return values for
  1671. each method.
  1672. _ray_method_retry_exceptions: The default value of boolean of whether you want
  1673. to retry all user-raised exceptions, or a list of allowlist exceptions to
  1674. retry.
  1675. _ray_method_generator_backpressure_num_objects: Generator-only
  1676. config. The max number of objects to generate before it
  1677. starts pausing a generator.
  1678. _ray_method_enable_task_events: The value of whether task
  1679. tracing is enabled for the actor methods. This overrides the
  1680. actor's default value (`_ray_enable_task_events`).
  1681. _ray_method_name_to_tensor_transport: A dictionary mapping method names to their
  1682. tensor transport protocol.
  1683. _ray_actor_method_cpus: The number of CPUs required by actor methods.
  1684. _ray_original_handle: True if this is the original actor handle for a
  1685. given actor. If this is true, then the actor will be destroyed when
  1686. this handle goes out of scope.
  1687. _ray_weak_ref: True means that this handle does not count towards the
  1688. distributed ref count for the actor, i.e. the actor may be GCed
  1689. while this handle is still in scope. This is set to True if the
  1690. handle was created by getting an actor by name or by getting the
  1691. self handle. It is set to False if this is the original handle or
  1692. if it was created by passing the original handle through task args
  1693. and returns.
  1694. _ray_is_cross_language: Whether this actor is cross language.
  1695. _ray_actor_creation_function_descriptor: The function descriptor
  1696. of the actor creation task.
  1697. _ray_allow_out_of_order_execution: Whether the actor can execute tasks out of order.
  1698. _ray_enable_tensor_transport: Whether tensor transport is enabled for this actor.
  1699. """
  1700. def __init__(
  1701. self,
  1702. language,
  1703. actor_id,
  1704. max_task_retries: Optional[int],
  1705. enable_task_events: bool,
  1706. method_is_generator: Dict[str, bool],
  1707. method_decorators,
  1708. method_signatures,
  1709. method_num_returns: Dict[str, Union[int, Literal["streaming"]]],
  1710. method_max_task_retries: Dict[str, int],
  1711. method_retry_exceptions: Dict[str, Union[bool, list, tuple]],
  1712. method_generator_backpressure_num_objects: Dict[str, int],
  1713. method_enable_task_events: Dict[str, bool],
  1714. enable_tensor_transport: bool,
  1715. method_name_to_tensor_transport: Dict[str, str],
  1716. actor_method_cpus: int,
  1717. actor_creation_function_descriptor,
  1718. cluster_and_job,
  1719. original_handle=False,
  1720. weak_ref: bool = False,
  1721. allow_out_of_order_execution: Optional[bool] = None,
  1722. ):
  1723. """Initialize an ActorHandle.
  1724. Args:
  1725. language: The actor language.
  1726. actor_id: The ID of the actor.
  1727. max_task_retries: The maximum number of times to retry a task when it fails.
  1728. enable_task_events: Whether task events should be enabled for this actor.
  1729. method_is_generator: Dictionary mapping method names to whether they are generator methods.
  1730. method_decorators: Dictionary mapping method names to their decorators.
  1731. method_signatures: Dictionary mapping method names to their signatures.
  1732. method_num_returns: Dictionary mapping method names to their number of return values.
  1733. method_max_task_retries: Dictionary mapping method names to their maximum task retries.
  1734. method_retry_exceptions: Dictionary mapping method names to their retry exception settings.
  1735. method_generator_backpressure_num_objects: Dictionary mapping method names to their generator backpressure settings.
  1736. method_enable_task_events: Dictionary mapping method names to whether task events are enabled.
  1737. enable_tensor_transport: Whether tensor transport is enabled for
  1738. this actor. If True, then methods can be called with
  1739. .options(tensor_transport=...) to specify a non-default tensor
  1740. transport.
  1741. method_name_to_tensor_transport: Dictionary mapping method names to their tensor transport type.
  1742. actor_method_cpus: The number of CPUs required by actor methods.
  1743. actor_creation_function_descriptor: The function descriptor for actor creation.
  1744. cluster_and_job: The cluster and job information.
  1745. original_handle: Whether this is the original actor handle.
  1746. weak_ref: Whether this is a weak reference to the actor.
  1747. allow_out_of_order_execution: Whether the actor can execute tasks out of order.
  1748. """
  1749. self._ray_actor_language = language
  1750. self._ray_actor_id = actor_id
  1751. self._ray_max_task_retries = max_task_retries
  1752. self._ray_original_handle = original_handle
  1753. self._ray_weak_ref = weak_ref
  1754. self._ray_enable_task_events = enable_task_events
  1755. self._ray_allow_out_of_order_execution = allow_out_of_order_execution
  1756. self._ray_method_is_generator = method_is_generator
  1757. self._ray_method_decorators = method_decorators
  1758. self._ray_method_signatures = method_signatures
  1759. self._ray_method_num_returns = method_num_returns
  1760. self._ray_method_max_task_retries = method_max_task_retries
  1761. self._ray_method_retry_exceptions = method_retry_exceptions
  1762. self._ray_method_generator_backpressure_num_objects = (
  1763. method_generator_backpressure_num_objects
  1764. )
  1765. self._ray_method_enable_task_events = method_enable_task_events
  1766. self._ray_enable_tensor_transport = enable_tensor_transport
  1767. self._ray_method_name_to_tensor_transport = method_name_to_tensor_transport
  1768. self._ray_actor_method_cpus = actor_method_cpus
  1769. self._ray_cluster_and_job = cluster_and_job
  1770. self._ray_is_cross_language = language != Language.PYTHON
  1771. self._ray_actor_creation_function_descriptor = (
  1772. actor_creation_function_descriptor
  1773. )
  1774. self._ray_function_descriptor = {}
  1775. # This is incremented each time `bind()` is called on an actor handle
  1776. # (in Ray DAGs), therefore capturing the bind order of the actor methods.
  1777. # TODO: this does not work properly if the caller has two copies of the
  1778. # same actor handle, and needs to be fixed.
  1779. self._ray_dag_bind_index = 0
  1780. if not self._ray_is_cross_language:
  1781. assert isinstance(
  1782. actor_creation_function_descriptor, PythonFunctionDescriptor
  1783. )
  1784. module_name = actor_creation_function_descriptor.module_name
  1785. class_name = actor_creation_function_descriptor.class_name
  1786. for method_name in self._ray_method_signatures.keys():
  1787. function_descriptor = PythonFunctionDescriptor(
  1788. module_name, method_name, class_name
  1789. )
  1790. self._ray_function_descriptor[method_name] = function_descriptor
  1791. # Build an _ActorMethodMetadata per method to cache expensive parsing logic.
  1792. # The _ActorMethodMetadata doesn't take a reference to this ActorHandle to avoid a circular reference.
  1793. # Instead, we will lazily bind this ActorHandle to the _ActorMethodMetadata when a method is invoked.
  1794. self._method_shells = {}
  1795. for method_name, method_signature in self._ray_method_signatures.items():
  1796. self._method_shells[method_name] = _ActorMethodMetadata(
  1797. method_name=method_name,
  1798. num_returns=self._ray_method_num_returns.get(method_name, None),
  1799. max_task_retries=self._ray_method_max_task_retries.get(
  1800. method_name, self._ray_max_task_retries
  1801. )
  1802. or 0,
  1803. retry_exceptions=self._ray_method_retry_exceptions.get(method_name),
  1804. is_generator=self._ray_method_is_generator.get(method_name),
  1805. generator_backpressure_num_objects=self._ray_method_generator_backpressure_num_objects.get(
  1806. method_name
  1807. ),
  1808. enable_task_events=self._ray_method_enable_task_events.get(
  1809. method_name, self._ray_enable_task_events
  1810. ),
  1811. decorator=self._ray_method_decorators.get(method_name),
  1812. signature=method_signature,
  1813. tensor_transport=self._ray_method_name_to_tensor_transport.get(
  1814. method_name
  1815. ),
  1816. )
  1817. def __del__(self):
  1818. # Weak references don't count towards the distributed ref count, so no
  1819. # need to decrement the ref count.
  1820. if self._ray_weak_ref:
  1821. return
  1822. try:
  1823. # Mark that this actor handle has gone out of scope. Once all actor
  1824. # handles are out of scope, the actor will exit.
  1825. if ray._private.worker:
  1826. worker = ray._private.worker.global_worker
  1827. if worker.connected and hasattr(worker, "core_worker"):
  1828. worker.core_worker.remove_actor_handle_reference(self._ray_actor_id)
  1829. except AttributeError:
  1830. # Suppress the attribute error which is caused by
  1831. # python destruction ordering issue.
  1832. # It only happen when python exits.
  1833. pass
  1834. def _actor_method_call(
  1835. self,
  1836. method_name: str,
  1837. args: List[Any] = None,
  1838. kwargs: Dict[str, Any] = None,
  1839. name: str = "",
  1840. num_returns: Optional[Union[int, Literal["streaming"]]] = None,
  1841. max_task_retries: int = None,
  1842. retry_exceptions: Union[bool, list, tuple] = None,
  1843. concurrency_group_name: Optional[str] = None,
  1844. generator_backpressure_num_objects: Optional[int] = None,
  1845. enable_task_events: Optional[bool] = None,
  1846. tensor_transport: Optional[str] = None,
  1847. ):
  1848. """Method execution stub for an actor handle.
  1849. This is the function that executes when
  1850. `actor.method_name.remote(*args, **kwargs)` is called. Instead of
  1851. executing locally, the method is packaged as a task and scheduled
  1852. to the remote actor instance.
  1853. Args:
  1854. method_name: The name of the actor method to execute.
  1855. args: A list of arguments for the actor method.
  1856. kwargs: A dictionary of keyword arguments for the actor method.
  1857. name: The name to give the actor method call task.
  1858. num_returns: The number of return values for the method.
  1859. max_task_retries: Number of retries when method fails.
  1860. retry_exceptions: Boolean of whether you want to retry all user-raised
  1861. exceptions, or a list of allowlist exceptions to retry.
  1862. concurrency_group_name: The name of the concurrency group to use.
  1863. generator_backpressure_num_objects: The number of objects to generate
  1864. before applying backpressure.
  1865. enable_task_events: True if tracing is enabled, i.e., task events from
  1866. the actor should be reported.
  1867. tensor_transport: The tensor transport protocol to use for the actor method.
  1868. Returns:
  1869. object_refs: A list of object refs returned by the remote actor
  1870. method.
  1871. """
  1872. worker = ray._private.worker.global_worker
  1873. args = args or []
  1874. kwargs = kwargs or {}
  1875. if self._ray_is_cross_language:
  1876. list_args = cross_language._format_args(worker, args, kwargs)
  1877. function_descriptor = cross_language._get_function_descriptor_for_actor_method( # noqa: E501
  1878. self._ray_actor_language,
  1879. self._ray_actor_creation_function_descriptor,
  1880. method_name,
  1881. # The signature for xlang should be "{length_of_arguments}" to handle
  1882. # overloaded methods.
  1883. signature=str(len(args) + len(kwargs)),
  1884. )
  1885. else:
  1886. function_signature = self._ray_method_signatures[method_name]
  1887. if not args and not kwargs and not function_signature:
  1888. list_args = []
  1889. else:
  1890. list_args = signature.flatten_args(function_signature, args, kwargs)
  1891. function_descriptor = self._ray_function_descriptor[method_name]
  1892. if worker.mode == ray.LOCAL_MODE:
  1893. assert (
  1894. not self._ray_is_cross_language
  1895. ), "Cross language remote actor method cannot be executed locally."
  1896. if num_returns == "dynamic":
  1897. num_returns = -1
  1898. elif num_returns == "streaming":
  1899. # TODO(sang): This is a temporary private API.
  1900. # Remove it when we migrate to the streaming generator.
  1901. num_returns = ray._raylet.STREAMING_GENERATOR_RETURN
  1902. retry_exception_allowlist = None
  1903. if retry_exceptions is None:
  1904. retry_exceptions = False
  1905. elif isinstance(retry_exceptions, (list, tuple)):
  1906. retry_exception_allowlist = tuple(retry_exceptions)
  1907. retry_exceptions = True
  1908. assert isinstance(
  1909. retry_exceptions, bool
  1910. ), "retry_exceptions can either be \
  1911. boolean or list/tuple of exception types."
  1912. if generator_backpressure_num_objects is None:
  1913. generator_backpressure_num_objects = -1
  1914. object_refs = worker.core_worker.submit_actor_task(
  1915. self._ray_actor_language,
  1916. self._ray_actor_id,
  1917. function_descriptor,
  1918. list_args,
  1919. name,
  1920. num_returns,
  1921. max_task_retries,
  1922. retry_exceptions,
  1923. retry_exception_allowlist,
  1924. self._ray_actor_method_cpus,
  1925. concurrency_group_name if concurrency_group_name is not None else b"",
  1926. generator_backpressure_num_objects,
  1927. enable_task_events,
  1928. tensor_transport,
  1929. )
  1930. if num_returns == STREAMING_GENERATOR_RETURN:
  1931. # Streaming generator will return a single ref
  1932. # that is for the generator task.
  1933. assert len(object_refs) == 1
  1934. generator_ref = object_refs[0]
  1935. return ObjectRefGenerator(generator_ref, worker)
  1936. if len(object_refs) == 1:
  1937. object_refs = object_refs[0]
  1938. elif len(object_refs) == 0:
  1939. object_refs = None
  1940. return object_refs
  1941. def __getattr__(self, item: str) -> Any:
  1942. """Handle dynamic attribute access for actor methods.
  1943. This method is called when accessing attributes that don't exist as direct
  1944. instance attributes. It's the core mechanism for actor method invocation.
  1945. For Python actors (99% of cases):
  1946. - We use strict validation: only methods in _method_shells are allowed
  1947. - This prevents typos and provides clear error messages
  1948. - Returns a bound ActorMethod created from the cached _ActorMethodMetadata
  1949. For cross-language actors:
  1950. - We can't validate method names client-side (the target language defines them)
  1951. - We allow arbitrary method calls to pass through
  1952. - Some Python-specific methods like `__ray_terminate__` are blocked with warnings
  1953. Args:
  1954. item: The attribute/method name being accessed
  1955. Returns:
  1956. ActorMethod: A bound method ready for .remote() calls
  1957. Raises:
  1958. AttributeError: For Python actors when accessing non-existent methods
  1959. """
  1960. # If this name matches a remote method, bind and return it.
  1961. if item in self._method_shells:
  1962. return self._method_shells[item].bind(self)
  1963. if not self._ray_is_cross_language:
  1964. raise AttributeError(
  1965. f"'{type(self).__name__}' object has " f"no attribute '{item}'"
  1966. )
  1967. if item in ["__ray_terminate__"]:
  1968. class FakeActorMethod(object):
  1969. def __call__(self, *args, **kwargs):
  1970. raise TypeError(
  1971. "Actor methods cannot be called directly. Instead "
  1972. "of running 'object.{}()', try 'object.{}.remote()'.".format(
  1973. item, item
  1974. )
  1975. )
  1976. def remote(self, *args, **kwargs):
  1977. logger.warning(
  1978. f"Actor method {item} is not supported by cross language."
  1979. )
  1980. return FakeActorMethod()
  1981. return ActorMethod(
  1982. self, # actor
  1983. item, # method_name
  1984. ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS,
  1985. 0, # max_task_retries
  1986. False, # retry_exceptions
  1987. False, # is_generator
  1988. self._ray_method_generator_backpressure_num_objects.get(item, -1),
  1989. self._ray_enable_task_events, # enable_task_events
  1990. # Currently, cross-lang actor method not support decorator
  1991. decorator=None,
  1992. signature=None,
  1993. )
  1994. # Make tab completion work.
  1995. def __dir__(self):
  1996. return self._ray_method_signatures.keys()
  1997. def __repr__(self):
  1998. return (
  1999. "Actor("
  2000. f"{self._ray_actor_creation_function_descriptor.class_name}, "
  2001. f"{self._actor_id.hex()})"
  2002. )
  2003. def __hash__(self):
  2004. return hash(self._actor_id)
  2005. def __eq__(self, __value):
  2006. return hash(self) == hash(__value)
  2007. @property
  2008. def _actor_id(self):
  2009. return self._ray_actor_id
  2010. def _get_local_state(self):
  2011. """Get the local actor state.
  2012. NOTE: this method only returns accurate actor state
  2013. after a first actor method call is made against
  2014. this actor handle due to https://github.com/ray-project/ray/pull/24600.
  2015. Returns:
  2016. ActorTableData.ActorState or None if the state is unknown.
  2017. """
  2018. worker = ray._private.worker.global_worker
  2019. worker.check_connected()
  2020. return worker.core_worker.get_local_actor_state(self._ray_actor_id)
  2021. def _serialization_helper(self):
  2022. """This is defined in order to make pickling work.
  2023. Returns:
  2024. A dictionary of the information needed to reconstruct the object.
  2025. """
  2026. worker = ray._private.worker.global_worker
  2027. worker.check_connected()
  2028. if hasattr(worker, "core_worker"):
  2029. # Non-local mode
  2030. state = worker.core_worker.serialize_actor_handle(self._ray_actor_id)
  2031. else:
  2032. # Local mode
  2033. state = (
  2034. {
  2035. "actor_language": self._ray_actor_language,
  2036. "actor_id": self._ray_actor_id,
  2037. "max_task_retries": self._ray_max_task_retries,
  2038. "enable_task_events": self._enable_task_events,
  2039. "method_is_generator": self._ray_method_is_generator,
  2040. "method_decorators": self._ray_method_decorators,
  2041. "method_signatures": self._ray_method_signatures,
  2042. "method_num_returns": self._ray_method_num_returns,
  2043. "method_max_task_retries": self._ray_method_max_task_retries,
  2044. "method_retry_exceptions": self._ray_method_retry_exceptions,
  2045. "method_generator_backpressure_num_objects": (
  2046. self._ray_method_generator_backpressure_num_objects
  2047. ),
  2048. "method_enable_task_events": self._ray_method_enable_task_events,
  2049. "enable_tensor_transport": self._ray_enable_tensor_transport,
  2050. "method_name_to_tensor_transport": self._ray_method_name_to_tensor_transport,
  2051. "actor_method_cpus": self._ray_actor_method_cpus,
  2052. "actor_creation_function_descriptor": self._ray_actor_creation_function_descriptor, # noqa: E501
  2053. },
  2054. None,
  2055. )
  2056. return (*state, self._ray_weak_ref)
  2057. @classmethod
  2058. def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None):
  2059. """This is defined in order to make pickling work.
  2060. Args:
  2061. state: The serialized state of the actor handle.
  2062. outer_object_ref: The ObjectRef that the serialized actor handle
  2063. was contained in, if any. This is used for counting references
  2064. to the actor handle.
  2065. weak_ref: Whether this was serialized from an actor handle with a
  2066. weak ref to the actor.
  2067. """
  2068. worker = ray._private.worker.global_worker
  2069. worker.check_connected()
  2070. if hasattr(worker, "core_worker"):
  2071. # Non-local mode
  2072. return worker.core_worker.deserialize_and_register_actor_handle(
  2073. state,
  2074. outer_object_ref,
  2075. weak_ref,
  2076. )
  2077. else:
  2078. # Local mode
  2079. assert worker.current_cluster_and_job == state["current_cluster_and_job"]
  2080. return cls(
  2081. # TODO(swang): Accessing the worker's current task ID is not
  2082. # thread-safe.
  2083. state["actor_language"],
  2084. state["actor_id"],
  2085. state["max_task_retries"],
  2086. state["enable_task_events"],
  2087. state["method_is_generator"],
  2088. state["method_decorators"],
  2089. state["method_signatures"],
  2090. state["method_num_returns"],
  2091. state["method_max_task_retries"],
  2092. state["method_retry_exceptions"],
  2093. state["method_generator_backpressure_num_objects"],
  2094. state["method_enable_task_events"],
  2095. state["enable_tensor_transport"],
  2096. state["method_name_to_tensor_transport"],
  2097. state["actor_method_cpus"],
  2098. state["actor_creation_function_descriptor"],
  2099. state["current_cluster_and_job"],
  2100. )
  2101. def __reduce__(self):
  2102. """This code path is used by pickling but not by Ray forking."""
  2103. (serialized, _, weak_ref) = self._serialization_helper()
  2104. # There is no outer object ref when the actor handle is
  2105. # deserialized out-of-band using pickle.
  2106. return ActorHandle._deserialization_helper, (serialized, weak_ref, None)
  2107. def _modify_class(cls):
  2108. # cls has been modified.
  2109. if hasattr(cls, "__ray_actor_class__"):
  2110. return cls
  2111. # Modify the class to have additional default methods.
  2112. class Class(cls):
  2113. __ray_actor_class__ = cls # The original actor class
  2114. def __ray_ready__(self):
  2115. return True
  2116. def __ray_call__(self, fn, *args, **kwargs):
  2117. return fn(self, *args, **kwargs)
  2118. def __ray_terminate__(self):
  2119. worker = ray._private.worker.global_worker
  2120. if worker.mode != ray.LOCAL_MODE:
  2121. ray.actor.exit_actor()
  2122. Class.__module__ = cls.__module__
  2123. Class.__name__ = cls.__name__
  2124. if not is_function_or_method(getattr(Class, "__init__", None)):
  2125. # Add __init__ if it does not exist.
  2126. # Actor creation will be executed with __init__ together.
  2127. # Assign an __init__ function will avoid many checks later on.
  2128. def __init__(self):
  2129. pass
  2130. Class.__init__ = __init__
  2131. return Class
  2132. def _make_actor(cls, actor_options):
  2133. Class = _modify_class(cls)
  2134. _inject_tracing_into_class(Class)
  2135. if "max_restarts" in actor_options:
  2136. if actor_options["max_restarts"] != -1: # -1 represents infinite restart
  2137. # Make sure we don't pass too big of an int to C++, causing
  2138. # an overflow.
  2139. actor_options["max_restarts"] = min(
  2140. actor_options["max_restarts"], ray_constants.MAX_INT64_VALUE
  2141. )
  2142. return ActorClass._ray_from_modified_class(
  2143. Class,
  2144. ActorClassID.from_random(),
  2145. actor_options,
  2146. )
  2147. @PublicAPI
  2148. def exit_actor():
  2149. """Intentionally exit the current actor.
  2150. This API can be used only inside an actor. Use ray.kill
  2151. API if you'd like to kill an actor using actor handle.
  2152. When this API is called, an exception is raised and the actor
  2153. will exit immediately. For asyncio actors, there may be a short
  2154. delay before the actor exits if the API is called from a background
  2155. task.
  2156. Any queued methods will fail. Any ``atexit``
  2157. handlers installed in the actor will be run.
  2158. Raises:
  2159. TypeError: An exception is raised if this is a driver or this
  2160. worker is not an actor.
  2161. """
  2162. worker = ray._private.worker.global_worker
  2163. if worker.mode == ray.WORKER_MODE and not worker.actor_id.is_nil():
  2164. worker.core_worker.set_current_actor_should_exit()
  2165. # In asyncio actor mode, we can't raise SystemExit because it will just
  2166. # quit the asycnio event loop thread, not the main thread. Instead, we
  2167. # raise a custom error to the main thread to tell it to exit.
  2168. if worker.core_worker.current_actor_is_asyncio():
  2169. raise AsyncioActorExit()
  2170. # Set a flag to indicate this is an intentional actor exit. This
  2171. # reduces log verbosity.
  2172. raise_sys_exit_with_custom_error_message("exit_actor() is called.")
  2173. else:
  2174. raise TypeError(
  2175. "exit_actor API is called on a non-actor worker, "
  2176. f"{worker.mode}. Call this API inside an actor methods"
  2177. "if you'd like to exit the actor gracefully."
  2178. )