| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- # Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
- #
- # This module is part of GitPython and is released under the
- # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
- __all__ = ["TreeModifier", "Tree"]
- import os
- import sys
- import git.diff as git_diff
- from git.util import IterableList, join_path, to_bin_sha
- from . import util
- from .base import IndexObjUnion, IndexObject
- from .blob import Blob
- from .fun import tree_entries_from_data, tree_to_stream
- from .submodule.base import Submodule
- # typing -------------------------------------------------
- from typing import (
- Any,
- Callable,
- Dict,
- Iterable,
- Iterator,
- List,
- Tuple,
- TYPE_CHECKING,
- Type,
- Union,
- cast,
- )
- if sys.version_info >= (3, 8):
- from typing import Literal
- else:
- from typing_extensions import Literal
- from git.types import PathLike
- if TYPE_CHECKING:
- from io import BytesIO
- from git.repo import Repo
- TreeCacheTup = Tuple[bytes, int, str]
- TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
- # --------------------------------------------------------
- def cmp(a: str, b: str) -> int:
- return (a > b) - (a < b)
- class TreeModifier:
- """A utility class providing methods to alter the underlying cache in a list-like
- fashion.
- Once all adjustments are complete, the :attr:`_cache`, which really is a reference
- to the cache of a tree, will be sorted. This ensures it will be in a serializable
- state.
- """
- __slots__ = ("_cache",)
- def __init__(self, cache: List[TreeCacheTup]) -> None:
- self._cache = cache
- def _index_by_name(self, name: str) -> int:
- """:return: index of an item with name, or -1 if not found"""
- for i, t in enumerate(self._cache):
- if t[2] == name:
- return i
- # END found item
- # END for each item in cache
- return -1
- # { Interface
- def set_done(self) -> "TreeModifier":
- """Call this method once you are done modifying the tree information.
- This may be called several times, but be aware that each call will cause a sort
- operation.
- :return:
- self
- """
- self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2])
- return self
- # } END interface
- # { Mutators
- def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
- """Add the given item to the tree.
- If an item with the given name already exists, nothing will be done, but a
- :exc:`ValueError` will be raised if the sha and mode of the existing item do not
- match the one you add, unless `force` is ``True``.
- :param sha:
- The 20 or 40 byte sha of the item to add.
- :param mode:
- :class:`int` representing the stat-compatible mode of the item.
- :param force:
- If ``True``, an item with your name and information will overwrite any
- existing item with the same name, no matter which information it has.
- :return:
- self
- """
- if "/" in name:
- raise ValueError("Name must not contain '/' characters")
- if (mode >> 12) not in Tree._map_id_to_type:
- raise ValueError("Invalid object type according to mode %o" % mode)
- sha = to_bin_sha(sha)
- index = self._index_by_name(name)
- item = (sha, mode, name)
- if index == -1:
- self._cache.append(item)
- else:
- if force:
- self._cache[index] = item
- else:
- ex_item = self._cache[index]
- if ex_item[0] != sha or ex_item[1] != mode:
- raise ValueError("Item %r existed with different properties" % name)
- # END handle mismatch
- # END handle force
- # END handle name exists
- return self
- def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
- """Add the given item to the tree. Its correctness is assumed, so it is the
- caller's responsibility to ensure that the input is correct.
- For more information on the parameters, see :meth:`add`.
- :param binsha:
- 20 byte binary sha.
- """
- assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
- tree_cache = (binsha, mode, name)
- self._cache.append(tree_cache)
- def __delitem__(self, name: str) -> None:
- """Delete an item with the given name if it exists."""
- index = self._index_by_name(name)
- if index > -1:
- del self._cache[index]
- # } END mutators
- class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
- R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and
- other :class:`Tree`\s.
- See :manpage:`gitglossary(7)` on "tree object":
- https://git-scm.com/docs/gitglossary#def_tree_object
- Subscripting is supported, as with a list or dict:
- * Access a specific blob using the ``tree["filename"]`` notation.
- * You may likewise access by index, like ``blob = tree[0]``.
- """
- type: Literal["tree"] = "tree"
- __slots__ = ("_cache",)
- # Actual integer IDs for comparison.
- commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link.
- blob_id = 0o10
- symlink_id = 0o12
- tree_id = 0o04
- _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
- commit_id: Submodule,
- blob_id: Blob,
- symlink_id: Blob,
- # Tree ID added once Tree is defined.
- }
- def __init__(
- self,
- repo: "Repo",
- binsha: bytes,
- mode: int = tree_id << 12,
- path: Union[PathLike, None] = None,
- ):
- super().__init__(repo, binsha, mode, path)
- @classmethod
- def _get_intermediate_items(
- cls,
- index_object: IndexObjUnion,
- ) -> Union[Tuple["Tree", ...], Tuple[()]]:
- if index_object.type == "tree":
- return tuple(index_object._iter_convert_to_object(index_object._cache))
- return ()
- def _set_cache_(self, attr: str) -> None:
- if attr == "_cache":
- # Set the data when we need it.
- ostream = self.repo.odb.stream(self.binsha)
- self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
- else:
- super()._set_cache_(attr)
- # END handle attribute
- def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
- """Iterable yields tuples of (binsha, mode, name), which will be converted to
- the respective object representation.
- """
- for binsha, mode, name in iterable:
- path = join_path(self.path, name)
- try:
- yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
- except KeyError as e:
- raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
- # END for each item
- def join(self, file: PathLike) -> IndexObjUnion:
- """Find the named object in this tree's contents.
- :return:
- :class:`~git.objects.blob.Blob`, :class:`Tree`, or
- :class:`~git.objects.submodule.base.Submodule`
- :raise KeyError:
- If the given file or tree does not exist in this tree.
- """
- msg = "Blob or Tree named %r not found"
- file = os.fspath(file)
- if "/" in file:
- tree = self
- item = self
- tokens = file.split("/")
- for i, token in enumerate(tokens):
- item = tree[token]
- if item.type == "tree":
- tree = item
- else:
- # Safety assertion - blobs are at the end of the path.
- if i != len(tokens) - 1:
- raise KeyError(msg % file)
- return item
- # END handle item type
- # END for each token of split path
- if item == self:
- raise KeyError(msg % file)
- return item
- else:
- for info in self._cache:
- if info[2] == file: # [2] == name
- return self._map_id_to_type[info[1] >> 12](
- self.repo, info[0], info[1], join_path(self.path, info[2])
- )
- # END for each obj
- raise KeyError(msg % file)
- # END handle long paths
- def __truediv__(self, file: PathLike) -> IndexObjUnion:
- """The ``/`` operator is another syntax for joining.
- See :meth:`join` for details.
- """
- return self.join(file)
- @property
- def trees(self) -> List["Tree"]:
- """:return: list(Tree, ...) List of trees directly below this tree"""
- return [i for i in self if i.type == "tree"]
- @property
- def blobs(self) -> List[Blob]:
- """:return: list(Blob, ...) List of blobs directly below this tree"""
- return [i for i in self if i.type == "blob"]
- @property
- def cache(self) -> TreeModifier:
- """
- :return:
- An object allowing modification of the internal cache. This can be used to
- change the tree's contents. When done, make sure you call
- :meth:`~TreeModifier.set_done` on the tree modifier, or serialization
- behaviour will be incorrect.
- :note:
- See :class:`TreeModifier` for more information on how to alter the cache.
- """
- return TreeModifier(self._cache)
- def traverse(
- self,
- predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
- prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
- depth: int = -1,
- branch_first: bool = True,
- visit_once: bool = False,
- ignore_self: int = 1,
- as_edge: bool = False,
- ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
- """For documentation, see
- `Traversable._traverse() <git.objects.util.Traversable._traverse>`.
- Trees are set to ``visit_once = False`` to gain more performance in the
- traversal.
- """
- # # To typecheck instead of using cast.
- # import itertools
- # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
- # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
- # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
- # ret_tup = itertools.tee(ret, 2)
- # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
- # return ret_tup[0]
- return cast(
- Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
- super()._traverse(
- predicate, # type: ignore[arg-type]
- prune, # type: ignore[arg-type]
- depth,
- branch_first,
- visit_once,
- ignore_self,
- ),
- )
- def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
- """
- :return:
- :class:`~git.util.IterableList` with the results of the traversal as
- produced by :meth:`traverse`
- Tree -> IterableList[Union[Submodule, Tree, Blob]]
- """
- return super()._list_traverse(*args, **kwargs)
- # List protocol
- def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
- return list(self._iter_convert_to_object(self._cache[i:j]))
- def __iter__(self) -> Iterator[IndexObjUnion]:
- return self._iter_convert_to_object(self._cache)
- def __len__(self) -> int:
- return len(self._cache)
- def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
- if isinstance(item, int):
- info = self._cache[item]
- return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
- if isinstance(item, str):
- # compatibility
- return self.join(item)
- # END index is basestring
- raise TypeError("Invalid index type: %r" % item)
- def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
- if isinstance(item, IndexObject):
- for info in self._cache:
- if item.binsha == info[0]:
- return True
- # END compare sha
- # END for each entry
- # END handle item is index object
- # compatibility
- # Treat item as repo-relative path.
- else:
- path = self.path
- for info in self._cache:
- if item == join_path(path, info[2]):
- return True
- # END for each item
- return False
- def __reversed__(self) -> Iterator[IndexObjUnion]:
- return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload]
- def _serialize(self, stream: "BytesIO") -> "Tree":
- """Serialize this tree into the stream. Assumes sorted tree data.
- :note:
- We will assume our tree data to be in a sorted state. If this is not the
- case, serialization will not generate a correct tree representation as these
- are assumed to be sorted by algorithms.
- """
- tree_to_stream(self._cache, stream.write)
- return self
- def _deserialize(self, stream: "BytesIO") -> "Tree":
- self._cache = tree_entries_from_data(stream.read())
- return self
- # END tree
- # Finalize map definition.
- Tree._map_id_to_type[Tree.tree_id] = Tree
|