fun.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. # This module is part of GitPython and is released under the
  2. # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
  3. """Functions that are supposed to be as fast as possible."""
  4. __all__ = [
  5. "tree_to_stream",
  6. "tree_entries_from_data",
  7. "traverse_trees_recursive",
  8. "traverse_tree_recursive",
  9. ]
  10. from stat import S_ISDIR
  11. from git.compat import safe_decode, defenc
  12. # typing ----------------------------------------------
  13. from typing import (
  14. Callable,
  15. List,
  16. MutableSequence,
  17. Sequence,
  18. Tuple,
  19. TYPE_CHECKING,
  20. Union,
  21. overload,
  22. )
  23. if TYPE_CHECKING:
  24. from _typeshed import ReadableBuffer
  25. from git import GitCmdObjectDB
  26. EntryTup = Tuple[bytes, int, str] # Same as TreeCacheTup in tree.py.
  27. EntryTupOrNone = Union[EntryTup, None]
  28. # ---------------------------------------------------
  29. def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None:
  30. """Write the given list of entries into a stream using its ``write`` method.
  31. :param entries:
  32. **Sorted** list of tuples with (binsha, mode, name).
  33. :param write:
  34. A ``write`` method which takes a data string.
  35. """
  36. ord_zero = ord("0")
  37. bit_mask = 7 # 3 bits set.
  38. for binsha, mode, name in entries:
  39. mode_str = b""
  40. for i in range(6):
  41. mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str
  42. # END for each 8 octal value
  43. # git slices away the first octal if it's zero.
  44. if mode_str[0] == ord_zero:
  45. mode_str = mode_str[1:]
  46. # END save a byte
  47. # Here it comes: If the name is actually unicode, the replacement below will not
  48. # work as the binsha is not part of the ascii unicode encoding - hence we must
  49. # convert to an UTF-8 string for it to work properly. According to my tests,
  50. # this is exactly what git does, that is it just takes the input literally,
  51. # which appears to be UTF-8 on linux.
  52. if isinstance(name, str):
  53. name_bytes = name.encode(defenc)
  54. else:
  55. name_bytes = name # type: ignore[unreachable] # check runtime types - is always str?
  56. write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha)))
  57. # END for each item
  58. def tree_entries_from_data(data: bytes) -> List[EntryTup]:
  59. """Read the binary representation of a tree and returns tuples of
  60. :class:`~git.objects.tree.Tree` items.
  61. :param data:
  62. Data block with tree data (as bytes).
  63. :return:
  64. list(tuple(binsha, mode, tree_relative_path), ...)
  65. """
  66. ord_zero = ord("0")
  67. space_ord = ord(" ")
  68. len_data = len(data)
  69. i = 0
  70. out = []
  71. while i < len_data:
  72. mode = 0
  73. # Read Mode
  74. # Some git versions truncate the leading 0, some don't.
  75. # The type will be extracted from the mode later.
  76. while data[i] != space_ord:
  77. # Move existing mode integer up one level being 3 bits and add the actual
  78. # ordinal value of the character.
  79. mode = (mode << 3) + (data[i] - ord_zero)
  80. i += 1
  81. # END while reading mode
  82. # Byte is space now, skip it.
  83. i += 1
  84. # Parse name, it is NULL separated.
  85. ns = i
  86. while data[i] != 0:
  87. i += 1
  88. # END while not reached NULL
  89. # Default encoding for strings in git is UTF-8.
  90. # Only use the respective unicode object if the byte stream was encoded.
  91. name_bytes = data[ns:i]
  92. name = safe_decode(name_bytes)
  93. # Byte is NULL, get next 20.
  94. i += 1
  95. sha = data[i : i + 20]
  96. i = i + 20
  97. out.append((sha, mode, name))
  98. # END for each byte in data stream
  99. return out
  100. def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone:
  101. """Return data entry matching the given name and tree mode or ``None``.
  102. Before the item is returned, the respective data item is set None in the `tree_data`
  103. list to mark it done.
  104. """
  105. try:
  106. item = tree_data[start_at]
  107. if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
  108. tree_data[start_at] = None
  109. return item
  110. except IndexError:
  111. pass
  112. # END exception handling
  113. for index, item in enumerate(tree_data):
  114. if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
  115. tree_data[index] = None
  116. return item
  117. # END if item matches
  118. # END for each item
  119. return None
  120. @overload
  121. def _to_full_path(item: None, path_prefix: str) -> None: ...
  122. @overload
  123. def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: ...
  124. def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone:
  125. """Rebuild entry with given path prefix."""
  126. if not item:
  127. return item
  128. return (item[0], item[1], path_prefix + item[2])
  129. def traverse_trees_recursive(
  130. odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str
  131. ) -> List[Tuple[EntryTupOrNone, ...]]:
  132. """
  133. :return:
  134. List of list with entries according to the given binary tree-shas.
  135. The result is encoded in a list
  136. of n tuple|None per blob/commit, (n == len(tree_shas)), where:
  137. * [0] == 20 byte sha
  138. * [1] == mode as int
  139. * [2] == path relative to working tree root
  140. The entry tuple is ``None`` if the respective blob/commit did not exist in the
  141. given tree.
  142. :param tree_shas:
  143. Iterable of shas pointing to trees. All trees must be on the same level.
  144. A tree-sha may be ``None``, in which case ``None``.
  145. :param path_prefix:
  146. A prefix to be added to the returned paths on this level.
  147. Set it ``""`` for the first iteration.
  148. :note:
  149. The ordering of the returned items will be partially lost.
  150. """
  151. trees_data: List[List[EntryTupOrNone]] = []
  152. nt = len(tree_shas)
  153. for tree_sha in tree_shas:
  154. if tree_sha is None:
  155. data: List[EntryTupOrNone] = []
  156. else:
  157. # Make new list for typing as list invariant.
  158. data = list(tree_entries_from_data(odb.stream(tree_sha).read()))
  159. # END handle muted trees
  160. trees_data.append(data)
  161. # END for each sha to get data for
  162. out: List[Tuple[EntryTupOrNone, ...]] = []
  163. # Find all matching entries and recursively process them together if the match is a
  164. # tree. If the match is a non-tree item, put it into the result.
  165. # Processed items will be set None.
  166. for ti, tree_data in enumerate(trees_data):
  167. for ii, item in enumerate(tree_data):
  168. if not item:
  169. continue
  170. # END skip already done items
  171. entries: List[EntryTupOrNone]
  172. entries = [None for _ in range(nt)]
  173. entries[ti] = item
  174. _sha, mode, name = item
  175. is_dir = S_ISDIR(mode) # Type mode bits
  176. # Find this item in all other tree data items.
  177. # Wrap around, but stop one before our current index, hence ti+nt, not
  178. # ti+1+nt.
  179. for tio in range(ti + 1, ti + nt):
  180. tio = tio % nt
  181. entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
  182. # END for each other item data
  183. # If we are a directory, enter recursion.
  184. if is_dir:
  185. out.extend(
  186. traverse_trees_recursive(
  187. odb,
  188. [((ei and ei[0]) or None) for ei in entries],
  189. path_prefix + name + "/",
  190. )
  191. )
  192. else:
  193. out.append(tuple(_to_full_path(e, path_prefix) for e in entries))
  194. # END handle recursion
  195. # Finally mark it done.
  196. tree_data[ii] = None
  197. # END for each item
  198. # We are done with one tree, set all its data empty.
  199. del tree_data[:]
  200. # END for each tree_data chunk
  201. return out
  202. def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
  203. """
  204. :return:
  205. List of entries of the tree pointed to by the binary `tree_sha`.
  206. An entry has the following format:
  207. * [0] 20 byte sha
  208. * [1] mode as int
  209. * [2] path relative to the repository
  210. :param path_prefix:
  211. Prefix to prepend to the front of all returned paths.
  212. """
  213. entries = []
  214. data = tree_entries_from_data(odb.stream(tree_sha).read())
  215. # Unpacking/packing is faster than accessing individual items.
  216. for sha, mode, name in data:
  217. if S_ISDIR(mode):
  218. entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/"))
  219. else:
  220. entries.append((sha, mode, path_prefix + name))
  221. # END for each item
  222. return entries