_future.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. """Future-returning APIs for coroutines."""
  2. # Copyright (c) PyZMQ Developers.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import warnings
  6. from asyncio import Future
  7. from collections import deque
  8. from functools import partial
  9. from itertools import chain
  10. from typing import (
  11. Any,
  12. Awaitable,
  13. Callable,
  14. NamedTuple,
  15. TypeVar,
  16. cast,
  17. )
  18. import zmq as _zmq
  19. from zmq import EVENTS, POLLIN, POLLOUT
  20. class _FutureEvent(NamedTuple):
  21. future: Future
  22. kind: str
  23. args: tuple
  24. kwargs: dict
  25. msg: Any
  26. timer: Any
  27. # These are incomplete classes and need a Mixin for compatibility with an eventloop
  28. # defining the following attributes:
  29. #
  30. # _Future
  31. # _READ
  32. # _WRITE
  33. # _default_loop()
  34. class _Async:
  35. """Mixin for common async logic"""
  36. _current_loop: Any = None
  37. _Future: type[Future]
  38. def _get_loop(self) -> Any:
  39. """Get event loop
  40. Notice if event loop has changed,
  41. and register init_io_state on activation of a new event loop
  42. """
  43. if self._current_loop is None:
  44. self._current_loop = self._default_loop()
  45. self._init_io_state(self._current_loop)
  46. return self._current_loop
  47. current_loop = self._default_loop()
  48. if current_loop is not self._current_loop:
  49. # warn? This means a socket is being used in multiple loops!
  50. self._current_loop = current_loop
  51. self._init_io_state(current_loop)
  52. return current_loop
  53. def _default_loop(self) -> Any:
  54. raise NotImplementedError("Must be implemented in a subclass")
  55. def _init_io_state(self, loop=None) -> None:
  56. pass
  57. class _AsyncPoller(_Async, _zmq.Poller):
  58. """Poller that returns a Future on poll, instead of blocking."""
  59. _socket_class: type[_AsyncSocket]
  60. _READ: int
  61. _WRITE: int
  62. raw_sockets: list[Any]
  63. def _watch_raw_socket(self, loop: Any, socket: Any, evt: int, f: Callable) -> None:
  64. """Schedule callback for a raw socket"""
  65. raise NotImplementedError()
  66. def _unwatch_raw_sockets(self, loop: Any, *sockets: Any) -> None:
  67. """Unschedule callback for a raw socket"""
  68. raise NotImplementedError()
  69. def poll(self, timeout=-1) -> Awaitable[list[tuple[Any, int]]]: # type: ignore
  70. """Return a Future for a poll event"""
  71. future = self._Future()
  72. if timeout == 0:
  73. try:
  74. result = super().poll(0)
  75. except Exception as e:
  76. future.set_exception(e)
  77. else:
  78. future.set_result(result)
  79. return future
  80. loop = self._get_loop()
  81. # register Future to be called as soon as any event is available on any socket
  82. watcher = self._Future()
  83. # watch raw sockets:
  84. raw_sockets: list[Any] = []
  85. def wake_raw(*args):
  86. if not watcher.done():
  87. watcher.set_result(None)
  88. watcher.add_done_callback(
  89. lambda f: self._unwatch_raw_sockets(loop, *raw_sockets)
  90. )
  91. wrapped_sockets: list[_AsyncSocket] = []
  92. def _clear_wrapper_io(f):
  93. for s in wrapped_sockets:
  94. s._clear_io_state()
  95. for socket, mask in self.sockets:
  96. if isinstance(socket, _zmq.Socket):
  97. if not isinstance(socket, self._socket_class):
  98. # it's a blocking zmq.Socket, wrap it in async
  99. socket = self._socket_class.from_socket(socket)
  100. wrapped_sockets.append(socket)
  101. if mask & _zmq.POLLIN:
  102. socket._add_recv_event('poll', future=watcher)
  103. if mask & _zmq.POLLOUT:
  104. socket._add_send_event('poll', future=watcher)
  105. else:
  106. raw_sockets.append(socket)
  107. evt = 0
  108. if mask & _zmq.POLLIN:
  109. evt |= self._READ
  110. if mask & _zmq.POLLOUT:
  111. evt |= self._WRITE
  112. self._watch_raw_socket(loop, socket, evt, wake_raw)
  113. def on_poll_ready(f):
  114. if future.done():
  115. return
  116. if watcher.cancelled():
  117. try:
  118. future.cancel()
  119. except RuntimeError:
  120. # RuntimeError may be called during teardown
  121. pass
  122. return
  123. if watcher.exception():
  124. future.set_exception(watcher.exception())
  125. else:
  126. try:
  127. result = super(_AsyncPoller, self).poll(0)
  128. except Exception as e:
  129. future.set_exception(e)
  130. else:
  131. future.set_result(result)
  132. watcher.add_done_callback(on_poll_ready)
  133. if wrapped_sockets:
  134. watcher.add_done_callback(_clear_wrapper_io)
  135. if timeout is not None and timeout > 0:
  136. # schedule cancel to fire on poll timeout, if any
  137. def trigger_timeout():
  138. if not watcher.done():
  139. watcher.set_result(None)
  140. timeout_handle = loop.call_later(1e-3 * timeout, trigger_timeout)
  141. def cancel_timeout(f):
  142. if hasattr(timeout_handle, 'cancel'):
  143. timeout_handle.cancel()
  144. else:
  145. loop.remove_timeout(timeout_handle)
  146. future.add_done_callback(cancel_timeout)
  147. def cancel_watcher(f):
  148. if not watcher.done():
  149. watcher.cancel()
  150. future.add_done_callback(cancel_watcher)
  151. return future
  152. class _NoTimer:
  153. @staticmethod
  154. def cancel():
  155. pass
  156. T = TypeVar("T", bound="_AsyncSocket")
  157. class _AsyncSocket(_Async, _zmq.Socket[Future]):
  158. # Warning : these class variables are only here to allow to call super().__setattr__.
  159. # They be overridden at instance initialization and not shared in the whole class
  160. _recv_futures = None
  161. _send_futures = None
  162. _state = 0
  163. _shadow_sock: _zmq.Socket
  164. _poller_class = _AsyncPoller
  165. _fd = None
  166. def __init__(
  167. self,
  168. context=None,
  169. socket_type=-1,
  170. io_loop=None,
  171. _from_socket: _zmq.Socket | None = None,
  172. **kwargs,
  173. ) -> None:
  174. if isinstance(context, _zmq.Socket):
  175. context, _from_socket = (None, context)
  176. if _from_socket is not None:
  177. super().__init__(shadow=_from_socket.underlying) # type: ignore
  178. self._shadow_sock = _from_socket
  179. else:
  180. super().__init__(context, socket_type, **kwargs) # type: ignore
  181. self._shadow_sock = _zmq.Socket.shadow(self.underlying)
  182. if io_loop is not None:
  183. warnings.warn(
  184. f"{self.__class__.__name__}(io_loop) argument is deprecated in pyzmq 22.2."
  185. " The currently active loop will always be used.",
  186. DeprecationWarning,
  187. stacklevel=3,
  188. )
  189. self._recv_futures = deque()
  190. self._send_futures = deque()
  191. self._state = 0
  192. self._fd = self._shadow_sock.FD
  193. @classmethod
  194. def from_socket(cls: type[T], socket: _zmq.Socket, io_loop: Any = None) -> T:
  195. """Create an async socket from an existing Socket"""
  196. return cls(_from_socket=socket, io_loop=io_loop)
  197. def close(self, linger: int | None = None) -> None:
  198. if not self.closed and self._fd is not None:
  199. event_list: list[_FutureEvent] = list(
  200. chain(self._recv_futures or [], self._send_futures or [])
  201. )
  202. for event in event_list:
  203. if not event.future.done():
  204. try:
  205. event.future.cancel()
  206. except RuntimeError:
  207. # RuntimeError may be called during teardown
  208. pass
  209. self._clear_io_state()
  210. super().close(linger=linger)
  211. close.__doc__ = _zmq.Socket.close.__doc__
  212. def get(self, key):
  213. result = super().get(key)
  214. if key == EVENTS:
  215. self._schedule_remaining_events(result)
  216. return result
  217. get.__doc__ = _zmq.Socket.get.__doc__
  218. def recv_multipart(
  219. self, flags: int = 0, copy: bool = True, track: bool = False
  220. ) -> Awaitable[list[bytes] | list[_zmq.Frame]]:
  221. """Receive a complete multipart zmq message.
  222. Returns a Future whose result will be a multipart message.
  223. """
  224. return self._add_recv_event(
  225. 'recv_multipart', kwargs=dict(flags=flags, copy=copy, track=track)
  226. )
  227. def recv( # type: ignore
  228. self, flags: int = 0, copy: bool = True, track: bool = False
  229. ) -> Awaitable[bytes | _zmq.Frame]:
  230. """Receive a single zmq frame.
  231. Returns a Future, whose result will be the received frame.
  232. Recommend using recv_multipart instead.
  233. """
  234. return self._add_recv_event(
  235. 'recv', kwargs=dict(flags=flags, copy=copy, track=track)
  236. )
  237. def recv_into( # type: ignore
  238. self, buf, /, *, nbytes: int = 0, flags: int = 0
  239. ) -> Awaitable[int]:
  240. """Receive a single zmq frame into a pre-allocated buffer.
  241. Returns a Future, whose result will be the number of bytes received.
  242. """
  243. return self._add_recv_event(
  244. 'recv_into', args=(buf,), kwargs=dict(nbytes=nbytes, flags=flags)
  245. )
  246. def send_multipart( # type: ignore
  247. self, msg_parts: Any, flags: int = 0, copy: bool = True, track=False, **kwargs
  248. ) -> Awaitable[_zmq.MessageTracker | None]:
  249. """Send a complete multipart zmq message.
  250. Returns a Future that resolves when sending is complete.
  251. """
  252. kwargs['flags'] = flags
  253. kwargs['copy'] = copy
  254. kwargs['track'] = track
  255. return self._add_send_event('send_multipart', msg=msg_parts, kwargs=kwargs)
  256. def send( # type: ignore
  257. self,
  258. data: Any,
  259. flags: int = 0,
  260. copy: bool = True,
  261. track: bool = False,
  262. **kwargs: Any,
  263. ) -> Awaitable[_zmq.MessageTracker | None]:
  264. """Send a single zmq frame.
  265. Returns a Future that resolves when sending is complete.
  266. Recommend using send_multipart instead.
  267. """
  268. kwargs['flags'] = flags
  269. kwargs['copy'] = copy
  270. kwargs['track'] = track
  271. kwargs.update(dict(flags=flags, copy=copy, track=track))
  272. return self._add_send_event('send', msg=data, kwargs=kwargs)
  273. def _deserialize(self, recvd, load):
  274. """Deserialize with Futures"""
  275. f = self._Future()
  276. def _chain(_):
  277. """Chain result through serialization to recvd"""
  278. if f.done():
  279. # chained future may be cancelled, which means nobody is going to get this result
  280. # if it's an error, that's no big deal (probably zmq.Again),
  281. # but if it's a successful recv, this is a dropped message!
  282. if not recvd.cancelled() and recvd.exception() is None:
  283. warnings.warn(
  284. # is there a useful stacklevel?
  285. # ideally, it would point to where `f.cancel()` was called
  286. f"Future {f} completed while awaiting {recvd}. A message has been dropped!",
  287. RuntimeWarning,
  288. )
  289. return
  290. if recvd.exception():
  291. f.set_exception(recvd.exception())
  292. else:
  293. buf = recvd.result()
  294. try:
  295. loaded = load(buf)
  296. except Exception as e:
  297. f.set_exception(e)
  298. else:
  299. f.set_result(loaded)
  300. recvd.add_done_callback(_chain)
  301. def _chain_cancel(_):
  302. """Chain cancellation from f to recvd"""
  303. if recvd.done():
  304. return
  305. if f.cancelled():
  306. recvd.cancel()
  307. f.add_done_callback(_chain_cancel)
  308. return f
  309. def poll(self, timeout=None, flags=_zmq.POLLIN) -> Awaitable[int]: # type: ignore
  310. """poll the socket for events
  311. returns a Future for the poll results.
  312. """
  313. if self.closed:
  314. raise _zmq.ZMQError(_zmq.ENOTSUP)
  315. p = self._poller_class()
  316. p.register(self, flags)
  317. poll_future = cast(Future, p.poll(timeout))
  318. future = self._Future()
  319. def unwrap_result(f):
  320. if future.done():
  321. return
  322. if poll_future.cancelled():
  323. try:
  324. future.cancel()
  325. except RuntimeError:
  326. # RuntimeError may be called during teardown
  327. pass
  328. return
  329. if f.exception():
  330. future.set_exception(poll_future.exception())
  331. else:
  332. evts = dict(poll_future.result())
  333. future.set_result(evts.get(self, 0))
  334. if poll_future.done():
  335. # hook up result if already done
  336. unwrap_result(poll_future)
  337. else:
  338. poll_future.add_done_callback(unwrap_result)
  339. def cancel_poll(future):
  340. """Cancel underlying poll if request has been cancelled"""
  341. if not poll_future.done():
  342. try:
  343. poll_future.cancel()
  344. except RuntimeError:
  345. # RuntimeError may be called during teardown
  346. pass
  347. future.add_done_callback(cancel_poll)
  348. return future
  349. def _add_timeout(self, future, timeout):
  350. """Add a timeout for a send or recv Future"""
  351. def future_timeout():
  352. if future.done():
  353. # future already resolved, do nothing
  354. return
  355. # raise EAGAIN
  356. future.set_exception(_zmq.Again())
  357. return self._call_later(timeout, future_timeout)
  358. def _call_later(self, delay, callback):
  359. """Schedule a function to be called later
  360. Override for different IOLoop implementations
  361. Tornado and asyncio happen to both have ioloop.call_later
  362. with the same signature.
  363. """
  364. return self._get_loop().call_later(delay, callback)
  365. @staticmethod
  366. def _remove_finished_future(future, event_list, event=None):
  367. """Make sure that futures are removed from the event list when they resolve
  368. Avoids delaying cleanup until the next send/recv event,
  369. which may never come.
  370. """
  371. # "future" instance is shared between sockets, but each socket has its own event list.
  372. if not event_list:
  373. return
  374. # only unconsumed events (e.g. cancelled calls)
  375. # will be present when this happens
  376. try:
  377. event_list.remove(event)
  378. except ValueError:
  379. # usually this will have been removed by being consumed
  380. return
  381. def _add_recv_event(
  382. self,
  383. kind: str,
  384. *,
  385. args: tuple | None = None,
  386. kwargs: dict[str, Any] | None = None,
  387. future: Future | None = None,
  388. ) -> Future:
  389. """Add a recv event, returning the corresponding Future"""
  390. f = future or self._Future()
  391. if args is None:
  392. args = ()
  393. if kwargs is None:
  394. kwargs = {}
  395. if kind.startswith('recv') and kwargs.get('flags', 0) & _zmq.DONTWAIT:
  396. # short-circuit non-blocking calls
  397. recv = getattr(self._shadow_sock, kind)
  398. try:
  399. r = recv(*args, **kwargs)
  400. except Exception as e:
  401. f.set_exception(e)
  402. else:
  403. f.set_result(r)
  404. return f
  405. timer = _NoTimer
  406. if hasattr(_zmq, 'RCVTIMEO'):
  407. timeout_ms = self._shadow_sock.rcvtimeo
  408. if timeout_ms >= 0:
  409. timer = self._add_timeout(f, timeout_ms * 1e-3)
  410. # we add it to the list of futures before we add the timeout as the
  411. # timeout will remove the future from recv_futures to avoid leaks
  412. _future_event = _FutureEvent(
  413. f, kind, args=args, kwargs=kwargs, msg=None, timer=timer
  414. )
  415. self._recv_futures.append(_future_event)
  416. if self._shadow_sock.get(EVENTS) & POLLIN:
  417. # recv immediately, if we can
  418. self._handle_recv()
  419. if self._recv_futures and _future_event in self._recv_futures:
  420. # Don't let the Future sit in _recv_events after it's done
  421. # no need to register this if we've already been handled
  422. # (i.e. immediately-resolved recv)
  423. f.add_done_callback(
  424. partial(
  425. self._remove_finished_future,
  426. event_list=self._recv_futures,
  427. event=_future_event,
  428. )
  429. )
  430. self._add_io_state(POLLIN)
  431. return f
  432. def _add_send_event(self, kind, msg=None, kwargs=None, future=None):
  433. """Add a send event, returning the corresponding Future"""
  434. f = future or self._Future()
  435. # attempt send with DONTWAIT if no futures are waiting
  436. # short-circuit for sends that will resolve immediately
  437. # only call if no send Futures are waiting
  438. if kind in ('send', 'send_multipart') and not self._send_futures:
  439. flags = kwargs.get('flags', 0)
  440. nowait_kwargs = kwargs.copy()
  441. nowait_kwargs['flags'] = flags | _zmq.DONTWAIT
  442. # short-circuit non-blocking calls
  443. send = getattr(self._shadow_sock, kind)
  444. # track if the send resolved or not
  445. # (EAGAIN if DONTWAIT is not set should proceed with)
  446. finish_early = True
  447. try:
  448. r = send(msg, **nowait_kwargs)
  449. except _zmq.Again as e:
  450. if flags & _zmq.DONTWAIT:
  451. f.set_exception(e)
  452. else:
  453. # EAGAIN raised and DONTWAIT not requested,
  454. # proceed with async send
  455. finish_early = False
  456. except Exception as e:
  457. f.set_exception(e)
  458. else:
  459. f.set_result(r)
  460. if finish_early:
  461. # short-circuit resolved, return finished Future
  462. # schedule wake for recv if there are any receivers waiting
  463. if self._recv_futures:
  464. self._schedule_remaining_events()
  465. return f
  466. timer = _NoTimer
  467. if hasattr(_zmq, 'SNDTIMEO'):
  468. timeout_ms = self._shadow_sock.get(_zmq.SNDTIMEO)
  469. if timeout_ms >= 0:
  470. timer = self._add_timeout(f, timeout_ms * 1e-3)
  471. # we add it to the list of futures before we add the timeout as the
  472. # timeout will remove the future from recv_futures to avoid leaks
  473. _future_event = _FutureEvent(
  474. f, kind, args=(), kwargs=kwargs, msg=msg, timer=timer
  475. )
  476. self._send_futures.append(_future_event)
  477. # Don't let the Future sit in _send_futures after it's done
  478. f.add_done_callback(
  479. partial(
  480. self._remove_finished_future,
  481. event_list=self._send_futures,
  482. event=_future_event,
  483. )
  484. )
  485. self._add_io_state(POLLOUT)
  486. return f
  487. def _handle_recv(self):
  488. """Handle recv events"""
  489. if not self._shadow_sock.get(EVENTS) & POLLIN:
  490. # event triggered, but state may have been changed between trigger and callback
  491. return
  492. f = None
  493. while self._recv_futures:
  494. f, kind, args, kwargs, _, timer = self._recv_futures.popleft()
  495. # skip any cancelled futures
  496. if f.done():
  497. f = None
  498. else:
  499. break
  500. if not self._recv_futures:
  501. self._drop_io_state(POLLIN)
  502. if f is None:
  503. return
  504. timer.cancel()
  505. if kind == 'poll':
  506. # on poll event, just signal ready, nothing else.
  507. f.set_result(None)
  508. return
  509. elif kind == 'recv_multipart':
  510. recv = self._shadow_sock.recv_multipart
  511. elif kind == 'recv':
  512. recv = self._shadow_sock.recv
  513. elif kind == 'recv_into':
  514. recv = self._shadow_sock.recv_into
  515. else:
  516. raise ValueError(f"Unhandled recv event type: {kind!r}")
  517. kwargs['flags'] |= _zmq.DONTWAIT
  518. try:
  519. result = recv(*args, **kwargs)
  520. except Exception as e:
  521. f.set_exception(e)
  522. else:
  523. f.set_result(result)
  524. def _handle_send(self):
  525. if not self._shadow_sock.get(EVENTS) & POLLOUT:
  526. # event triggered, but state may have been changed between trigger and callback
  527. return
  528. f = None
  529. while self._send_futures:
  530. f, kind, args, kwargs, msg, timer = self._send_futures.popleft()
  531. # skip any cancelled futures
  532. if f.done():
  533. f = None
  534. else:
  535. break
  536. if not self._send_futures:
  537. self._drop_io_state(POLLOUT)
  538. if f is None:
  539. return
  540. timer.cancel()
  541. if kind == 'poll':
  542. # on poll event, just signal ready, nothing else.
  543. f.set_result(None)
  544. return
  545. elif kind == 'send_multipart':
  546. send = self._shadow_sock.send_multipart
  547. elif kind == 'send':
  548. send = self._shadow_sock.send
  549. else:
  550. raise ValueError(f"Unhandled send event type: {kind!r}")
  551. kwargs['flags'] |= _zmq.DONTWAIT
  552. try:
  553. result = send(msg, **kwargs)
  554. except Exception as e:
  555. f.set_exception(e)
  556. else:
  557. f.set_result(result)
  558. # event masking from ZMQStream
  559. def _handle_events(self, fd=0, events=0):
  560. """Dispatch IO events to _handle_recv, etc."""
  561. if self._shadow_sock.closed:
  562. return
  563. zmq_events = self._shadow_sock.get(EVENTS)
  564. if zmq_events & _zmq.POLLIN:
  565. self._handle_recv()
  566. if zmq_events & _zmq.POLLOUT:
  567. self._handle_send()
  568. self._schedule_remaining_events()
  569. def _schedule_remaining_events(self, events=None):
  570. """Schedule a call to handle_events next loop iteration
  571. If there are still events to handle.
  572. """
  573. # edge-triggered handling
  574. # allow passing events in, in case this is triggered by retrieving events,
  575. # so we don't have to retrieve it twice.
  576. if self._state == 0:
  577. # not watching for anything, nothing to schedule
  578. return
  579. if events is None:
  580. events = self._shadow_sock.get(EVENTS)
  581. if events & self._state:
  582. self._call_later(0, self._handle_events)
  583. def _add_io_state(self, state):
  584. """Add io_state to poller."""
  585. if self._state != state:
  586. state = self._state = self._state | state
  587. self._update_handler(self._state)
  588. def _drop_io_state(self, state):
  589. """Stop poller from watching an io_state."""
  590. if self._state & state:
  591. self._state = self._state & (~state)
  592. self._update_handler(self._state)
  593. def _update_handler(self, state):
  594. """Update IOLoop handler with state.
  595. zmq FD is always read-only.
  596. """
  597. # ensure loop is registered and init_io has been called
  598. # if there are any events to watch for
  599. if state:
  600. self._get_loop()
  601. self._schedule_remaining_events()
  602. def _init_io_state(self, loop=None):
  603. """initialize the ioloop event handler"""
  604. if loop is None:
  605. loop = self._get_loop()
  606. loop.add_handler(self._shadow_sock, self._handle_events, self._READ)
  607. self._call_later(0, self._handle_events)
  608. def _clear_io_state(self):
  609. """unregister the ioloop event handler
  610. called once during close
  611. """
  612. fd = self._shadow_sock
  613. if self._shadow_sock.closed:
  614. fd = self._fd
  615. if self._current_loop is not None:
  616. self._current_loop.remove_handler(fd)