_asyncio.py 97 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996
  1. from __future__ import annotations
  2. import array
  3. import asyncio
  4. import concurrent.futures
  5. import contextvars
  6. import math
  7. import os
  8. import socket
  9. import sys
  10. import threading
  11. import weakref
  12. from asyncio import (
  13. AbstractEventLoop,
  14. CancelledError,
  15. all_tasks,
  16. create_task,
  17. current_task,
  18. get_running_loop,
  19. sleep,
  20. )
  21. from asyncio.base_events import _run_until_complete_cb # type: ignore[attr-defined]
  22. from collections import OrderedDict, deque
  23. from collections.abc import (
  24. AsyncGenerator,
  25. AsyncIterator,
  26. Awaitable,
  27. Callable,
  28. Collection,
  29. Coroutine,
  30. Iterable,
  31. Sequence,
  32. )
  33. from concurrent.futures import Future
  34. from contextlib import AbstractContextManager, suppress
  35. from contextvars import Context, copy_context
  36. from dataclasses import dataclass, field
  37. from functools import partial, wraps
  38. from inspect import (
  39. CORO_RUNNING,
  40. CORO_SUSPENDED,
  41. getcoroutinestate,
  42. iscoroutine,
  43. )
  44. from io import IOBase
  45. from os import PathLike
  46. from queue import Queue
  47. from signal import Signals
  48. from socket import AddressFamily, SocketKind
  49. from threading import Thread
  50. from types import CodeType, TracebackType
  51. from typing import (
  52. IO,
  53. TYPE_CHECKING,
  54. Any,
  55. ParamSpec,
  56. TypeVar,
  57. cast,
  58. )
  59. from weakref import WeakKeyDictionary
  60. from .. import (
  61. CapacityLimiterStatistics,
  62. EventStatistics,
  63. LockStatistics,
  64. TaskInfo,
  65. abc,
  66. )
  67. from .._core._eventloop import (
  68. claim_worker_thread,
  69. set_current_async_library,
  70. threadlocals,
  71. )
  72. from .._core._exceptions import (
  73. BrokenResourceError,
  74. BusyResourceError,
  75. ClosedResourceError,
  76. EndOfStream,
  77. RunFinishedError,
  78. WouldBlock,
  79. )
  80. from .._core._sockets import convert_ipv6_sockaddr
  81. from .._core._streams import create_memory_object_stream
  82. from .._core._synchronization import (
  83. CapacityLimiter as BaseCapacityLimiter,
  84. )
  85. from .._core._synchronization import Event as BaseEvent
  86. from .._core._synchronization import Lock as BaseLock
  87. from .._core._synchronization import (
  88. ResourceGuard,
  89. SemaphoreStatistics,
  90. )
  91. from .._core._synchronization import Semaphore as BaseSemaphore
  92. from .._core._tasks import CancelScope as BaseCancelScope
  93. from ..abc import (
  94. AsyncBackend,
  95. IPSockAddrType,
  96. SocketListener,
  97. UDPPacketType,
  98. UNIXDatagramPacketType,
  99. )
  100. from ..abc._eventloop import StrOrBytesPath
  101. from ..lowlevel import RunVar
  102. from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
  103. if TYPE_CHECKING:
  104. from _typeshed import FileDescriptorLike
  105. else:
  106. FileDescriptorLike = object
  107. if sys.version_info >= (3, 11):
  108. from asyncio import Runner
  109. from typing import TypeVarTuple, Unpack
  110. else:
  111. import contextvars
  112. import enum
  113. import signal
  114. from asyncio import coroutines, events, exceptions, tasks
  115. from exceptiongroup import BaseExceptionGroup
  116. from typing_extensions import TypeVarTuple, Unpack
  117. class _State(enum.Enum):
  118. CREATED = "created"
  119. INITIALIZED = "initialized"
  120. CLOSED = "closed"
  121. class Runner:
  122. # Copied from CPython 3.11
  123. def __init__(
  124. self,
  125. *,
  126. debug: bool | None = None,
  127. loop_factory: Callable[[], AbstractEventLoop] | None = None,
  128. ):
  129. self._state = _State.CREATED
  130. self._debug = debug
  131. self._loop_factory = loop_factory
  132. self._loop: AbstractEventLoop | None = None
  133. self._context = None
  134. self._interrupt_count = 0
  135. self._set_event_loop = False
  136. def __enter__(self) -> Runner:
  137. self._lazy_init()
  138. return self
  139. def __exit__(
  140. self,
  141. exc_type: type[BaseException] | None,
  142. exc_val: BaseException | None,
  143. exc_tb: TracebackType | None,
  144. ) -> None:
  145. self.close()
  146. def close(self) -> None:
  147. """Shutdown and close event loop."""
  148. loop = self._loop
  149. if self._state is not _State.INITIALIZED or loop is None:
  150. return
  151. try:
  152. _cancel_all_tasks(loop)
  153. loop.run_until_complete(loop.shutdown_asyncgens())
  154. if hasattr(loop, "shutdown_default_executor"):
  155. loop.run_until_complete(loop.shutdown_default_executor())
  156. else:
  157. loop.run_until_complete(_shutdown_default_executor(loop))
  158. finally:
  159. if self._set_event_loop:
  160. events.set_event_loop(None)
  161. loop.close()
  162. self._loop = None
  163. self._state = _State.CLOSED
  164. def get_loop(self) -> AbstractEventLoop:
  165. """Return embedded event loop."""
  166. self._lazy_init()
  167. return self._loop
  168. def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
  169. """Run a coroutine inside the embedded event loop."""
  170. if not coroutines.iscoroutine(coro):
  171. raise ValueError(f"a coroutine was expected, got {coro!r}")
  172. if events._get_running_loop() is not None:
  173. # fail fast with short traceback
  174. raise RuntimeError(
  175. "Runner.run() cannot be called from a running event loop"
  176. )
  177. self._lazy_init()
  178. if context is None:
  179. context = self._context
  180. task = context.run(self._loop.create_task, coro)
  181. if (
  182. threading.current_thread() is threading.main_thread()
  183. and signal.getsignal(signal.SIGINT) is signal.default_int_handler
  184. ):
  185. sigint_handler = partial(self._on_sigint, main_task=task)
  186. try:
  187. signal.signal(signal.SIGINT, sigint_handler)
  188. except ValueError:
  189. # `signal.signal` may throw if `threading.main_thread` does
  190. # not support signals (e.g. embedded interpreter with signals
  191. # not registered - see gh-91880)
  192. sigint_handler = None
  193. else:
  194. sigint_handler = None
  195. self._interrupt_count = 0
  196. try:
  197. return self._loop.run_until_complete(task)
  198. except exceptions.CancelledError:
  199. if self._interrupt_count > 0:
  200. uncancel = getattr(task, "uncancel", None)
  201. if uncancel is not None and uncancel() == 0:
  202. raise KeyboardInterrupt # noqa: B904
  203. raise # CancelledError
  204. finally:
  205. if (
  206. sigint_handler is not None
  207. and signal.getsignal(signal.SIGINT) is sigint_handler
  208. ):
  209. signal.signal(signal.SIGINT, signal.default_int_handler)
  210. def _lazy_init(self) -> None:
  211. if self._state is _State.CLOSED:
  212. raise RuntimeError("Runner is closed")
  213. if self._state is _State.INITIALIZED:
  214. return
  215. if self._loop_factory is None:
  216. self._loop = events.new_event_loop()
  217. if not self._set_event_loop:
  218. # Call set_event_loop only once to avoid calling
  219. # attach_loop multiple times on child watchers
  220. events.set_event_loop(self._loop)
  221. self._set_event_loop = True
  222. else:
  223. self._loop = self._loop_factory()
  224. if self._debug is not None:
  225. self._loop.set_debug(self._debug)
  226. self._context = contextvars.copy_context()
  227. self._state = _State.INITIALIZED
  228. def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
  229. self._interrupt_count += 1
  230. if self._interrupt_count == 1 and not main_task.done():
  231. main_task.cancel()
  232. # wakeup loop if it is blocked by select() with long timeout
  233. self._loop.call_soon_threadsafe(lambda: None)
  234. return
  235. raise KeyboardInterrupt()
  236. def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
  237. to_cancel = tasks.all_tasks(loop)
  238. if not to_cancel:
  239. return
  240. for task in to_cancel:
  241. task.cancel()
  242. loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  243. for task in to_cancel:
  244. if task.cancelled():
  245. continue
  246. if task.exception() is not None:
  247. loop.call_exception_handler(
  248. {
  249. "message": "unhandled exception during asyncio.run() shutdown",
  250. "exception": task.exception(),
  251. "task": task,
  252. }
  253. )
  254. async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
  255. """Schedule the shutdown of the default executor."""
  256. def _do_shutdown(future: asyncio.futures.Future) -> None:
  257. try:
  258. loop._default_executor.shutdown(wait=True) # type: ignore[attr-defined]
  259. loop.call_soon_threadsafe(future.set_result, None)
  260. except Exception as ex:
  261. loop.call_soon_threadsafe(future.set_exception, ex)
  262. loop._executor_shutdown_called = True
  263. if loop._default_executor is None:
  264. return
  265. future = loop.create_future()
  266. thread = threading.Thread(target=_do_shutdown, args=(future,))
  267. thread.start()
  268. try:
  269. await future
  270. finally:
  271. thread.join()
  272. T_Retval = TypeVar("T_Retval")
  273. T_contra = TypeVar("T_contra", contravariant=True)
  274. PosArgsT = TypeVarTuple("PosArgsT")
  275. P = ParamSpec("P")
  276. _root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
  277. def find_root_task() -> asyncio.Task:
  278. root_task = _root_task.get(None)
  279. if root_task is not None and not root_task.done():
  280. return root_task
  281. # Look for a task that has been started via run_until_complete()
  282. for task in all_tasks():
  283. if task._callbacks and not task.done():
  284. callbacks = [cb for cb, context in task._callbacks]
  285. for cb in callbacks:
  286. if (
  287. cb is _run_until_complete_cb
  288. or getattr(cb, "__module__", None) == "uvloop.loop"
  289. ):
  290. _root_task.set(task)
  291. return task
  292. # Look up the topmost task in the AnyIO task tree, if possible
  293. task = cast(asyncio.Task, current_task())
  294. state = _task_states.get(task)
  295. if state:
  296. cancel_scope = state.cancel_scope
  297. while cancel_scope and cancel_scope._parent_scope is not None:
  298. cancel_scope = cancel_scope._parent_scope
  299. if cancel_scope is not None:
  300. return cast(asyncio.Task, cancel_scope._host_task)
  301. return task
  302. def get_callable_name(func: Callable) -> str:
  303. module = getattr(func, "__module__", None)
  304. qualname = getattr(func, "__qualname__", None)
  305. return ".".join([x for x in (module, qualname) if x])
  306. #
  307. # Event loop
  308. #
  309. _run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
  310. def _task_started(task: asyncio.Task) -> bool:
  311. """Return ``True`` if the task has been started and has not finished."""
  312. # The task coro should never be None here, as we never add finished tasks to the
  313. # task list
  314. coro = task.get_coro()
  315. assert coro is not None
  316. try:
  317. return getcoroutinestate(coro) in (CORO_RUNNING, CORO_SUSPENDED)
  318. except AttributeError:
  319. # task coro is async_genenerator_asend https://bugs.python.org/issue37771
  320. raise Exception(f"Cannot determine if task {task} has started or not") from None
  321. #
  322. # Timeouts and cancellation
  323. #
  324. def is_anyio_cancellation(exc: CancelledError) -> bool:
  325. # Sometimes third party frameworks catch a CancelledError and raise a new one, so as
  326. # a workaround we have to look at the previous ones in __context__ too for a
  327. # matching cancel message
  328. while True:
  329. if (
  330. exc.args
  331. and isinstance(exc.args[0], str)
  332. and exc.args[0].startswith("Cancelled via cancel scope ")
  333. ):
  334. return True
  335. if isinstance(exc.__context__, CancelledError):
  336. exc = exc.__context__
  337. continue
  338. return False
  339. class CancelScope(BaseCancelScope):
  340. def __new__(
  341. cls, *, deadline: float = math.inf, shield: bool = False
  342. ) -> CancelScope:
  343. return object.__new__(cls)
  344. def __init__(self, deadline: float = math.inf, shield: bool = False):
  345. self._deadline = deadline
  346. self._shield = shield
  347. self._parent_scope: CancelScope | None = None
  348. self._child_scopes: set[CancelScope] = set()
  349. self._cancel_called = False
  350. self._cancel_reason: str | None = None
  351. self._cancelled_caught = False
  352. self._active = False
  353. self._timeout_handle: asyncio.TimerHandle | None = None
  354. self._cancel_handle: asyncio.Handle | None = None
  355. self._tasks: set[asyncio.Task] = set()
  356. self._host_task: asyncio.Task | None = None
  357. if sys.version_info >= (3, 11):
  358. self._pending_uncancellations: int | None = 0
  359. else:
  360. self._pending_uncancellations = None
  361. def __enter__(self) -> CancelScope:
  362. if self._active:
  363. raise RuntimeError(
  364. "Each CancelScope may only be used for a single 'with' block"
  365. )
  366. self._host_task = host_task = cast(asyncio.Task, current_task())
  367. self._tasks.add(host_task)
  368. try:
  369. task_state = _task_states[host_task]
  370. except KeyError:
  371. task_state = TaskState(None, self)
  372. _task_states[host_task] = task_state
  373. else:
  374. self._parent_scope = task_state.cancel_scope
  375. task_state.cancel_scope = self
  376. if self._parent_scope is not None:
  377. # If using an eager task factory, the parent scope may not even contain
  378. # the host task
  379. self._parent_scope._child_scopes.add(self)
  380. self._parent_scope._tasks.discard(host_task)
  381. self._timeout()
  382. self._active = True
  383. # Start cancelling the host task if the scope was cancelled before entering
  384. if self._cancel_called:
  385. self._deliver_cancellation(self)
  386. return self
  387. def __exit__(
  388. self,
  389. exc_type: type[BaseException] | None,
  390. exc_val: BaseException | None,
  391. exc_tb: TracebackType | None,
  392. ) -> bool:
  393. del exc_tb
  394. if not self._active:
  395. raise RuntimeError("This cancel scope is not active")
  396. if current_task() is not self._host_task:
  397. raise RuntimeError(
  398. "Attempted to exit cancel scope in a different task than it was "
  399. "entered in"
  400. )
  401. assert self._host_task is not None
  402. host_task_state = _task_states.get(self._host_task)
  403. if host_task_state is None or host_task_state.cancel_scope is not self:
  404. raise RuntimeError(
  405. "Attempted to exit a cancel scope that isn't the current tasks's "
  406. "current cancel scope"
  407. )
  408. try:
  409. self._active = False
  410. if self._timeout_handle:
  411. self._timeout_handle.cancel()
  412. self._timeout_handle = None
  413. self._tasks.remove(self._host_task)
  414. if self._parent_scope is not None:
  415. self._parent_scope._child_scopes.remove(self)
  416. self._parent_scope._tasks.add(self._host_task)
  417. host_task_state.cancel_scope = self._parent_scope
  418. # Restart the cancellation effort in the closest visible, cancelled parent
  419. # scope if necessary
  420. self._restart_cancellation_in_parent()
  421. # We only swallow the exception iff it was an AnyIO CancelledError, either
  422. # directly as exc_val or inside an exception group and there are no cancelled
  423. # parent cancel scopes visible to us here
  424. if self._cancel_called and not self._parent_cancellation_is_visible_to_us:
  425. # For each level-cancel() call made on the host task, call uncancel()
  426. while self._pending_uncancellations:
  427. self._host_task.uncancel()
  428. self._pending_uncancellations -= 1
  429. # Update cancelled_caught and check for exceptions we must not swallow
  430. if isinstance(exc_val, BaseExceptionGroup):
  431. cancelleds_caught, remaining = exc_val.split(
  432. lambda exc: (
  433. isinstance(exc, CancelledError)
  434. and is_anyio_cancellation(exc)
  435. )
  436. )
  437. if cancelleds_caught is None:
  438. return False
  439. self._cancelled_caught = True
  440. if remaining is None:
  441. return True
  442. context = remaining.__context__
  443. try:
  444. # Preserve __cause__ and __suppress_context__ by avoiding `raise
  445. # ... from ...`
  446. raise remaining
  447. finally:
  448. # Preserve __context__
  449. remaining.__context__ = context
  450. del context
  451. else:
  452. if isinstance(exc_val, CancelledError) and is_anyio_cancellation(
  453. exc_val
  454. ):
  455. self._cancelled_caught = True
  456. return True
  457. else:
  458. return False
  459. else:
  460. if self._pending_uncancellations:
  461. assert self._parent_scope is not None
  462. assert self._parent_scope._pending_uncancellations is not None
  463. self._parent_scope._pending_uncancellations += (
  464. self._pending_uncancellations
  465. )
  466. self._pending_uncancellations = 0
  467. return False
  468. finally:
  469. self._host_task = None
  470. del exc_val
  471. @property
  472. def _effectively_cancelled(self) -> bool:
  473. cancel_scope: CancelScope | None = self
  474. while cancel_scope is not None:
  475. if cancel_scope._cancel_called:
  476. return True
  477. if cancel_scope.shield:
  478. return False
  479. cancel_scope = cancel_scope._parent_scope
  480. return False
  481. @property
  482. def _parent_cancellation_is_visible_to_us(self) -> bool:
  483. return (
  484. self._parent_scope is not None
  485. and not self.shield
  486. and self._parent_scope._effectively_cancelled
  487. )
  488. def _timeout(self) -> None:
  489. if self._deadline != math.inf:
  490. loop = get_running_loop()
  491. if loop.time() >= self._deadline:
  492. self.cancel("deadline exceeded")
  493. else:
  494. self._timeout_handle = loop.call_at(self._deadline, self._timeout)
  495. def _deliver_cancellation(self, origin: CancelScope) -> bool:
  496. """
  497. Deliver cancellation to directly contained tasks and nested cancel scopes.
  498. Schedule another run at the end if we still have tasks eligible for
  499. cancellation.
  500. :param origin: the cancel scope that originated the cancellation
  501. :return: ``True`` if the delivery needs to be retried on the next cycle
  502. """
  503. should_retry = False
  504. current = current_task()
  505. for task in self._tasks:
  506. should_retry = True
  507. if task._must_cancel: # type: ignore[attr-defined]
  508. continue
  509. # The task is eligible for cancellation if it has started
  510. if task is not current and (task is self._host_task or _task_started(task)):
  511. waiter = task._fut_waiter # type: ignore[attr-defined]
  512. if not isinstance(waiter, asyncio.Future) or not waiter.done():
  513. task.cancel(origin._cancel_reason)
  514. if (
  515. task is origin._host_task
  516. and origin._pending_uncancellations is not None
  517. ):
  518. origin._pending_uncancellations += 1
  519. # Deliver cancellation to child scopes that aren't shielded or running their own
  520. # cancellation callbacks
  521. for scope in self._child_scopes:
  522. if not scope._shield and not scope.cancel_called:
  523. should_retry = scope._deliver_cancellation(origin) or should_retry
  524. # Schedule another callback if there are still tasks left
  525. if origin is self:
  526. if should_retry:
  527. self._cancel_handle = get_running_loop().call_soon(
  528. self._deliver_cancellation, origin
  529. )
  530. else:
  531. self._cancel_handle = None
  532. return should_retry
  533. def _restart_cancellation_in_parent(self) -> None:
  534. """
  535. Restart the cancellation effort in the closest directly cancelled parent scope.
  536. """
  537. scope = self._parent_scope
  538. while scope is not None:
  539. if scope._cancel_called:
  540. if scope._cancel_handle is None:
  541. scope._deliver_cancellation(scope)
  542. break
  543. # No point in looking beyond any shielded scope
  544. if scope._shield:
  545. break
  546. scope = scope._parent_scope
  547. def cancel(self, reason: str | None = None) -> None:
  548. if not self._cancel_called:
  549. if self._timeout_handle:
  550. self._timeout_handle.cancel()
  551. self._timeout_handle = None
  552. self._cancel_called = True
  553. self._cancel_reason = f"Cancelled via cancel scope {id(self):x}"
  554. if task := current_task():
  555. self._cancel_reason += f" by {task}"
  556. if reason:
  557. self._cancel_reason += f"; reason: {reason}"
  558. if self._host_task is not None:
  559. self._deliver_cancellation(self)
  560. @property
  561. def deadline(self) -> float:
  562. return self._deadline
  563. @deadline.setter
  564. def deadline(self, value: float) -> None:
  565. self._deadline = float(value)
  566. if self._timeout_handle is not None:
  567. self._timeout_handle.cancel()
  568. self._timeout_handle = None
  569. if self._active and not self._cancel_called:
  570. self._timeout()
  571. @property
  572. def cancel_called(self) -> bool:
  573. return self._cancel_called
  574. @property
  575. def cancelled_caught(self) -> bool:
  576. return self._cancelled_caught
  577. @property
  578. def shield(self) -> bool:
  579. return self._shield
  580. @shield.setter
  581. def shield(self, value: bool) -> None:
  582. if self._shield != value:
  583. self._shield = value
  584. if not value:
  585. self._restart_cancellation_in_parent()
  586. #
  587. # Task states
  588. #
  589. class TaskState:
  590. """
  591. Encapsulates auxiliary task information that cannot be added to the Task instance
  592. itself because there are no guarantees about its implementation.
  593. """
  594. __slots__ = "parent_id", "cancel_scope", "__weakref__"
  595. def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
  596. self.parent_id = parent_id
  597. self.cancel_scope = cancel_scope
  598. _task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
  599. #
  600. # Task groups
  601. #
  602. class _AsyncioTaskStatus(abc.TaskStatus):
  603. def __init__(self, future: asyncio.Future, parent_id: int):
  604. self._future = future
  605. self._parent_id = parent_id
  606. def started(self, value: T_contra | None = None) -> None:
  607. try:
  608. self._future.set_result(value)
  609. except asyncio.InvalidStateError:
  610. if not self._future.cancelled():
  611. raise RuntimeError(
  612. "called 'started' twice on the same task status"
  613. ) from None
  614. task = cast(asyncio.Task, current_task())
  615. _task_states[task].parent_id = self._parent_id
  616. if sys.version_info >= (3, 12):
  617. _eager_task_factory_code: CodeType | None = asyncio.eager_task_factory.__code__
  618. else:
  619. _eager_task_factory_code = None
  620. class TaskGroup(abc.TaskGroup):
  621. def __init__(self) -> None:
  622. self.cancel_scope: CancelScope = CancelScope()
  623. self._active = False
  624. self._exceptions: list[BaseException] = []
  625. self._tasks: set[asyncio.Task] = set()
  626. self._on_completed_fut: asyncio.Future[None] | None = None
  627. async def __aenter__(self) -> TaskGroup:
  628. self.cancel_scope.__enter__()
  629. self._active = True
  630. return self
  631. async def __aexit__(
  632. self,
  633. exc_type: type[BaseException] | None,
  634. exc_val: BaseException | None,
  635. exc_tb: TracebackType | None,
  636. ) -> bool:
  637. try:
  638. if exc_val is not None:
  639. self.cancel_scope.cancel()
  640. if not isinstance(exc_val, CancelledError):
  641. self._exceptions.append(exc_val)
  642. loop = get_running_loop()
  643. try:
  644. if self._tasks:
  645. with CancelScope() as wait_scope:
  646. while self._tasks:
  647. self._on_completed_fut = loop.create_future()
  648. try:
  649. await self._on_completed_fut
  650. except CancelledError as exc:
  651. # Shield the scope against further cancellation attempts,
  652. # as they're not productive (#695)
  653. wait_scope.shield = True
  654. self.cancel_scope.cancel()
  655. # Set exc_val from the cancellation exception if it was
  656. # previously unset. However, we should not replace a native
  657. # cancellation exception with one raise by a cancel scope.
  658. if exc_val is None or (
  659. isinstance(exc_val, CancelledError)
  660. and not is_anyio_cancellation(exc)
  661. ):
  662. exc_val = exc
  663. self._on_completed_fut = None
  664. else:
  665. # If there are no child tasks to wait on, run at least one checkpoint
  666. # anyway
  667. await AsyncIOBackend.cancel_shielded_checkpoint()
  668. self._active = False
  669. if self._exceptions:
  670. # The exception that got us here should already have been
  671. # added to self._exceptions so it's ok to break exception
  672. # chaining and avoid adding a "During handling of above..."
  673. # for each nesting level.
  674. raise BaseExceptionGroup(
  675. "unhandled errors in a TaskGroup", self._exceptions
  676. ) from None
  677. elif exc_val:
  678. raise exc_val
  679. except BaseException as exc:
  680. if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__):
  681. return True
  682. raise
  683. return self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
  684. finally:
  685. del exc_val, exc_tb, self._exceptions
  686. def _spawn(
  687. self,
  688. func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
  689. args: tuple[Unpack[PosArgsT]],
  690. name: object,
  691. task_status_future: asyncio.Future | None = None,
  692. ) -> asyncio.Task:
  693. def task_done(_task: asyncio.Task) -> None:
  694. if sys.version_info >= (3, 14) and self.cancel_scope._host_task is not None:
  695. asyncio.future_discard_from_awaited_by(
  696. _task, self.cancel_scope._host_task
  697. )
  698. task_state = _task_states[_task]
  699. assert task_state.cancel_scope is not None
  700. assert _task in task_state.cancel_scope._tasks
  701. task_state.cancel_scope._tasks.remove(_task)
  702. self._tasks.remove(task)
  703. del _task_states[_task]
  704. if self._on_completed_fut is not None and not self._tasks:
  705. try:
  706. self._on_completed_fut.set_result(None)
  707. except asyncio.InvalidStateError:
  708. pass
  709. try:
  710. exc = _task.exception()
  711. except CancelledError as e:
  712. while isinstance(e.__context__, CancelledError):
  713. e = e.__context__
  714. exc = e
  715. if exc is not None:
  716. # The future can only be in the cancelled state if the host task was
  717. # cancelled, so return immediately instead of adding one more
  718. # CancelledError to the exceptions list
  719. if task_status_future is not None and task_status_future.cancelled():
  720. return
  721. if task_status_future is None or task_status_future.done():
  722. if not isinstance(exc, CancelledError):
  723. self._exceptions.append(exc)
  724. if not self.cancel_scope._effectively_cancelled:
  725. self.cancel_scope.cancel()
  726. else:
  727. task_status_future.set_exception(exc)
  728. elif task_status_future is not None and not task_status_future.done():
  729. task_status_future.set_exception(
  730. RuntimeError("Child exited without calling task_status.started()")
  731. )
  732. if not self._active:
  733. raise RuntimeError(
  734. "This task group is not active; no new tasks can be started."
  735. )
  736. kwargs = {}
  737. if task_status_future:
  738. parent_id = id(current_task())
  739. kwargs["task_status"] = _AsyncioTaskStatus(
  740. task_status_future, id(self.cancel_scope._host_task)
  741. )
  742. else:
  743. parent_id = id(self.cancel_scope._host_task)
  744. coro = func(*args, **kwargs)
  745. if not iscoroutine(coro):
  746. prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
  747. raise TypeError(
  748. f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
  749. f"the return value ({coro!r}) is not a coroutine object"
  750. )
  751. name = get_callable_name(func) if name is None else str(name)
  752. loop = asyncio.get_running_loop()
  753. if (
  754. (factory := loop.get_task_factory())
  755. and getattr(factory, "__code__", None) is _eager_task_factory_code
  756. and (closure := getattr(factory, "__closure__", None))
  757. ):
  758. custom_task_constructor = closure[0].cell_contents
  759. task = custom_task_constructor(coro, loop=loop, name=name)
  760. else:
  761. task = create_task(coro, name=name)
  762. # Make the spawned task inherit the task group's cancel scope
  763. _task_states[task] = TaskState(
  764. parent_id=parent_id, cancel_scope=self.cancel_scope
  765. )
  766. self.cancel_scope._tasks.add(task)
  767. self._tasks.add(task)
  768. if sys.version_info >= (3, 14) and self.cancel_scope._host_task is not None:
  769. asyncio.future_add_to_awaited_by(task, self.cancel_scope._host_task)
  770. task.add_done_callback(task_done)
  771. return task
  772. def start_soon(
  773. self,
  774. func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
  775. *args: Unpack[PosArgsT],
  776. name: object = None,
  777. ) -> None:
  778. self._spawn(func, args, name)
  779. async def start(
  780. self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
  781. ) -> Any:
  782. future: asyncio.Future = asyncio.Future()
  783. task = self._spawn(func, args, name, future)
  784. # If the task raises an exception after sending a start value without a switch
  785. # point between, the task group is cancelled and this method never proceeds to
  786. # process the completed future. That's why we have to have a shielded cancel
  787. # scope here.
  788. try:
  789. return await future
  790. except CancelledError:
  791. # Cancel the task and wait for it to exit before returning
  792. task.cancel()
  793. with CancelScope(shield=True), suppress(CancelledError):
  794. await task
  795. raise
  796. #
  797. # Threads
  798. #
  799. _Retval_Queue_Type = tuple[T_Retval | None, BaseException | None]
  800. class WorkerThread(Thread):
  801. MAX_IDLE_TIME = 10 # seconds
  802. def __init__(
  803. self,
  804. root_task: asyncio.Task,
  805. workers: set[WorkerThread],
  806. idle_workers: deque[WorkerThread],
  807. ):
  808. super().__init__(name="AnyIO worker thread")
  809. self.root_task = root_task
  810. self.workers = workers
  811. self.idle_workers = idle_workers
  812. self.loop = root_task._loop
  813. self.queue: Queue[
  814. tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
  815. ] = Queue(2)
  816. self.idle_since = AsyncIOBackend.current_time()
  817. self.stopping = False
  818. def _report_result(
  819. self, future: asyncio.Future, result: Any, exc: BaseException | None
  820. ) -> None:
  821. self.idle_since = AsyncIOBackend.current_time()
  822. if not self.stopping:
  823. self.idle_workers.append(self)
  824. if not future.cancelled():
  825. if exc is not None:
  826. if isinstance(exc, StopIteration):
  827. new_exc = RuntimeError("coroutine raised StopIteration")
  828. new_exc.__cause__ = exc
  829. exc = new_exc
  830. future.set_exception(exc)
  831. else:
  832. future.set_result(result)
  833. def run(self) -> None:
  834. with claim_worker_thread(AsyncIOBackend, self.loop):
  835. while True:
  836. item = self.queue.get()
  837. if item is None:
  838. # Shutdown command received
  839. return
  840. context, func, args, future, cancel_scope = item
  841. if not future.cancelled():
  842. result = None
  843. exception: BaseException | None = None
  844. threadlocals.current_cancel_scope = cancel_scope
  845. try:
  846. result = context.run(func, *args)
  847. except BaseException as exc:
  848. exception = exc
  849. finally:
  850. del threadlocals.current_cancel_scope
  851. if not self.loop.is_closed():
  852. self.loop.call_soon_threadsafe(
  853. self._report_result, future, result, exception
  854. )
  855. del result, exception
  856. self.queue.task_done()
  857. del item, context, func, args, future, cancel_scope
  858. def stop(self, f: asyncio.Task | None = None) -> None:
  859. self.stopping = True
  860. self.queue.put_nowait(None)
  861. self.workers.discard(self)
  862. try:
  863. self.idle_workers.remove(self)
  864. except ValueError:
  865. pass
  866. _threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
  867. "_threadpool_idle_workers"
  868. )
  869. _threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
  870. #
  871. # Subprocesses
  872. #
  873. @dataclass(eq=False)
  874. class StreamReaderWrapper(abc.ByteReceiveStream):
  875. _stream: asyncio.StreamReader
  876. async def receive(self, max_bytes: int = 65536) -> bytes:
  877. data = await self._stream.read(max_bytes)
  878. if data:
  879. return data
  880. else:
  881. raise EndOfStream
  882. async def aclose(self) -> None:
  883. self._stream.set_exception(ClosedResourceError())
  884. await AsyncIOBackend.checkpoint()
  885. @dataclass(eq=False)
  886. class StreamWriterWrapper(abc.ByteSendStream):
  887. _stream: asyncio.StreamWriter
  888. _closed: bool = field(init=False, default=False)
  889. async def send(self, item: bytes) -> None:
  890. await AsyncIOBackend.checkpoint_if_cancelled()
  891. stream_paused = self._stream._protocol._paused # type: ignore[attr-defined]
  892. try:
  893. self._stream.write(item)
  894. await self._stream.drain()
  895. except (ConnectionResetError, BrokenPipeError, RuntimeError) as exc:
  896. # If closed by us and/or the peer:
  897. # * on stdlib, drain() raises ConnectionResetError or BrokenPipeError
  898. # * on uvloop and Winloop, write() eventually starts raising RuntimeError
  899. if self._closed:
  900. raise ClosedResourceError from exc
  901. elif self._stream.is_closing():
  902. raise BrokenResourceError from exc
  903. raise
  904. if not stream_paused:
  905. await AsyncIOBackend.cancel_shielded_checkpoint()
  906. async def aclose(self) -> None:
  907. self._closed = True
  908. self._stream.close()
  909. await AsyncIOBackend.checkpoint()
  910. @dataclass(eq=False)
  911. class Process(abc.Process):
  912. _process: asyncio.subprocess.Process
  913. _stdin: StreamWriterWrapper | None
  914. _stdout: StreamReaderWrapper | None
  915. _stderr: StreamReaderWrapper | None
  916. async def aclose(self) -> None:
  917. with CancelScope(shield=True) as scope:
  918. if self._stdin:
  919. await self._stdin.aclose()
  920. if self._stdout:
  921. await self._stdout.aclose()
  922. if self._stderr:
  923. await self._stderr.aclose()
  924. scope.shield = False
  925. try:
  926. await self.wait()
  927. except BaseException:
  928. scope.shield = True
  929. self.kill()
  930. await self.wait()
  931. raise
  932. async def wait(self) -> int:
  933. return await self._process.wait()
  934. def terminate(self) -> None:
  935. self._process.terminate()
  936. def kill(self) -> None:
  937. self._process.kill()
  938. def send_signal(self, signal: int) -> None:
  939. self._process.send_signal(signal)
  940. @property
  941. def pid(self) -> int:
  942. return self._process.pid
  943. @property
  944. def returncode(self) -> int | None:
  945. return self._process.returncode
  946. @property
  947. def stdin(self) -> abc.ByteSendStream | None:
  948. return self._stdin
  949. @property
  950. def stdout(self) -> abc.ByteReceiveStream | None:
  951. return self._stdout
  952. @property
  953. def stderr(self) -> abc.ByteReceiveStream | None:
  954. return self._stderr
  955. def _forcibly_shutdown_process_pool_on_exit(
  956. workers: set[Process], _task: object
  957. ) -> None:
  958. """
  959. Forcibly shuts down worker processes belonging to this event loop."""
  960. child_watcher: asyncio.AbstractChildWatcher | None = None # type: ignore[name-defined]
  961. if sys.version_info < (3, 12):
  962. try:
  963. child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
  964. except NotImplementedError:
  965. pass
  966. # Close as much as possible (w/o async/await) to avoid warnings
  967. for process in workers.copy():
  968. if process.returncode is not None:
  969. continue
  970. process._stdin._stream._transport.close() # type: ignore[union-attr]
  971. process._stdout._stream._transport.close() # type: ignore[union-attr]
  972. process._stderr._stream._transport.close() # type: ignore[union-attr]
  973. process.kill()
  974. if child_watcher:
  975. child_watcher.remove_child_handler(process.pid)
  976. async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
  977. """
  978. Shuts down worker processes belonging to this event loop.
  979. NOTE: this only works when the event loop was started using asyncio.run() or
  980. anyio.run().
  981. """
  982. process: abc.Process
  983. try:
  984. await sleep(math.inf)
  985. except asyncio.CancelledError:
  986. workers = workers.copy()
  987. for process in workers:
  988. if process.returncode is None:
  989. process.kill()
  990. for process in workers:
  991. await process.aclose()
  992. #
  993. # Sockets and networking
  994. #
  995. class StreamProtocol(asyncio.Protocol):
  996. read_queue: deque[bytes]
  997. read_event: asyncio.Event
  998. write_event: asyncio.Event
  999. exception: Exception | None = None
  1000. is_at_eof: bool = False
  1001. def connection_made(self, transport: asyncio.BaseTransport) -> None:
  1002. self.read_queue = deque()
  1003. self.read_event = asyncio.Event()
  1004. self.write_event = asyncio.Event()
  1005. self.write_event.set()
  1006. cast(asyncio.Transport, transport).set_write_buffer_limits(0)
  1007. def connection_lost(self, exc: Exception | None) -> None:
  1008. if exc:
  1009. self.exception = exc
  1010. self.read_event.set()
  1011. self.write_event.set()
  1012. def data_received(self, data: bytes) -> None:
  1013. # ProactorEventloop sometimes sends bytearray instead of bytes
  1014. self.read_queue.append(bytes(data))
  1015. self.read_event.set()
  1016. def eof_received(self) -> bool | None:
  1017. self.is_at_eof = True
  1018. self.read_event.set()
  1019. return True
  1020. def pause_writing(self) -> None:
  1021. self.write_event = asyncio.Event()
  1022. def resume_writing(self) -> None:
  1023. self.write_event.set()
  1024. class DatagramProtocol(asyncio.DatagramProtocol):
  1025. read_queue: deque[tuple[bytes, IPSockAddrType]]
  1026. read_event: asyncio.Event
  1027. write_event: asyncio.Event
  1028. exception: Exception | None = None
  1029. def connection_made(self, transport: asyncio.BaseTransport) -> None:
  1030. self.read_queue = deque(maxlen=100) # arbitrary value
  1031. self.read_event = asyncio.Event()
  1032. self.write_event = asyncio.Event()
  1033. self.write_event.set()
  1034. def connection_lost(self, exc: Exception | None) -> None:
  1035. self.read_event.set()
  1036. self.write_event.set()
  1037. def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
  1038. addr = convert_ipv6_sockaddr(addr)
  1039. self.read_queue.append((data, addr))
  1040. self.read_event.set()
  1041. def error_received(self, exc: Exception) -> None:
  1042. self.exception = exc
  1043. def pause_writing(self) -> None:
  1044. self.write_event.clear()
  1045. def resume_writing(self) -> None:
  1046. self.write_event.set()
  1047. class SocketStream(abc.SocketStream):
  1048. def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
  1049. self._transport = transport
  1050. self._protocol = protocol
  1051. self._receive_guard = ResourceGuard("reading from")
  1052. self._send_guard = ResourceGuard("writing to")
  1053. self._closed = False
  1054. @property
  1055. def _raw_socket(self) -> socket.socket:
  1056. return self._transport.get_extra_info("socket")
  1057. async def receive(self, max_bytes: int = 65536) -> bytes:
  1058. with self._receive_guard:
  1059. if (
  1060. not self._protocol.read_event.is_set()
  1061. and not self._transport.is_closing()
  1062. and not self._protocol.is_at_eof
  1063. ):
  1064. self._transport.resume_reading()
  1065. await self._protocol.read_event.wait()
  1066. self._transport.pause_reading()
  1067. else:
  1068. await AsyncIOBackend.checkpoint()
  1069. try:
  1070. chunk = self._protocol.read_queue.popleft()
  1071. except IndexError:
  1072. if self._closed:
  1073. raise ClosedResourceError from None
  1074. elif self._protocol.exception:
  1075. raise BrokenResourceError from self._protocol.exception
  1076. else:
  1077. raise EndOfStream from None
  1078. if len(chunk) > max_bytes:
  1079. # Split the oversized chunk
  1080. chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
  1081. self._protocol.read_queue.appendleft(leftover)
  1082. # If the read queue is empty, clear the flag so that the next call will
  1083. # block until data is available
  1084. if not self._protocol.read_queue:
  1085. self._protocol.read_event.clear()
  1086. return chunk
  1087. async def send(self, item: bytes) -> None:
  1088. with self._send_guard:
  1089. await AsyncIOBackend.checkpoint()
  1090. if self._closed:
  1091. raise ClosedResourceError
  1092. elif self._protocol.exception is not None:
  1093. raise BrokenResourceError from self._protocol.exception
  1094. try:
  1095. self._transport.write(item)
  1096. except RuntimeError as exc:
  1097. if self._transport.is_closing():
  1098. raise BrokenResourceError from exc
  1099. else:
  1100. raise
  1101. await self._protocol.write_event.wait()
  1102. async def send_eof(self) -> None:
  1103. try:
  1104. self._transport.write_eof()
  1105. except OSError:
  1106. pass
  1107. async def aclose(self) -> None:
  1108. self._closed = True
  1109. if not self._transport.is_closing():
  1110. try:
  1111. self._transport.write_eof()
  1112. except OSError:
  1113. pass
  1114. self._transport.close()
  1115. await sleep(0)
  1116. self._transport.abort()
  1117. class _RawSocketMixin:
  1118. _receive_future: asyncio.Future | None = None
  1119. _send_future: asyncio.Future | None = None
  1120. _closing = False
  1121. def __init__(self, raw_socket: socket.socket):
  1122. self.__raw_socket = raw_socket
  1123. self._receive_guard = ResourceGuard("reading from")
  1124. self._send_guard = ResourceGuard("writing to")
  1125. @property
  1126. def _raw_socket(self) -> socket.socket:
  1127. return self.__raw_socket
  1128. def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
  1129. def callback(f: object) -> None:
  1130. del self._receive_future
  1131. loop.remove_reader(self.__raw_socket)
  1132. f = self._receive_future = asyncio.Future()
  1133. loop.add_reader(self.__raw_socket, f.set_result, None)
  1134. f.add_done_callback(callback)
  1135. return f
  1136. def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
  1137. def callback(f: object) -> None:
  1138. del self._send_future
  1139. loop.remove_writer(self.__raw_socket)
  1140. f = self._send_future = asyncio.Future()
  1141. loop.add_writer(self.__raw_socket, f.set_result, None)
  1142. f.add_done_callback(callback)
  1143. return f
  1144. async def aclose(self) -> None:
  1145. if not self._closing:
  1146. self._closing = True
  1147. if self.__raw_socket.fileno() != -1:
  1148. self.__raw_socket.close()
  1149. if self._receive_future:
  1150. self._receive_future.set_result(None)
  1151. if self._send_future:
  1152. self._send_future.set_result(None)
  1153. class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
  1154. async def send_eof(self) -> None:
  1155. with self._send_guard:
  1156. self._raw_socket.shutdown(socket.SHUT_WR)
  1157. async def receive(self, max_bytes: int = 65536) -> bytes:
  1158. loop = get_running_loop()
  1159. await AsyncIOBackend.checkpoint()
  1160. with self._receive_guard:
  1161. while True:
  1162. try:
  1163. data = self._raw_socket.recv(max_bytes)
  1164. except BlockingIOError:
  1165. await self._wait_until_readable(loop)
  1166. except OSError as exc:
  1167. if self._closing:
  1168. raise ClosedResourceError from None
  1169. else:
  1170. raise BrokenResourceError from exc
  1171. else:
  1172. if not data:
  1173. raise EndOfStream
  1174. return data
  1175. async def send(self, item: bytes) -> None:
  1176. loop = get_running_loop()
  1177. await AsyncIOBackend.checkpoint()
  1178. with self._send_guard:
  1179. view = memoryview(item)
  1180. while view:
  1181. try:
  1182. bytes_sent = self._raw_socket.send(view)
  1183. except BlockingIOError:
  1184. await self._wait_until_writable(loop)
  1185. except OSError as exc:
  1186. if self._closing:
  1187. raise ClosedResourceError from None
  1188. else:
  1189. raise BrokenResourceError from exc
  1190. else:
  1191. view = view[bytes_sent:]
  1192. async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
  1193. if not isinstance(msglen, int) or msglen < 0:
  1194. raise ValueError("msglen must be a non-negative integer")
  1195. if not isinstance(maxfds, int) or maxfds < 1:
  1196. raise ValueError("maxfds must be a positive integer")
  1197. loop = get_running_loop()
  1198. fds = array.array("i")
  1199. await AsyncIOBackend.checkpoint()
  1200. with self._receive_guard:
  1201. while True:
  1202. try:
  1203. message, ancdata, flags, addr = self._raw_socket.recvmsg(
  1204. msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
  1205. )
  1206. except BlockingIOError:
  1207. await self._wait_until_readable(loop)
  1208. except OSError as exc:
  1209. if self._closing:
  1210. raise ClosedResourceError from None
  1211. else:
  1212. raise BrokenResourceError from exc
  1213. else:
  1214. if not message and not ancdata:
  1215. raise EndOfStream
  1216. break
  1217. for cmsg_level, cmsg_type, cmsg_data in ancdata:
  1218. if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
  1219. raise RuntimeError(
  1220. f"Received unexpected ancillary data; message = {message!r}, "
  1221. f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
  1222. )
  1223. fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
  1224. return message, list(fds)
  1225. async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
  1226. if not message:
  1227. raise ValueError("message must not be empty")
  1228. if not fds:
  1229. raise ValueError("fds must not be empty")
  1230. loop = get_running_loop()
  1231. filenos: list[int] = []
  1232. for fd in fds:
  1233. if isinstance(fd, int):
  1234. filenos.append(fd)
  1235. elif isinstance(fd, IOBase):
  1236. filenos.append(fd.fileno())
  1237. fdarray = array.array("i", filenos)
  1238. await AsyncIOBackend.checkpoint()
  1239. with self._send_guard:
  1240. while True:
  1241. try:
  1242. # The ignore can be removed after mypy picks up
  1243. # https://github.com/python/typeshed/pull/5545
  1244. self._raw_socket.sendmsg(
  1245. [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
  1246. )
  1247. break
  1248. except BlockingIOError:
  1249. await self._wait_until_writable(loop)
  1250. except OSError as exc:
  1251. if self._closing:
  1252. raise ClosedResourceError from None
  1253. else:
  1254. raise BrokenResourceError from exc
  1255. class TCPSocketListener(abc.SocketListener):
  1256. _accept_scope: CancelScope | None = None
  1257. _closed = False
  1258. def __init__(self, raw_socket: socket.socket):
  1259. self.__raw_socket = raw_socket
  1260. self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
  1261. self._accept_guard = ResourceGuard("accepting connections from")
  1262. @property
  1263. def _raw_socket(self) -> socket.socket:
  1264. return self.__raw_socket
  1265. async def accept(self) -> abc.SocketStream:
  1266. if self._closed:
  1267. raise ClosedResourceError
  1268. with self._accept_guard:
  1269. await AsyncIOBackend.checkpoint()
  1270. with CancelScope() as self._accept_scope:
  1271. try:
  1272. client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
  1273. except asyncio.CancelledError:
  1274. # Workaround for https://bugs.python.org/issue41317
  1275. try:
  1276. self._loop.remove_reader(self._raw_socket)
  1277. except (ValueError, NotImplementedError):
  1278. pass
  1279. if self._closed:
  1280. raise ClosedResourceError from None
  1281. raise
  1282. finally:
  1283. self._accept_scope = None
  1284. client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  1285. transport, protocol = await self._loop.connect_accepted_socket(
  1286. StreamProtocol, client_sock
  1287. )
  1288. return SocketStream(transport, protocol)
  1289. async def aclose(self) -> None:
  1290. if self._closed:
  1291. return
  1292. self._closed = True
  1293. if self._accept_scope:
  1294. # Workaround for https://bugs.python.org/issue41317
  1295. try:
  1296. self._loop.remove_reader(self._raw_socket)
  1297. except (ValueError, NotImplementedError):
  1298. pass
  1299. self._accept_scope.cancel()
  1300. await sleep(0)
  1301. self._raw_socket.close()
  1302. class UNIXSocketListener(abc.SocketListener):
  1303. def __init__(self, raw_socket: socket.socket):
  1304. self.__raw_socket = raw_socket
  1305. self._loop = get_running_loop()
  1306. self._accept_guard = ResourceGuard("accepting connections from")
  1307. self._closed = False
  1308. async def accept(self) -> abc.SocketStream:
  1309. await AsyncIOBackend.checkpoint()
  1310. with self._accept_guard:
  1311. while True:
  1312. try:
  1313. client_sock, _ = self.__raw_socket.accept()
  1314. client_sock.setblocking(False)
  1315. return UNIXSocketStream(client_sock)
  1316. except BlockingIOError:
  1317. f: asyncio.Future = asyncio.Future()
  1318. self._loop.add_reader(self.__raw_socket, f.set_result, None)
  1319. f.add_done_callback(
  1320. lambda _: self._loop.remove_reader(self.__raw_socket)
  1321. )
  1322. await f
  1323. except OSError as exc:
  1324. if self._closed:
  1325. raise ClosedResourceError from None
  1326. else:
  1327. raise BrokenResourceError from exc
  1328. async def aclose(self) -> None:
  1329. self._closed = True
  1330. self.__raw_socket.close()
  1331. @property
  1332. def _raw_socket(self) -> socket.socket:
  1333. return self.__raw_socket
  1334. class UDPSocket(abc.UDPSocket):
  1335. def __init__(
  1336. self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
  1337. ):
  1338. self._transport = transport
  1339. self._protocol = protocol
  1340. self._receive_guard = ResourceGuard("reading from")
  1341. self._send_guard = ResourceGuard("writing to")
  1342. self._closed = False
  1343. @property
  1344. def _raw_socket(self) -> socket.socket:
  1345. return self._transport.get_extra_info("socket")
  1346. async def aclose(self) -> None:
  1347. self._closed = True
  1348. if not self._transport.is_closing():
  1349. self._transport.close()
  1350. async def receive(self) -> tuple[bytes, IPSockAddrType]:
  1351. with self._receive_guard:
  1352. await AsyncIOBackend.checkpoint()
  1353. # If the buffer is empty, ask for more data
  1354. if not self._protocol.read_queue and not self._transport.is_closing():
  1355. self._protocol.read_event.clear()
  1356. await self._protocol.read_event.wait()
  1357. try:
  1358. return self._protocol.read_queue.popleft()
  1359. except IndexError:
  1360. if self._closed:
  1361. raise ClosedResourceError from None
  1362. else:
  1363. raise BrokenResourceError from None
  1364. async def send(self, item: UDPPacketType) -> None:
  1365. with self._send_guard:
  1366. await AsyncIOBackend.checkpoint()
  1367. await self._protocol.write_event.wait()
  1368. if self._closed:
  1369. raise ClosedResourceError
  1370. elif self._transport.is_closing():
  1371. raise BrokenResourceError
  1372. else:
  1373. self._transport.sendto(*item)
  1374. class ConnectedUDPSocket(abc.ConnectedUDPSocket):
  1375. def __init__(
  1376. self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
  1377. ):
  1378. self._transport = transport
  1379. self._protocol = protocol
  1380. self._receive_guard = ResourceGuard("reading from")
  1381. self._send_guard = ResourceGuard("writing to")
  1382. self._closed = False
  1383. @property
  1384. def _raw_socket(self) -> socket.socket:
  1385. return self._transport.get_extra_info("socket")
  1386. async def aclose(self) -> None:
  1387. self._closed = True
  1388. if not self._transport.is_closing():
  1389. self._transport.close()
  1390. async def receive(self) -> bytes:
  1391. with self._receive_guard:
  1392. await AsyncIOBackend.checkpoint()
  1393. # If the buffer is empty, ask for more data
  1394. if not self._protocol.read_queue and not self._transport.is_closing():
  1395. self._protocol.read_event.clear()
  1396. await self._protocol.read_event.wait()
  1397. try:
  1398. packet = self._protocol.read_queue.popleft()
  1399. except IndexError:
  1400. if self._closed:
  1401. raise ClosedResourceError from None
  1402. else:
  1403. raise BrokenResourceError from None
  1404. return packet[0]
  1405. async def send(self, item: bytes) -> None:
  1406. with self._send_guard:
  1407. await AsyncIOBackend.checkpoint()
  1408. await self._protocol.write_event.wait()
  1409. if self._closed:
  1410. raise ClosedResourceError
  1411. elif self._transport.is_closing():
  1412. raise BrokenResourceError
  1413. else:
  1414. self._transport.sendto(item)
  1415. class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
  1416. async def receive(self) -> UNIXDatagramPacketType:
  1417. loop = get_running_loop()
  1418. await AsyncIOBackend.checkpoint()
  1419. with self._receive_guard:
  1420. while True:
  1421. try:
  1422. data = self._raw_socket.recvfrom(65536)
  1423. except BlockingIOError:
  1424. await self._wait_until_readable(loop)
  1425. except OSError as exc:
  1426. if self._closing:
  1427. raise ClosedResourceError from None
  1428. else:
  1429. raise BrokenResourceError from exc
  1430. else:
  1431. return data
  1432. async def send(self, item: UNIXDatagramPacketType) -> None:
  1433. loop = get_running_loop()
  1434. await AsyncIOBackend.checkpoint()
  1435. with self._send_guard:
  1436. while True:
  1437. try:
  1438. self._raw_socket.sendto(*item)
  1439. except BlockingIOError:
  1440. await self._wait_until_writable(loop)
  1441. except OSError as exc:
  1442. if self._closing:
  1443. raise ClosedResourceError from None
  1444. else:
  1445. raise BrokenResourceError from exc
  1446. else:
  1447. return
  1448. class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
  1449. async def receive(self) -> bytes:
  1450. loop = get_running_loop()
  1451. await AsyncIOBackend.checkpoint()
  1452. with self._receive_guard:
  1453. while True:
  1454. try:
  1455. data = self._raw_socket.recv(65536)
  1456. except BlockingIOError:
  1457. await self._wait_until_readable(loop)
  1458. except OSError as exc:
  1459. if self._closing:
  1460. raise ClosedResourceError from None
  1461. else:
  1462. raise BrokenResourceError from exc
  1463. else:
  1464. return data
  1465. async def send(self, item: bytes) -> None:
  1466. loop = get_running_loop()
  1467. await AsyncIOBackend.checkpoint()
  1468. with self._send_guard:
  1469. while True:
  1470. try:
  1471. self._raw_socket.send(item)
  1472. except BlockingIOError:
  1473. await self._wait_until_writable(loop)
  1474. except OSError as exc:
  1475. if self._closing:
  1476. raise ClosedResourceError from None
  1477. else:
  1478. raise BrokenResourceError from exc
  1479. else:
  1480. return
  1481. _read_events: RunVar[dict[int, asyncio.Future[bool]]] = RunVar("read_events")
  1482. _write_events: RunVar[dict[int, asyncio.Future[bool]]] = RunVar("write_events")
  1483. #
  1484. # Synchronization
  1485. #
  1486. class Event(BaseEvent):
  1487. def __new__(cls) -> Event:
  1488. return object.__new__(cls)
  1489. def __init__(self) -> None:
  1490. self._event = asyncio.Event()
  1491. def set(self) -> None:
  1492. self._event.set()
  1493. def is_set(self) -> bool:
  1494. return self._event.is_set()
  1495. async def wait(self) -> None:
  1496. if self.is_set():
  1497. await AsyncIOBackend.checkpoint()
  1498. else:
  1499. await self._event.wait()
  1500. def statistics(self) -> EventStatistics:
  1501. return EventStatistics(len(self._event._waiters))
  1502. class Lock(BaseLock):
  1503. def __new__(cls, *, fast_acquire: bool = False) -> Lock:
  1504. return object.__new__(cls)
  1505. def __init__(self, *, fast_acquire: bool = False) -> None:
  1506. self._fast_acquire = fast_acquire
  1507. self._owner_task: asyncio.Task | None = None
  1508. self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque()
  1509. async def acquire(self) -> None:
  1510. task = cast(asyncio.Task, current_task())
  1511. if self._owner_task is None and not self._waiters:
  1512. await AsyncIOBackend.checkpoint_if_cancelled()
  1513. self._owner_task = task
  1514. # Unless on the "fast path", yield control of the event loop so that other
  1515. # tasks can run too
  1516. if not self._fast_acquire:
  1517. try:
  1518. await AsyncIOBackend.cancel_shielded_checkpoint()
  1519. except CancelledError:
  1520. self.release()
  1521. raise
  1522. return
  1523. if self._owner_task == task:
  1524. raise RuntimeError("Attempted to acquire an already held Lock")
  1525. fut: asyncio.Future[None] = asyncio.Future()
  1526. item = task, fut
  1527. self._waiters.append(item)
  1528. try:
  1529. await fut
  1530. except CancelledError:
  1531. self._waiters.remove(item)
  1532. if self._owner_task is task:
  1533. self.release()
  1534. raise
  1535. self._waiters.remove(item)
  1536. def acquire_nowait(self) -> None:
  1537. task = cast(asyncio.Task, current_task())
  1538. if self._owner_task is None and not self._waiters:
  1539. self._owner_task = task
  1540. return
  1541. if self._owner_task is task:
  1542. raise RuntimeError("Attempted to acquire an already held Lock")
  1543. raise WouldBlock
  1544. def locked(self) -> bool:
  1545. return self._owner_task is not None
  1546. def release(self) -> None:
  1547. if self._owner_task != current_task():
  1548. raise RuntimeError("The current task is not holding this lock")
  1549. for task, fut in self._waiters:
  1550. if not fut.cancelled():
  1551. self._owner_task = task
  1552. fut.set_result(None)
  1553. return
  1554. self._owner_task = None
  1555. def statistics(self) -> LockStatistics:
  1556. task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None
  1557. return LockStatistics(self.locked(), task_info, len(self._waiters))
  1558. class Semaphore(BaseSemaphore):
  1559. def __new__(
  1560. cls,
  1561. initial_value: int,
  1562. *,
  1563. max_value: int | None = None,
  1564. fast_acquire: bool = False,
  1565. ) -> Semaphore:
  1566. return object.__new__(cls)
  1567. def __init__(
  1568. self,
  1569. initial_value: int,
  1570. *,
  1571. max_value: int | None = None,
  1572. fast_acquire: bool = False,
  1573. ):
  1574. super().__init__(initial_value, max_value=max_value)
  1575. self._value = initial_value
  1576. self._max_value = max_value
  1577. self._fast_acquire = fast_acquire
  1578. self._waiters: deque[asyncio.Future[None]] = deque()
  1579. async def acquire(self) -> None:
  1580. if self._value > 0 and not self._waiters:
  1581. await AsyncIOBackend.checkpoint_if_cancelled()
  1582. self._value -= 1
  1583. # Unless on the "fast path", yield control of the event loop so that other
  1584. # tasks can run too
  1585. if not self._fast_acquire:
  1586. try:
  1587. await AsyncIOBackend.cancel_shielded_checkpoint()
  1588. except CancelledError:
  1589. self.release()
  1590. raise
  1591. return
  1592. fut: asyncio.Future[None] = asyncio.Future()
  1593. self._waiters.append(fut)
  1594. try:
  1595. await fut
  1596. except CancelledError:
  1597. try:
  1598. self._waiters.remove(fut)
  1599. except ValueError:
  1600. self.release()
  1601. raise
  1602. def acquire_nowait(self) -> None:
  1603. if self._value == 0:
  1604. raise WouldBlock
  1605. self._value -= 1
  1606. def release(self) -> None:
  1607. if self._max_value is not None and self._value == self._max_value:
  1608. raise ValueError("semaphore released too many times")
  1609. for fut in self._waiters:
  1610. if not fut.cancelled():
  1611. fut.set_result(None)
  1612. self._waiters.remove(fut)
  1613. return
  1614. self._value += 1
  1615. @property
  1616. def value(self) -> int:
  1617. return self._value
  1618. @property
  1619. def max_value(self) -> int | None:
  1620. return self._max_value
  1621. def statistics(self) -> SemaphoreStatistics:
  1622. return SemaphoreStatistics(len(self._waiters))
  1623. class CapacityLimiter(BaseCapacityLimiter):
  1624. _total_tokens: float = 0
  1625. def __new__(cls, total_tokens: float) -> CapacityLimiter:
  1626. return object.__new__(cls)
  1627. def __init__(self, total_tokens: float):
  1628. self._borrowers: set[Any] = set()
  1629. self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
  1630. self.total_tokens = total_tokens
  1631. async def __aenter__(self) -> None:
  1632. await self.acquire()
  1633. async def __aexit__(
  1634. self,
  1635. exc_type: type[BaseException] | None,
  1636. exc_val: BaseException | None,
  1637. exc_tb: TracebackType | None,
  1638. ) -> None:
  1639. self.release()
  1640. @property
  1641. def total_tokens(self) -> float:
  1642. return self._total_tokens
  1643. @total_tokens.setter
  1644. def total_tokens(self, value: float) -> None:
  1645. if not isinstance(value, int) and not math.isinf(value):
  1646. raise TypeError("total_tokens must be an int or math.inf")
  1647. if value < 0:
  1648. raise ValueError("total_tokens must be >= 0")
  1649. waiters_to_notify = max(value - self._total_tokens, 0)
  1650. self._total_tokens = value
  1651. # Notify waiting tasks that they have acquired the limiter
  1652. while self._wait_queue and waiters_to_notify:
  1653. event = self._wait_queue.popitem(last=False)[1]
  1654. event.set()
  1655. waiters_to_notify -= 1
  1656. @property
  1657. def borrowed_tokens(self) -> int:
  1658. return len(self._borrowers)
  1659. @property
  1660. def available_tokens(self) -> float:
  1661. return self._total_tokens - len(self._borrowers)
  1662. def _notify_next_waiter(self) -> None:
  1663. """Notify the next task in line if this limiter has free capacity now."""
  1664. if self._wait_queue and len(self._borrowers) < self._total_tokens:
  1665. event = self._wait_queue.popitem(last=False)[1]
  1666. event.set()
  1667. def acquire_nowait(self) -> None:
  1668. self.acquire_on_behalf_of_nowait(current_task())
  1669. def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
  1670. if borrower in self._borrowers:
  1671. raise RuntimeError(
  1672. "this borrower is already holding one of this CapacityLimiter's tokens"
  1673. )
  1674. if self._wait_queue or len(self._borrowers) >= self._total_tokens:
  1675. raise WouldBlock
  1676. self._borrowers.add(borrower)
  1677. async def acquire(self) -> None:
  1678. return await self.acquire_on_behalf_of(current_task())
  1679. async def acquire_on_behalf_of(self, borrower: object) -> None:
  1680. await AsyncIOBackend.checkpoint_if_cancelled()
  1681. try:
  1682. self.acquire_on_behalf_of_nowait(borrower)
  1683. except WouldBlock:
  1684. event = asyncio.Event()
  1685. self._wait_queue[borrower] = event
  1686. try:
  1687. await event.wait()
  1688. except BaseException:
  1689. self._wait_queue.pop(borrower, None)
  1690. if event.is_set():
  1691. self._notify_next_waiter()
  1692. raise
  1693. self._borrowers.add(borrower)
  1694. else:
  1695. try:
  1696. await AsyncIOBackend.cancel_shielded_checkpoint()
  1697. except BaseException:
  1698. self.release()
  1699. raise
  1700. def release(self) -> None:
  1701. self.release_on_behalf_of(current_task())
  1702. def release_on_behalf_of(self, borrower: object) -> None:
  1703. try:
  1704. self._borrowers.remove(borrower)
  1705. except KeyError:
  1706. raise RuntimeError(
  1707. "this borrower isn't holding any of this CapacityLimiter's tokens"
  1708. ) from None
  1709. self._notify_next_waiter()
  1710. def statistics(self) -> CapacityLimiterStatistics:
  1711. return CapacityLimiterStatistics(
  1712. self.borrowed_tokens,
  1713. self.total_tokens,
  1714. tuple(self._borrowers),
  1715. len(self._wait_queue),
  1716. )
  1717. _default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
  1718. #
  1719. # Operating system signals
  1720. #
  1721. class _SignalReceiver:
  1722. def __init__(self, signals: tuple[Signals, ...]):
  1723. self._signals = signals
  1724. self._loop = get_running_loop()
  1725. self._signal_queue: deque[Signals] = deque()
  1726. self._future: asyncio.Future = asyncio.Future()
  1727. self._handled_signals: set[Signals] = set()
  1728. def _deliver(self, signum: Signals) -> None:
  1729. self._signal_queue.append(signum)
  1730. if not self._future.done():
  1731. self._future.set_result(None)
  1732. def __enter__(self) -> _SignalReceiver:
  1733. for sig in set(self._signals):
  1734. self._loop.add_signal_handler(sig, self._deliver, sig)
  1735. self._handled_signals.add(sig)
  1736. return self
  1737. def __exit__(
  1738. self,
  1739. exc_type: type[BaseException] | None,
  1740. exc_val: BaseException | None,
  1741. exc_tb: TracebackType | None,
  1742. ) -> None:
  1743. for sig in self._handled_signals:
  1744. self._loop.remove_signal_handler(sig)
  1745. def __aiter__(self) -> _SignalReceiver:
  1746. return self
  1747. async def __anext__(self) -> Signals:
  1748. await AsyncIOBackend.checkpoint()
  1749. if not self._signal_queue:
  1750. self._future = asyncio.Future()
  1751. await self._future
  1752. return self._signal_queue.popleft()
  1753. #
  1754. # Testing and debugging
  1755. #
  1756. class AsyncIOTaskInfo(TaskInfo):
  1757. def __init__(self, task: asyncio.Task):
  1758. task_state = _task_states.get(task)
  1759. if task_state is None:
  1760. parent_id = None
  1761. else:
  1762. parent_id = task_state.parent_id
  1763. coro = task.get_coro()
  1764. assert coro is not None, "created TaskInfo from a completed Task"
  1765. super().__init__(id(task), parent_id, task.get_name(), coro)
  1766. self._task = weakref.ref(task)
  1767. def has_pending_cancellation(self) -> bool:
  1768. if not (task := self._task()):
  1769. # If the task isn't around anymore, it won't have a pending cancellation
  1770. return False
  1771. if task._must_cancel: # type: ignore[attr-defined]
  1772. return True
  1773. elif (
  1774. isinstance(task._fut_waiter, asyncio.Future) # type: ignore[attr-defined]
  1775. and task._fut_waiter.cancelled() # type: ignore[attr-defined]
  1776. ):
  1777. return True
  1778. if task_state := _task_states.get(task):
  1779. if cancel_scope := task_state.cancel_scope:
  1780. return cancel_scope._effectively_cancelled
  1781. return False
  1782. class TestRunner(abc.TestRunner):
  1783. _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
  1784. def __init__(
  1785. self,
  1786. *,
  1787. debug: bool | None = None,
  1788. use_uvloop: bool = False,
  1789. loop_factory: Callable[[], AbstractEventLoop] | None = None,
  1790. ) -> None:
  1791. if use_uvloop and loop_factory is None:
  1792. if sys.platform != "win32":
  1793. import uvloop
  1794. loop_factory = uvloop.new_event_loop
  1795. else:
  1796. import winloop
  1797. loop_factory = winloop.new_event_loop
  1798. self._runner = Runner(debug=debug, loop_factory=loop_factory)
  1799. self._exceptions: list[BaseException] = []
  1800. self._runner_task: asyncio.Task | None = None
  1801. def __enter__(self) -> TestRunner:
  1802. self._runner.__enter__()
  1803. self.get_loop().set_exception_handler(self._exception_handler)
  1804. return self
  1805. def __exit__(
  1806. self,
  1807. exc_type: type[BaseException] | None,
  1808. exc_val: BaseException | None,
  1809. exc_tb: TracebackType | None,
  1810. ) -> None:
  1811. self._runner.__exit__(exc_type, exc_val, exc_tb)
  1812. def get_loop(self) -> AbstractEventLoop:
  1813. return self._runner.get_loop()
  1814. def _exception_handler(
  1815. self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
  1816. ) -> None:
  1817. if isinstance(context.get("exception"), Exception):
  1818. self._exceptions.append(context["exception"])
  1819. else:
  1820. loop.default_exception_handler(context)
  1821. def _raise_async_exceptions(self) -> None:
  1822. # Re-raise any exceptions raised in asynchronous callbacks
  1823. if self._exceptions:
  1824. exceptions, self._exceptions = self._exceptions, []
  1825. if len(exceptions) == 1:
  1826. raise exceptions[0]
  1827. elif exceptions:
  1828. raise BaseExceptionGroup(
  1829. "Multiple exceptions occurred in asynchronous callbacks", exceptions
  1830. )
  1831. async def _run_tests_and_fixtures(
  1832. self,
  1833. receive_stream: MemoryObjectReceiveStream[
  1834. tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
  1835. ],
  1836. ) -> None:
  1837. from _pytest.outcomes import OutcomeException
  1838. with receive_stream, self._send_stream:
  1839. async for coro, future in receive_stream:
  1840. try:
  1841. retval = await coro
  1842. except CancelledError as exc:
  1843. if not future.cancelled():
  1844. future.cancel(*exc.args)
  1845. raise
  1846. except BaseException as exc:
  1847. if not future.cancelled():
  1848. future.set_exception(exc)
  1849. if not isinstance(exc, (Exception, OutcomeException)):
  1850. raise
  1851. else:
  1852. if not future.cancelled():
  1853. future.set_result(retval)
  1854. async def _call_in_runner_task(
  1855. self,
  1856. func: Callable[P, Awaitable[T_Retval]],
  1857. /,
  1858. *args: P.args,
  1859. **kwargs: P.kwargs,
  1860. ) -> T_Retval:
  1861. if not self._runner_task:
  1862. self._send_stream, receive_stream = create_memory_object_stream[
  1863. tuple[Awaitable[Any], asyncio.Future]
  1864. ](1)
  1865. self._runner_task = self.get_loop().create_task(
  1866. self._run_tests_and_fixtures(receive_stream)
  1867. )
  1868. coro = func(*args, **kwargs)
  1869. future: asyncio.Future[T_Retval] = self.get_loop().create_future()
  1870. self._send_stream.send_nowait((coro, future))
  1871. return await future
  1872. def run_asyncgen_fixture(
  1873. self,
  1874. fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
  1875. kwargs: dict[str, Any],
  1876. ) -> Iterable[T_Retval]:
  1877. asyncgen = fixture_func(**kwargs)
  1878. fixturevalue: T_Retval = self.get_loop().run_until_complete(
  1879. self._call_in_runner_task(asyncgen.asend, None)
  1880. )
  1881. self._raise_async_exceptions()
  1882. yield fixturevalue
  1883. try:
  1884. self.get_loop().run_until_complete(
  1885. self._call_in_runner_task(asyncgen.asend, None)
  1886. )
  1887. except StopAsyncIteration:
  1888. self._raise_async_exceptions()
  1889. else:
  1890. self.get_loop().run_until_complete(asyncgen.aclose())
  1891. raise RuntimeError("Async generator fixture did not stop")
  1892. def run_fixture(
  1893. self,
  1894. fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
  1895. kwargs: dict[str, Any],
  1896. ) -> T_Retval:
  1897. retval = self.get_loop().run_until_complete(
  1898. self._call_in_runner_task(fixture_func, **kwargs)
  1899. )
  1900. self._raise_async_exceptions()
  1901. return retval
  1902. def run_test(
  1903. self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
  1904. ) -> None:
  1905. try:
  1906. self.get_loop().run_until_complete(
  1907. self._call_in_runner_task(test_func, **kwargs)
  1908. )
  1909. except Exception as exc:
  1910. self._exceptions.append(exc)
  1911. self._raise_async_exceptions()
  1912. class AsyncIOBackend(AsyncBackend):
  1913. @classmethod
  1914. def run(
  1915. cls,
  1916. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  1917. args: tuple[Unpack[PosArgsT]],
  1918. kwargs: dict[str, Any],
  1919. options: dict[str, Any],
  1920. ) -> T_Retval:
  1921. @wraps(func)
  1922. async def wrapper() -> T_Retval:
  1923. task = cast(asyncio.Task, current_task())
  1924. task.set_name(get_callable_name(func))
  1925. _task_states[task] = TaskState(None, None)
  1926. try:
  1927. return await func(*args)
  1928. finally:
  1929. del _task_states[task]
  1930. debug = options.get("debug", None)
  1931. loop_factory = options.get("loop_factory", None)
  1932. if loop_factory is None and options.get("use_uvloop", False):
  1933. if sys.platform != "win32":
  1934. import uvloop
  1935. loop_factory = uvloop.new_event_loop
  1936. else:
  1937. import winloop
  1938. loop_factory = winloop.new_event_loop
  1939. with Runner(debug=debug, loop_factory=loop_factory) as runner:
  1940. return runner.run(wrapper())
  1941. @classmethod
  1942. def current_token(cls) -> object:
  1943. return get_running_loop()
  1944. @classmethod
  1945. def current_time(cls) -> float:
  1946. return get_running_loop().time()
  1947. @classmethod
  1948. def cancelled_exception_class(cls) -> type[BaseException]:
  1949. return CancelledError
  1950. @classmethod
  1951. async def checkpoint(cls) -> None:
  1952. await sleep(0)
  1953. @classmethod
  1954. async def checkpoint_if_cancelled(cls) -> None:
  1955. task = current_task()
  1956. if task is None:
  1957. return
  1958. try:
  1959. cancel_scope = _task_states[task].cancel_scope
  1960. except KeyError:
  1961. return
  1962. while cancel_scope:
  1963. if cancel_scope.cancel_called:
  1964. await sleep(0)
  1965. elif cancel_scope.shield:
  1966. break
  1967. else:
  1968. cancel_scope = cancel_scope._parent_scope
  1969. @classmethod
  1970. async def cancel_shielded_checkpoint(cls) -> None:
  1971. with CancelScope(shield=True):
  1972. await sleep(0)
  1973. @classmethod
  1974. async def sleep(cls, delay: float) -> None:
  1975. await sleep(delay)
  1976. @classmethod
  1977. def create_cancel_scope(
  1978. cls, *, deadline: float = math.inf, shield: bool = False
  1979. ) -> CancelScope:
  1980. return CancelScope(deadline=deadline, shield=shield)
  1981. @classmethod
  1982. def current_effective_deadline(cls) -> float:
  1983. if (task := current_task()) is None:
  1984. return math.inf
  1985. try:
  1986. cancel_scope = _task_states[task].cancel_scope
  1987. except KeyError:
  1988. return math.inf
  1989. deadline = math.inf
  1990. while cancel_scope:
  1991. deadline = min(deadline, cancel_scope.deadline)
  1992. if cancel_scope._cancel_called:
  1993. deadline = -math.inf
  1994. break
  1995. elif cancel_scope.shield:
  1996. break
  1997. else:
  1998. cancel_scope = cancel_scope._parent_scope
  1999. return deadline
  2000. @classmethod
  2001. def create_task_group(cls) -> abc.TaskGroup:
  2002. return TaskGroup()
  2003. @classmethod
  2004. def create_event(cls) -> abc.Event:
  2005. return Event()
  2006. @classmethod
  2007. def create_lock(cls, *, fast_acquire: bool) -> abc.Lock:
  2008. return Lock(fast_acquire=fast_acquire)
  2009. @classmethod
  2010. def create_semaphore(
  2011. cls,
  2012. initial_value: int,
  2013. *,
  2014. max_value: int | None = None,
  2015. fast_acquire: bool = False,
  2016. ) -> abc.Semaphore:
  2017. return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)
  2018. @classmethod
  2019. def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
  2020. return CapacityLimiter(total_tokens)
  2021. @classmethod
  2022. async def run_sync_in_worker_thread( # type: ignore[return]
  2023. cls,
  2024. func: Callable[[Unpack[PosArgsT]], T_Retval],
  2025. args: tuple[Unpack[PosArgsT]],
  2026. abandon_on_cancel: bool = False,
  2027. limiter: abc.CapacityLimiter | None = None,
  2028. ) -> T_Retval:
  2029. await cls.checkpoint()
  2030. # If this is the first run in this event loop thread, set up the necessary
  2031. # variables
  2032. try:
  2033. idle_workers = _threadpool_idle_workers.get()
  2034. workers = _threadpool_workers.get()
  2035. except LookupError:
  2036. idle_workers = deque()
  2037. workers = set()
  2038. _threadpool_idle_workers.set(idle_workers)
  2039. _threadpool_workers.set(workers)
  2040. async with limiter or cls.current_default_thread_limiter():
  2041. with CancelScope(shield=not abandon_on_cancel) as scope:
  2042. future = asyncio.Future[T_Retval]()
  2043. root_task = find_root_task()
  2044. if not idle_workers:
  2045. worker = WorkerThread(root_task, workers, idle_workers)
  2046. worker.start()
  2047. workers.add(worker)
  2048. root_task.add_done_callback(
  2049. worker.stop, context=contextvars.Context()
  2050. )
  2051. else:
  2052. worker = idle_workers.pop()
  2053. # Prune any other workers that have been idle for MAX_IDLE_TIME
  2054. # seconds or longer
  2055. now = cls.current_time()
  2056. while idle_workers:
  2057. if (
  2058. now - idle_workers[0].idle_since
  2059. < WorkerThread.MAX_IDLE_TIME
  2060. ):
  2061. break
  2062. expired_worker = idle_workers.popleft()
  2063. expired_worker.root_task.remove_done_callback(
  2064. expired_worker.stop
  2065. )
  2066. expired_worker.stop()
  2067. context = copy_context()
  2068. context.run(set_current_async_library, None)
  2069. if abandon_on_cancel or scope._parent_scope is None:
  2070. worker_scope = scope
  2071. else:
  2072. worker_scope = scope._parent_scope
  2073. worker.queue.put_nowait((context, func, args, future, worker_scope))
  2074. return await future
  2075. @classmethod
  2076. def check_cancelled(cls) -> None:
  2077. scope: CancelScope | None = threadlocals.current_cancel_scope
  2078. while scope is not None:
  2079. if scope.cancel_called:
  2080. raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
  2081. if scope.shield:
  2082. return
  2083. scope = scope._parent_scope
  2084. @classmethod
  2085. def run_async_from_thread(
  2086. cls,
  2087. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  2088. args: tuple[Unpack[PosArgsT]],
  2089. token: object,
  2090. ) -> T_Retval:
  2091. async def task_wrapper() -> T_Retval:
  2092. __tracebackhide__ = True
  2093. if scope is not None:
  2094. task = cast(asyncio.Task, current_task())
  2095. _task_states[task] = TaskState(None, scope)
  2096. scope._tasks.add(task)
  2097. try:
  2098. return await func(*args)
  2099. except CancelledError as exc:
  2100. raise concurrent.futures.CancelledError(str(exc)) from None
  2101. finally:
  2102. if scope is not None:
  2103. scope._tasks.discard(task)
  2104. loop = cast(
  2105. "AbstractEventLoop", token or threadlocals.current_token.native_token
  2106. )
  2107. if loop.is_closed():
  2108. raise RunFinishedError
  2109. context = copy_context()
  2110. context.run(set_current_async_library, "asyncio")
  2111. scope = getattr(threadlocals, "current_cancel_scope", None)
  2112. f: concurrent.futures.Future[T_Retval] = context.run(
  2113. asyncio.run_coroutine_threadsafe, task_wrapper(), loop=loop
  2114. )
  2115. return f.result()
  2116. @classmethod
  2117. def run_sync_from_thread(
  2118. cls,
  2119. func: Callable[[Unpack[PosArgsT]], T_Retval],
  2120. args: tuple[Unpack[PosArgsT]],
  2121. token: object,
  2122. ) -> T_Retval:
  2123. @wraps(func)
  2124. def wrapper() -> None:
  2125. try:
  2126. set_current_async_library("asyncio")
  2127. f.set_result(func(*args))
  2128. except BaseException as exc:
  2129. f.set_exception(exc)
  2130. if not isinstance(exc, Exception):
  2131. raise
  2132. loop = cast(
  2133. "AbstractEventLoop", token or threadlocals.current_token.native_token
  2134. )
  2135. if loop.is_closed():
  2136. raise RunFinishedError
  2137. f: concurrent.futures.Future[T_Retval] = Future()
  2138. loop.call_soon_threadsafe(wrapper)
  2139. return f.result()
  2140. @classmethod
  2141. async def open_process(
  2142. cls,
  2143. command: StrOrBytesPath | Sequence[StrOrBytesPath],
  2144. *,
  2145. stdin: int | IO[Any] | None,
  2146. stdout: int | IO[Any] | None,
  2147. stderr: int | IO[Any] | None,
  2148. **kwargs: Any,
  2149. ) -> Process:
  2150. await cls.checkpoint()
  2151. if isinstance(command, PathLike):
  2152. command = os.fspath(command)
  2153. if isinstance(command, (str, bytes)):
  2154. process = await asyncio.create_subprocess_shell(
  2155. command,
  2156. stdin=stdin,
  2157. stdout=stdout,
  2158. stderr=stderr,
  2159. **kwargs,
  2160. )
  2161. else:
  2162. process = await asyncio.create_subprocess_exec(
  2163. *command,
  2164. stdin=stdin,
  2165. stdout=stdout,
  2166. stderr=stderr,
  2167. **kwargs,
  2168. )
  2169. stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
  2170. stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
  2171. stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
  2172. return Process(process, stdin_stream, stdout_stream, stderr_stream)
  2173. @classmethod
  2174. def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
  2175. create_task(
  2176. _shutdown_process_pool_on_exit(workers),
  2177. name="AnyIO process pool shutdown task",
  2178. )
  2179. find_root_task().add_done_callback(
  2180. partial(_forcibly_shutdown_process_pool_on_exit, workers) # type:ignore[arg-type]
  2181. )
  2182. @classmethod
  2183. async def connect_tcp(
  2184. cls, host: str, port: int, local_address: IPSockAddrType | None = None
  2185. ) -> abc.SocketStream:
  2186. transport, protocol = cast(
  2187. tuple[asyncio.Transport, StreamProtocol],
  2188. await get_running_loop().create_connection(
  2189. StreamProtocol, host, port, local_addr=local_address
  2190. ),
  2191. )
  2192. transport.pause_reading()
  2193. return SocketStream(transport, protocol)
  2194. @classmethod
  2195. async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
  2196. await cls.checkpoint()
  2197. loop = get_running_loop()
  2198. raw_socket = socket.socket(socket.AF_UNIX)
  2199. raw_socket.setblocking(False)
  2200. while True:
  2201. try:
  2202. raw_socket.connect(path)
  2203. except BlockingIOError:
  2204. f: asyncio.Future = asyncio.Future()
  2205. loop.add_writer(raw_socket, f.set_result, None)
  2206. f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
  2207. await f
  2208. except BaseException:
  2209. raw_socket.close()
  2210. raise
  2211. else:
  2212. return UNIXSocketStream(raw_socket)
  2213. @classmethod
  2214. def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
  2215. return TCPSocketListener(sock)
  2216. @classmethod
  2217. def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
  2218. return UNIXSocketListener(sock)
  2219. @classmethod
  2220. async def create_udp_socket(
  2221. cls,
  2222. family: AddressFamily,
  2223. local_address: IPSockAddrType | None,
  2224. remote_address: IPSockAddrType | None,
  2225. reuse_port: bool,
  2226. ) -> UDPSocket | ConnectedUDPSocket:
  2227. transport, protocol = await get_running_loop().create_datagram_endpoint(
  2228. DatagramProtocol,
  2229. local_addr=local_address,
  2230. remote_addr=remote_address,
  2231. family=family,
  2232. reuse_port=reuse_port,
  2233. )
  2234. if protocol.exception:
  2235. transport.close()
  2236. raise protocol.exception
  2237. if not remote_address:
  2238. return UDPSocket(transport, protocol)
  2239. else:
  2240. return ConnectedUDPSocket(transport, protocol)
  2241. @classmethod
  2242. async def create_unix_datagram_socket( # type: ignore[override]
  2243. cls, raw_socket: socket.socket, remote_path: str | bytes | None
  2244. ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
  2245. await cls.checkpoint()
  2246. loop = get_running_loop()
  2247. if remote_path:
  2248. while True:
  2249. try:
  2250. raw_socket.connect(remote_path)
  2251. except BlockingIOError:
  2252. f: asyncio.Future = asyncio.Future()
  2253. loop.add_writer(raw_socket, f.set_result, None)
  2254. f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
  2255. await f
  2256. except BaseException:
  2257. raw_socket.close()
  2258. raise
  2259. else:
  2260. return ConnectedUNIXDatagramSocket(raw_socket)
  2261. else:
  2262. return UNIXDatagramSocket(raw_socket)
  2263. @classmethod
  2264. async def getaddrinfo(
  2265. cls,
  2266. host: bytes | str | None,
  2267. port: str | int | None,
  2268. *,
  2269. family: int | AddressFamily = 0,
  2270. type: int | SocketKind = 0,
  2271. proto: int = 0,
  2272. flags: int = 0,
  2273. ) -> Sequence[
  2274. tuple[
  2275. AddressFamily,
  2276. SocketKind,
  2277. int,
  2278. str,
  2279. tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
  2280. ]
  2281. ]:
  2282. return await get_running_loop().getaddrinfo(
  2283. host, port, family=family, type=type, proto=proto, flags=flags
  2284. )
  2285. @classmethod
  2286. async def getnameinfo(
  2287. cls, sockaddr: IPSockAddrType, flags: int = 0
  2288. ) -> tuple[str, str]:
  2289. return await get_running_loop().getnameinfo(sockaddr, flags)
  2290. @classmethod
  2291. async def wait_readable(cls, obj: FileDescriptorLike) -> None:
  2292. try:
  2293. read_events = _read_events.get()
  2294. except LookupError:
  2295. read_events = {}
  2296. _read_events.set(read_events)
  2297. fd = obj if isinstance(obj, int) else obj.fileno()
  2298. if read_events.get(fd):
  2299. raise BusyResourceError("reading from")
  2300. loop = get_running_loop()
  2301. fut: asyncio.Future[bool] = loop.create_future()
  2302. def cb() -> None:
  2303. try:
  2304. del read_events[fd]
  2305. except KeyError:
  2306. pass
  2307. else:
  2308. remove_reader(fd)
  2309. try:
  2310. fut.set_result(True)
  2311. except asyncio.InvalidStateError:
  2312. pass
  2313. try:
  2314. loop.add_reader(fd, cb)
  2315. except NotImplementedError:
  2316. from anyio._core._asyncio_selector_thread import get_selector
  2317. selector = get_selector()
  2318. selector.add_reader(fd, cb)
  2319. remove_reader = selector.remove_reader
  2320. else:
  2321. remove_reader = loop.remove_reader
  2322. read_events[fd] = fut
  2323. try:
  2324. success = await fut
  2325. finally:
  2326. try:
  2327. del read_events[fd]
  2328. except KeyError:
  2329. pass
  2330. else:
  2331. remove_reader(fd)
  2332. if not success:
  2333. raise ClosedResourceError
  2334. @classmethod
  2335. async def wait_writable(cls, obj: FileDescriptorLike) -> None:
  2336. try:
  2337. write_events = _write_events.get()
  2338. except LookupError:
  2339. write_events = {}
  2340. _write_events.set(write_events)
  2341. fd = obj if isinstance(obj, int) else obj.fileno()
  2342. if write_events.get(fd):
  2343. raise BusyResourceError("writing to")
  2344. loop = get_running_loop()
  2345. fut: asyncio.Future[bool] = loop.create_future()
  2346. def cb() -> None:
  2347. try:
  2348. del write_events[fd]
  2349. except KeyError:
  2350. pass
  2351. else:
  2352. remove_writer(fd)
  2353. try:
  2354. fut.set_result(True)
  2355. except asyncio.InvalidStateError:
  2356. pass
  2357. try:
  2358. loop.add_writer(fd, cb)
  2359. except NotImplementedError:
  2360. from anyio._core._asyncio_selector_thread import get_selector
  2361. selector = get_selector()
  2362. selector.add_writer(fd, cb)
  2363. remove_writer = selector.remove_writer
  2364. else:
  2365. remove_writer = loop.remove_writer
  2366. write_events[fd] = fut
  2367. try:
  2368. success = await fut
  2369. finally:
  2370. try:
  2371. del write_events[fd]
  2372. except KeyError:
  2373. pass
  2374. else:
  2375. remove_writer(fd)
  2376. if not success:
  2377. raise ClosedResourceError
  2378. @classmethod
  2379. def notify_closing(cls, obj: FileDescriptorLike) -> None:
  2380. fd = obj if isinstance(obj, int) else obj.fileno()
  2381. loop = get_running_loop()
  2382. try:
  2383. write_events = _write_events.get()
  2384. except LookupError:
  2385. pass
  2386. else:
  2387. try:
  2388. fut = write_events.pop(fd)
  2389. except KeyError:
  2390. pass
  2391. else:
  2392. try:
  2393. fut.set_result(False)
  2394. except asyncio.InvalidStateError:
  2395. pass
  2396. try:
  2397. loop.remove_writer(fd)
  2398. except NotImplementedError:
  2399. from anyio._core._asyncio_selector_thread import get_selector
  2400. get_selector().remove_writer(fd)
  2401. try:
  2402. read_events = _read_events.get()
  2403. except LookupError:
  2404. pass
  2405. else:
  2406. try:
  2407. fut = read_events.pop(fd)
  2408. except KeyError:
  2409. pass
  2410. else:
  2411. try:
  2412. fut.set_result(False)
  2413. except asyncio.InvalidStateError:
  2414. pass
  2415. try:
  2416. loop.remove_reader(fd)
  2417. except NotImplementedError:
  2418. from anyio._core._asyncio_selector_thread import get_selector
  2419. get_selector().remove_reader(fd)
  2420. @classmethod
  2421. async def wrap_listener_socket(cls, sock: socket.socket) -> SocketListener:
  2422. return TCPSocketListener(sock)
  2423. @classmethod
  2424. async def wrap_stream_socket(cls, sock: socket.socket) -> SocketStream:
  2425. transport, protocol = await get_running_loop().create_connection(
  2426. StreamProtocol, sock=sock
  2427. )
  2428. return SocketStream(transport, protocol)
  2429. @classmethod
  2430. async def wrap_unix_stream_socket(cls, sock: socket.socket) -> UNIXSocketStream:
  2431. return UNIXSocketStream(sock)
  2432. @classmethod
  2433. async def wrap_udp_socket(cls, sock: socket.socket) -> UDPSocket:
  2434. transport, protocol = await get_running_loop().create_datagram_endpoint(
  2435. DatagramProtocol, sock=sock
  2436. )
  2437. return UDPSocket(transport, protocol)
  2438. @classmethod
  2439. async def wrap_connected_udp_socket(cls, sock: socket.socket) -> ConnectedUDPSocket:
  2440. transport, protocol = await get_running_loop().create_datagram_endpoint(
  2441. DatagramProtocol, sock=sock
  2442. )
  2443. return ConnectedUDPSocket(transport, protocol)
  2444. @classmethod
  2445. async def wrap_unix_datagram_socket(cls, sock: socket.socket) -> UNIXDatagramSocket:
  2446. return UNIXDatagramSocket(sock)
  2447. @classmethod
  2448. async def wrap_connected_unix_datagram_socket(
  2449. cls, sock: socket.socket
  2450. ) -> ConnectedUNIXDatagramSocket:
  2451. return ConnectedUNIXDatagramSocket(sock)
  2452. @classmethod
  2453. def current_default_thread_limiter(cls) -> CapacityLimiter:
  2454. try:
  2455. return _default_thread_limiter.get()
  2456. except LookupError:
  2457. limiter = CapacityLimiter(40)
  2458. _default_thread_limiter.set(limiter)
  2459. return limiter
  2460. @classmethod
  2461. def open_signal_receiver(
  2462. cls, *signals: Signals
  2463. ) -> AbstractContextManager[AsyncIterator[Signals]]:
  2464. return _SignalReceiver(signals)
  2465. @classmethod
  2466. def get_current_task(cls) -> TaskInfo:
  2467. return AsyncIOTaskInfo(current_task()) # type: ignore[arg-type]
  2468. @classmethod
  2469. def get_running_tasks(cls) -> Sequence[TaskInfo]:
  2470. return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
  2471. @classmethod
  2472. async def wait_all_tasks_blocked(cls) -> None:
  2473. await cls.checkpoint()
  2474. this_task = current_task()
  2475. while True:
  2476. for task in all_tasks():
  2477. if task is this_task:
  2478. continue
  2479. waiter = task._fut_waiter # type: ignore[attr-defined]
  2480. if waiter is None or waiter.done():
  2481. await sleep(0.1)
  2482. break
  2483. else:
  2484. return
  2485. @classmethod
  2486. def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
  2487. return TestRunner(**options)
  2488. backend_class = AsyncIOBackend