| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- import io as StringIO
- import re
- import string
- from typing import Dict, Iterable, List, Match, Optional, TextIO, Tuple
- from .metrics_core import Metric
- from .samples import Sample
- from .validation import (
- _is_valid_legacy_metric_name, _validate_labelname, _validate_metric_name,
- )
- def text_string_to_metric_families(text: str) -> Iterable[Metric]:
- """Parse Prometheus text format from a unicode string.
- See text_fd_to_metric_families.
- """
- yield from text_fd_to_metric_families(StringIO.StringIO(text))
- ESCAPE_SEQUENCES = {
- '\\\\': '\\',
- '\\n': '\n',
- '\\"': '"',
- }
- def replace_escape_sequence(match: Match[str]) -> str:
- return ESCAPE_SEQUENCES[match.group(0)]
- HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
- ESCAPING_RE = re.compile(r'\\[\\n"]')
- def _replace_help_escaping(s: str) -> str:
- return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
- def _replace_escaping(s: str) -> str:
- return ESCAPING_RE.sub(replace_escape_sequence, s)
- def _is_character_escaped(s: str, charpos: int) -> bool:
- num_bslashes = 0
- while (charpos > num_bslashes
- and s[charpos - 1 - num_bslashes] == '\\'):
- num_bslashes += 1
- return num_bslashes % 2 == 1
- def parse_labels(labels_string: str, openmetrics: bool = False) -> Dict[str, str]:
- labels: Dict[str, str] = {}
- # Copy original labels
- sub_labels = labels_string.strip()
- if openmetrics and sub_labels and sub_labels[0] == ',':
- raise ValueError("leading comma: " + labels_string)
- try:
- # Process one label at a time
- while sub_labels:
- # The label name is before the equal, or if there's no equal, that's the
- # metric name.
-
- name_term, value_term, sub_labels = _next_term(sub_labels, openmetrics)
- if not value_term:
- if openmetrics:
- raise ValueError("empty term in line: " + labels_string)
- continue
-
- label_name, quoted_name = _unquote_unescape(name_term)
-
- if not quoted_name and not _is_valid_legacy_metric_name(label_name):
- raise ValueError("unquoted UTF-8 metric name")
-
- # Check for missing quotes
- if not value_term or value_term[0] != '"':
- raise ValueError
- # The first quote is guaranteed to be after the equal.
- # Make sure that the next unescaped quote is the last character.
- i = 1
- while i < len(value_term):
- i = value_term.index('"', i)
- if not _is_character_escaped(value_term[:i], i):
- break
- i += 1
- # The label value is between the first and last quote
- quote_end = i + 1
- if quote_end != len(value_term):
- raise ValueError("unexpected text after quote: " + labels_string)
- label_value, _ = _unquote_unescape(value_term)
- if label_name == '__name__':
- _validate_metric_name(label_name)
- else:
- _validate_labelname(label_name)
- if label_name in labels:
- raise ValueError("invalid line, duplicate label name: " + labels_string)
- labels[label_name] = label_value
- return labels
- except ValueError:
- raise ValueError("Invalid labels: " + labels_string)
-
- def _next_term(text: str, openmetrics: bool) -> Tuple[str, str, str]:
- """Extract the next comma-separated label term from the text. The results
- are stripped terms for the label name, label value, and then the remainder
- of the string including the final , or }.
-
- Raises ValueError if the term is empty and we're in openmetrics mode.
- """
-
- # There may be a leading comma, which is fine here.
- if text[0] == ',':
- text = text[1:]
- if not text:
- return "", "", ""
- if text[0] == ',':
- raise ValueError("multiple commas")
- splitpos = _next_unquoted_char(text, '=,}')
- if splitpos >= 0 and text[splitpos] == "=":
- labelname = text[:splitpos]
- text = text[splitpos + 1:]
- splitpos = _next_unquoted_char(text, ',}')
- else:
- labelname = "__name__"
- if splitpos == -1:
- splitpos = len(text)
- term = text[:splitpos]
- if not term and openmetrics:
- raise ValueError("empty term:", term)
-
- rest = text[splitpos:]
- return labelname, term.strip(), rest.strip()
- def _next_unquoted_char(text: str, chs: Optional[str], startidx: int = 0) -> int:
- """Return position of next unquoted character in tuple, or -1 if not found.
-
- It is always assumed that the first character being checked is not already
- inside quotes.
- """
- in_quotes = False
- if chs is None:
- chs = string.whitespace
- for i, c in enumerate(text[startidx:]):
- if c == '"' and not _is_character_escaped(text, startidx + i):
- in_quotes = not in_quotes
- if not in_quotes:
- if c in chs:
- return startidx + i
- return -1
- def _last_unquoted_char(text: str, chs: Optional[str]) -> int:
- """Return position of last unquoted character in list, or -1 if not found."""
- i = len(text) - 1
- in_quotes = False
- if chs is None:
- chs = string.whitespace
- while i > 0:
- if text[i] == '"' and not _is_character_escaped(text, i):
- in_quotes = not in_quotes
-
- if not in_quotes:
- if text[i] in chs:
- return i
- i -= 1
- return -1
- def _split_quoted(text, separator, maxsplit=0):
- """Splits on split_ch similarly to strings.split, skipping separators if
- they are inside quotes.
- """
- tokens = ['']
- x = 0
- while x < len(text):
- split_pos = _next_unquoted_char(text, separator, x)
- if split_pos == -1:
- tokens[-1] = text[x:]
- x = len(text)
- continue
- # If the first character is the separator keep going. This happens when
- # there are double whitespace characters separating symbols.
- if split_pos == x:
- x += 1
- continue
- if maxsplit > 0 and len(tokens) > maxsplit:
- tokens[-1] = text[x:]
- break
- tokens[-1] = text[x:split_pos]
- x = split_pos + 1
- tokens.append('')
- return tokens
- def _unquote_unescape(text):
- """Returns the string, and true if it was quoted."""
- if not text:
- return text, False
- quoted = False
- text = text.strip()
- if text[0] == '"':
- if len(text) == 1 or text[-1] != '"':
- raise ValueError("missing close quote")
- text = text[1:-1]
- quoted = True
- if "\\" in text:
- text = _replace_escaping(text)
- return text, quoted
- # If we have multiple values only consider the first
- def _parse_value_and_timestamp(s: str) -> Tuple[float, Optional[float]]:
- s = s.lstrip()
- separator = " "
- if separator not in s:
- separator = "\t"
- values = [value.strip() for value in s.split(separator) if value.strip()]
- if not values:
- return float(s), None
- value = _parse_value(values[0])
- timestamp = (_parse_value(values[-1]) / 1000) if len(values) > 1 else None
- return value, timestamp
- def _parse_value(value):
- value = ''.join(value)
- if value != value.strip() or '_' in value:
- raise ValueError(f"Invalid value: {value!r}")
- try:
- return int(value)
- except ValueError:
- return float(value)
-
- def _parse_sample(text):
- separator = " # "
- # Detect the labels in the text
- label_start = _next_unquoted_char(text, '{')
- if label_start == -1 or separator in text[:label_start]:
- # We don't have labels, but there could be an exemplar.
- name_end = _next_unquoted_char(text, ' \t')
- name = text[:name_end].strip()
- if not _is_valid_legacy_metric_name(name):
- raise ValueError("invalid metric name:" + text)
- # Parse the remaining text after the name
- remaining_text = text[name_end + 1:]
- value, timestamp = _parse_value_and_timestamp(remaining_text)
- return Sample(name, {}, value, timestamp)
- name = text[:label_start].strip()
- label_end = _next_unquoted_char(text[label_start:], '}') + label_start
- labels = parse_labels(text[label_start + 1:label_end], False)
- if not name:
- # Name might be in the labels
- if '__name__' not in labels:
- raise ValueError
- name = labels['__name__']
- del labels['__name__']
- elif '__name__' in labels:
- raise ValueError("metric name specified more than once")
- # Parsing labels succeeded, continue parsing the remaining text
- remaining_text = text[label_end + 1:]
- value, timestamp = _parse_value_and_timestamp(remaining_text)
- return Sample(name, labels, value, timestamp)
- def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]:
- """Parse Prometheus text format from a file descriptor.
- This is a laxer parser than the main Go parser,
- so successful parsing does not imply that the parsed
- text meets the specification.
- Yields Metric's.
- """
- name = ''
- documentation = ''
- typ = 'untyped'
- samples: List[Sample] = []
- allowed_names = []
- def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric:
- # Munge counters into OpenMetrics representation
- # used internally.
- if typ == 'counter':
- if name.endswith('_total'):
- name = name[:-6]
- else:
- new_samples = []
- for s in samples:
- new_samples.append(Sample(s[0] + '_total', *s[1:]))
- samples = new_samples
- metric = Metric(name, documentation, typ)
- metric.samples = samples
- return metric
- for line in fd:
- line = line.strip()
- if line.startswith('#'):
- parts = _split_quoted(line, None, 3)
- if len(parts) < 2:
- continue
- candidate_name, quoted = '', False
- if len(parts) > 2:
- # Ignore comment tokens
- if parts[1] != 'TYPE' and parts[1] != 'HELP':
- continue
- candidate_name, quoted = _unquote_unescape(parts[2])
- if not quoted and not _is_valid_legacy_metric_name(candidate_name):
- raise ValueError
- if parts[1] == 'HELP':
- if candidate_name != name:
- if name != '':
- yield build_metric(name, documentation, typ, samples)
- # New metric
- name = candidate_name
- typ = 'untyped'
- samples = []
- allowed_names = [candidate_name]
- if len(parts) == 4:
- documentation = _replace_help_escaping(parts[3])
- else:
- documentation = ''
- elif parts[1] == 'TYPE':
- if len(parts) < 4:
- raise ValueError
- if candidate_name != name:
- if name != '':
- yield build_metric(name, documentation, typ, samples)
- # New metric
- name = candidate_name
- documentation = ''
- samples = []
- typ = parts[3]
- allowed_names = {
- 'counter': [''],
- 'gauge': [''],
- 'summary': ['_count', '_sum', ''],
- 'histogram': ['_count', '_sum', '_bucket'],
- }.get(typ, [''])
- allowed_names = [name + n for n in allowed_names]
- elif line == '':
- # Ignore blank lines
- pass
- else:
- sample = _parse_sample(line)
- if sample.name not in allowed_names:
- if name != '':
- yield build_metric(name, documentation, typ, samples)
- # New metric, yield immediately as untyped singleton
- name = ''
- documentation = ''
- typ = 'untyped'
- samples = []
- allowed_names = []
- yield build_metric(sample[0], documentation, typ, [sample])
- else:
- samples.append(sample)
- if name != '':
- yield build_metric(name, documentation, typ, samples)
|