| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532 |
- # Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
- #
- # This module is part of GitPython and is released under the
- # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
- """Module containing :class:`IndexFile`, an Index implementation facilitating all kinds
- of index manipulations such as querying and merging."""
- __all__ = ["IndexFile", "CheckoutError", "StageType"]
- import contextlib
- import datetime
- import glob
- from io import BytesIO
- import os
- import os.path as osp
- from stat import S_ISLNK
- import subprocess
- import sys
- import tempfile
- from gitdb.base import IStream
- from gitdb.db import MemoryDB
- from git.compat import defenc, force_bytes
- import git.diff as git_diff
- from git.exc import CheckoutError, GitCommandError, GitError, InvalidGitRepositoryError
- from git.objects import Blob, Commit, Object, Submodule, Tree
- from git.objects.util import Serializable
- from git.util import (
- Actor,
- LazyMixin,
- LockedFD,
- join_path_native,
- file_contents_ro,
- to_native_path_linux,
- unbare_repo,
- to_bin_sha,
- )
- from .fun import (
- S_IFGITLINK,
- aggressive_tree_merge,
- entry_key,
- read_cache,
- run_commit_hook,
- stat_mode_to_index_mode,
- write_cache,
- write_tree_from_cache,
- )
- from .typ import BaseIndexEntry, IndexEntry, StageType
- from .util import TemporaryFileSwap, post_clear_cache, default_index, git_working_dir
- # typing -----------------------------------------------------------------------------
- from typing import (
- Any,
- BinaryIO,
- Callable,
- Dict,
- Generator,
- IO,
- Iterable,
- Iterator,
- List,
- NoReturn,
- Sequence,
- TYPE_CHECKING,
- Tuple,
- Union,
- )
- from git.types import Literal, PathLike
- if TYPE_CHECKING:
- from subprocess import Popen
- from git.refs.reference import Reference
- from git.repo import Repo
- Treeish = Union[Tree, Commit, str, bytes]
- # ------------------------------------------------------------------------------------
- @contextlib.contextmanager
- def _named_temporary_file_for_subprocess(directory: PathLike) -> Generator[str, None, None]:
- """Create a named temporary file git subprocesses can open, deleting it afterward.
- :param directory:
- The directory in which the file is created.
- :return:
- A context manager object that creates the file and provides its name on entry,
- and deletes it on exit.
- """
- if sys.platform == "win32":
- fd, name = tempfile.mkstemp(dir=directory)
- os.close(fd)
- try:
- yield name
- finally:
- os.remove(name)
- else:
- with tempfile.NamedTemporaryFile(dir=directory) as ctx:
- yield ctx.name
- class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
- """An Index that can be manipulated using a native implementation in order to save
- git command function calls wherever possible.
- This provides custom merging facilities allowing to merge without actually changing
- your index or your working tree. This way you can perform your own test merges based
- on the index only without having to deal with the working copy. This is useful in
- case of partial working trees.
- Entries:
- The index contains an entries dict whose keys are tuples of type
- :class:`~git.index.typ.IndexEntry` to facilitate access.
- You may read the entries dict or manipulate it using IndexEntry instance, i.e.::
- index.entries[index.entry_key(index_entry_instance)] = index_entry_instance
- Make sure you use :meth:`index.write() <write>` once you are done manipulating the
- index directly before operating on it using the git command.
- """
- __slots__ = ("repo", "version", "entries", "_extension_data", "_file_path")
- _VERSION = 2
- """The latest version we support."""
- S_IFGITLINK = S_IFGITLINK
- """Flags for a submodule."""
- def __init__(self, repo: "Repo", file_path: Union[PathLike, None] = None) -> None:
- """Initialize this Index instance, optionally from the given `file_path`.
- If no `file_path` is given, we will be created from the current index file.
- If a stream is not given, the stream will be initialized from the current
- repository's index on demand.
- """
- self.repo = repo
- self.version = self._VERSION
- self._extension_data = b""
- self._file_path: PathLike = file_path or self._index_path()
- def _set_cache_(self, attr: str) -> None:
- if attr == "entries":
- try:
- fd = os.open(self._file_path, os.O_RDONLY)
- except OSError:
- # In new repositories, there may be no index, which means we are empty.
- self.entries: Dict[Tuple[PathLike, StageType], IndexEntry] = {}
- return
- # END exception handling
- try:
- stream = file_contents_ro(fd, stream=True, allow_mmap=True)
- finally:
- os.close(fd)
- self._deserialize(stream)
- else:
- super()._set_cache_(attr)
- def _index_path(self) -> PathLike:
- if self.repo.git_dir:
- return join_path_native(self.repo.git_dir, "index")
- else:
- raise GitCommandError("No git directory given to join index path")
- @property
- def path(self) -> PathLike:
- """:return: Path to the index file we are representing"""
- return self._file_path
- def _delete_entries_cache(self) -> None:
- """Safely clear the entries cache so it can be recreated."""
- try:
- del self.entries
- except AttributeError:
- # It failed in Python 2.6.5 with AttributeError.
- # FIXME: Look into whether we can just remove this except clause now.
- pass
- # END exception handling
- # { Serializable Interface
- def _deserialize(self, stream: IO) -> "IndexFile":
- """Initialize this instance with index values read from the given stream."""
- self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream)
- return self
- def _entries_sorted(self) -> List[IndexEntry]:
- """:return: List of entries, in a sorted fashion, first by path, then by stage"""
- return sorted(self.entries.values(), key=lambda e: (e.path, e.stage))
- def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> "IndexFile":
- entries = self._entries_sorted()
- extension_data = self._extension_data # type: Union[None, bytes]
- if ignore_extension_data:
- extension_data = None
- write_cache(entries, stream, extension_data)
- return self
- # } END serializable interface
- def write(
- self,
- file_path: Union[None, PathLike] = None,
- ignore_extension_data: bool = False,
- ) -> None:
- """Write the current state to our file path or to the given one.
- :param file_path:
- If ``None``, we will write to our stored file path from which we have been
- initialized. Otherwise we write to the given file path. Please note that
- this will change the `file_path` of this index to the one you gave.
- :param ignore_extension_data:
- If ``True``, the TREE type extension data read in the index will not be
- written to disk. NOTE that no extension data is actually written. Use this
- if you have altered the index and would like to use
- :manpage:`git-write-tree(1)` afterwards to create a tree representing your
- written changes. If this data is present in the written index,
- :manpage:`git-write-tree(1)` will instead write the stored/cached tree.
- Alternatively, use :meth:`write_tree` to handle this case automatically.
- """
- # Make sure we have our entries read before getting a write lock.
- # Otherwise it would be done when streaming.
- # This can happen if one doesn't change the index, but writes it right away.
- self.entries # noqa: B018
- lfd = LockedFD(file_path or self._file_path)
- stream = lfd.open(write=True, stream=True)
- try:
- self._serialize(stream, ignore_extension_data)
- except BaseException:
- lfd.rollback()
- raise
- lfd.commit()
- # Make sure we represent what we have written.
- if file_path is not None:
- self._file_path = file_path
- @post_clear_cache
- @default_index
- def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> "IndexFile":
- """Merge the given `rhs` treeish into the current index, possibly taking
- a common base treeish into account.
- As opposed to the :func:`from_tree` method, this allows you to use an already
- existing tree as the left side of the merge.
- :param rhs:
- Treeish reference pointing to the 'other' side of the merge.
- :param base:
- Optional treeish reference pointing to the common base of `rhs` and this
- index which equals lhs.
- :return:
- self (containing the merge and possibly unmerged entries in case of
- conflicts)
- :raise git.exc.GitCommandError:
- If there is a merge conflict. The error will be raised at the first
- conflicting path. If you want to have proper merge resolution to be done by
- yourself, you have to commit the changed index (or make a valid tree from
- it) and retry with a three-way :meth:`index.from_tree <from_tree>` call.
- """
- # -i : ignore working tree status
- # --aggressive : handle more merge cases
- # -m : do an actual merge
- args: List[Union[Treeish, str]] = ["--aggressive", "-i", "-m"]
- if base is not None:
- args.append(base)
- args.append(rhs)
- self.repo.git.read_tree(args)
- return self
- @classmethod
- def new(cls, repo: "Repo", *tree_sha: Union[str, Tree]) -> "IndexFile":
- """Merge the given treeish revisions into a new index which is returned.
- This method behaves like ``git-read-tree --aggressive`` when doing the merge.
- :param repo:
- The repository treeish are located in.
- :param tree_sha:
- 20 byte or 40 byte tree sha or tree objects.
- :return:
- New :class:`IndexFile` instance. Its path will be undefined.
- If you intend to write such a merged Index, supply an alternate
- ``file_path`` to its :meth:`write` method.
- """
- tree_sha_bytes: List[bytes] = [to_bin_sha(str(t)) for t in tree_sha]
- base_entries = aggressive_tree_merge(repo.odb, tree_sha_bytes)
- inst = cls(repo)
- # Convert to entries dict.
- entries: Dict[Tuple[PathLike, int], IndexEntry] = dict(
- zip(
- ((e.path, e.stage) for e in base_entries),
- (IndexEntry.from_base(e) for e in base_entries),
- )
- )
- inst.entries = entries
- return inst
- @classmethod
- def from_tree(cls, repo: "Repo", *treeish: Treeish, **kwargs: Any) -> "IndexFile":
- R"""Merge the given treeish revisions into a new index which is returned.
- The original index will remain unaltered.
- :param repo:
- The repository treeish are located in.
- :param treeish:
- One, two or three :class:`~git.objects.tree.Tree` objects,
- :class:`~git.objects.commit.Commit`\s or 40 byte hexshas.
- The result changes according to the amount of trees:
- 1. If 1 Tree is given, it will just be read into a new index.
- 2. If 2 Trees are given, they will be merged into a new index using a two
- way merge algorithm. Tree 1 is the 'current' tree, tree 2 is the 'other'
- one. It behaves like a fast-forward.
- 3. If 3 Trees are given, a 3-way merge will be performed with the first tree
- being the common ancestor of tree 2 and tree 3. Tree 2 is the 'current'
- tree, tree 3 is the 'other' one.
- :param kwargs:
- Additional arguments passed to :manpage:`git-read-tree(1)`.
- :return:
- New :class:`IndexFile` instance. It will point to a temporary index location
- which does not exist anymore. If you intend to write such a merged Index,
- supply an alternate ``file_path`` to its :meth:`write` method.
- :note:
- In the three-way merge case, ``--aggressive`` will be specified to
- automatically resolve more cases in a commonly correct manner. Specify
- ``trivial=True`` as a keyword argument to override that.
- As the underlying :manpage:`git-read-tree(1)` command takes into account the
- current index, it will be temporarily moved out of the way to prevent any
- unexpected interference.
- """
- if len(treeish) == 0 or len(treeish) > 3:
- raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish))
- arg_list: List[Union[Treeish, str]] = []
- # Ignore that the working tree and index possibly are out of date.
- if len(treeish) > 1:
- # Drop unmerged entries when reading our index and merging.
- arg_list.append("--reset")
- # Handle non-trivial cases the way a real merge does.
- arg_list.append("--aggressive")
- # END merge handling
- # Create the temporary file in the .git directory to be sure renaming
- # works - /tmp/ directories could be on another device.
- with _named_temporary_file_for_subprocess(repo.git_dir) as tmp_index:
- arg_list.append("--index-output=%s" % tmp_index)
- arg_list.extend(treeish)
- # Move the current index out of the way - otherwise the merge may fail as it
- # considers existing entries. Moving it essentially clears the index.
- # Unfortunately there is no 'soft' way to do it.
- # The TemporaryFileSwap ensures the original file gets put back.
- with TemporaryFileSwap(join_path_native(repo.git_dir, "index")):
- repo.git.read_tree(*arg_list, **kwargs)
- index = cls(repo, tmp_index)
- index.entries # noqa: B018 # Force it to read the file as we will delete the temp-file.
- return index
- # END index merge handling
- # UTILITIES
- @unbare_repo
- def _iter_expand_paths(self: "IndexFile", paths: Sequence[PathLike]) -> Iterator[PathLike]:
- """Expand the directories in list of paths to the corresponding paths
- accordingly.
- :note:
- git will add items multiple times even if a glob overlapped with manually
- specified paths or if paths where specified multiple times - we respect that
- and do not prune.
- """
- def raise_exc(e: Exception) -> NoReturn:
- raise e
- r = str(self.repo.working_tree_dir)
- rs = r + os.sep
- for path in paths:
- abs_path = os.fspath(path)
- if not osp.isabs(abs_path):
- abs_path = osp.join(r, path)
- # END make absolute path
- try:
- st = os.lstat(abs_path) # Handles non-symlinks as well.
- except OSError:
- # The lstat call may fail as the path may contain globs as well.
- pass
- else:
- if S_ISLNK(st.st_mode):
- yield abs_path.replace(rs, "")
- continue
- # END check symlink
- # If the path is not already pointing to an existing file, resolve globs if possible.
- if not os.path.exists(abs_path) and ("?" in abs_path or "*" in abs_path or "[" in abs_path):
- resolved_paths = glob.glob(abs_path)
- # not abs_path in resolved_paths:
- # A glob() resolving to the same path we are feeding it with is a
- # glob() that failed to resolve. If we continued calling ourselves
- # we'd endlessly recurse. If the condition below evaluates to true
- # then we are likely dealing with a file whose name contains wildcard
- # characters.
- if abs_path not in resolved_paths:
- for f in self._iter_expand_paths(glob.glob(abs_path)):
- yield str(f).replace(rs, "")
- continue
- # END glob handling
- try:
- for root, _dirs, files in os.walk(abs_path, onerror=raise_exc):
- for rela_file in files:
- # Add relative paths only.
- yield osp.join(root.replace(rs, ""), rela_file)
- # END for each file in subdir
- # END for each subdirectory
- except OSError:
- # It was a file or something that could not be iterated.
- yield abs_path.replace(rs, "")
- # END path exception handling
- # END for each path
- def _write_path_to_stdin(
- self,
- proc: "Popen",
- filepath: PathLike,
- item: PathLike,
- fmakeexc: Callable[..., GitError],
- fprogress: Callable[[PathLike, bool, PathLike], None],
- read_from_stdout: bool = True,
- ) -> Union[None, str]:
- """Write path to ``proc.stdin`` and make sure it processes the item, including
- progress.
- :return:
- stdout string
- :param read_from_stdout:
- If ``True``, ``proc.stdout`` will be read after the item was sent to stdin.
- In that case, it will return ``None``.
- :note:
- There is a bug in :manpage:`git-update-index(1)` that prevents it from
- sending reports just in time. This is why we have a version that tries to
- read stdout and one which doesn't. In fact, the stdout is not important as
- the piped-in files are processed anyway and just in time.
- :note:
- Newlines are essential here, git's behaviour is somewhat inconsistent on
- this depending on the version, hence we try our best to deal with newlines
- carefully. Usually the last newline will not be sent, instead we will close
- stdin to break the pipe.
- """
- fprogress(filepath, False, item)
- rval: Union[None, str] = None
- if proc.stdin is not None:
- try:
- proc.stdin.write(("%s\n" % filepath).encode(defenc))
- except IOError as e:
- # Pipe broke, usually because some error happened.
- raise fmakeexc() from e
- # END write exception handling
- proc.stdin.flush()
- if read_from_stdout and proc.stdout is not None:
- rval = proc.stdout.readline().strip()
- fprogress(filepath, True, item)
- return rval
- def iter_blobs(
- self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True
- ) -> Iterator[Tuple[StageType, Blob]]:
- """
- :return:
- Iterator yielding tuples of :class:`~git.objects.blob.Blob` objects and
- stages, tuple(stage, Blob).
- :param predicate:
- Function(t) returning ``True`` if tuple(stage, Blob) should be yielded by
- the iterator. A default filter, the :class:`~git.index.typ.BlobFilter`, allows you
- to yield blobs only if they match a given list of paths.
- """
- for entry in self.entries.values():
- blob = entry.to_blob(self.repo)
- blob.size = entry.size
- output = (entry.stage, blob)
- if predicate(output):
- yield output
- # END for each entry
- def unmerged_blobs(self) -> Dict[PathLike, List[Tuple[StageType, Blob]]]:
- """
- :return:
- Dict(path : list(tuple(stage, Blob, ...))), being a dictionary associating a
- path in the index with a list containing sorted stage/blob pairs.
- :note:
- Blobs that have been removed in one side simply do not exist in the given
- stage. That is, a file removed on the 'other' branch whose entries are at
- stage 3 will not have a stage 3 entry.
- """
- def is_unmerged_blob(t: Tuple[StageType, Blob]) -> bool:
- return t[0] != 0
- path_map: Dict[PathLike, List[Tuple[StageType, Blob]]] = {}
- for stage, blob in self.iter_blobs(is_unmerged_blob):
- path_map.setdefault(blob.path, []).append((stage, blob))
- # END for each unmerged blob
- for line in path_map.values():
- line.sort()
- return path_map
- @classmethod
- def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]:
- return entry_key(*entry)
- def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> "IndexFile":
- """Resolve the blobs given in blob iterator.
- This will effectively remove the index entries of the respective path at all
- non-null stages and add the given blob as new stage null blob.
- For each path there may only be one blob, otherwise a :exc:`ValueError` will be
- raised claiming the path is already at stage 0.
- :raise ValueError:
- If one of the blobs already existed at stage 0.
- :return:
- self
- :note:
- You will have to write the index manually once you are done, i.e.
- ``index.resolve_blobs(blobs).write()``.
- """
- for blob in iter_blobs:
- stage_null_key = (blob.path, 0)
- if stage_null_key in self.entries:
- raise ValueError("Path %r already exists at stage 0" % str(blob.path))
- # END assert blob is not stage 0 already
- # Delete all possible stages.
- for stage in (1, 2, 3):
- try:
- del self.entries[(blob.path, stage)]
- except KeyError:
- pass
- # END ignore key errors
- # END for each possible stage
- self.entries[stage_null_key] = IndexEntry.from_blob(blob)
- # END for each blob
- return self
- def update(self) -> "IndexFile":
- """Reread the contents of our index file, discarding all cached information
- we might have.
- :note:
- This is a possibly dangerous operations as it will discard your changes to
- :attr:`index.entries <entries>`.
- :return:
- self
- """
- self._delete_entries_cache()
- # Allows to lazily reread on demand.
- return self
- def write_tree(self) -> Tree:
- """Write this index to a corresponding :class:`~git.objects.tree.Tree` object
- into the repository's object database and return it.
- :return:
- :class:`~git.objects.tree.Tree` object representing this index.
- :note:
- The tree will be written even if one or more objects the tree refers to does
- not yet exist in the object database. This could happen if you added entries
- to the index directly.
- :raise ValueError:
- If there are no entries in the cache.
- :raise git.exc.UnmergedEntriesError:
- """
- # We obtain no lock as we just flush our contents to disk as tree.
- # If we are a new index, the entries access will load our data accordingly.
- mdb = MemoryDB()
- entries = self._entries_sorted()
- binsha, tree_items = write_tree_from_cache(entries, mdb, slice(0, len(entries)))
- # Copy changed trees only.
- mdb.stream_copy(mdb.sha_iter(), self.repo.odb)
- # Note: Additional deserialization could be saved if write_tree_from_cache would
- # return sorted tree entries.
- root_tree = Tree(self.repo, binsha, path="")
- root_tree._cache = tree_items
- return root_tree
- def _process_diff_args(
- self,
- args: List[Union[PathLike, "git_diff.Diffable"]],
- ) -> List[Union[PathLike, "git_diff.Diffable"]]:
- try:
- args.pop(args.index(self))
- except IndexError:
- pass
- # END remove self
- return args
- def _to_relative_path(self, path: PathLike) -> PathLike:
- """
- :return:
- Version of path relative to our git directory or raise :exc:`ValueError` if
- it is not within our git directory.
- :raise ValueError:
- """
- if not osp.isabs(path):
- return path
- if self.repo.bare:
- raise InvalidGitRepositoryError("require non-bare repository")
- if not osp.normpath(path).startswith(str(self.repo.working_tree_dir)):
- raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir))
- result = os.path.relpath(path, self.repo.working_tree_dir)
- if os.fspath(path).endswith(os.sep) and not result.endswith(os.sep):
- result += os.sep
- return result
- def _preprocess_add_items(
- self, items: Union[PathLike, Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]]]
- ) -> Tuple[List[PathLike], List[BaseIndexEntry]]:
- """Split the items into two lists of path strings and BaseEntries."""
- paths = []
- entries = []
- # if it is a string put in list
- if isinstance(items, (str, os.PathLike)):
- items = [items]
- for item in items:
- if isinstance(item, (str, os.PathLike)):
- paths.append(self._to_relative_path(item))
- elif isinstance(item, (Blob, Submodule)):
- entries.append(BaseIndexEntry.from_blob(item))
- elif isinstance(item, BaseIndexEntry):
- entries.append(item)
- else:
- raise TypeError("Invalid Type: %r" % item)
- # END for each item
- return paths, entries
- def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry:
- """Store file at filepath in the database and return the base index entry.
- :note:
- This needs the :func:`~git.index.util.git_working_dir` decorator active!
- This must be ensured in the calling code.
- """
- st = os.lstat(filepath) # Handles non-symlinks as well.
- if S_ISLNK(st.st_mode):
- # In PY3, readlink is a string, but we need bytes.
- # In PY2, it was just OS encoded bytes, we assumed UTF-8.
- def open_stream() -> BinaryIO:
- return BytesIO(force_bytes(os.readlink(filepath), encoding=defenc))
- else:
- def open_stream() -> BinaryIO:
- return open(filepath, "rb")
- with open_stream() as stream:
- fprogress(filepath, False, filepath)
- istream = self.repo.odb.store(IStream(Blob.type, st.st_size, stream))
- fprogress(filepath, True, filepath)
- return BaseIndexEntry(
- (
- stat_mode_to_index_mode(st.st_mode),
- istream.binsha,
- 0,
- to_native_path_linux(filepath),
- )
- )
- @unbare_repo
- @git_working_dir
- def _entries_for_paths(
- self,
- paths: List[str],
- path_rewriter: Union[Callable, None],
- fprogress: Callable,
- entries: List[BaseIndexEntry],
- ) -> List[BaseIndexEntry]:
- entries_added: List[BaseIndexEntry] = []
- if path_rewriter:
- for path in paths:
- if osp.isabs(path):
- abspath = path
- gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1 :]
- else:
- gitrelative_path = path
- if self.repo.working_tree_dir:
- abspath = osp.join(self.repo.working_tree_dir, gitrelative_path)
- # END obtain relative and absolute paths
- blob = Blob(
- self.repo,
- Blob.NULL_BIN_SHA,
- stat_mode_to_index_mode(os.stat(abspath).st_mode),
- to_native_path_linux(gitrelative_path),
- )
- # TODO: variable undefined
- entries.append(BaseIndexEntry.from_blob(blob))
- # END for each path
- del paths[:]
- # END rewrite paths
- # HANDLE PATHS
- assert len(entries_added) == 0
- for filepath in self._iter_expand_paths(paths):
- entries_added.append(self._store_path(filepath, fprogress))
- # END for each filepath
- # END path handling
- return entries_added
- def add(
- self,
- items: Union[PathLike, Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]]],
- force: bool = True,
- fprogress: Callable = lambda *args: None,
- path_rewriter: Union[Callable[..., PathLike], None] = None,
- write: bool = True,
- write_extension_data: bool = False,
- ) -> List[BaseIndexEntry]:
- R"""Add files from the working tree, specific blobs, or
- :class:`~git.index.typ.BaseIndexEntry`\s to the index.
- :param items:
- Multiple types of items are supported, types can be mixed within one call.
- Different types imply a different handling. File paths may generally be
- relative or absolute.
- - path string
- Strings denote a relative or absolute path into the repository pointing
- to an existing file, e.g., ``CHANGES``, ``lib/myfile.ext``,
- ``/home/gitrepo/lib/myfile.ext``.
- Absolute paths must start with working tree directory of this index's
- repository to be considered valid. For example, if it was initialized
- with a non-normalized path, like ``/root/repo/../repo``, absolute paths
- to be added must start with ``/root/repo/../repo``.
- Paths provided like this must exist. When added, they will be written
- into the object database.
- PathStrings may contain globs, such as ``lib/__init__*``. Or they can be
- directories like ``lib``, which will add all the files within the
- directory and subdirectories.
- This equals a straight :manpage:`git-add(1)`.
- They are added at stage 0.
- - :class:`~git.objects.blob.Blob` or
- :class:`~git.objects.submodule.base.Submodule` object
- Blobs are added as they are assuming a valid mode is set.
- The file they refer to may or may not exist in the file system, but must
- be a path relative to our repository.
- If their sha is null (40*0), their path must exist in the file system
- relative to the git repository as an object will be created from the
- data at the path.
- The handling now very much equals the way string paths are processed,
- except that the mode you have set will be kept. This allows you to
- create symlinks by settings the mode respectively and writing the target
- of the symlink directly into the file. This equals a default Linux
- symlink which is not dereferenced automatically, except that it can be
- created on filesystems not supporting it as well.
- Please note that globs or directories are not allowed in
- :class:`~git.objects.blob.Blob` objects.
- They are added at stage 0.
- - :class:`~git.index.typ.BaseIndexEntry` or type
- Handling equals the one of :class:`~git.objects.blob.Blob` objects, but
- the stage may be explicitly set. Please note that Index Entries require
- binary sha's.
- :param force:
- **CURRENTLY INEFFECTIVE**
- If ``True``, otherwise ignored or excluded files will be added anyway. As
- opposed to the :manpage:`git-add(1)` command, we enable this flag by default
- as the API user usually wants the item to be added even though they might be
- excluded.
- :param fprogress:
- Function with signature ``f(path, done=False, item=item)`` called for each
- path to be added, one time once it is about to be added where ``done=False``
- and once after it was added where ``done=True``.
- ``item`` is set to the actual item we handle, either a path or a
- :class:`~git.index.typ.BaseIndexEntry`.
- Please note that the processed path is not guaranteed to be present in the
- index already as the index is currently being processed.
- :param path_rewriter:
- Function, with signature ``(string) func(BaseIndexEntry)``, returning a path
- for each passed entry which is the path to be actually recorded for the
- object created from :attr:`entry.path <git.index.typ.BaseIndexEntry.path>`.
- This allows you to write an index which is not identical to the layout of
- the actual files on your hard-disk. If not ``None`` and `items` contain
- plain paths, these paths will be converted to Entries beforehand and passed
- to the path_rewriter. Please note that ``entry.path`` is relative to the git
- repository.
- :param write:
- If ``True``, the index will be written once it was altered. Otherwise the
- changes only exist in memory and are not available to git commands.
- :param write_extension_data:
- If ``True``, extension data will be written back to the index. This can lead
- to issues in case it is containing the 'TREE' extension, which will cause
- the :manpage:`git-commit(1)` command to write an old tree, instead of a new
- one representing the now changed index.
- This doesn't matter if you use :meth:`IndexFile.commit`, which ignores the
- 'TREE' extension altogether. You should set it to ``True`` if you intend to
- use :meth:`IndexFile.commit` exclusively while maintaining support for
- third-party extensions. Besides that, you can usually safely ignore the
- built-in extensions when using GitPython on repositories that are not
- handled manually at all.
- All current built-in extensions are listed here:
- https://git-scm.com/docs/index-format
- :return:
- List of :class:`~git.index.typ.BaseIndexEntry`\s representing the entries
- just actually added.
- :raise OSError:
- If a supplied path did not exist. Please note that
- :class:`~git.index.typ.BaseIndexEntry` objects that do not have a null sha
- will be added even if their paths do not exist.
- """
- # Sort the entries into strings and Entries.
- # Blobs are converted to entries automatically.
- # Paths can be git-added. For everything else we use git-update-index.
- paths, entries = self._preprocess_add_items(items)
- entries_added: List[BaseIndexEntry] = []
- # This code needs a working tree, so we try not to run it unless required.
- # That way, we are OK on a bare repository as well.
- # If there are no paths, the rewriter has nothing to do either.
- if paths:
- entries_added.extend(self._entries_for_paths(paths, path_rewriter, fprogress, entries))
- # HANDLE ENTRIES
- if entries:
- null_mode_entries = [e for e in entries if e.mode == 0]
- if null_mode_entries:
- raise ValueError(
- "At least one Entry has a null-mode - please use index.remove to remove files for clarity"
- )
- # END null mode should be remove
- # HANDLE ENTRY OBJECT CREATION
- # Create objects if required, otherwise go with the existing shas.
- null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA]
- if null_entries_indices:
- @git_working_dir
- def handle_null_entries(self: "IndexFile") -> None:
- for ei in null_entries_indices:
- null_entry = entries[ei]
- new_entry = self._store_path(null_entry.path, fprogress)
- # Update null entry.
- entries[ei] = BaseIndexEntry(
- (
- null_entry.mode,
- new_entry.binsha,
- null_entry.stage,
- null_entry.path,
- )
- )
- # END for each entry index
- # END closure
- handle_null_entries(self)
- # END null_entry handling
- # REWRITE PATHS
- # If we have to rewrite the entries, do so now, after we have generated all
- # object sha's.
- if path_rewriter:
- for i, e in enumerate(entries):
- entries[i] = BaseIndexEntry((e.mode, e.binsha, e.stage, path_rewriter(e)))
- # END for each entry
- # END handle path rewriting
- # Just go through the remaining entries and provide progress info.
- for i, entry in enumerate(entries):
- progress_sent = i in null_entries_indices
- if not progress_sent:
- fprogress(entry.path, False, entry)
- fprogress(entry.path, True, entry)
- # END handle progress
- # END for each entry
- entries_added.extend(entries)
- # END if there are base entries
- # FINALIZE
- # Add the new entries to this instance.
- for entry in entries_added:
- self.entries[(entry.path, 0)] = IndexEntry.from_base(entry)
- if write:
- self.write(ignore_extension_data=not write_extension_data)
- # END handle write
- return entries_added
- def _items_to_rela_paths(
- self,
- items: Union[PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]],
- ) -> List[PathLike]:
- """Returns a list of repo-relative paths from the given items which
- may be absolute or relative paths, entries or blobs."""
- paths = []
- # If string, put in list.
- if isinstance(items, (str, os.PathLike)):
- items = [items]
- for item in items:
- if isinstance(item, (BaseIndexEntry, (Blob, Submodule))):
- paths.append(self._to_relative_path(item.path))
- elif isinstance(item, (str, os.PathLike)):
- paths.append(self._to_relative_path(item))
- else:
- raise TypeError("Invalid item type: %r" % item)
- # END for each item
- return paths
- @post_clear_cache
- @default_index
- def remove(
- self,
- items: Union[PathLike, Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]]],
- working_tree: bool = False,
- **kwargs: Any,
- ) -> List[str]:
- R"""Remove the given items from the index and optionally from the working tree
- as well.
- :param items:
- Multiple types of items are supported which may be be freely mixed.
- - path string
- Remove the given path at all stages. If it is a directory, you must
- specify the ``r=True`` keyword argument to remove all file entries below
- it. If absolute paths are given, they will be converted to a path
- relative to the git repository directory containing the working tree
- The path string may include globs, such as ``*.c``.
- - :class:`~git.objects.blob.Blob` object
- Only the path portion is used in this case.
- - :class:`~git.index.typ.BaseIndexEntry` or compatible type
- The only relevant information here is the path. The stage is ignored.
- :param working_tree:
- If ``True``, the entry will also be removed from the working tree,
- physically removing the respective file. This may fail if there are
- uncommitted changes in it.
- :param kwargs:
- Additional keyword arguments to be passed to :manpage:`git-rm(1)`, such as
- ``r`` to allow recursive removal.
- :return:
- List(path_string, ...) list of repository relative paths that have been
- removed effectively.
- This is interesting to know in case you have provided a directory or globs.
- Paths are relative to the repository.
- """
- args = []
- if not working_tree:
- args.append("--cached")
- args.append("--")
- # Preprocess paths.
- paths = list(map(os.fspath, self._items_to_rela_paths(items))) # type: ignore[arg-type]
- removed_paths = self.repo.git.rm(args, paths, **kwargs).splitlines()
- # Process output to gain proper paths.
- # rm 'path'
- return [p[4:-1] for p in removed_paths]
- @post_clear_cache
- @default_index
- def move(
- self,
- items: Union[PathLike, Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]]],
- skip_errors: bool = False,
- **kwargs: Any,
- ) -> List[Tuple[str, str]]:
- """Rename/move the items, whereas the last item is considered the destination of
- the move operation.
- If the destination is a file, the first item (of two) must be a file as well.
- If the destination is a directory, it may be preceded by one or more directories
- or files.
- The working tree will be affected in non-bare repositories.
- :param items:
- Multiple types of items are supported, please see the :meth:`remove` method
- for reference.
- :param skip_errors:
- If ``True``, errors such as ones resulting from missing source files will be
- skipped.
- :param kwargs:
- Additional arguments you would like to pass to :manpage:`git-mv(1)`, such as
- ``dry_run`` or ``force``.
- :return:
- List(tuple(source_path_string, destination_path_string), ...)
- A list of pairs, containing the source file moved as well as its actual
- destination. Relative to the repository root.
- :raise ValueError:
- If only one item was given.
- :raise git.exc.GitCommandError:
- If git could not handle your request.
- """
- args = []
- if skip_errors:
- args.append("-k")
- paths = self._items_to_rela_paths(items)
- if len(paths) < 2:
- raise ValueError("Please provide at least one source and one destination of the move operation")
- was_dry_run = kwargs.pop("dry_run", kwargs.pop("n", None))
- kwargs["dry_run"] = True
- # First execute rename in dry run so the command tells us what it actually does
- # (for later output).
- out = []
- mvlines = self.repo.git.mv(args, paths, **kwargs).splitlines()
- # Parse result - first 0:n/2 lines are 'checking ', the remaining ones are the
- # 'renaming' ones which we parse.
- for ln in range(int(len(mvlines) / 2), len(mvlines)):
- tokens = mvlines[ln].split(" to ")
- assert len(tokens) == 2, "Too many tokens in %s" % mvlines[ln]
- # [0] = Renaming x
- # [1] = y
- out.append((tokens[0][9:], tokens[1]))
- # END for each line to parse
- # Either prepare for the real run, or output the dry-run result.
- if was_dry_run:
- return out
- # END handle dry run
- # Now apply the actual operation.
- kwargs.pop("dry_run")
- self.repo.git.mv(args, paths, **kwargs)
- return out
- def commit(
- self,
- message: str,
- parent_commits: Union[List[Commit], None] = None,
- head: bool = True,
- author: Union[None, Actor] = None,
- committer: Union[None, Actor] = None,
- author_date: Union[datetime.datetime, str, None] = None,
- commit_date: Union[datetime.datetime, str, None] = None,
- skip_hooks: bool = False,
- ) -> Commit:
- """Commit the current default index file, creating a
- :class:`~git.objects.commit.Commit` object.
- For more information on the arguments, see
- :meth:`Commit.create_from_tree <git.objects.commit.Commit.create_from_tree>`.
- :note:
- If you have manually altered the :attr:`entries` member of this instance,
- don't forget to :meth:`write` your changes to disk beforehand.
- :note:
- Passing ``skip_hooks=True`` is the equivalent of using ``-n`` or
- ``--no-verify`` on the command line.
- :return:
- :class:`~git.objects.commit.Commit` object representing the new commit
- """
- if not skip_hooks:
- run_commit_hook("pre-commit", self)
- self._write_commit_editmsg(message)
- run_commit_hook("commit-msg", self, self._commit_editmsg_filepath())
- message = self._read_commit_editmsg()
- self._remove_commit_editmsg()
- tree = self.write_tree()
- rval = Commit.create_from_tree(
- self.repo,
- tree,
- message,
- parent_commits,
- head,
- author=author,
- committer=committer,
- author_date=author_date,
- commit_date=commit_date,
- )
- if not skip_hooks:
- run_commit_hook("post-commit", self)
- return rval
- def _write_commit_editmsg(self, message: str) -> None:
- with open(self._commit_editmsg_filepath(), "wb") as commit_editmsg_file:
- commit_editmsg_file.write(message.encode(defenc))
- def _remove_commit_editmsg(self) -> None:
- os.remove(self._commit_editmsg_filepath())
- def _read_commit_editmsg(self) -> str:
- with open(self._commit_editmsg_filepath(), "rb") as commit_editmsg_file:
- return commit_editmsg_file.read().decode(defenc)
- def _commit_editmsg_filepath(self) -> str:
- return osp.join(self.repo.common_dir, "COMMIT_EDITMSG")
- def _flush_stdin_and_wait(cls, proc: "Popen[bytes]", ignore_stdout: bool = False) -> bytes:
- stdin_IO = proc.stdin
- if stdin_IO:
- stdin_IO.flush()
- stdin_IO.close()
- stdout = b""
- if not ignore_stdout and proc.stdout:
- stdout = proc.stdout.read()
- if proc.stdout:
- proc.stdout.close()
- proc.wait()
- return stdout
- @default_index
- def checkout(
- self,
- paths: Union[None, Iterable[PathLike]] = None,
- force: bool = False,
- fprogress: Callable = lambda *args: None,
- **kwargs: Any,
- ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]:
- """Check out the given paths or all files from the version known to the index
- into the working tree.
- :note:
- Be sure you have written pending changes using the :meth:`write` method in
- case you have altered the entries dictionary directly.
- :param paths:
- If ``None``, all paths in the index will be checked out.
- Otherwise an iterable of relative or absolute paths or a single path
- pointing to files or directories in the index is expected.
- :param force:
- If ``True``, existing files will be overwritten even if they contain local
- modifications.
- If ``False``, these will trigger a :exc:`~git.exc.CheckoutError`.
- :param fprogress:
- See :meth:`IndexFile.add` for signature and explanation.
- The provided progress information will contain ``None`` as path and item if
- no explicit paths are given. Otherwise progress information will be send
- prior and after a file has been checked out.
- :param kwargs:
- Additional arguments to be passed to :manpage:`git-checkout-index(1)`.
- :return:
- Iterable yielding paths to files which have been checked out and are
- guaranteed to match the version stored in the index.
- :raise git.exc.CheckoutError:
- * If at least one file failed to be checked out. This is a summary, hence it
- will checkout as many files as it can anyway.
- * If one of files or directories do not exist in the index (as opposed to
- the original git command, which ignores them).
- :raise git.exc.GitCommandError:
- If error lines could not be parsed - this truly is an exceptional state.
- :note:
- The checkout is limited to checking out the files in the index. Files which
- are not in the index anymore and exist in the working tree will not be
- deleted. This behaviour is fundamentally different to ``head.checkout``,
- i.e. if you want :manpage:`git-checkout(1)`-like behaviour, use
- ``head.checkout`` instead of ``index.checkout``.
- """
- args = ["--index"]
- if force:
- args.append("--force")
- failed_files = []
- failed_reasons = []
- unknown_lines = []
- def handle_stderr(proc: "Popen[bytes]", iter_checked_out_files: Iterable[PathLike]) -> None:
- stderr_IO = proc.stderr
- if not stderr_IO:
- return # Return early if stderr empty.
- stderr_bytes = stderr_IO.read()
- # line contents:
- stderr = stderr_bytes.decode(defenc)
- # git-checkout-index: this already exists
- endings = (
- " already exists",
- " is not in the cache",
- " does not exist at stage",
- " is unmerged",
- )
- for line in stderr.splitlines():
- if not line.startswith("git checkout-index: ") and not line.startswith("git-checkout-index: "):
- is_a_dir = " is a directory"
- unlink_issue = "unable to unlink old '"
- already_exists_issue = " already exists, no checkout" # created by entry.c:checkout_entry(...)
- if line.endswith(is_a_dir):
- failed_files.append(line[: -len(is_a_dir)])
- failed_reasons.append(is_a_dir)
- elif line.startswith(unlink_issue):
- failed_files.append(line[len(unlink_issue) : line.rfind("'")])
- failed_reasons.append(unlink_issue)
- elif line.endswith(already_exists_issue):
- failed_files.append(line[: -len(already_exists_issue)])
- failed_reasons.append(already_exists_issue)
- else:
- unknown_lines.append(line)
- continue
- # END special lines parsing
- for e in endings:
- if line.endswith(e):
- failed_files.append(line[20 : -len(e)])
- failed_reasons.append(e)
- break
- # END if ending matches
- # END for each possible ending
- # END for each line
- if unknown_lines:
- raise GitCommandError(("git-checkout-index",), 128, stderr)
- if failed_files:
- valid_files = list(set(iter_checked_out_files) - set(failed_files))
- raise CheckoutError(
- "Some files could not be checked out from the index due to local modifications",
- failed_files,
- valid_files,
- failed_reasons,
- )
- # END stderr handler
- if paths is None:
- args.append("--all")
- kwargs["as_process"] = 1
- fprogress(None, False, None)
- proc = self.repo.git.checkout_index(*args, **kwargs)
- proc.wait()
- fprogress(None, True, None)
- rval_iter = (e.path for e in self.entries.values())
- handle_stderr(proc, rval_iter)
- return rval_iter
- else:
- if isinstance(paths, str):
- paths = [paths]
- # Make sure we have our entries loaded before we start checkout_index, which
- # will hold a lock on it. We try to get the lock as well during our entries
- # initialization.
- self.entries # noqa: B018
- args.append("--stdin")
- kwargs["as_process"] = True
- kwargs["istream"] = subprocess.PIPE
- proc = self.repo.git.checkout_index(args, **kwargs)
- # FIXME: Reading from GIL!
- def make_exc() -> GitCommandError:
- return GitCommandError(("git-checkout-index", *args), 128, proc.stderr.read())
- checked_out_files: List[PathLike] = []
- for path in paths:
- co_path = to_native_path_linux(self._to_relative_path(path))
- # If the item is not in the index, it could be a directory.
- path_is_directory = False
- try:
- self.entries[(co_path, 0)]
- except KeyError:
- folder = co_path
- if not folder.endswith("/"):
- folder += "/"
- for entry in self.entries.values():
- if os.fspath(entry.path).startswith(folder):
- p = entry.path
- self._write_path_to_stdin(proc, p, p, make_exc, fprogress, read_from_stdout=False)
- checked_out_files.append(p)
- path_is_directory = True
- # END if entry is in directory
- # END for each entry
- # END path exception handlnig
- if not path_is_directory:
- self._write_path_to_stdin(proc, co_path, path, make_exc, fprogress, read_from_stdout=False)
- checked_out_files.append(co_path)
- # END path is a file
- # END for each path
- try:
- self._flush_stdin_and_wait(proc, ignore_stdout=True)
- except GitCommandError:
- # Without parsing stdout we don't know what failed.
- raise CheckoutError( # noqa: B904
- "Some files could not be checked out from the index, probably because they didn't exist.",
- failed_files,
- [],
- failed_reasons,
- )
- handle_stderr(proc, checked_out_files)
- return checked_out_files
- # END paths handling
- @default_index
- def reset(
- self,
- commit: Union[Commit, "Reference", str] = "HEAD",
- working_tree: bool = False,
- paths: Union[None, Iterable[PathLike]] = None,
- head: bool = False,
- **kwargs: Any,
- ) -> "IndexFile":
- """Reset the index to reflect the tree at the given commit. This will not adjust
- our HEAD reference by default, as opposed to
- :meth:`HEAD.reset <git.refs.head.HEAD.reset>`.
- :param commit:
- Revision, :class:`~git.refs.reference.Reference` or
- :class:`~git.objects.commit.Commit` specifying the commit we should
- represent.
- If you want to specify a tree only, use :meth:`IndexFile.from_tree` and
- overwrite the default index.
- :param working_tree:
- If ``True``, the files in the working tree will reflect the changed index.
- If ``False``, the working tree will not be touched.
- Please note that changes to the working copy will be discarded without
- warning!
- :param head:
- If ``True``, the head will be set to the given commit. This is ``False`` by
- default, but if ``True``, this method behaves like
- :meth:`HEAD.reset <git.refs.head.HEAD.reset>`.
- :param paths:
- If given as an iterable of absolute or repository-relative paths, only these
- will be reset to their state at the given commit-ish.
- The paths need to exist at the commit, otherwise an exception will be
- raised.
- :param kwargs:
- Additional keyword arguments passed to :manpage:`git-reset(1)`.
- :note:
- :meth:`IndexFile.reset`, as opposed to
- :meth:`HEAD.reset <git.refs.head.HEAD.reset>`, will not delete any files in
- order to maintain a consistent working tree. Instead, it will just check out
- the files according to their state in the index.
- If you want :manpage:`git-reset(1)`-like behaviour, use
- :meth:`HEAD.reset <git.refs.head.HEAD.reset>` instead.
- :return:
- self
- """
- # What we actually want to do is to merge the tree into our existing index,
- # which is what git-read-tree does.
- new_inst = type(self).from_tree(self.repo, commit)
- if not paths:
- self.entries = new_inst.entries
- else:
- nie = new_inst.entries
- for path in paths:
- path = self._to_relative_path(path)
- try:
- key = entry_key(path, 0)
- self.entries[key] = nie[key]
- except KeyError:
- # If key is not in theirs, it mustn't be in ours.
- try:
- del self.entries[key]
- except KeyError:
- pass
- # END handle deletion keyerror
- # END handle keyerror
- # END for each path
- # END handle paths
- self.write()
- if working_tree:
- self.checkout(paths=paths, force=True)
- # END handle working tree
- if head:
- self.repo.head.set_commit(self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit)
- # END handle head change
- return self
- # FIXME: This is documented to accept the same parameters as Diffable.diff, but this
- # does not handle NULL_TREE for `other`. (The suppressed mypy error is about this.)
- def diff(
- self,
- other: Union[ # type: ignore[override]
- Literal[git_diff.DiffConstants.INDEX],
- "Tree",
- "Commit",
- str,
- None,
- ] = git_diff.INDEX,
- paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
- create_patch: bool = False,
- **kwargs: Any,
- ) -> git_diff.DiffIndex[git_diff.Diff]:
- """Diff this index against the working copy or a :class:`~git.objects.tree.Tree`
- or :class:`~git.objects.commit.Commit` object.
- For documentation of the parameters and return values, see
- :meth:`Diffable.diff <git.diff.Diffable.diff>`.
- :note:
- Will only work with indices that represent the default git index as they
- have not been initialized with a stream.
- """
- # Only run if we are the default repository index.
- if self._file_path != self._index_path():
- raise AssertionError("Cannot call %r on indices that do not represent the default git index" % self.diff())
- # Index against index is always empty.
- if other is self.INDEX:
- return git_diff.DiffIndex()
- # Index against anything but None is a reverse diff with the respective item.
- # Handle existing -R flags properly.
- # Transform strings to the object so that we can call diff on it.
- if isinstance(other, str):
- other = self.repo.rev_parse(other)
- # END object conversion
- if isinstance(other, Object): # For Tree or Commit.
- # Invert the existing R flag.
- cur_val = kwargs.get("R", False)
- kwargs["R"] = not cur_val
- return other.diff(self.INDEX, paths, create_patch, **kwargs)
- # END diff against other item handling
- # If other is not None here, something is wrong.
- if other is not None:
- raise ValueError("other must be None, Diffable.INDEX, a Tree or Commit, was %r" % other)
- # Diff against working copy - can be handled by superclass natively.
- return super().diff(other, paths, create_patch, **kwargs)
|