event_summarizer.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import time
  2. from threading import RLock
  3. from typing import Any, Callable, Dict, List
  4. class EventSummarizer:
  5. """Utility that aggregates related log messages to reduce log spam."""
  6. def __init__(self):
  7. self.events_by_key: Dict[str, int] = {}
  8. # Messages to send in next summary batch.
  9. self.messages_to_send: List[str] = []
  10. # Tracks TTL of messages. A message will not be re-sent once it is
  11. # added here, until its TTL expires.
  12. self.throttled_messages: Dict[str, float] = {}
  13. # Event summarizer is used by the main thread and
  14. # by node launcher child threads.
  15. self.lock = RLock()
  16. def add(
  17. self, template: str, *, quantity: Any, aggregate: Callable[[Any, Any], Any]
  18. ) -> None:
  19. """Add a log message, which will be combined by template.
  20. Args:
  21. template: Format string with one placeholder for quantity.
  22. quantity: Quantity to aggregate.
  23. aggregate: Aggregation function used to combine the
  24. quantities. The result is inserted into the template to
  25. produce the final log message.
  26. """
  27. with self.lock:
  28. # Enforce proper sentence structure.
  29. if not template.endswith("."):
  30. template += "."
  31. if template in self.events_by_key:
  32. self.events_by_key[template] = aggregate(
  33. self.events_by_key[template], quantity
  34. )
  35. else:
  36. self.events_by_key[template] = quantity
  37. def add_once_per_interval(self, message: str, key: str, interval_s: int):
  38. """Add a log message, which is throttled once per interval by a key.
  39. Args:
  40. message: The message to log.
  41. key: The key to use to deduplicate the message.
  42. interval_s: Throttling interval in seconds.
  43. """
  44. with self.lock:
  45. if key not in self.throttled_messages:
  46. self.throttled_messages[key] = time.time() + interval_s
  47. self.messages_to_send.append(message)
  48. def summary(self) -> List[str]:
  49. """Generate the aggregated log summary of all added events."""
  50. with self.lock:
  51. out = []
  52. for template, quantity in self.events_by_key.items():
  53. out.append(template.format(quantity))
  54. out.extend(self.messages_to_send)
  55. return out
  56. def clear(self) -> None:
  57. """Clear the events added."""
  58. with self.lock:
  59. self.events_by_key.clear()
  60. self.messages_to_send.clear()
  61. # Expire any messages that have reached their TTL. This allows them
  62. # to be sent again.
  63. for k, t in list(self.throttled_messages.items()):
  64. if time.time() > t:
  65. del self.throttled_messages[k]