monitor.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. """Module holding utility and convenience functions for zmq event monitoring."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import struct
  6. from typing import Awaitable, TypedDict, overload
  7. import zmq
  8. import zmq.asyncio
  9. from zmq.error import _check_version
  10. class _MonitorMessage(TypedDict):
  11. event: int
  12. value: int
  13. endpoint: bytes
  14. def parse_monitor_message(msg: list[bytes]) -> _MonitorMessage:
  15. """decode zmq_monitor event messages.
  16. Parameters
  17. ----------
  18. msg : list(bytes)
  19. zmq multipart message that has arrived on a monitor PAIR socket.
  20. First frame is::
  21. 16 bit event id
  22. 32 bit event value
  23. no padding
  24. Second frame is the endpoint as a bytestring
  25. Returns
  26. -------
  27. event : dict
  28. event description as dict with the keys `event`, `value`, and `endpoint`.
  29. """
  30. if len(msg) != 2 or len(msg[0]) != 6:
  31. raise RuntimeError(f"Invalid event message format: {msg}")
  32. event_id, value = struct.unpack("=hi", msg[0])
  33. event: _MonitorMessage = {
  34. 'event': zmq.Event(event_id),
  35. 'value': zmq.Event(value),
  36. 'endpoint': msg[1],
  37. }
  38. return event
  39. async def _parse_monitor_msg_async(
  40. awaitable_msg: Awaitable[list[bytes]],
  41. ) -> _MonitorMessage:
  42. """Like parse_monitor_msg, but awaitable
  43. Given awaitable message, return awaitable for the parsed monitor message.
  44. """
  45. msg = await awaitable_msg
  46. # 4.0-style event API
  47. return parse_monitor_message(msg)
  48. @overload
  49. def recv_monitor_message(
  50. socket: zmq.asyncio.Socket,
  51. flags: int = 0,
  52. ) -> Awaitable[_MonitorMessage]: ...
  53. @overload
  54. def recv_monitor_message(
  55. socket: zmq.Socket[bytes],
  56. flags: int = 0,
  57. ) -> _MonitorMessage: ...
  58. def recv_monitor_message(
  59. socket: zmq.Socket,
  60. flags: int = 0,
  61. ) -> _MonitorMessage | Awaitable[_MonitorMessage]:
  62. """Receive and decode the given raw message from the monitoring socket and return a dict.
  63. Requires libzmq ≥ 4.0
  64. The returned dict will have the following entries:
  65. event : int
  66. the event id as described in `libzmq.zmq_socket_monitor`
  67. value : int
  68. the event value associated with the event, see `libzmq.zmq_socket_monitor`
  69. endpoint : str
  70. the affected endpoint
  71. .. versionchanged:: 23.1
  72. Support for async sockets added.
  73. When called with a async socket,
  74. returns an awaitable for the monitor message.
  75. Parameters
  76. ----------
  77. socket : zmq.Socket
  78. The PAIR socket (created by other.get_monitor_socket()) on which to recv the message
  79. flags : int
  80. standard zmq recv flags
  81. Returns
  82. -------
  83. event : dict
  84. event description as dict with the keys `event`, `value`, and `endpoint`.
  85. """
  86. _check_version((4, 0), 'libzmq event API')
  87. # will always return a list
  88. msg = socket.recv_multipart(flags)
  89. # transparently handle asyncio socket,
  90. # returns a Future instead of a dict
  91. if isinstance(msg, Awaitable):
  92. return _parse_monitor_msg_async(msg)
  93. # 4.0-style event API
  94. return parse_monitor_message(msg)
  95. __all__ = ['parse_monitor_message', 'recv_monitor_message']