mmap_dict.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import json
  2. import mmap
  3. import os
  4. import struct
  5. from typing import List
  6. _INITIAL_MMAP_SIZE = 1 << 16
  7. _pack_integer_func = struct.Struct(b'i').pack
  8. _pack_two_doubles_func = struct.Struct(b'dd').pack
  9. _unpack_integer = struct.Struct(b'i').unpack_from
  10. _unpack_two_doubles = struct.Struct(b'dd').unpack_from
  11. # struct.pack_into has atomicity issues because it will temporarily write 0 into
  12. # the mmap, resulting in false reads to 0 when experiencing a lot of writes.
  13. # Using direct assignment solves this issue.
  14. def _pack_two_doubles(data, pos, value, timestamp):
  15. data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp)
  16. def _pack_integer(data, pos, value):
  17. data[pos:pos + 4] = _pack_integer_func(value)
  18. def _read_all_values(data, used=0):
  19. """Yield (key, value, timestamp, pos). No locking is performed."""
  20. if used <= 0:
  21. # If not valid `used` value is passed in, read it from the file.
  22. used = _unpack_integer(data, 0)[0]
  23. pos = 8
  24. while pos < used:
  25. encoded_len = _unpack_integer(data, pos)[0]
  26. # check we are not reading beyond bounds
  27. if encoded_len + pos > used:
  28. raise RuntimeError('Read beyond file size detected, file is corrupted.')
  29. pos += 4
  30. encoded_key = data[pos:pos + encoded_len]
  31. padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
  32. pos += padded_len
  33. value, timestamp = _unpack_two_doubles(data, pos)
  34. yield encoded_key.decode('utf-8'), value, timestamp, pos
  35. pos += 16
  36. class MmapedDict:
  37. """A dict of doubles, backed by an mmapped file.
  38. The file starts with a 4 byte int, indicating how much of it is used.
  39. Then 4 bytes of padding.
  40. There's then a number of entries, consisting of a 4 byte int which is the
  41. size of the next field, a utf-8 encoded string key, padding to a 8 byte
  42. alignment, and then a 8 byte float which is the value and a 8 byte float
  43. which is a UNIX timestamp in seconds.
  44. Not thread safe.
  45. """
  46. def __init__(self, filename, read_mode=False):
  47. self._f = open(filename, 'rb' if read_mode else 'a+b')
  48. self._fname = filename
  49. capacity = os.fstat(self._f.fileno()).st_size
  50. if capacity == 0:
  51. self._f.truncate(_INITIAL_MMAP_SIZE)
  52. capacity = _INITIAL_MMAP_SIZE
  53. self._capacity = capacity
  54. self._m = mmap.mmap(self._f.fileno(), self._capacity,
  55. access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE)
  56. self._positions = {}
  57. self._used = _unpack_integer(self._m, 0)[0]
  58. if self._used == 0:
  59. self._used = 8
  60. _pack_integer(self._m, 0, self._used)
  61. else:
  62. if not read_mode:
  63. for key, _, _, pos in self._read_all_values():
  64. self._positions[key] = pos
  65. @staticmethod
  66. def read_all_values_from_file(filename):
  67. with open(filename, 'rb') as infp:
  68. # Read the first block of data, including the first 4 bytes which tell us
  69. # how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied.
  70. data = infp.read(mmap.PAGESIZE)
  71. used = _unpack_integer(data, 0)[0]
  72. if used > len(data): # Then read in the rest, if needed.
  73. data += infp.read(used - len(data))
  74. return _read_all_values(data, used)
  75. def _init_value(self, key):
  76. """Initialize a value. Lock must be held by caller."""
  77. encoded = key.encode('utf-8')
  78. # Pad to be 8-byte aligned.
  79. padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
  80. value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
  81. while self._used + len(value) > self._capacity:
  82. self._capacity *= 2
  83. self._f.truncate(self._capacity)
  84. self._m = mmap.mmap(self._f.fileno(), self._capacity)
  85. self._m[self._used:self._used + len(value)] = value
  86. # Update how much space we've used.
  87. self._used += len(value)
  88. _pack_integer(self._m, 0, self._used)
  89. self._positions[key] = self._used - 16
  90. def _read_all_values(self):
  91. """Yield (key, value, pos). No locking is performed."""
  92. return _read_all_values(data=self._m, used=self._used)
  93. def read_all_values(self):
  94. """Yield (key, value, timestamp). No locking is performed."""
  95. for k, v, ts, _ in self._read_all_values():
  96. yield k, v, ts
  97. def read_value(self, key):
  98. if key not in self._positions:
  99. self._init_value(key)
  100. pos = self._positions[key]
  101. return _unpack_two_doubles(self._m, pos)
  102. def write_value(self, key, value, timestamp):
  103. if key not in self._positions:
  104. self._init_value(key)
  105. pos = self._positions[key]
  106. _pack_two_doubles(self._m, pos, value, timestamp)
  107. def close(self):
  108. if self._f:
  109. self._m.close()
  110. self._m = None
  111. self._f.close()
  112. self._f = None
  113. def mmap_key(metric_name: str, name: str, labelnames: List[str], labelvalues: List[str], help_text: str) -> str:
  114. """Format a key for use in the mmap file."""
  115. # ensure labels are in consistent order for identity
  116. labels = dict(zip(labelnames, labelvalues))
  117. return json.dumps([metric_name, name, labels, help_text], sort_keys=True)