| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486 |
- import inspect
- import logging
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Generic,
- List,
- Literal,
- Optional,
- Tuple,
- TypeVar,
- Union,
- overload,
- )
- try:
- from typing import Concatenate, ParamSpec
- except ImportError:
- from typing_extensions import Concatenate, ParamSpec
- import ray._common.signature as signature
- import ray._private.ray_constants as ray_constants
- import ray._raylet
- from ray import ActorClassID, Language, ObjectRef, cross_language
- from ray._common import ray_option_utils
- from ray._common.ray_constants import DEFAULT_MAX_CONCURRENCY_ASYNC
- from ray._common.ray_option_utils import _warn_if_using_deprecated_placement_group
- from ray._private.async_compat import has_async_methods
- from ray._private.auto_init_hook import wrap_auto_init
- from ray._private.client_mode_hook import (
- client_mode_convert_actor,
- client_mode_hook,
- client_mode_should_convert,
- )
- from ray._private.inspect_util import (
- is_class_method,
- is_function_or_method,
- is_static_method,
- )
- from ray._private.utils import get_runtime_env_info, parse_runtime_env_for_task_or_actor
- from ray._raylet import (
- STREAMING_GENERATOR_RETURN,
- ObjectRefGenerator,
- PythonFunctionDescriptor,
- raise_sys_exit_with_custom_error_message,
- )
- from ray.exceptions import ActorAlreadyExistsError, AsyncioActorExit
- from ray.util.annotations import DeveloperAPI, PublicAPI
- from ray.util.placement_group import _configure_placement_group_based_on_context
- from ray.util.scheduling_strategies import (
- PlacementGroupSchedulingStrategy,
- SchedulingStrategyT,
- )
- from ray.util.tracing.tracing_helper import (
- _inject_tracing_into_class,
- _tracing_actor_creation,
- _tracing_actor_method_invocation,
- )
- if TYPE_CHECKING:
- pass
- logger = logging.getLogger(__name__)
- # Hook to call with (actor, resources, strategy) on each local actor creation.
- _actor_launch_hook = None
- # TypeVar for generic ActorHandle
- T = TypeVar("T")
- # return type of ActorClass[T].remote()
- ActorProxy = Union["ActorHandle[T]", type[T]]
- _Ret = TypeVar("_Ret")
- _P = ParamSpec("_P")
- _T0 = TypeVar("_T0")
- _T1 = TypeVar("_T1")
- _T2 = TypeVar("_T2")
- _T3 = TypeVar("_T3")
- _T4 = TypeVar("_T4")
- _T5 = TypeVar("_T5")
- _T6 = TypeVar("_T6")
- _T7 = TypeVar("_T7")
- _T8 = TypeVar("_T8")
- _T9 = TypeVar("_T9")
- class _RemoteMethodNoArgs(Generic[_Ret]):
- def remote(self) -> "ObjectRef[_Ret]":
- ...
- def bind(self) -> Any:
- ...
- class _RemoteMethod0(Generic[_Ret, _T0]):
- def remote(self, __arg0: "Union[_T0, ObjectRef[_T0]]") -> "ObjectRef[_Ret]":
- ...
- def bind(self, __arg0: _T0) -> Any:
- ...
- class _RemoteMethod1(Generic[_Ret, _T0, _T1]):
- def remote(
- self, __arg0: "Union[_T0, ObjectRef[_T0]]", __arg1: "Union[_T1, ObjectRef[_T1]]"
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(self, __arg0: _T0, __arg1: _T1) -> Any:
- ...
- class _RemoteMethod2(Generic[_Ret, _T0, _T1, _T2]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(self, __arg0: _T0, __arg1: _T1, __arg2: _T2) -> Any:
- ...
- class _RemoteMethod3(Generic[_Ret, _T0, _T1, _T2, _T3]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(self, __arg0: _T0, __arg1: _T1, __arg2: _T2, __arg3: _T3) -> Any:
- ...
- class _RemoteMethod4(Generic[_Ret, _T0, _T1, _T2, _T3, _T4]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- __arg4: "Union[_T4, ObjectRef[_T4]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(
- self, __arg0: _T0, __arg1: _T1, __arg2: _T2, __arg3: _T3, __arg4: _T4
- ) -> Any:
- ...
- class _RemoteMethod5(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- __arg4: "Union[_T4, ObjectRef[_T4]]",
- __arg5: "Union[_T5, ObjectRef[_T5]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(
- self,
- __arg0: _T0,
- __arg1: _T1,
- __arg2: _T2,
- __arg3: _T3,
- __arg4: _T4,
- __arg5: _T5,
- ) -> Any:
- ...
- class _RemoteMethod6(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- __arg4: "Union[_T4, ObjectRef[_T4]]",
- __arg5: "Union[_T5, ObjectRef[_T5]]",
- __arg6: "Union[_T6, ObjectRef[_T6]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(
- self,
- __arg0: _T0,
- __arg1: _T1,
- __arg2: _T2,
- __arg3: _T3,
- __arg4: _T4,
- __arg5: _T5,
- __arg6: _T6,
- ) -> Any:
- ...
- class _RemoteMethod7(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- __arg4: "Union[_T4, ObjectRef[_T4]]",
- __arg5: "Union[_T5, ObjectRef[_T5]]",
- __arg6: "Union[_T6, ObjectRef[_T6]]",
- __arg7: "Union[_T7, ObjectRef[_T7]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(
- self,
- __arg0: _T0,
- __arg1: _T1,
- __arg2: _T2,
- __arg3: _T3,
- __arg4: _T4,
- __arg5: _T5,
- __arg6: _T6,
- __arg7: _T7,
- ) -> Any:
- ...
- class _RemoteMethod8(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- __arg4: "Union[_T4, ObjectRef[_T4]]",
- __arg5: "Union[_T5, ObjectRef[_T5]]",
- __arg6: "Union[_T6, ObjectRef[_T6]]",
- __arg7: "Union[_T7, ObjectRef[_T7]]",
- __arg8: "Union[_T8, ObjectRef[_T8]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(
- self,
- __arg0: _T0,
- __arg1: _T1,
- __arg2: _T2,
- __arg3: _T3,
- __arg4: _T4,
- __arg5: _T5,
- __arg6: _T6,
- __arg7: _T7,
- __arg8: _T8,
- ) -> Any:
- ...
- class _RemoteMethod9(Generic[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9]):
- def remote(
- self,
- __arg0: "Union[_T0, ObjectRef[_T0]]",
- __arg1: "Union[_T1, ObjectRef[_T1]]",
- __arg2: "Union[_T2, ObjectRef[_T2]]",
- __arg3: "Union[_T3, ObjectRef[_T3]]",
- __arg4: "Union[_T4, ObjectRef[_T4]]",
- __arg5: "Union[_T5, ObjectRef[_T5]]",
- __arg6: "Union[_T6, ObjectRef[_T6]]",
- __arg7: "Union[_T7, ObjectRef[_T7]]",
- __arg8: "Union[_T8, ObjectRef[_T8]]",
- __arg9: "Union[_T9, ObjectRef[_T9]]",
- ) -> "ObjectRef[_Ret]":
- ...
- def bind(
- self,
- __arg0: _T0,
- __arg1: _T1,
- __arg2: _T2,
- __arg3: _T3,
- __arg4: _T4,
- __arg5: _T5,
- __arg6: _T6,
- __arg7: _T7,
- __arg8: _T8,
- __arg9: _T9,
- ) -> Any:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0], _Ret],
- ) -> _RemoteMethod0[_Ret, _T0]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1], _Ret],
- ) -> _RemoteMethod1[_Ret, _T0, _T1]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2], _Ret],
- ) -> _RemoteMethod2[_Ret, _T0, _T1, _T2]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3], _Ret],
- ) -> _RemoteMethod3[_Ret, _T0, _T1, _T2, _T3]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4], _Ret],
- ) -> _RemoteMethod4[_Ret, _T0, _T1, _T2, _T3, _T4]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5], _Ret],
- ) -> _RemoteMethod5[_Ret, _T0, _T1, _T2, _T3, _T4, _T5]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6], _Ret],
- ) -> _RemoteMethod6[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7], _Ret],
- ) -> _RemoteMethod7[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8], _Ret],
- ) -> _RemoteMethod8[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8]:
- ...
- @overload
- def method(
- __method: Callable[[Any, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9], _Ret],
- ) -> _RemoteMethod9[_Ret, _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9]:
- ...
- @overload
- def method(
- __method: Callable[[Any], _Ret],
- ) -> _RemoteMethodNoArgs[_Ret]:
- ...
- @overload
- def method(
- *,
- num_returns: Optional[Union[int, Literal["streaming"]]] = None,
- concurrency_group: Optional[str] = None,
- max_task_retries: Optional[int] = None,
- retry_exceptions: Optional[Union[bool, list, tuple]] = None,
- _generator_backpressure_num_objects: Optional[int] = None,
- enable_task_events: Optional[bool] = None,
- tensor_transport: Optional[str] = None,
- ) -> Callable[[Callable[Concatenate[Any, _P], _Ret]], Any]:
- ...
- @PublicAPI
- @client_mode_hook
- def method(*args, **kwargs):
- """Annotate an actor method.
- .. code-block:: python
- @ray.remote
- class Foo:
- @ray.method(num_returns=2)
- def bar(self):
- return 1, 2
- f = Foo.remote()
- _, _ = f.bar.remote()
- Args:
- num_returns: The number of object refs that should be returned by
- invocations of this actor method. The default value is 1 for a
- normal actor task and "streaming" for an actor generator task (a
- function that yields objects instead of returning them).
- max_task_retries: How many times to retry an actor task if the task
- fails due to a runtime error, e.g., the actor has died. The
- default value is 0. If set to -1, the system will retry the
- failed task until the task succeeds, or the actor has reached
- its max_restarts limit. If set to `n > 0`, the system will retry
- the failed task up to n times, after which the task will throw a
- `RayActorError` exception upon :obj:`ray.get`. Note that Python
- exceptions may trigger retries
- *only if* `retry_exceptions` is set for the method, in that case
- when `max_task_retries` runs out the task will rethrow the
- exception from the task. You can override this number with the
- method's `max_task_retries` option in `@ray.method` decorator or
- in `.option()`.
- retry_exceptions: Boolean of whether to retry all Python
- exceptions, or a list of allowlist exceptions to retry. The default
- value is False (only retry tasks upon system failures and if
- max_task_retries is set)
- concurrency_group: The name of the concurrency group
- to use for the actor method. By default, the actor is
- single-threaded and runs all actor tasks on the same thread.
- See :ref:`Defining Concurrency Groups <defining-concurrency-groups>`.
- tensor_transport: [Alpha] The tensor transport protocol to
- use for the actor method. If a tensor transport is specified,
- Ray will store a *reference* instead of a copy of any torch.Tensors found inside
- values returned by this task, and the tensors will be sent directly
- to other tasks using the specified transport. The object store will be used
- when this is None (default). "NIXL", "NCCL", and "GLOO" (case-insensitive) are
- the three transports supported by default. The NCCL and GLOO transports
- require first creating a collective with the involved actors using
- :func:`ray.experimental.collective.create_collective_group`.
- See :ref:`Ray Direct Transport (RDT) <direct-transport>` for more
- details.
- """
- valid_kwargs = [
- "num_returns",
- "concurrency_group",
- "max_task_retries",
- "retry_exceptions",
- "_generator_backpressure_num_objects",
- "enable_task_events",
- "tensor_transport",
- ]
- def annotate_method(method: Callable[_P, _Ret]):
- if "num_returns" in kwargs:
- method.__ray_num_returns__ = kwargs["num_returns"]
- if "max_task_retries" in kwargs:
- method.__ray_max_task_retries__ = kwargs["max_task_retries"]
- if "retry_exceptions" in kwargs:
- method.__ray_retry_exceptions__ = kwargs["retry_exceptions"]
- if "concurrency_group" in kwargs:
- method.__ray_concurrency_group__ = kwargs["concurrency_group"]
- if "_generator_backpressure_num_objects" in kwargs:
- method.__ray_generator_backpressure_num_objects__ = kwargs[
- "_generator_backpressure_num_objects"
- ]
- if "enable_task_events" in kwargs and kwargs["enable_task_events"] is not None:
- method.__ray_enable_task_events__ = kwargs["enable_task_events"]
- if "tensor_transport" in kwargs:
- tensor_transport = kwargs["tensor_transport"]
- from ray.experimental.gpu_object_manager.util import (
- normalize_and_validate_tensor_transport,
- )
- tensor_transport = normalize_and_validate_tensor_transport(tensor_transport)
- method.__ray_tensor_transport__ = tensor_transport
- return method
- # Check if decorator is called without parentheses (args[0] would be the function)
- if len(args) == 1 and callable(args[0]) and len(kwargs) == 0:
- # Called as @ray.method (without parentheses)
- return annotate_method(args[0])
- # Called as @ray.method() or @ray.method(options...)
- error_string = (
- "The @ray.method decorator must be applied using no arguments or at "
- f"least one of the arguments in the list {valid_kwargs}, for example "
- "'@ray.method(num_returns=2)'."
- )
- assert len(args) == 0, error_string
- for key in kwargs:
- key_error_string = (
- f"Unexpected keyword argument to @ray.method: '{key}'. The "
- f"supported keyword arguments are {valid_kwargs}"
- )
- assert key in valid_kwargs, key_error_string
- return annotate_method
- class _ActorMethodMetadata:
- """A container for the metadata required to invoke an actor method.
- This class intentionally does *not* hold a reference to the `ActorHandle`, as that causes
- a circular reference that delays `ActorHandle` destruction until the Python GC runs.
- Instead, it can be used as a factory to lazily generate `ActorMethod` instances that can
- be used to submit actor tasks for this method.
- """
- def __init__(
- self,
- method_name: str,
- num_returns: Optional[Union[int, Literal["streaming"]]],
- max_task_retries: int,
- retry_exceptions: Union[bool, list, tuple],
- is_generator: bool,
- generator_backpressure_num_objects: int,
- enable_task_events: bool,
- decorator: Optional[Any] = None,
- signature: Optional[List[inspect.Parameter]] = None,
- tensor_transport: Optional[str] = None,
- ):
- """Initialize an _ActorMethodMetadata.
- Args:
- method_name: The name of the actor method.
- num_returns: The default number of return values that the method
- invocation should return. If None is given, it uses
- DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS for a normal actor task
- and "streaming" for a generator task (when `is_generator` is True).
- max_task_retries: Number of retries on method failure.
- retry_exceptions: Boolean or list/tuple of exceptions to retry.
- is_generator: True if the method is a generator.
- generator_backpressure_num_objects: Generator-only config for backpressure.
- enable_task_events: True if task events are enabled for this method.
- decorator: Optional decorator for the method invocation.
- signature: The signature of the actor method.
- tensor_transport: The tensor transport protocol to use for the actor method.
- """
- self._method_name = method_name
- # Default case.
- if num_returns is None:
- if is_generator:
- num_returns = "streaming"
- else:
- num_returns = ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS
- self._num_returns = num_returns
- self._max_task_retries = max_task_retries
- self._retry_exceptions = retry_exceptions
- self._is_generator = is_generator
- self._generator_backpressure_num_objects = generator_backpressure_num_objects
- self._enable_task_events = enable_task_events
- self._decorator = decorator
- self._signature = signature
- self._tensor_transport = tensor_transport
- def bind(self, actor_handle: "ActorHandle") -> "ActorMethod":
- """
- Produce a bound ActorMethod that holds a strong reference to actor_handle.
- """
- return ActorMethod(
- actor_handle,
- self._method_name,
- self._num_returns,
- self._max_task_retries,
- self._retry_exceptions,
- self._is_generator,
- self._generator_backpressure_num_objects,
- self._enable_task_events,
- decorator=self._decorator,
- signature=self._signature,
- tensor_transport=self._tensor_transport,
- )
- # Create objects to wrap method invocations. This is done so that we can
- # invoke methods with actor.method.remote() instead of actor.method().
- @PublicAPI
- class ActorMethod:
- """A class used to invoke an actor method.
- Note: This class should not be instantiated directly. Instead, it should
- only be used as a return value from the `@ray.method` decorator.
- """
- def __init__(
- self,
- actor,
- method_name,
- num_returns: Optional[Union[int, Literal["streaming"]]],
- max_task_retries: int,
- retry_exceptions: Union[bool, list, tuple],
- is_generator: bool,
- generator_backpressure_num_objects: int,
- enable_task_events: bool,
- decorator=None,
- signature: Optional[List[inspect.Parameter]] = None,
- tensor_transport: Optional[str] = None,
- ):
- """Initialize an ActorMethod.
- Args:
- actor: The actor instance this method belongs to.
- method_name: The name of the actor method.
- num_returns: The default number of return values that the method
- invocation should return. If None is given, it uses
- DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS for a normal actor task
- and "streaming" for a generator task (when `is_generator` is True).
- max_task_retries: Number of retries on method failure.
- retry_exceptions: Boolean of whether you want to retry all user-raised
- exceptions, or a list of allowlist exceptions to retry.
- is_generator: True if a given method is a Python generator.
- generator_backpressure_num_objects: Generator-only config.
- If a number of unconsumed objects reach this threshold,
- the actor task stops pausing.
- enable_task_events: True if task events is enabled, i.e., task events from
- the actor should be reported. Defaults to True.
- decorator: An optional decorator that should be applied to the actor
- method invocation.
- signature: The signature of the actor method. It is None only when cross
- language feature is used.
- tensor_transport: The tensor transport protocol to use for the actor method.
- """
- self._actor = actor
- self._method_name = method_name
- self._num_returns = num_returns
- # Default case.
- if self._num_returns is None:
- if is_generator:
- self._num_returns = "streaming"
- else:
- self._num_returns = ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS
- self._max_task_retries = max_task_retries
- self._retry_exceptions = retry_exceptions
- self._is_generator = is_generator
- self._generator_backpressure_num_objects = generator_backpressure_num_objects
- self._enable_task_events = enable_task_events
- self._signature = signature
- # This is a decorator that is used to wrap the function invocation (as
- # opposed to the function execution). The decorator must return a
- # function that takes in two arguments ("args" and "kwargs"). In most
- # cases, it should call the function that was passed into the decorator
- # and return the resulting ObjectRefs.
- self._decorator = decorator
- self._tensor_transport = tensor_transport
- def __call__(self, *args, **kwargs):
- raise TypeError(
- "Actor methods cannot be called directly. Instead "
- f"of running 'object.{self._method_name}()', try "
- f"'object.{self._method_name}.remote()'."
- )
- @DeveloperAPI
- def bind(self, *args, **kwargs):
- """
- Bind arguments to the actor method for Ray DAG building.
- This method generates and returns an intermediate representation (IR)
- node that indicates the actor method will be called with the given
- arguments at execution time.
- This method is used in both :ref:`Ray DAG <ray-dag-guide>` and
- :ref:`Ray Compiled Graph <ray-compiled-graph>` for building a DAG.
- """
- return self._bind(args, kwargs)
- def remote(self, *args, **kwargs):
- return self._remote(args, kwargs)
- def options(self, **options):
- """Convenience method for executing an actor method call with options.
- Same arguments as func._remote(), but returns a wrapped function
- that a non-underscore .remote() can be called on.
- Examples:
- # The following two calls are equivalent.
- >>> actor.my_method._remote(args=[x, y], name="foo", num_returns=2)
- >>> actor.my_method.options(name="foo", num_returns=2).remote(x, y)
- """
- func_cls = self
- tensor_transport = options.get("tensor_transport", None)
- if tensor_transport is not None:
- from ray.experimental.gpu_object_manager.util import (
- normalize_and_validate_tensor_transport,
- )
- tensor_transport = normalize_and_validate_tensor_transport(tensor_transport)
- options["tensor_transport"] = tensor_transport
- class FuncWrapper:
- def remote(self, *args, **kwargs):
- return func_cls._remote(args=args, kwargs=kwargs, **options)
- @DeveloperAPI
- def bind(self, *args, **kwargs):
- return func_cls._bind(args=args, kwargs=kwargs, **options)
- return FuncWrapper()
- @wrap_auto_init
- @_tracing_actor_method_invocation
- def _bind(
- self,
- args=None,
- kwargs=None,
- name="",
- num_returns=None,
- concurrency_group=None,
- _generator_backpressure_num_objects=None,
- ) -> Union["ray.dag.ClassMethodNode", Tuple["ray.dag.ClassMethodNode", ...]]:
- from ray.dag.class_node import (
- BIND_INDEX_KEY,
- IS_CLASS_METHOD_OUTPUT_KEY,
- PARENT_CLASS_NODE_KEY,
- PREV_CLASS_METHOD_CALL_KEY,
- ClassMethodNode,
- )
- # TODO(sang): unify option passing
- options = {
- "name": name,
- "num_returns": num_returns,
- "concurrency_group": concurrency_group,
- "_generator_backpressure_num_objects": _generator_backpressure_num_objects,
- }
- actor = self._actor
- if actor is None:
- # Ref is GC'ed. It happens when the actor handle is GC'ed
- # when bind is called.
- raise RuntimeError("Lost reference to actor")
- other_args_to_resolve = {
- PARENT_CLASS_NODE_KEY: actor,
- PREV_CLASS_METHOD_CALL_KEY: None,
- BIND_INDEX_KEY: actor._ray_dag_bind_index,
- }
- actor._ray_dag_bind_index += 1
- assert (
- self._signature is not None
- ), "self._signature should be set for .bind API."
- try:
- signature.validate_args(self._signature, args, kwargs)
- except TypeError as e:
- signature_copy = self._signature.copy()
- if len(signature_copy) > 0 and signature_copy[-1].name == "_ray_trace_ctx":
- # Remove the trace context arg for readability.
- signature_copy.pop(-1)
- signature_copy = inspect.Signature(parameters=signature_copy)
- raise TypeError(
- f"{str(e)}. The function `{self._method_name}` has a signature "
- f"`{signature_copy}`, but the given arguments to `bind` doesn't "
- f"match. args: {args}. kwargs: {kwargs}."
- ) from None
- node = ClassMethodNode(
- self._method_name,
- args,
- kwargs,
- options,
- other_args_to_resolve=other_args_to_resolve,
- )
- if node.num_returns > 1:
- output_nodes: List[ClassMethodNode] = []
- for i in range(node.num_returns):
- output_node = ClassMethodNode(
- f"return_idx_{i}",
- (node, i),
- dict(),
- dict(),
- {IS_CLASS_METHOD_OUTPUT_KEY: True, PARENT_CLASS_NODE_KEY: actor},
- )
- output_nodes.append(output_node)
- return tuple(output_nodes)
- else:
- return node
- @wrap_auto_init
- @_tracing_actor_method_invocation
- def _remote(
- self,
- args=None,
- kwargs=None,
- name="",
- num_returns=None,
- max_task_retries=None,
- retry_exceptions=None,
- concurrency_group=None,
- _generator_backpressure_num_objects=None,
- enable_task_events=None,
- tensor_transport: Optional[str] = None,
- ):
- if num_returns is None:
- num_returns = self._num_returns
- if max_task_retries is None:
- max_task_retries = self._max_task_retries
- if max_task_retries is None:
- max_task_retries = 0
- if retry_exceptions is None:
- retry_exceptions = self._retry_exceptions
- if enable_task_events is None:
- enable_task_events = self._enable_task_events
- if _generator_backpressure_num_objects is None:
- _generator_backpressure_num_objects = (
- self._generator_backpressure_num_objects
- )
- if tensor_transport is None:
- tensor_transport = self._tensor_transport
- if tensor_transport is not None:
- if num_returns != 1:
- raise ValueError(
- f"Currently, methods with tensor_transport={tensor_transport} only support 1 return value. "
- "Please make sure the actor method is decorated with `@ray.method(num_returns=1)` (the default)."
- )
- if not self._actor._ray_enable_tensor_transport:
- raise ValueError(
- f'Currently, methods with .options(tensor_transport="{tensor_transport}") are not supported when enable_tensor_transport=False. '
- "Please set @ray.remote(enable_tensor_transport=True) on the actor class definition."
- )
- gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
- if not gpu_object_manager.actor_has_tensor_transport(
- self._actor, tensor_transport
- ):
- raise ValueError(
- f'{self._actor} does not have tensor transport {tensor_transport} available. If using a collective-based transport ("nccl" or "gloo"), please create a communicator with '
- "`ray.experimental.collective.create_collective_group` "
- "before calling actor tasks with non-default tensor_transport."
- )
- # Wait for source actor to have the transport registered.
- gpu_object_manager.wait_until_custom_transports_registered(self._actor)
- args = args or []
- kwargs = kwargs or {}
- def invocation(args, kwargs):
- dst_actor = self._actor
- if dst_actor is None:
- # See https://github.com/ray-project/ray/issues/6265 for more details.
- raise RuntimeError(
- "Lost reference to actor. Actor handles must be stored as variables, e.g. `actor = MyActor.remote()` before calling methods."
- )
- gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
- gpu_object_manager.queue_or_trigger_out_of_band_tensor_transfer(
- dst_actor, args
- )
- return dst_actor._actor_method_call(
- self._method_name,
- args=args,
- kwargs=kwargs,
- name=name,
- num_returns=num_returns,
- max_task_retries=max_task_retries,
- retry_exceptions=retry_exceptions,
- concurrency_group_name=concurrency_group,
- generator_backpressure_num_objects=(
- _generator_backpressure_num_objects
- ),
- enable_task_events=enable_task_events,
- tensor_transport=tensor_transport,
- )
- # Apply the decorator if there is one.
- if self._decorator is not None:
- invocation = self._decorator(invocation)
- object_refs = invocation(args, kwargs)
- if tensor_transport is not None:
- # Currently, we only support RDT when num_returns is 1.
- assert isinstance(object_refs, ObjectRef)
- object_ref = object_refs
- gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
- gpu_object_manager.add_gpu_object_ref(
- object_ref, self._actor, tensor_transport
- )
- return object_refs
- def __getstate__(self):
- return {
- "actor": self._actor,
- "method_name": self._method_name,
- "num_returns": self._num_returns,
- "max_task_retries": self._max_task_retries,
- "retry_exceptions": self._retry_exceptions,
- "decorator": self._decorator,
- "is_generator": self._is_generator,
- "generator_backpressure_num_objects": self._generator_backpressure_num_objects, # noqa
- "enable_task_events": self._enable_task_events,
- "_tensor_transport": self._tensor_transport,
- }
- def __setstate__(self, state):
- self.__init__(
- state["actor"],
- state["method_name"],
- state["num_returns"],
- state["max_task_retries"],
- state["retry_exceptions"],
- state["is_generator"],
- state["generator_backpressure_num_objects"],
- state["enable_task_events"],
- state["decorator"],
- state["_tensor_transport"],
- )
- class _ActorClassMethodMetadata(object):
- """Metadata for all methods in an actor class. This data can be cached.
- Attributes:
- methods: The actor methods.
- decorators: Optional decorators that should be applied to the
- method invocation function before invoking the actor methods. These
- can be set by attaching the attribute
- "__ray_invocation_decorator__" to the actor method.
- signatures: The signatures of the methods.
- num_returns: The default number of return values for
- each actor method.
- max_task_retries: Number of retries on method failure.
- retry_exceptions: Boolean of whether you want to retry all user-raised
- exceptions, or a list of allowlist exceptions to retry, for each method.
- enable_task_events: True if tracing is enabled, i.e., task events from
- the actor should be reported. Defaults to True.
- """
- _cache = {} # This cache will be cleared in ray._private.worker.disconnect()
- def __init__(self):
- class_name = type(self).__name__
- raise TypeError(
- f"{class_name} can not be constructed directly, "
- f"instead of running '{class_name}()', "
- f"try '{class_name}.create()'"
- )
- @classmethod
- def reset_cache(cls):
- cls._cache.clear()
- @classmethod
- def create(
- cls,
- modified_class,
- actor_creation_function_descriptor,
- ):
- # Try to create an instance from cache.
- cached_meta = cls._cache.get(actor_creation_function_descriptor)
- if cached_meta is not None:
- return cached_meta
- # Create an instance without __init__ called.
- self = cls.__new__(cls)
- actor_methods = inspect.getmembers(modified_class, is_function_or_method)
- self.methods = dict(actor_methods)
- # Extract the signatures of each of the methods. This will be used
- # to catch some errors if the methods are called with inappropriate
- # arguments.
- self.decorators = {}
- self.signatures = {}
- self.num_returns = {}
- self.max_task_retries = {}
- self.retry_exceptions = {}
- self.method_is_generator = {}
- self.enable_task_events = {}
- self.generator_backpressure_num_objects = {}
- self.concurrency_group_for_methods = {}
- self.method_name_to_tensor_transport: Dict[str, str] = {}
- # Check whether any actor methods specify a non-default tensor transport.
- self.has_tensor_transport_methods = any(
- getattr(
- method,
- "__ray_tensor_transport__",
- None,
- )
- is not None
- for _, method in actor_methods
- )
- for method_name, method in actor_methods:
- # Whether or not this method requires binding of its first
- # argument. For class and static methods, we do not want to bind
- # the first argument, but we do for instance methods
- method = inspect.unwrap(method)
- is_bound = is_class_method(method) or is_static_method(
- modified_class, method_name
- )
- # Print a warning message if the method signature is not
- # supported. We don't raise an exception because if the actor
- # inherits from a class that has a method whose signature we
- # don't support, there may not be much the user can do about it.
- self.signatures[method_name] = signature.extract_signature(
- method, ignore_first=not is_bound
- )
- # Set the default number of return values for this method.
- if hasattr(method, "__ray_num_returns__"):
- self.num_returns[method_name] = method.__ray_num_returns__
- else:
- self.num_returns[method_name] = None
- # Only contains entries from `@ray.method(max_task_retries=...)`
- # Ray may not populate the others with max_task_retries here because you may
- # have set in `actor.method.options(max_task_retries=...)`. So Ray always
- # stores max_task_retries both from the method and from the actor, and
- # favors the former.
- if hasattr(method, "__ray_max_task_retries__"):
- self.max_task_retries[method_name] = method.__ray_max_task_retries__
- if hasattr(method, "__ray_retry_exceptions__"):
- self.retry_exceptions[method_name] = method.__ray_retry_exceptions__
- if hasattr(method, "__ray_invocation_decorator__"):
- self.decorators[method_name] = method.__ray_invocation_decorator__
- if hasattr(method, "__ray_concurrency_group__"):
- self.concurrency_group_for_methods[
- method_name
- ] = method.__ray_concurrency_group__
- if hasattr(method, "__ray_enable_task_events__"):
- self.enable_task_events[method_name] = method.__ray_enable_task_events__
- is_generator = inspect.isgeneratorfunction(
- method
- ) or inspect.isasyncgenfunction(method)
- self.method_is_generator[method_name] = is_generator
- if hasattr(method, "__ray_generator_backpressure_num_objects__"):
- self.generator_backpressure_num_objects[
- method_name
- ] = method.__ray_generator_backpressure_num_objects__
- if hasattr(method, "__ray_tensor_transport__"):
- self.method_name_to_tensor_transport[
- method_name
- ] = method.__ray_tensor_transport__
- # Update cache.
- cls._cache[actor_creation_function_descriptor] = self
- return self
- class _ActorClassMetadata:
- """Metadata for an actor class.
- Attributes:
- language: The actor language, e.g. Python, Java.
- modified_class: The original class that was decorated (with some
- additional methods added like __ray_terminate__).
- actor_creation_function_descriptor: The function descriptor for
- the actor creation task.
- class_id: The ID of this actor class.
- method_meta: The actor method metadata.
- class_name: The name of this class.
- num_cpus: The default number of CPUs required by the actor creation
- task.
- num_gpus: The default number of GPUs required by the actor creation
- task.
- memory: The heap memory quota for this actor.
- resources: The default resources required by the actor creation task.
- label_selector: The labels required for the node on which this actor
- can be scheduled on. The label selector consist of key-value pairs, where the keys
- are label names and the value are expressions consisting of an operator with label
- values or just a value to indicate equality.
- fallback_strategy: If specified, expresses soft constraints through a list of decorator
- options to fall back on when scheduling on a node. Decorator options are evaluated
- together during scheduling. The first satisfied dict of options is used. Currently
- only `label_selector` is a supported option.
- accelerator_type: The specified type of accelerator required for the
- node on which this actor runs.
- See :ref:`accelerator types <accelerator_types>`.
- runtime_env: The runtime environment for this actor.
- scheduling_strategy: Strategy about how to schedule this actor.
- last_export_cluster_and_job: A pair of the last exported cluster
- and job to help us to know whether this function was exported.
- This is an imperfect mechanism used to determine if we need to
- export the remote function again. It is imperfect in the sense that
- the actor class definition could be exported multiple times by
- different workers.
- enable_tensor_transport: Whether to enable out-of-band tensor transport
- for this actor.
- """
- def __init__(
- self,
- language,
- modified_class,
- actor_creation_function_descriptor,
- class_id,
- method_meta,
- max_restarts,
- max_task_retries,
- num_cpus,
- num_gpus,
- memory,
- object_store_memory,
- resources,
- label_selector,
- fallback_strategy,
- accelerator_type,
- runtime_env,
- concurrency_groups,
- scheduling_strategy: SchedulingStrategyT,
- enable_tensor_transport: bool,
- ):
- self.language = language
- self.modified_class = modified_class
- self.actor_creation_function_descriptor = actor_creation_function_descriptor
- self.method_meta = method_meta
- self.class_name = actor_creation_function_descriptor.class_name
- self.is_cross_language = language != Language.PYTHON
- self.class_id = class_id
- self.max_restarts = max_restarts
- self.max_task_retries = max_task_retries
- self.num_cpus = num_cpus
- self.num_gpus = num_gpus
- self.memory = memory
- self.object_store_memory = object_store_memory
- self.resources = resources
- self.label_selector = label_selector
- self.fallback_strategy = fallback_strategy
- self.accelerator_type = accelerator_type
- self.runtime_env = runtime_env
- self.concurrency_groups = concurrency_groups
- self.scheduling_strategy = scheduling_strategy
- self.last_export_cluster_and_job = None
- self.enable_tensor_transport = enable_tensor_transport
- @PublicAPI
- class ActorClassInheritanceException(TypeError):
- pass
- def _process_option_dict(actor_options, has_tensor_transport_methods):
- _filled_options = {}
- arg_names = set(inspect.getfullargspec(_ActorClassMetadata.__init__)[0])
- for k, v in ray_option_utils.actor_options.items():
- if k in arg_names:
- _filled_options[k] = actor_options.get(k, v.default_value)
- _filled_options["runtime_env"] = parse_runtime_env_for_task_or_actor(
- _filled_options["runtime_env"]
- )
- # If any actor method has a non-default tensor transport, automatically
- # enable tensor transport, unless it was explicitly set to False by the
- # user.
- if has_tensor_transport_methods:
- if _filled_options["enable_tensor_transport"] is False:
- raise ValueError(
- "Actor class has methods with @ray.method(tensor_transport=...) decorator but @ray.remote(enable_tensor_transport=False). "
- "Either set enable_tensor_transport=True or remove the @ray.method(tensor_transport=...) decorator from the methods."
- )
- _filled_options["enable_tensor_transport"] = True
- # Ray GPU objects requires a background thread for data transfer. However,
- # currently by default the background thread will be blocked if the main
- # thread does not yield. For now, we explicitly create the background thread
- # if `@ray.remote(enable_tensor_transport=True)` or if any methods are
- # decorated with `@ray.method(tensor_transport=...)` and a non-default
- # tensor transport. This forces Ray to execute all tasks on background
- # threads instead of the main thread.
- # TODO(swang): Remove this code once
- # https://github.com/ray-project/ray/issues/54639 is fixed.
- enable_tensor_transport = _filled_options.get("enable_tensor_transport", False)
- if enable_tensor_transport:
- if _filled_options.get("concurrency_groups", None) is None:
- _filled_options["concurrency_groups"] = {}
- _filled_options["concurrency_groups"]["_ray_system"] = 1
- _filled_options["concurrency_groups"]["_ray_system_error"] = 1
- return _filled_options
- @PublicAPI
- class ActorClass(Generic[T]):
- """An actor class.
- This is a decorated class. It can be used to create actors.
- Attributes:
- __ray_metadata__: Contains metadata for the actor.
- """
- def __init__(cls, name, bases, attr):
- """Prevents users from directly inheriting from an ActorClass.
- This will be called when a class is defined with an ActorClass object
- as one of its base classes. To intentionally construct an ActorClass,
- use the '_ray_from_modified_class' classmethod.
- Raises:
- ActorClassInheritanceException: When ActorClass is inherited.
- AssertionError: If ActorClassInheritanceException is not raised i.e.,
- conditions for raising it are not met in any
- iteration of the loop.
- TypeError: In all other cases.
- """
- for base in bases:
- if isinstance(base, ActorClass):
- raise ActorClassInheritanceException(
- f"Attempted to define subclass '{name}' of actor "
- f"class '{base.__ray_metadata__.class_name}'. "
- "Inheriting from actor classes is "
- "not currently supported. You can instead "
- "inherit from a non-actor base class and make "
- "the derived class an actor class (with "
- "@ray.remote)."
- )
- # This shouldn't be reached because one of the base classes must be
- # an actor class if this was meant to be subclassed.
- assert False, (
- "ActorClass.__init__ should not be called. Please use "
- "the @ray.remote decorator instead."
- )
- def __call__(self, *args, **kwargs):
- """Prevents users from directly instantiating an ActorClass.
- This will be called instead of __init__ when 'ActorClass()' is executed
- because an is an object rather than a metaobject. To properly
- instantiated a remote actor, use 'ActorClass.remote()'.
- Raises:
- Exception: Always.
- """
- raise TypeError(
- "Actors cannot be instantiated directly. "
- f"Instead of '{self.__ray_metadata__.class_name}()', "
- f"use '{self.__ray_metadata__.class_name}.remote()'."
- )
- @classmethod
- def _ray_from_modified_class(
- cls,
- modified_class,
- class_id,
- actor_options,
- ):
- for attribute in [
- "remote",
- "_remote",
- "_ray_from_modified_class",
- "_ray_from_function_descriptor",
- ]:
- if hasattr(modified_class, attribute):
- logger.warning(
- "Creating an actor from class "
- f"{modified_class.__name__} overwrites "
- f"attribute {attribute} of that class"
- )
- # Make sure the actor class we are constructing inherits from the
- # original class so it retains all class properties.
- class DerivedActorClass(cls, modified_class):
- def __init__(self, *args, **kwargs):
- try:
- cls.__init__(self, *args, **kwargs)
- except Exception as e:
- # Delegate call to modified_class.__init__ only
- # if the exception raised by cls.__init__ is
- # TypeError and not ActorClassInheritanceException(TypeError).
- # In all other cases proceed with raise e.
- if isinstance(e, TypeError) and not isinstance(
- e, ActorClassInheritanceException
- ):
- modified_class.__init__(self, *args, **kwargs)
- else:
- raise e
- name = f"ActorClass({modified_class.__name__})"
- DerivedActorClass.__module__ = modified_class.__module__
- DerivedActorClass.__name__ = name
- DerivedActorClass.__qualname__ = name
- # Construct the base object.
- self = DerivedActorClass.__new__(DerivedActorClass)
- # Actor creation function descriptor.
- actor_creation_function_descriptor = PythonFunctionDescriptor.from_class(
- modified_class.__ray_actor_class__
- )
- actor_method_meta = _ActorClassMethodMetadata.create(
- modified_class,
- actor_creation_function_descriptor,
- )
- self.__ray_metadata__ = _ActorClassMetadata(
- Language.PYTHON,
- modified_class,
- actor_creation_function_descriptor,
- class_id,
- actor_method_meta,
- **_process_option_dict(
- actor_options, actor_method_meta.has_tensor_transport_methods
- ),
- )
- self._default_options = actor_options
- if "runtime_env" in self._default_options:
- self._default_options["runtime_env"] = self.__ray_metadata__.runtime_env
- return self
- @classmethod
- def _ray_from_function_descriptor(
- cls,
- language,
- actor_creation_function_descriptor,
- actor_options,
- ):
- self = ActorClass.__new__(ActorClass)
- modified_class = None
- actor_method_meta = _ActorClassMethodMetadata.create(
- modified_class,
- actor_creation_function_descriptor,
- )
- self.__ray_metadata__ = _ActorClassMetadata(
- language,
- modified_class,
- actor_creation_function_descriptor,
- None,
- actor_method_meta,
- **_process_option_dict(
- actor_options, actor_method_meta.has_tensor_transport_methods
- ),
- )
- self._default_options = actor_options
- if "runtime_env" in self._default_options:
- self._default_options["runtime_env"] = self.__ray_metadata__.runtime_env
- return self
- def remote(self, *args, **kwargs) -> ActorProxy[T]:
- """Create an actor.
- Args:
- args: These arguments are forwarded directly to the actor
- constructor.
- kwargs: These arguments are forwarded directly to the actor
- constructor.
- Returns:
- A handle to the newly created actor.
- """
- return self._remote(args=args, kwargs=kwargs, **self._default_options)
- def options(self, **actor_options) -> "ActorClass[T]":
- """Configures and overrides the actor instantiation parameters.
- The arguments are the same as those that can be passed
- to :obj:`ray.remote`.
- Args:
- num_cpus: The quantity of CPU cores to reserve
- for this task or for the lifetime of the actor.
- num_gpus: The quantity of GPUs to reserve
- for this task or for the lifetime of the actor.
- resources (Dict[str, float]): The quantity of various custom resources
- to reserve for this task or for the lifetime of the actor.
- This is a dictionary mapping strings (resource names) to floats.
- label_selector (Dict[str, str]): If specified, requires that the actor run
- on a node which meets the specified label conditions (equals, in, not in, etc.).
- fallback_strategy (List[Dict[str, Any]]): If specified, expresses soft constraints
- through a list of decorator options to fall back on when scheduling on a node.
- accelerator_type: If specified, requires that the task or actor run
- on a node with the specified type of accelerator.
- See :ref:`accelerator types <accelerator_types>`.
- memory: The heap memory request in bytes for this task/actor,
- rounded down to the nearest integer.
- object_store_memory: The object store memory request for actors only.
- max_restarts: This specifies the maximum
- number of times that the actor should be restarted when it dies
- unexpectedly. The minimum valid value is 0 (default),
- which indicates that the actor doesn't need to be restarted.
- A value of -1 indicates that an actor should be restarted
- indefinitely.
- max_task_retries: How many times to retry an actor task if the task
- fails due to a runtime error, e.g., the actor has died. The
- default value is 0. If set to -1, the system will retry the
- failed task until the task succeeds, or the actor has reached
- its max_restarts limit. If set to `n > 0`, the system will retry
- the failed task up to n times, after which the task will throw a
- `RayActorError` exception upon :obj:`ray.get`. Note that Python
- exceptions may trigger retries
- *only if* `retry_exceptions` is set for the method, in that case
- when `max_task_retries` runs out the task will rethrow the
- exception from the task. You can override this number with the
- method's `max_task_retries` option in `@ray.method` decorator or
- in `.option()`.
- max_pending_calls: Set the max number of pending calls
- allowed on the actor handle. When this value is exceeded,
- PendingCallsLimitExceeded will be raised for further tasks.
- Note that this limit is counted per handle. -1 means that the
- number of pending calls is unlimited.
- max_concurrency: The max number of concurrent calls to allow for
- this actor. This only works with direct actor calls. The max
- concurrency defaults to 1 for threaded execution, and 1000 for
- asyncio execution. Note that the execution order is not
- guaranteed when max_concurrency > 1.
- allow_out_of_order_execution: Only for *actors*. Whether Ray executes actor
- tasks out of order. If you're using multi-threaded
- (``max_concurrency > 1``) or async actors, you can't set this to False.
- Defaults to True if you're using multi-threaded or async actors, and
- False otherwise. Actor task retries are always executed out of order.
- name: The globally unique name for the actor, which can be used
- to retrieve the actor via ray.get_actor(name) as long as the
- actor is still alive.
- namespace: Override the namespace to use for the actor. By default,
- actors are created in an anonymous namespace. The actor can
- be retrieved via ray.get_actor(name=name, namespace=namespace).
- lifetime: Either `None`, which defaults to the actor will fate
- share with its creator and will be deleted once its refcount
- drops to zero, or "detached", which means the actor will live
- as a global object independent of the creator.
- runtime_env (Dict[str, Any]): Specifies the runtime environment for
- this actor or task and its children. See
- :ref:`runtime-environments` for detailed documentation.
- scheduling_strategy: Strategy about how to
- schedule a remote function or actor. Possible values are
- None: ray will figure out the scheduling strategy to use, it
- will either be the PlacementGroupSchedulingStrategy using parent's
- placement group if parent has one and has
- placement_group_capture_child_tasks set to true,
- or "DEFAULT";
- "DEFAULT": default hybrid scheduling;
- "SPREAD": best effort spread scheduling;
- `PlacementGroupSchedulingStrategy`:
- placement group based scheduling;
- `NodeAffinitySchedulingStrategy`:
- node id based affinity scheduling.
- enable_task_events: True if tracing is enabled, i.e., task events from
- the actor should be reported. Defaults to True.
- Examples:
- .. code-block:: python
- @ray.remote(num_cpus=2, resources={"CustomResource": 1})
- class Foo:
- def method(self):
- return 1
- # Class Bar will require 1 cpu instead of 2.
- # It will also require no custom resources.
- Bar = Foo.options(num_cpus=1, resources=None)
- """
- actor_cls = self
- # override original options
- default_options = self._default_options.copy()
- # "concurrency_groups" could not be used in ".options()",
- # we should remove it before merging options from '@ray.remote'.
- default_options.pop("concurrency_groups", None)
- updated_options = ray_option_utils.update_options(
- default_options, actor_options
- )
- ray_option_utils.validate_actor_options(updated_options, in_options=True)
- # only update runtime_env when ".options()" specifies new runtime_env
- if "runtime_env" in actor_options:
- updated_options["runtime_env"] = parse_runtime_env_for_task_or_actor(
- updated_options["runtime_env"]
- )
- class ActorOptionWrapper:
- def remote(self, *args, **kwargs):
- return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
- @DeveloperAPI
- def bind(self, *args, **kwargs):
- """
- For Ray DAG building that creates static graph from decorated
- class or functions.
- """
- from ray.dag.class_node import ClassNode
- return ClassNode(
- actor_cls.__ray_metadata__.modified_class,
- args,
- kwargs,
- updated_options,
- )
- return ActorOptionWrapper()
- @wrap_auto_init
- @_tracing_actor_creation
- def _remote(self, args=None, kwargs=None, **actor_options) -> ActorProxy[T]:
- """Create an actor.
- This method allows more flexibility than the remote method because
- resource requirements can be specified and override the defaults in the
- decorator.
- Args:
- args: The arguments to forward to the actor constructor.
- kwargs: The keyword arguments to forward to the actor constructor.
- **actor_options: Keyword arguments for configuring the actor options.
- See ``ActorClass.options`` for more details.
- Returns:
- A handle to the newly created actor.
- """
- name = actor_options.get("name")
- namespace = actor_options.get("namespace")
- if name is not None:
- if not isinstance(name, str):
- raise TypeError(f"name must be None or a string, got: '{type(name)}'.")
- elif name == "":
- raise ValueError("Actor name cannot be an empty string.")
- if namespace is not None:
- ray._private.utils.validate_namespace(namespace)
- # Handle the get-or-create case.
- if actor_options.get("get_if_exists"):
- try:
- return ray.get_actor(name, namespace=namespace)
- except ValueError:
- # Attempt to create it (may race with other attempts).
- updated_options = actor_options.copy()
- updated_options["get_if_exists"] = False # prevent infinite loop
- try:
- return self._remote(args, kwargs, **updated_options)
- except ActorAlreadyExistsError:
- pass
- # The actor was created between the first and second get_actor calls.
- # Try to get it again to see if it's there.
- return ray.get_actor(name, namespace=namespace)
- # We pop the "concurrency_groups" coming from "@ray.remote" here. We no longer
- # need it in "_remote()".
- actor_options.pop("concurrency_groups", None)
- if args is None:
- args = []
- if kwargs is None:
- kwargs = {}
- meta = self.__ray_metadata__
- is_asyncio = has_async_methods(meta.modified_class)
- if actor_options.get("max_concurrency") is None:
- actor_options["max_concurrency"] = (
- DEFAULT_MAX_CONCURRENCY_ASYNC
- if is_asyncio
- else ray_constants.DEFAULT_MAX_CONCURRENCY_THREADED
- )
- if client_mode_should_convert():
- return client_mode_convert_actor(self, args, kwargs, **actor_options)
- # fill actor required options
- for k, v in ray_option_utils.actor_options.items():
- actor_options[k] = actor_options.get(k, v.default_value)
- # "concurrency_groups" already takes effects and should not apply again.
- # Remove the default value here.
- actor_options.pop("concurrency_groups", None)
- # TODO(suquark): cleanup these fields
- max_concurrency = actor_options["max_concurrency"]
- lifetime = actor_options["lifetime"]
- runtime_env = actor_options["runtime_env"]
- placement_group = actor_options["placement_group"]
- placement_group_bundle_index = actor_options["placement_group_bundle_index"]
- placement_group_capture_child_tasks = actor_options[
- "placement_group_capture_child_tasks"
- ]
- scheduling_strategy = actor_options["scheduling_strategy"]
- max_restarts = actor_options["max_restarts"]
- max_task_retries = actor_options["max_task_retries"]
- max_pending_calls = actor_options["max_pending_calls"]
- # Override enable_task_events to default for actor if not specified (i.e. None)
- enable_task_events = actor_options.get("enable_task_events")
- if scheduling_strategy is None or not isinstance(
- scheduling_strategy, PlacementGroupSchedulingStrategy
- ):
- _warn_if_using_deprecated_placement_group(actor_options, 3)
- worker = ray._private.worker.global_worker
- worker.check_connected()
- if worker.mode != ray._private.worker.WORKER_MODE:
- from ray._common.usage import usage_lib
- usage_lib.record_library_usage("core")
- # Check whether the name is already taken.
- # TODO(edoakes): this check has a race condition because two drivers
- # could pass the check and then create the same named actor. We should
- # instead check this when we create the actor, but that's currently an
- # async call.
- if name is not None:
- try:
- ray.get_actor(name, namespace=namespace)
- except ValueError: # Name is not taken.
- pass
- else:
- raise ActorAlreadyExistsError(
- f"The name {name} (namespace={namespace}) is already "
- "taken. Please use "
- "a different name or get the existing actor using "
- f"ray.get_actor('{name}', namespace='{namespace}')"
- )
- if lifetime is None:
- detached = None
- elif lifetime == "detached":
- detached = True
- elif lifetime == "non_detached":
- detached = False
- else:
- raise ValueError(
- "actor `lifetime` argument must be one of 'detached', "
- "'non_detached' and 'None'."
- )
- # LOCAL_MODE cannot handle cross_language
- if worker.mode == ray.LOCAL_MODE:
- assert (
- not meta.is_cross_language
- ), "Cross language ActorClass cannot be executed locally."
- # Export the actor.
- if not meta.is_cross_language and (
- meta.last_export_cluster_and_job != worker.current_cluster_and_job
- ):
- # If this actor class was not exported in this cluster and job,
- # we need to export this function again, because current GCS
- # doesn't have it.
- # After serialize / deserialize modified class, the __module__
- # of modified class will be ray.cloudpickle.cloudpickle.
- # So, here pass actor_creation_function_descriptor to make
- # sure export actor class correct.
- worker.function_actor_manager.export_actor_class(
- meta.modified_class,
- meta.actor_creation_function_descriptor,
- meta.method_meta.methods.keys(),
- )
- meta.last_export_cluster_and_job = worker.current_cluster_and_job
- resources = ray._common.utils.resources_from_ray_options(actor_options)
- # Set the actor's default resources if not already set. First three
- # conditions are to check that no resources were specified in the
- # decorator. Last three conditions are to check that no resources were
- # specified when _remote() was called.
- # TODO(suquark): In the original code, memory is not considered as resources,
- # when deciding the default CPUs. It is strange, but we keep the original
- # semantics in case that it breaks user applications & tests.
- if not set(resources.keys()).difference({"memory", "object_store_memory"}):
- # In the default case, actors acquire no resources for
- # their lifetime, and actor methods will require 1 CPU.
- resources.setdefault("CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE)
- actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
- else:
- # If any resources are specified (here or in decorator), then
- # all resources are acquired for the actor's lifetime and no
- # resources are associated with methods.
- resources.setdefault(
- "CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
- )
- actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
- # If the actor methods require CPU resources, then set the required
- # placement resources. If actor_placement_resources is empty, then
- # the required placement resources will be the same as resources.
- actor_placement_resources = {}
- assert actor_method_cpu in [0, 1]
- if actor_method_cpu == 1:
- actor_placement_resources = resources.copy()
- actor_placement_resources["CPU"] += 1
- if meta.is_cross_language:
- creation_args = cross_language._format_args(worker, args, kwargs)
- else:
- function_signature = meta.method_meta.signatures["__init__"]
- creation_args = signature.flatten_args(function_signature, args, kwargs)
- use_placement_group = scheduling_strategy is not None and isinstance(
- scheduling_strategy, PlacementGroupSchedulingStrategy
- )
- is_restartable = max_restarts > 0 or max_restarts == -1
- if use_placement_group and detached and is_restartable:
- # TODO(kevin85421): Checking `max_restarts > 0` is because Ray Serve currently schedules detached actors with
- # placement groups. Adding the check avoids printing this warning for all Ray Serve applications. In the future,
- # we should consider raising an error instead of a warning, but this is a breaking change.
- logger.warning(
- "Scheduling a restartable detached actor with a placement group is not recommended "
- "because Ray will kill the actor when the placement group is removed and the actor will "
- "not be able to be restarted."
- )
- if scheduling_strategy is None or isinstance(
- scheduling_strategy, PlacementGroupSchedulingStrategy
- ):
- # TODO(jjyao) Clean this up once the
- # placement_group option is removed.
- # We should also consider pushing this logic down to c++
- # so that it can be reused by all languages.
- if isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy):
- placement_group = scheduling_strategy.placement_group
- placement_group_bundle_index = (
- scheduling_strategy.placement_group_bundle_index
- )
- placement_group_capture_child_tasks = (
- scheduling_strategy.placement_group_capture_child_tasks
- )
- if placement_group_capture_child_tasks is None:
- placement_group_capture_child_tasks = (
- worker.should_capture_child_tasks_in_placement_group
- )
- placement_group = _configure_placement_group_based_on_context(
- placement_group_capture_child_tasks,
- placement_group_bundle_index,
- resources,
- actor_placement_resources,
- meta.class_name,
- placement_group=placement_group,
- )
- if not placement_group.is_empty:
- scheduling_strategy = PlacementGroupSchedulingStrategy(
- placement_group,
- placement_group_bundle_index,
- placement_group_capture_child_tasks,
- )
- else:
- scheduling_strategy = "DEFAULT"
- serialized_runtime_env_info = None
- if runtime_env is not None:
- serialized_runtime_env_info = get_runtime_env_info(
- runtime_env,
- is_job_runtime_env=False,
- serialize=True,
- )
- concurrency_groups_dict = {}
- if meta.concurrency_groups is None:
- meta.concurrency_groups = []
- for cg_name in meta.concurrency_groups:
- concurrency_groups_dict[cg_name] = {
- "name": cg_name,
- "max_concurrency": meta.concurrency_groups[cg_name],
- "function_descriptors": [],
- }
- # Update methods
- for method_name in meta.method_meta.concurrency_group_for_methods:
- cg_name = meta.method_meta.concurrency_group_for_methods[method_name]
- assert cg_name in concurrency_groups_dict
- module_name = meta.actor_creation_function_descriptor.module_name
- class_name = meta.actor_creation_function_descriptor.class_name
- concurrency_groups_dict[cg_name]["function_descriptors"].append(
- PythonFunctionDescriptor(module_name, method_name, class_name)
- )
- # Update the creation descriptor based on number of arguments
- if meta.is_cross_language:
- func_name = "<init>"
- if meta.language == Language.CPP:
- func_name = meta.actor_creation_function_descriptor.function_name
- meta.actor_creation_function_descriptor = (
- cross_language._get_function_descriptor_for_actor_method(
- meta.language,
- meta.actor_creation_function_descriptor,
- func_name,
- str(len(args) + len(kwargs)),
- )
- )
- allow_out_of_order_execution = actor_options.get("allow_out_of_order_execution")
- # If the actor is async or multi-threaded, default to out-of-order execution.
- if allow_out_of_order_execution is None:
- allow_out_of_order_execution = is_asyncio or max_concurrency > 1
- if is_asyncio and not allow_out_of_order_execution:
- raise ValueError(
- "If you're using async actors, Ray can't execute actor tasks in order. "
- "Set `allow_out_of_order_execution=True` to allow out-of-order "
- "execution."
- )
- elif max_concurrency > 1 and not allow_out_of_order_execution:
- raise ValueError(
- "If you're using multi-threaded actors, Ray can't execute actor tasks "
- "in order. Set `allow_out_of_order_execution=True` to allow "
- "out-of-order execution."
- )
- actor_id = worker.core_worker.create_actor(
- meta.language,
- meta.actor_creation_function_descriptor,
- creation_args,
- max_restarts,
- max_task_retries,
- resources,
- actor_placement_resources,
- max_concurrency,
- detached,
- name if name is not None else "",
- namespace if namespace is not None else "",
- is_asyncio,
- # Store actor_method_cpu in actor handle's extension data.
- extension_data=str(actor_method_cpu),
- serialized_runtime_env_info=serialized_runtime_env_info or "{}",
- concurrency_groups_dict=concurrency_groups_dict or dict(),
- max_pending_calls=max_pending_calls,
- scheduling_strategy=scheduling_strategy,
- enable_task_events=enable_task_events,
- labels=actor_options.get("_labels"),
- label_selector=actor_options.get("label_selector"),
- fallback_strategy=actor_options.get("fallback_strategy"),
- allow_out_of_order_execution=allow_out_of_order_execution,
- enable_tensor_transport=meta.enable_tensor_transport,
- )
- if _actor_launch_hook:
- _actor_launch_hook(
- meta.actor_creation_function_descriptor, resources, scheduling_strategy
- )
- actor_handle = ActorHandle(
- meta.language,
- actor_id,
- max_task_retries,
- enable_task_events,
- meta.method_meta.method_is_generator,
- meta.method_meta.decorators,
- meta.method_meta.signatures,
- meta.method_meta.num_returns,
- meta.method_meta.max_task_retries,
- meta.method_meta.retry_exceptions,
- meta.method_meta.generator_backpressure_num_objects,
- meta.method_meta.enable_task_events,
- meta.enable_tensor_transport,
- meta.method_meta.method_name_to_tensor_transport,
- actor_method_cpu,
- meta.actor_creation_function_descriptor,
- worker.current_cluster_and_job,
- original_handle=True,
- allow_out_of_order_execution=allow_out_of_order_execution,
- )
- if meta.enable_tensor_transport:
- gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
- gpu_object_manager.register_custom_transports_on_actor(actor_handle)
- return actor_handle
- @DeveloperAPI
- def bind(self, *args, **kwargs):
- """
- For Ray DAG building that creates static graph from decorated
- class or functions.
- """
- from ray.dag.class_node import ClassNode
- return ClassNode(
- self.__ray_metadata__.modified_class, args, kwargs, self._default_options
- )
- @PublicAPI
- class ActorHandle(Generic[T]):
- """A handle to an actor.
- The fields in this class are prefixed with _ray_ to hide them from the user
- and to avoid collision with actor method names.
- An ActorHandle can be created in three ways. First, by calling .remote() on
- an ActorClass. Second, by passing an actor handle into a task (forking the
- ActorHandle). Third, by directly serializing the ActorHandle (e.g., with
- cloudpickle).
- Attributes:
- _ray_actor_language: The actor language.
- _ray_actor_id: Actor ID.
- _ray_enable_task_events: The default value of whether task events is
- enabled, i.e., task events from the actor should be reported.
- _ray_method_is_generator: Map of method name -> if it is a generator
- method.
- _ray_method_decorators: Optional decorators for the function
- invocation. This can be used to change the behavior on the
- invocation side, whereas a regular decorator can be used to change
- the behavior on the execution side.
- _ray_method_signatures: The signatures of the actor methods.
- _ray_method_max_task_retries: Max number of retries on method failure.
- _ray_method_num_returns: The default number of return values for
- each method.
- _ray_method_retry_exceptions: The default value of boolean of whether you want
- to retry all user-raised exceptions, or a list of allowlist exceptions to
- retry.
- _ray_method_generator_backpressure_num_objects: Generator-only
- config. The max number of objects to generate before it
- starts pausing a generator.
- _ray_method_enable_task_events: The value of whether task
- tracing is enabled for the actor methods. This overrides the
- actor's default value (`_ray_enable_task_events`).
- _ray_method_name_to_tensor_transport: A dictionary mapping method names to their
- tensor transport protocol.
- _ray_actor_method_cpus: The number of CPUs required by actor methods.
- _ray_original_handle: True if this is the original actor handle for a
- given actor. If this is true, then the actor will be destroyed when
- this handle goes out of scope.
- _ray_weak_ref: True means that this handle does not count towards the
- distributed ref count for the actor, i.e. the actor may be GCed
- while this handle is still in scope. This is set to True if the
- handle was created by getting an actor by name or by getting the
- self handle. It is set to False if this is the original handle or
- if it was created by passing the original handle through task args
- and returns.
- _ray_is_cross_language: Whether this actor is cross language.
- _ray_actor_creation_function_descriptor: The function descriptor
- of the actor creation task.
- _ray_allow_out_of_order_execution: Whether the actor can execute tasks out of order.
- _ray_enable_tensor_transport: Whether tensor transport is enabled for this actor.
- """
- def __init__(
- self,
- language,
- actor_id,
- max_task_retries: Optional[int],
- enable_task_events: bool,
- method_is_generator: Dict[str, bool],
- method_decorators,
- method_signatures,
- method_num_returns: Dict[str, Union[int, Literal["streaming"]]],
- method_max_task_retries: Dict[str, int],
- method_retry_exceptions: Dict[str, Union[bool, list, tuple]],
- method_generator_backpressure_num_objects: Dict[str, int],
- method_enable_task_events: Dict[str, bool],
- enable_tensor_transport: bool,
- method_name_to_tensor_transport: Dict[str, str],
- actor_method_cpus: int,
- actor_creation_function_descriptor,
- cluster_and_job,
- original_handle=False,
- weak_ref: bool = False,
- allow_out_of_order_execution: Optional[bool] = None,
- ):
- """Initialize an ActorHandle.
- Args:
- language: The actor language.
- actor_id: The ID of the actor.
- max_task_retries: The maximum number of times to retry a task when it fails.
- enable_task_events: Whether task events should be enabled for this actor.
- method_is_generator: Dictionary mapping method names to whether they are generator methods.
- method_decorators: Dictionary mapping method names to their decorators.
- method_signatures: Dictionary mapping method names to their signatures.
- method_num_returns: Dictionary mapping method names to their number of return values.
- method_max_task_retries: Dictionary mapping method names to their maximum task retries.
- method_retry_exceptions: Dictionary mapping method names to their retry exception settings.
- method_generator_backpressure_num_objects: Dictionary mapping method names to their generator backpressure settings.
- method_enable_task_events: Dictionary mapping method names to whether task events are enabled.
- enable_tensor_transport: Whether tensor transport is enabled for
- this actor. If True, then methods can be called with
- .options(tensor_transport=...) to specify a non-default tensor
- transport.
- method_name_to_tensor_transport: Dictionary mapping method names to their tensor transport type.
- actor_method_cpus: The number of CPUs required by actor methods.
- actor_creation_function_descriptor: The function descriptor for actor creation.
- cluster_and_job: The cluster and job information.
- original_handle: Whether this is the original actor handle.
- weak_ref: Whether this is a weak reference to the actor.
- allow_out_of_order_execution: Whether the actor can execute tasks out of order.
- """
- self._ray_actor_language = language
- self._ray_actor_id = actor_id
- self._ray_max_task_retries = max_task_retries
- self._ray_original_handle = original_handle
- self._ray_weak_ref = weak_ref
- self._ray_enable_task_events = enable_task_events
- self._ray_allow_out_of_order_execution = allow_out_of_order_execution
- self._ray_method_is_generator = method_is_generator
- self._ray_method_decorators = method_decorators
- self._ray_method_signatures = method_signatures
- self._ray_method_num_returns = method_num_returns
- self._ray_method_max_task_retries = method_max_task_retries
- self._ray_method_retry_exceptions = method_retry_exceptions
- self._ray_method_generator_backpressure_num_objects = (
- method_generator_backpressure_num_objects
- )
- self._ray_method_enable_task_events = method_enable_task_events
- self._ray_enable_tensor_transport = enable_tensor_transport
- self._ray_method_name_to_tensor_transport = method_name_to_tensor_transport
- self._ray_actor_method_cpus = actor_method_cpus
- self._ray_cluster_and_job = cluster_and_job
- self._ray_is_cross_language = language != Language.PYTHON
- self._ray_actor_creation_function_descriptor = (
- actor_creation_function_descriptor
- )
- self._ray_function_descriptor = {}
- # This is incremented each time `bind()` is called on an actor handle
- # (in Ray DAGs), therefore capturing the bind order of the actor methods.
- # TODO: this does not work properly if the caller has two copies of the
- # same actor handle, and needs to be fixed.
- self._ray_dag_bind_index = 0
- if not self._ray_is_cross_language:
- assert isinstance(
- actor_creation_function_descriptor, PythonFunctionDescriptor
- )
- module_name = actor_creation_function_descriptor.module_name
- class_name = actor_creation_function_descriptor.class_name
- for method_name in self._ray_method_signatures.keys():
- function_descriptor = PythonFunctionDescriptor(
- module_name, method_name, class_name
- )
- self._ray_function_descriptor[method_name] = function_descriptor
- # Build an _ActorMethodMetadata per method to cache expensive parsing logic.
- # The _ActorMethodMetadata doesn't take a reference to this ActorHandle to avoid a circular reference.
- # Instead, we will lazily bind this ActorHandle to the _ActorMethodMetadata when a method is invoked.
- self._method_shells = {}
- for method_name, method_signature in self._ray_method_signatures.items():
- self._method_shells[method_name] = _ActorMethodMetadata(
- method_name=method_name,
- num_returns=self._ray_method_num_returns.get(method_name, None),
- max_task_retries=self._ray_method_max_task_retries.get(
- method_name, self._ray_max_task_retries
- )
- or 0,
- retry_exceptions=self._ray_method_retry_exceptions.get(method_name),
- is_generator=self._ray_method_is_generator.get(method_name),
- generator_backpressure_num_objects=self._ray_method_generator_backpressure_num_objects.get(
- method_name
- ),
- enable_task_events=self._ray_method_enable_task_events.get(
- method_name, self._ray_enable_task_events
- ),
- decorator=self._ray_method_decorators.get(method_name),
- signature=method_signature,
- tensor_transport=self._ray_method_name_to_tensor_transport.get(
- method_name
- ),
- )
- def __del__(self):
- # Weak references don't count towards the distributed ref count, so no
- # need to decrement the ref count.
- if self._ray_weak_ref:
- return
- try:
- # Mark that this actor handle has gone out of scope. Once all actor
- # handles are out of scope, the actor will exit.
- if ray._private.worker:
- worker = ray._private.worker.global_worker
- if worker.connected and hasattr(worker, "core_worker"):
- worker.core_worker.remove_actor_handle_reference(self._ray_actor_id)
- except AttributeError:
- # Suppress the attribute error which is caused by
- # python destruction ordering issue.
- # It only happen when python exits.
- pass
- def _actor_method_call(
- self,
- method_name: str,
- args: List[Any] = None,
- kwargs: Dict[str, Any] = None,
- name: str = "",
- num_returns: Optional[Union[int, Literal["streaming"]]] = None,
- max_task_retries: int = None,
- retry_exceptions: Union[bool, list, tuple] = None,
- concurrency_group_name: Optional[str] = None,
- generator_backpressure_num_objects: Optional[int] = None,
- enable_task_events: Optional[bool] = None,
- tensor_transport: Optional[str] = None,
- ):
- """Method execution stub for an actor handle.
- This is the function that executes when
- `actor.method_name.remote(*args, **kwargs)` is called. Instead of
- executing locally, the method is packaged as a task and scheduled
- to the remote actor instance.
- Args:
- method_name: The name of the actor method to execute.
- args: A list of arguments for the actor method.
- kwargs: A dictionary of keyword arguments for the actor method.
- name: The name to give the actor method call task.
- num_returns: The number of return values for the method.
- max_task_retries: Number of retries when method fails.
- retry_exceptions: Boolean of whether you want to retry all user-raised
- exceptions, or a list of allowlist exceptions to retry.
- concurrency_group_name: The name of the concurrency group to use.
- generator_backpressure_num_objects: The number of objects to generate
- before applying backpressure.
- enable_task_events: True if tracing is enabled, i.e., task events from
- the actor should be reported.
- tensor_transport: The tensor transport protocol to use for the actor method.
- Returns:
- object_refs: A list of object refs returned by the remote actor
- method.
- """
- worker = ray._private.worker.global_worker
- args = args or []
- kwargs = kwargs or {}
- if self._ray_is_cross_language:
- list_args = cross_language._format_args(worker, args, kwargs)
- function_descriptor = cross_language._get_function_descriptor_for_actor_method( # noqa: E501
- self._ray_actor_language,
- self._ray_actor_creation_function_descriptor,
- method_name,
- # The signature for xlang should be "{length_of_arguments}" to handle
- # overloaded methods.
- signature=str(len(args) + len(kwargs)),
- )
- else:
- function_signature = self._ray_method_signatures[method_name]
- if not args and not kwargs and not function_signature:
- list_args = []
- else:
- list_args = signature.flatten_args(function_signature, args, kwargs)
- function_descriptor = self._ray_function_descriptor[method_name]
- if worker.mode == ray.LOCAL_MODE:
- assert (
- not self._ray_is_cross_language
- ), "Cross language remote actor method cannot be executed locally."
- if num_returns == "dynamic":
- num_returns = -1
- elif num_returns == "streaming":
- # TODO(sang): This is a temporary private API.
- # Remove it when we migrate to the streaming generator.
- num_returns = ray._raylet.STREAMING_GENERATOR_RETURN
- retry_exception_allowlist = None
- if retry_exceptions is None:
- retry_exceptions = False
- elif isinstance(retry_exceptions, (list, tuple)):
- retry_exception_allowlist = tuple(retry_exceptions)
- retry_exceptions = True
- assert isinstance(
- retry_exceptions, bool
- ), "retry_exceptions can either be \
- boolean or list/tuple of exception types."
- if generator_backpressure_num_objects is None:
- generator_backpressure_num_objects = -1
- object_refs = worker.core_worker.submit_actor_task(
- self._ray_actor_language,
- self._ray_actor_id,
- function_descriptor,
- list_args,
- name,
- num_returns,
- max_task_retries,
- retry_exceptions,
- retry_exception_allowlist,
- self._ray_actor_method_cpus,
- concurrency_group_name if concurrency_group_name is not None else b"",
- generator_backpressure_num_objects,
- enable_task_events,
- tensor_transport,
- )
- if num_returns == STREAMING_GENERATOR_RETURN:
- # Streaming generator will return a single ref
- # that is for the generator task.
- assert len(object_refs) == 1
- generator_ref = object_refs[0]
- return ObjectRefGenerator(generator_ref, worker)
- if len(object_refs) == 1:
- object_refs = object_refs[0]
- elif len(object_refs) == 0:
- object_refs = None
- return object_refs
- def __getattr__(self, item: str) -> Any:
- """Handle dynamic attribute access for actor methods.
- This method is called when accessing attributes that don't exist as direct
- instance attributes. It's the core mechanism for actor method invocation.
- For Python actors (99% of cases):
- - We use strict validation: only methods in _method_shells are allowed
- - This prevents typos and provides clear error messages
- - Returns a bound ActorMethod created from the cached _ActorMethodMetadata
- For cross-language actors:
- - We can't validate method names client-side (the target language defines them)
- - We allow arbitrary method calls to pass through
- - Some Python-specific methods like `__ray_terminate__` are blocked with warnings
- Args:
- item: The attribute/method name being accessed
- Returns:
- ActorMethod: A bound method ready for .remote() calls
- Raises:
- AttributeError: For Python actors when accessing non-existent methods
- """
- # If this name matches a remote method, bind and return it.
- if item in self._method_shells:
- return self._method_shells[item].bind(self)
- if not self._ray_is_cross_language:
- raise AttributeError(
- f"'{type(self).__name__}' object has " f"no attribute '{item}'"
- )
- if item in ["__ray_terminate__"]:
- class FakeActorMethod(object):
- def __call__(self, *args, **kwargs):
- raise TypeError(
- "Actor methods cannot be called directly. Instead "
- "of running 'object.{}()', try 'object.{}.remote()'.".format(
- item, item
- )
- )
- def remote(self, *args, **kwargs):
- logger.warning(
- f"Actor method {item} is not supported by cross language."
- )
- return FakeActorMethod()
- return ActorMethod(
- self, # actor
- item, # method_name
- ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS,
- 0, # max_task_retries
- False, # retry_exceptions
- False, # is_generator
- self._ray_method_generator_backpressure_num_objects.get(item, -1),
- self._ray_enable_task_events, # enable_task_events
- # Currently, cross-lang actor method not support decorator
- decorator=None,
- signature=None,
- )
- # Make tab completion work.
- def __dir__(self):
- return self._ray_method_signatures.keys()
- def __repr__(self):
- return (
- "Actor("
- f"{self._ray_actor_creation_function_descriptor.class_name}, "
- f"{self._actor_id.hex()})"
- )
- def __hash__(self):
- return hash(self._actor_id)
- def __eq__(self, __value):
- return hash(self) == hash(__value)
- @property
- def _actor_id(self):
- return self._ray_actor_id
- def _get_local_state(self):
- """Get the local actor state.
- NOTE: this method only returns accurate actor state
- after a first actor method call is made against
- this actor handle due to https://github.com/ray-project/ray/pull/24600.
- Returns:
- ActorTableData.ActorState or None if the state is unknown.
- """
- worker = ray._private.worker.global_worker
- worker.check_connected()
- return worker.core_worker.get_local_actor_state(self._ray_actor_id)
- def _serialization_helper(self):
- """This is defined in order to make pickling work.
- Returns:
- A dictionary of the information needed to reconstruct the object.
- """
- worker = ray._private.worker.global_worker
- worker.check_connected()
- if hasattr(worker, "core_worker"):
- # Non-local mode
- state = worker.core_worker.serialize_actor_handle(self._ray_actor_id)
- else:
- # Local mode
- state = (
- {
- "actor_language": self._ray_actor_language,
- "actor_id": self._ray_actor_id,
- "max_task_retries": self._ray_max_task_retries,
- "enable_task_events": self._enable_task_events,
- "method_is_generator": self._ray_method_is_generator,
- "method_decorators": self._ray_method_decorators,
- "method_signatures": self._ray_method_signatures,
- "method_num_returns": self._ray_method_num_returns,
- "method_max_task_retries": self._ray_method_max_task_retries,
- "method_retry_exceptions": self._ray_method_retry_exceptions,
- "method_generator_backpressure_num_objects": (
- self._ray_method_generator_backpressure_num_objects
- ),
- "method_enable_task_events": self._ray_method_enable_task_events,
- "enable_tensor_transport": self._ray_enable_tensor_transport,
- "method_name_to_tensor_transport": self._ray_method_name_to_tensor_transport,
- "actor_method_cpus": self._ray_actor_method_cpus,
- "actor_creation_function_descriptor": self._ray_actor_creation_function_descriptor, # noqa: E501
- },
- None,
- )
- return (*state, self._ray_weak_ref)
- @classmethod
- def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None):
- """This is defined in order to make pickling work.
- Args:
- state: The serialized state of the actor handle.
- outer_object_ref: The ObjectRef that the serialized actor handle
- was contained in, if any. This is used for counting references
- to the actor handle.
- weak_ref: Whether this was serialized from an actor handle with a
- weak ref to the actor.
- """
- worker = ray._private.worker.global_worker
- worker.check_connected()
- if hasattr(worker, "core_worker"):
- # Non-local mode
- return worker.core_worker.deserialize_and_register_actor_handle(
- state,
- outer_object_ref,
- weak_ref,
- )
- else:
- # Local mode
- assert worker.current_cluster_and_job == state["current_cluster_and_job"]
- return cls(
- # TODO(swang): Accessing the worker's current task ID is not
- # thread-safe.
- state["actor_language"],
- state["actor_id"],
- state["max_task_retries"],
- state["enable_task_events"],
- state["method_is_generator"],
- state["method_decorators"],
- state["method_signatures"],
- state["method_num_returns"],
- state["method_max_task_retries"],
- state["method_retry_exceptions"],
- state["method_generator_backpressure_num_objects"],
- state["method_enable_task_events"],
- state["enable_tensor_transport"],
- state["method_name_to_tensor_transport"],
- state["actor_method_cpus"],
- state["actor_creation_function_descriptor"],
- state["current_cluster_and_job"],
- )
- def __reduce__(self):
- """This code path is used by pickling but not by Ray forking."""
- (serialized, _, weak_ref) = self._serialization_helper()
- # There is no outer object ref when the actor handle is
- # deserialized out-of-band using pickle.
- return ActorHandle._deserialization_helper, (serialized, weak_ref, None)
- def _modify_class(cls):
- # cls has been modified.
- if hasattr(cls, "__ray_actor_class__"):
- return cls
- # Modify the class to have additional default methods.
- class Class(cls):
- __ray_actor_class__ = cls # The original actor class
- def __ray_ready__(self):
- return True
- def __ray_call__(self, fn, *args, **kwargs):
- return fn(self, *args, **kwargs)
- def __ray_terminate__(self):
- worker = ray._private.worker.global_worker
- if worker.mode != ray.LOCAL_MODE:
- ray.actor.exit_actor()
- Class.__module__ = cls.__module__
- Class.__name__ = cls.__name__
- if not is_function_or_method(getattr(Class, "__init__", None)):
- # Add __init__ if it does not exist.
- # Actor creation will be executed with __init__ together.
- # Assign an __init__ function will avoid many checks later on.
- def __init__(self):
- pass
- Class.__init__ = __init__
- return Class
- def _make_actor(cls, actor_options):
- Class = _modify_class(cls)
- _inject_tracing_into_class(Class)
- if "max_restarts" in actor_options:
- if actor_options["max_restarts"] != -1: # -1 represents infinite restart
- # Make sure we don't pass too big of an int to C++, causing
- # an overflow.
- actor_options["max_restarts"] = min(
- actor_options["max_restarts"], ray_constants.MAX_INT64_VALUE
- )
- return ActorClass._ray_from_modified_class(
- Class,
- ActorClassID.from_random(),
- actor_options,
- )
- @PublicAPI
- def exit_actor():
- """Intentionally exit the current actor.
- This API can be used only inside an actor. Use ray.kill
- API if you'd like to kill an actor using actor handle.
- When this API is called, an exception is raised and the actor
- will exit immediately. For asyncio actors, there may be a short
- delay before the actor exits if the API is called from a background
- task.
- Any queued methods will fail. Any ``atexit``
- handlers installed in the actor will be run.
- Raises:
- TypeError: An exception is raised if this is a driver or this
- worker is not an actor.
- """
- worker = ray._private.worker.global_worker
- if worker.mode == ray.WORKER_MODE and not worker.actor_id.is_nil():
- worker.core_worker.set_current_actor_should_exit()
- # In asyncio actor mode, we can't raise SystemExit because it will just
- # quit the asycnio event loop thread, not the main thread. Instead, we
- # raise a custom error to the main thread to tell it to exit.
- if worker.core_worker.current_actor_is_asyncio():
- raise AsyncioActorExit()
- # Set a flag to indicate this is an intentional actor exit. This
- # reduces log verbosity.
- raise_sys_exit_with_custom_error_message("exit_actor() is called.")
- else:
- raise TypeError(
- "exit_actor API is called on a non-actor worker, "
- f"{worker.mode}. Call this API inside an actor methods"
- "if you'd like to exit the actor gracefully."
- )
|