graphite.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!/usr/bin/env python
  2. import logging
  3. import re
  4. import socket
  5. import threading
  6. import time
  7. from timeit import default_timer
  8. from typing import Callable, Tuple
  9. from ..registry import Collector, REGISTRY
  10. # Roughly, have to keep to what works as a file name.
  11. # We also remove periods, so labels can be distinguished.
  12. _INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]")
  13. def _sanitize(s):
  14. return _INVALID_GRAPHITE_CHARS.sub('_', s)
  15. class _RegularPush(threading.Thread):
  16. def __init__(self, pusher, interval, prefix):
  17. super().__init__()
  18. self._pusher = pusher
  19. self._interval = interval
  20. self._prefix = prefix
  21. def run(self):
  22. wait_until = default_timer()
  23. while True:
  24. while True:
  25. now = default_timer()
  26. if now >= wait_until:
  27. # May need to skip some pushes.
  28. while wait_until < now:
  29. wait_until += self._interval
  30. break
  31. # time.sleep can return early.
  32. time.sleep(wait_until - now)
  33. try:
  34. self._pusher.push(prefix=self._prefix)
  35. except OSError:
  36. logging.exception("Push failed")
  37. class GraphiteBridge:
  38. def __init__(self,
  39. address: Tuple[str, int],
  40. registry: Collector = REGISTRY,
  41. timeout_seconds: float = 30,
  42. _timer: Callable[[], float] = time.time,
  43. tags: bool = False,
  44. ):
  45. self._address = address
  46. self._registry = registry
  47. self._tags = tags
  48. self._timeout = timeout_seconds
  49. self._timer = _timer
  50. def push(self, prefix: str = '') -> None:
  51. now = int(self._timer())
  52. output = []
  53. prefixstr = ''
  54. if prefix:
  55. prefixstr = prefix + '.'
  56. for metric in self._registry.collect():
  57. for s in metric.samples:
  58. if s.labels:
  59. if self._tags:
  60. sep = ';'
  61. fmt = '{0}={1}'
  62. else:
  63. sep = '.'
  64. fmt = '{0}.{1}'
  65. labelstr = sep + sep.join(
  66. [fmt.format(
  67. _sanitize(k), _sanitize(v))
  68. for k, v in sorted(s.labels.items())])
  69. else:
  70. labelstr = ''
  71. output.append(f'{prefixstr}{_sanitize(s.name)}{labelstr} {float(s.value)} {now}\n')
  72. conn = socket.create_connection(self._address, self._timeout)
  73. conn.sendall(''.join(output).encode('ascii'))
  74. conn.close()
  75. def start(self, interval: float = 60.0, prefix: str = '') -> None:
  76. t = _RegularPush(self, interval, prefix)
  77. t.daemon = True
  78. t.start()