| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- #!/usr/bin/env python
- import logging
- import re
- import socket
- import threading
- import time
- from timeit import default_timer
- from typing import Callable, Tuple
- from ..registry import Collector, REGISTRY
- # Roughly, have to keep to what works as a file name.
- # We also remove periods, so labels can be distinguished.
- _INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]")
- def _sanitize(s):
- return _INVALID_GRAPHITE_CHARS.sub('_', s)
- class _RegularPush(threading.Thread):
- def __init__(self, pusher, interval, prefix):
- super().__init__()
- self._pusher = pusher
- self._interval = interval
- self._prefix = prefix
- def run(self):
- wait_until = default_timer()
- while True:
- while True:
- now = default_timer()
- if now >= wait_until:
- # May need to skip some pushes.
- while wait_until < now:
- wait_until += self._interval
- break
- # time.sleep can return early.
- time.sleep(wait_until - now)
- try:
- self._pusher.push(prefix=self._prefix)
- except OSError:
- logging.exception("Push failed")
- class GraphiteBridge:
- def __init__(self,
- address: Tuple[str, int],
- registry: Collector = REGISTRY,
- timeout_seconds: float = 30,
- _timer: Callable[[], float] = time.time,
- tags: bool = False,
- ):
- self._address = address
- self._registry = registry
- self._tags = tags
- self._timeout = timeout_seconds
- self._timer = _timer
- def push(self, prefix: str = '') -> None:
- now = int(self._timer())
- output = []
- prefixstr = ''
- if prefix:
- prefixstr = prefix + '.'
- for metric in self._registry.collect():
- for s in metric.samples:
- if s.labels:
- if self._tags:
- sep = ';'
- fmt = '{0}={1}'
- else:
- sep = '.'
- fmt = '{0}.{1}'
- labelstr = sep + sep.join(
- [fmt.format(
- _sanitize(k), _sanitize(v))
- for k, v in sorted(s.labels.items())])
- else:
- labelstr = ''
- output.append(f'{prefixstr}{_sanitize(s.name)}{labelstr} {float(s.value)} {now}\n')
- conn = socket.create_connection(self._address, self._timeout)
- conn.sendall(''.join(output).encode('ascii'))
- conn.close()
- def start(self, interval: float = 60.0, prefix: str = '') -> None:
- t = _RegularPush(self, interval, prefix)
- t.daemon = True
- t.start()
|