| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- # This module is part of GitPython and is released under the
- # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
- """Standalone functions to accompany the index implementation and make it more
- versatile."""
- __all__ = [
- "write_cache",
- "read_cache",
- "write_tree_from_cache",
- "entry_key",
- "stat_mode_to_index_mode",
- "S_IFGITLINK",
- "run_commit_hook",
- "hook_path",
- ]
- from io import BytesIO
- import os
- import os.path as osp
- from pathlib import Path
- from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR
- import subprocess
- import sys
- from gitdb.base import IStream
- from gitdb.typ import str_tree_type
- from git.cmd import handle_process_output, safer_popen
- from git.compat import defenc, force_bytes, force_text, safe_decode
- from git.exc import HookExecutionError, UnmergedEntriesError
- from git.objects.fun import (
- traverse_tree_recursive,
- traverse_trees_recursive,
- tree_to_stream,
- )
- from git.util import IndexFileSHA1Writer, finalize_process
- from .typ import CE_EXTENDED, BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
- from .util import pack, unpack
- # typing -----------------------------------------------------------------------------
- from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
- from git.types import PathLike
- if TYPE_CHECKING:
- from git.db import GitCmdObjectDB
- from git.objects.tree import TreeCacheTup
- from .base import IndexFile
- # ------------------------------------------------------------------------------------
- S_IFGITLINK = S_IFLNK | S_IFDIR
- """Flags for a submodule."""
- CE_NAMEMASK_INV = ~CE_NAMEMASK
- def hook_path(name: str, git_dir: PathLike) -> str:
- """:return: path to the given named hook in the given git repository directory"""
- return osp.join(git_dir, "hooks", name)
- def _has_file_extension(path: str) -> str:
- return osp.splitext(path)[1]
- def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
- """Run the commit hook of the given name. Silently ignore hooks that do not exist.
- :param name:
- Name of hook, like ``pre-commit``.
- :param index:
- :class:`~git.index.base.IndexFile` instance.
- :param args:
- Arguments passed to hook file.
- :raise git.exc.HookExecutionError:
- """
- hp = hook_path(name, index.repo.git_dir)
- if not os.access(hp, os.X_OK):
- return
- env = os.environ.copy()
- env["GIT_INDEX_FILE"] = safe_decode(os.fspath(index.path))
- env["GIT_EDITOR"] = ":"
- cmd = [hp]
- try:
- if sys.platform == "win32" and not _has_file_extension(hp):
- # Windows only uses extensions to determine how to open files
- # (doesn't understand shebangs). Try using bash to run the hook.
- relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
- cmd = ["bash.exe", relative_hp]
- process = safer_popen(
- cmd + list(args),
- env=env,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=index.repo.working_dir,
- )
- except Exception as ex:
- raise HookExecutionError(hp, ex) from ex
- else:
- stdout_list: List[str] = []
- stderr_list: List[str] = []
- handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process)
- stdout = "".join(stdout_list)
- stderr = "".join(stderr_list)
- if process.returncode != 0:
- stdout = force_text(stdout, defenc)
- stderr = force_text(stderr, defenc)
- raise HookExecutionError(hp, process.returncode, stderr, stdout)
- # END handle return code
- def stat_mode_to_index_mode(mode: int) -> int:
- """Convert the given mode from a stat call to the corresponding index mode and
- return it."""
- if S_ISLNK(mode): # symlinks
- return S_IFLNK
- if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
- return S_IFGITLINK
- return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
- def write_cache(
- entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
- stream: IO[bytes],
- extension_data: Union[None, bytes] = None,
- ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
- ) -> None:
- """Write the cache represented by entries to a stream.
- :param entries:
- **Sorted** list of entries.
- :param stream:
- Stream to wrap into the AdapterStreamCls - it is used for final output.
- :param ShaStreamCls:
- Type to use when writing to the stream. It produces a sha while writing to it,
- before the data is passed on to the wrapped stream.
- :param extension_data:
- Any kind of data to write as a trailer, it must begin a 4 byte identifier,
- followed by its size (4 bytes).
- """
- # Wrap the stream into a compatible writer.
- stream_sha = ShaStreamCls(stream)
- tell = stream_sha.tell
- write = stream_sha.write
- # Header
- version = 3 if any(entry.extended_flags for entry in entries) else 2
- write(b"DIRC")
- write(pack(">LL", version, len(entries)))
- # Body
- for entry in entries:
- beginoffset = tell()
- write(entry.ctime_bytes) # ctime
- write(entry.mtime_bytes) # mtime
- path_str = str(entry.path)
- path: bytes = force_bytes(path_str, encoding=defenc)
- plen = len(path) & CE_NAMEMASK # Path length
- assert plen == len(path), "Path %s too long to fit into index" % entry.path
- flags = plen | (entry.flags & CE_NAMEMASK_INV) # Clear possible previous values.
- if entry.extended_flags:
- flags |= CE_EXTENDED
- write(
- pack(
- ">LLLLLL20sH",
- entry.dev,
- entry.inode,
- entry.mode,
- entry.uid,
- entry.gid,
- entry.size,
- entry.binsha,
- flags,
- )
- )
- if entry.extended_flags:
- write(pack(">H", entry.extended_flags))
- write(path)
- real_size = (tell() - beginoffset + 8) & ~7
- write(b"\0" * ((beginoffset + real_size) - tell()))
- # END for each entry
- # Write previously cached extensions data.
- if extension_data is not None:
- stream_sha.write(extension_data)
- # Write the sha over the content.
- stream_sha.write_sha()
- def read_header(stream: IO[bytes]) -> Tuple[int, int]:
- """Return tuple(version_long, num_entries) from the given stream."""
- type_id = stream.read(4)
- if type_id != b"DIRC":
- raise AssertionError("Invalid index file header: %r" % type_id)
- unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
- version, num_entries = unpacked
- assert version in (1, 2, 3), "Unsupported git index version %i, only 1, 2, and 3 are supported" % version
- return version, num_entries
- def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
- """
- :return:
- Key suitable to be used for the
- :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary.
- :param entry:
- One instance of type BaseIndexEntry or the path and the stage.
- """
- # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
- # return isinstance(entry_key, tuple) and len(entry_key) == 2
- if len(entry) == 1:
- entry_first = entry[0]
- assert isinstance(entry_first, BaseIndexEntry)
- return (entry_first.path, entry_first.stage)
- else:
- # assert is_entry_key_tup(entry)
- entry = cast(Tuple[PathLike, int], entry)
- return entry
- # END handle entry
- def read_cache(
- stream: IO[bytes],
- ) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
- """Read a cache file from the given stream.
- :return:
- tuple(version, entries_dict, extension_data, content_sha)
- * *version* is the integer version number.
- * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a
- stage.
- * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes.
- * *content_sha* is a 20 byte sha on all cache file contents.
- """
- version, num_entries = read_header(stream)
- count = 0
- entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}
- read = stream.read
- tell = stream.tell
- while count < num_entries:
- beginoffset = tell()
- ctime = unpack(">8s", read(8))[0]
- mtime = unpack(">8s", read(8))[0]
- (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
- extended_flags = 0
- if flags & CE_EXTENDED:
- extended_flags = unpack(">H", read(2))[0]
- path_size = flags & CE_NAMEMASK
- path = read(path_size).decode(defenc)
- real_size = (tell() - beginoffset + 8) & ~7
- read((beginoffset + real_size) - tell())
- entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size, extended_flags))
- # entry_key would be the method to use, but we save the effort.
- entries[(path, entry.stage)] = entry
- count += 1
- # END for each entry
- # The footer contains extension data and a sha on the content so far.
- # Keep the extension footer,and verify we have a sha in the end.
- # Extension data format is:
- # 4 bytes ID
- # 4 bytes length of chunk
- # Repeated 0 - N times
- extension_data = stream.read(~0)
- assert len(extension_data) > 19, (
- "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data)
- )
- content_sha = extension_data[-20:]
- # Truncate the sha in the end as we will dynamically create it anyway.
- extension_data = extension_data[:-20]
- return (version, entries, extension_data, content_sha)
- def write_tree_from_cache(
- entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
- ) -> Tuple[bytes, List["TreeCacheTup"]]:
- R"""Create a tree from the given sorted list of entries and put the respective
- trees into the given object database.
- :param entries:
- **Sorted** list of :class:`~git.index.typ.IndexEntry`\s.
- :param odb:
- Object database to store the trees in.
- :param si:
- Start index at which we should start creating subtrees.
- :param sl:
- Slice indicating the range we should process on the entries list.
- :return:
- tuple(binsha, list(tree_entry, ...))
- A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name.
- """
- tree_items: List["TreeCacheTup"] = []
- ci = sl.start
- end = sl.stop
- while ci < end:
- entry = entries[ci]
- if entry.stage != 0:
- raise UnmergedEntriesError(entry)
- # END abort on unmerged
- ci += 1
- rbound = entry.path.find("/", si)
- if rbound == -1:
- # It's not a tree.
- tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
- else:
- # Find common base range.
- base = entry.path[si:rbound]
- xi = ci
- while xi < end:
- oentry = entries[xi]
- orbound = oentry.path.find("/", si)
- if orbound == -1 or oentry.path[si:orbound] != base:
- break
- # END abort on base mismatch
- xi += 1
- # END find common base
- # Enter recursion.
- # ci - 1 as we want to count our current item as well.
- sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
- tree_items.append((sha, S_IFDIR, base))
- # Skip ahead.
- ci = xi
- # END handle bounds
- # END for each entry
- # Finally create the tree.
- sio = BytesIO()
- tree_to_stream(tree_items, sio.write) # Writes to stream as bytes, but doesn't change tree_items.
- sio.seek(0)
- istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
- return (istream.binsha, tree_items)
- def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry:
- return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
- def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
- R"""
- :return:
- List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive
- merge of the given trees. All valid entries are on stage 0, whereas the
- conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the
- common ancestor tree, 2 to our tree and 3 to 'their' tree.
- :param tree_shas:
- 1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the
- entries will effectively correspond to the last given tree. If 3 are given, a 3
- way merge is performed.
- """
- out: List[BaseIndexEntry] = []
- # One and two way is the same for us, as we don't have to handle an existing
- # index, instrea
- if len(tree_shas) in (1, 2):
- for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
- out.append(_tree_entry_to_baseindexentry(entry, 0))
- # END for each entry
- return out
- # END handle single tree
- if len(tree_shas) > 3:
- raise ValueError("Cannot handle %i trees at once" % len(tree_shas))
- # Three trees.
- for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
- if base is not None:
- # Base version exists.
- if ours is not None:
- # Ours exists.
- if theirs is not None:
- # It exists in all branches. Ff it was changed in both
- # its a conflict. Otherwise, we take the changed version.
- # This should be the most common branch, so it comes first.
- if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or (
- base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]
- ):
- # Changed by both.
- out.append(_tree_entry_to_baseindexentry(base, 1))
- out.append(_tree_entry_to_baseindexentry(ours, 2))
- out.append(_tree_entry_to_baseindexentry(theirs, 3))
- elif base[0] != ours[0] or base[1] != ours[1]:
- # Only we changed it.
- out.append(_tree_entry_to_baseindexentry(ours, 0))
- else:
- # Either nobody changed it, or they did. In either
- # case, use theirs.
- out.append(_tree_entry_to_baseindexentry(theirs, 0))
- # END handle modification
- else:
- if ours[0] != base[0] or ours[1] != base[1]:
- # They deleted it, we changed it, conflict.
- out.append(_tree_entry_to_baseindexentry(base, 1))
- out.append(_tree_entry_to_baseindexentry(ours, 2))
- # else:
- # # We didn't change it, ignore.
- # pass
- # END handle our change
- # END handle theirs
- else:
- if theirs is None:
- # Deleted in both, its fine - it's out.
- pass
- else:
- if theirs[0] != base[0] or theirs[1] != base[1]:
- # Deleted in ours, changed theirs, conflict.
- out.append(_tree_entry_to_baseindexentry(base, 1))
- out.append(_tree_entry_to_baseindexentry(theirs, 3))
- # END theirs changed
- # else:
- # # Theirs didn't change.
- # pass
- # END handle theirs
- # END handle ours
- else:
- # All three can't be None.
- if ours is None:
- # Added in their branch.
- assert theirs is not None
- out.append(_tree_entry_to_baseindexentry(theirs, 0))
- elif theirs is None:
- # Added in our branch.
- out.append(_tree_entry_to_baseindexentry(ours, 0))
- else:
- # Both have it, except for the base, see whether it changed.
- if ours[0] != theirs[0] or ours[1] != theirs[1]:
- out.append(_tree_entry_to_baseindexentry(ours, 2))
- out.append(_tree_entry_to_baseindexentry(theirs, 3))
- else:
- # It was added the same in both.
- out.append(_tree_entry_to_baseindexentry(ours, 0))
- # END handle two items
- # END handle heads
- # END handle base exists
- # END for each entries tuple
- return out
|