| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import time
- from threading import RLock
- from typing import Any, Callable, Dict, List
- class EventSummarizer:
- """Utility that aggregates related log messages to reduce log spam."""
- def __init__(self):
- self.events_by_key: Dict[str, int] = {}
- # Messages to send in next summary batch.
- self.messages_to_send: List[str] = []
- # Tracks TTL of messages. A message will not be re-sent once it is
- # added here, until its TTL expires.
- self.throttled_messages: Dict[str, float] = {}
- # Event summarizer is used by the main thread and
- # by node launcher child threads.
- self.lock = RLock()
- def add(
- self, template: str, *, quantity: Any, aggregate: Callable[[Any, Any], Any]
- ) -> None:
- """Add a log message, which will be combined by template.
- Args:
- template: Format string with one placeholder for quantity.
- quantity: Quantity to aggregate.
- aggregate: Aggregation function used to combine the
- quantities. The result is inserted into the template to
- produce the final log message.
- """
- with self.lock:
- # Enforce proper sentence structure.
- if not template.endswith("."):
- template += "."
- if template in self.events_by_key:
- self.events_by_key[template] = aggregate(
- self.events_by_key[template], quantity
- )
- else:
- self.events_by_key[template] = quantity
- def add_once_per_interval(self, message: str, key: str, interval_s: int):
- """Add a log message, which is throttled once per interval by a key.
- Args:
- message: The message to log.
- key: The key to use to deduplicate the message.
- interval_s: Throttling interval in seconds.
- """
- with self.lock:
- if key not in self.throttled_messages:
- self.throttled_messages[key] = time.time() + interval_s
- self.messages_to_send.append(message)
- def summary(self) -> List[str]:
- """Generate the aggregated log summary of all added events."""
- with self.lock:
- out = []
- for template, quantity in self.events_by_key.items():
- out.append(template.format(quantity))
- out.extend(self.messages_to_send)
- return out
- def clear(self) -> None:
- """Clear the events added."""
- with self.lock:
- self.events_by_key.clear()
- self.messages_to_send.clear()
- # Expire any messages that have reached their TTL. This allows them
- # to be sent again.
- for k, t in list(self.throttled_messages.items()):
- if time.time() > t:
- del self.throttled_messages[k]
|