| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- """type annotations for async sockets"""
- from __future__ import annotations
- from asyncio import Future
- from pickle import DEFAULT_PROTOCOL
- from typing import Any, Awaitable, Literal, Sequence, TypeVar, overload
- import zmq as _zmq
- class _AsyncPoller(_zmq.Poller):
- _socket_class: type[_AsyncSocket]
- def poll(self, timeout=-1) -> Awaitable[list[tuple[Any, int]]]: ... # type: ignore
- T = TypeVar("T", bound="_AsyncSocket")
- class _AsyncSocket(_zmq.Socket[Future]):
- @classmethod
- def from_socket(cls: type[T], socket: _zmq.Socket, io_loop: Any = None) -> T: ...
- def send( # type: ignore
- self,
- data: Any,
- flags: int = 0,
- copy: bool = True,
- track: bool = False,
- routing_id: int | None = None,
- group: str | None = None,
- ) -> Awaitable[_zmq.MessageTracker | None]: ...
- @overload # type: ignore
- def recv(self, flags: int = 0, *, track: bool = False) -> Awaitable[bytes]: ...
- @overload
- def recv(
- self, flags: int = 0, *, copy: Literal[True], track: bool = False
- ) -> Awaitable[bytes]: ...
- @overload
- def recv(
- self, flags: int = 0, *, copy: Literal[False], track: bool = False
- ) -> Awaitable[_zmq.Frame]: ...
- @overload
- def recv(
- self, flags: int = 0, copy: bool = True, track: bool = False
- ) -> Awaitable[bytes | _zmq.Frame]: ...
- def recv_into( # type: ignore
- self, buffer: Any, /, *, nbytes: int = 0, flags: int = 0
- ) -> Awaitable[int]: ...
- def send_multipart( # type: ignore
- self,
- msg_parts: Sequence,
- flags: int = 0,
- copy: bool = True,
- track: bool = False,
- routing_id: int | None = None,
- group: str | None = None,
- ) -> Awaitable[_zmq.MessageTracker | None]: ...
- @overload # type: ignore
- def recv_multipart(
- self, flags: int = 0, *, track: bool = False
- ) -> Awaitable[list[bytes]]: ...
- @overload
- def recv_multipart(
- self, flags: int = 0, *, copy: Literal[True], track: bool = False
- ) -> Awaitable[list[bytes]]: ...
- @overload
- def recv_multipart(
- self, flags: int = 0, *, copy: Literal[False], track: bool = False
- ) -> Awaitable[list[_zmq.Frame]]: ...
- @overload
- def recv_multipart(
- self, flags: int = 0, copy: bool = True, track: bool = False
- ) -> Awaitable[list[bytes] | list[_zmq.Frame]]: ...
- # serialization wrappers
- def send_string( # type: ignore
- self,
- u: str,
- flags: int = 0,
- copy: bool = True,
- *,
- encoding: str = 'utf-8',
- **kwargs,
- ) -> Awaitable[_zmq.Frame | None]: ...
- def recv_string( # type: ignore
- self, flags: int = 0, encoding: str = 'utf-8'
- ) -> Awaitable[str]: ...
- def send_pyobj( # type: ignore
- self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
- ) -> Awaitable[_zmq.Frame | None]: ...
- def recv_pyobj(self, flags: int = 0) -> Awaitable[Any]: ... # type: ignore
- def send_json( # type: ignore
- self, obj: Any, flags: int = 0, **kwargs
- ) -> Awaitable[_zmq.Frame | None]: ...
- def recv_json(self, flags: int = 0, **kwargs) -> Awaitable[Any]: ... # type: ignore
- def poll(self, timeout=-1) -> Awaitable[list[tuple[Any, int]]]: ... # type: ignore
|