| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- from typing import Any, Callable, List, Optional, Set, Tuple, TypeVar, Union, overload
- from typing_extensions import Literal
- import zmq
- from .select import select_backend
- # avoid collision in Frame.bytes
- _bytestr = bytes
- T = TypeVar("T")
- class Frame:
- buffer: Any
- bytes: bytes
- more: bool
- tracker: Any
- def __init__(
- self,
- data: Any = None,
- track: bool = False,
- copy: bool | None = None,
- copy_threshold: int | None = None,
- ): ...
- def copy_fast(self: T) -> T: ...
- def get(self, option: int) -> int | _bytestr | str: ...
- def set(self, option: int, value: int | _bytestr | str) -> None: ...
- class Socket:
- underlying: int
- context: zmq.Context
- copy_threshold: int
- # specific option types
- FD: int
- def __init__(
- self,
- context: Context | None = None,
- socket_type: int = 0,
- shadow: int = 0,
- copy_threshold: int | None = zmq.COPY_THRESHOLD,
- ) -> None: ...
- def close(self, linger: int | None = ...) -> None: ...
- def get(self, option: int) -> int | bytes | str: ...
- def set(self, option: int, value: int | bytes | str) -> None: ...
- def connect(self, url: str): ...
- def disconnect(self, url: str) -> None: ...
- def bind(self, url: str): ...
- def unbind(self, url: str) -> None: ...
- def send(
- self,
- data: Any,
- flags: int = ...,
- copy: bool = ...,
- track: bool = ...,
- ) -> zmq.MessageTracker | None: ...
- @overload
- def recv(
- self,
- flags: int = ...,
- *,
- copy: Literal[False],
- track: bool = ...,
- ) -> zmq.Frame: ...
- @overload
- def recv(
- self,
- flags: int = ...,
- *,
- copy: Literal[True],
- track: bool = ...,
- ) -> bytes: ...
- @overload
- def recv(
- self,
- flags: int = ...,
- track: bool = False,
- ) -> bytes: ...
- @overload
- def recv(
- self,
- flags: int | None = ...,
- copy: bool = ...,
- track: bool | None = False,
- ) -> zmq.Frame | bytes: ...
- def recv_into(self, buf, /, *, nbytes: int = 0, flags: int = 0) -> int: ...
- def monitor(self, addr: str | None, events: int) -> None: ...
- # draft methods
- def join(self, group: str) -> None: ...
- def leave(self, group: str) -> None: ...
- class Context:
- underlying: int
- def __init__(self, io_threads: int = 1, shadow: int = 0): ...
- def get(self, option: int) -> int | bytes | str: ...
- def set(self, option: int, value: int | bytes | str) -> None: ...
- def socket(self, socket_type: int) -> Socket: ...
- def term(self) -> None: ...
- IPC_PATH_MAX_LEN: int
- PYZMQ_DRAFT_API: bool
- def has(capability: str) -> bool: ...
- def curve_keypair() -> tuple[bytes, bytes]: ...
- def curve_public(secret_key: bytes) -> bytes: ...
- def strerror(errno: int | None = ...) -> str: ...
- def zmq_errno() -> int: ...
- def zmq_version() -> str: ...
- def zmq_version_info() -> tuple[int, int, int]: ...
- def zmq_poll(
- sockets: list[Any], timeout: int | None = ...
- ) -> list[tuple[Socket, int]]: ...
- def proxy(frontend: Socket, backend: Socket, capture: Socket | None = None) -> int: ...
- def proxy_steerable(
- frontend: Socket,
- backend: Socket,
- capture: Socket | None = ...,
- control: Socket | None = ...,
- ) -> int: ...
- monitored_queue = Callable | None
|