handlers.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """pyzmq logging handlers.
  2. This mainly defines the PUBHandler object for publishing logging messages over
  3. a zmq.PUB socket.
  4. The PUBHandler can be used with the regular logging module, as in::
  5. >>> import logging
  6. >>> handler = PUBHandler('tcp://127.0.0.1:12345')
  7. >>> handler.root_topic = 'foo'
  8. >>> logger = logging.getLogger('foobar')
  9. >>> logger.setLevel(logging.DEBUG)
  10. >>> logger.addHandler(handler)
  11. Or using ``dictConfig``, as in::
  12. >>> from logging.config import dictConfig
  13. >>> socket = Context.instance().socket(PUB)
  14. >>> socket.connect('tcp://127.0.0.1:12345')
  15. >>> dictConfig({
  16. >>> 'version': 1,
  17. >>> 'handlers': {
  18. >>> 'zmq': {
  19. >>> 'class': 'zmq.log.handlers.PUBHandler',
  20. >>> 'level': logging.DEBUG,
  21. >>> 'root_topic': 'foo',
  22. >>> 'interface_or_socket': socket
  23. >>> }
  24. >>> },
  25. >>> 'root': {
  26. >>> 'level': 'DEBUG',
  27. >>> 'handlers': ['zmq'],
  28. >>> }
  29. >>> })
  30. After this point, all messages logged by ``logger`` will be published on the
  31. PUB socket.
  32. Code adapted from StarCluster:
  33. https://github.com/jtriley/StarCluster/blob/StarCluster-0.91/starcluster/logger.py
  34. """
  35. from __future__ import annotations
  36. import logging
  37. from copy import copy
  38. import zmq
  39. # Copyright (C) PyZMQ Developers
  40. # Distributed under the terms of the Modified BSD License.
  41. TOPIC_DELIM = "::" # delimiter for splitting topics on the receiving end.
  42. class PUBHandler(logging.Handler):
  43. """A basic logging handler that emits log messages through a PUB socket.
  44. Takes a PUB socket already bound to interfaces or an interface to bind to.
  45. Example::
  46. sock = context.socket(zmq.PUB)
  47. sock.bind('inproc://log')
  48. handler = PUBHandler(sock)
  49. Or::
  50. handler = PUBHandler('inproc://loc')
  51. These are equivalent.
  52. Log messages handled by this handler are broadcast with ZMQ topics
  53. ``this.root_topic`` comes first, followed by the log level
  54. (DEBUG,INFO,etc.), followed by any additional subtopics specified in the
  55. message by: log.debug("subtopic.subsub::the real message")
  56. """
  57. ctx: zmq.Context
  58. socket: zmq.Socket
  59. def __init__(
  60. self,
  61. interface_or_socket: str | zmq.Socket,
  62. context: zmq.Context | None = None,
  63. root_topic: str = '',
  64. ) -> None:
  65. logging.Handler.__init__(self)
  66. self.root_topic = root_topic
  67. self.formatters = {
  68. logging.DEBUG: logging.Formatter(
  69. "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n"
  70. ),
  71. logging.INFO: logging.Formatter("%(message)s\n"),
  72. logging.WARN: logging.Formatter(
  73. "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n"
  74. ),
  75. logging.ERROR: logging.Formatter(
  76. "%(levelname)s %(filename)s:%(lineno)d - %(message)s - %(exc_info)s\n"
  77. ),
  78. logging.CRITICAL: logging.Formatter(
  79. "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n"
  80. ),
  81. }
  82. if isinstance(interface_or_socket, zmq.Socket):
  83. self.socket = interface_or_socket
  84. self.ctx = self.socket.context
  85. else:
  86. self.ctx = context or zmq.Context()
  87. self.socket = self.ctx.socket(zmq.PUB)
  88. self.socket.bind(interface_or_socket)
  89. @property
  90. def root_topic(self) -> str:
  91. return self._root_topic
  92. @root_topic.setter
  93. def root_topic(self, value: str):
  94. self.setRootTopic(value)
  95. def setRootTopic(self, root_topic: str):
  96. """Set the root topic for this handler.
  97. This value is prepended to all messages published by this handler, and it
  98. defaults to the empty string ''. When you subscribe to this socket, you must
  99. set your subscription to an empty string, or to at least the first letter of
  100. the binary representation of this string to ensure you receive any messages
  101. from this handler.
  102. If you use the default empty string root topic, messages will begin with
  103. the binary representation of the log level string (INFO, WARN, etc.).
  104. Note that ZMQ SUB sockets can have multiple subscriptions.
  105. """
  106. if isinstance(root_topic, bytes):
  107. root_topic = root_topic.decode("utf8")
  108. self._root_topic = root_topic
  109. def setFormatter(self, fmt, level=logging.NOTSET):
  110. """Set the Formatter for this handler.
  111. If no level is provided, the same format is used for all levels. This
  112. will overwrite all selective formatters set in the object constructor.
  113. """
  114. if level == logging.NOTSET:
  115. for fmt_level in self.formatters.keys():
  116. self.formatters[fmt_level] = fmt
  117. else:
  118. self.formatters[level] = fmt
  119. def format(self, record):
  120. """Format a record."""
  121. return self.formatters[record.levelno].format(record)
  122. def emit(self, record):
  123. """Emit a log message on my socket."""
  124. # LogRecord.getMessage explicitly allows msg to be anything _castable_ to a str
  125. try:
  126. topic, msg = str(record.msg).split(TOPIC_DELIM, 1)
  127. except ValueError:
  128. topic = ""
  129. else:
  130. # copy to avoid mutating LogRecord in-place
  131. record = copy(record)
  132. record.msg = msg
  133. try:
  134. bmsg = self.format(record).encode("utf8")
  135. except Exception:
  136. self.handleError(record)
  137. return
  138. topic_list = []
  139. if self.root_topic:
  140. topic_list.append(self.root_topic)
  141. topic_list.append(record.levelname)
  142. if topic:
  143. topic_list.append(topic)
  144. btopic = '.'.join(topic_list).encode("utf8", "replace")
  145. self.socket.send_multipart([btopic, bmsg])
  146. class TopicLogger(logging.Logger):
  147. """A simple wrapper that takes an additional argument to log methods.
  148. All the regular methods exist, but instead of one msg argument, two
  149. arguments: topic, msg are passed.
  150. That is::
  151. logger.debug('msg')
  152. Would become::
  153. logger.debug('topic.sub', 'msg')
  154. """
  155. def log(self, level, topic, msg, *args, **kwargs):
  156. """Log 'msg % args' with level and topic.
  157. To pass exception information, use the keyword argument exc_info
  158. with a True value::
  159. logger.log(level, "zmq.fun", "We have a %s",
  160. "mysterious problem", exc_info=1)
  161. """
  162. logging.Logger.log(self, level, f'{topic}{TOPIC_DELIM}{msg}', *args, **kwargs)
  163. # Generate the methods of TopicLogger, since they are just adding a
  164. # topic prefix to a message.
  165. for name in "debug warn warning error critical fatal".split():
  166. try:
  167. meth = getattr(logging.Logger, name)
  168. except AttributeError:
  169. # some methods are missing, e.g. Logger.warn was removed from Python 3.13
  170. continue
  171. setattr(
  172. TopicLogger,
  173. name,
  174. lambda self, level, topic, msg, *args, **kwargs: meth(
  175. self, level, topic + TOPIC_DELIM + msg, *args, **kwargs
  176. ),
  177. )