| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627 |
- from __future__ import annotations
- import collections
- import itertools
- import operator
- from typing import TYPE_CHECKING, Generic
- from ..structs import (
- CT,
- KT,
- RT,
- DirectedGraph,
- IterableView,
- IteratorMapping,
- RequirementInformation,
- State,
- build_iter_view,
- )
- from .abstract import AbstractResolver, Result
- from .criterion import Criterion
- from .exceptions import (
- InconsistentCandidate,
- RequirementsConflicted,
- ResolutionImpossible,
- ResolutionTooDeep,
- ResolverException,
- )
- if TYPE_CHECKING:
- from collections.abc import Collection, Iterable, Mapping
- from ..providers import AbstractProvider, Preference
- from ..reporters import BaseReporter
- _OPTIMISTIC_BACKJUMPING_RATIO: float = 0.1
- def _build_result(state: State[RT, CT, KT]) -> Result[RT, CT, KT]:
- mapping = state.mapping
- all_keys: dict[int, KT | None] = {id(v): k for k, v in mapping.items()}
- all_keys[id(None)] = None
- graph: DirectedGraph[KT | None] = DirectedGraph()
- graph.add(None) # Sentinel as root dependencies' parent.
- connected: set[KT | None] = {None}
- for key, criterion in state.criteria.items():
- if not _has_route_to_root(state.criteria, key, all_keys, connected):
- continue
- if key not in graph:
- graph.add(key)
- for p in criterion.iter_parent():
- try:
- pkey = all_keys[id(p)]
- except KeyError:
- continue
- if pkey not in graph:
- graph.add(pkey)
- graph.connect(pkey, key)
- return Result(
- mapping={k: v for k, v in mapping.items() if k in connected},
- graph=graph,
- criteria=state.criteria,
- )
- class Resolution(Generic[RT, CT, KT]):
- """Stateful resolution object.
- This is designed as a one-off object that holds information to kick start
- the resolution process, and holds the results afterwards.
- """
- def __init__(
- self,
- provider: AbstractProvider[RT, CT, KT],
- reporter: BaseReporter[RT, CT, KT],
- ) -> None:
- self._p = provider
- self._r = reporter
- self._states: list[State[RT, CT, KT]] = []
- # Optimistic backjumping variables
- self._optimistic_backjumping_ratio = _OPTIMISTIC_BACKJUMPING_RATIO
- self._save_states: list[State[RT, CT, KT]] | None = None
- self._optimistic_start_round: int | None = None
- @property
- def state(self) -> State[RT, CT, KT]:
- try:
- return self._states[-1]
- except IndexError as e:
- raise AttributeError("state") from e
- def _push_new_state(self) -> None:
- """Push a new state into history.
- This new state will be used to hold resolution results of the next
- coming round.
- """
- base = self._states[-1]
- state = State(
- mapping=base.mapping.copy(),
- criteria=base.criteria.copy(),
- backtrack_causes=base.backtrack_causes[:],
- )
- self._states.append(state)
- def _add_to_criteria(
- self,
- criteria: dict[KT, Criterion[RT, CT]],
- requirement: RT,
- parent: CT | None,
- ) -> None:
- self._r.adding_requirement(requirement=requirement, parent=parent)
- identifier = self._p.identify(requirement_or_candidate=requirement)
- criterion = criteria.get(identifier)
- if criterion:
- incompatibilities = list(criterion.incompatibilities)
- else:
- incompatibilities = []
- matches = self._p.find_matches(
- identifier=identifier,
- requirements=IteratorMapping(
- criteria,
- operator.methodcaller("iter_requirement"),
- {identifier: [requirement]},
- ),
- incompatibilities=IteratorMapping(
- criteria,
- operator.attrgetter("incompatibilities"),
- {identifier: incompatibilities},
- ),
- )
- if criterion:
- information = list(criterion.information)
- information.append(RequirementInformation(requirement, parent))
- else:
- information = [RequirementInformation(requirement, parent)]
- criterion = Criterion(
- candidates=build_iter_view(matches),
- information=information,
- incompatibilities=incompatibilities,
- )
- if not criterion.candidates:
- raise RequirementsConflicted(criterion)
- criteria[identifier] = criterion
- def _remove_information_from_criteria(
- self, criteria: dict[KT, Criterion[RT, CT]], parents: Collection[KT]
- ) -> None:
- """Remove information from parents of criteria.
- Concretely, removes all values from each criterion's ``information``
- field that have one of ``parents`` as provider of the requirement.
- :param criteria: The criteria to update.
- :param parents: Identifiers for which to remove information from all criteria.
- """
- if not parents:
- return
- for key, criterion in criteria.items():
- criteria[key] = Criterion(
- criterion.candidates,
- [
- information
- for information in criterion.information
- if (
- information.parent is None
- or self._p.identify(information.parent) not in parents
- )
- ],
- criterion.incompatibilities,
- )
- def _get_preference(self, name: KT) -> Preference:
- return self._p.get_preference(
- identifier=name,
- resolutions=self.state.mapping,
- candidates=IteratorMapping(
- self.state.criteria,
- operator.attrgetter("candidates"),
- ),
- information=IteratorMapping(
- self.state.criteria,
- operator.attrgetter("information"),
- ),
- backtrack_causes=self.state.backtrack_causes,
- )
- def _is_current_pin_satisfying(
- self, name: KT, criterion: Criterion[RT, CT]
- ) -> bool:
- try:
- current_pin = self.state.mapping[name]
- except KeyError:
- return False
- return all(
- self._p.is_satisfied_by(requirement=r, candidate=current_pin)
- for r in criterion.iter_requirement()
- )
- def _get_updated_criteria(self, candidate: CT) -> dict[KT, Criterion[RT, CT]]:
- criteria = self.state.criteria.copy()
- for requirement in self._p.get_dependencies(candidate=candidate):
- self._add_to_criteria(criteria, requirement, parent=candidate)
- return criteria
- def _attempt_to_pin_criterion(self, name: KT) -> list[Criterion[RT, CT]]:
- criterion = self.state.criteria[name]
- causes: list[Criterion[RT, CT]] = []
- for candidate in criterion.candidates:
- try:
- criteria = self._get_updated_criteria(candidate)
- except RequirementsConflicted as e:
- self._r.rejecting_candidate(e.criterion, candidate)
- causes.append(e.criterion)
- continue
- # Check the newly-pinned candidate actually works. This should
- # always pass under normal circumstances, but in the case of a
- # faulty provider, we will raise an error to notify the implementer
- # to fix find_matches() and/or is_satisfied_by().
- satisfied = all(
- self._p.is_satisfied_by(requirement=r, candidate=candidate)
- for r in criterion.iter_requirement()
- )
- if not satisfied:
- raise InconsistentCandidate(candidate, criterion)
- self._r.pinning(candidate=candidate)
- self.state.criteria.update(criteria)
- # Put newly-pinned candidate at the end. This is essential because
- # backtracking looks at this mapping to get the last pin.
- self.state.mapping.pop(name, None)
- self.state.mapping[name] = candidate
- return []
- # All candidates tried, nothing works. This criterion is a dead
- # end, signal for backtracking.
- return causes
- def _patch_criteria(
- self, incompatibilities_from_broken: list[tuple[KT, list[CT]]]
- ) -> bool:
- # Create a new state from the last known-to-work one, and apply
- # the previously gathered incompatibility information.
- for k, incompatibilities in incompatibilities_from_broken:
- if not incompatibilities:
- continue
- try:
- criterion = self.state.criteria[k]
- except KeyError:
- continue
- matches = self._p.find_matches(
- identifier=k,
- requirements=IteratorMapping(
- self.state.criteria,
- operator.methodcaller("iter_requirement"),
- ),
- incompatibilities=IteratorMapping(
- self.state.criteria,
- operator.attrgetter("incompatibilities"),
- {k: incompatibilities},
- ),
- )
- candidates: IterableView[CT] = build_iter_view(matches)
- if not candidates:
- return False
- incompatibilities.extend(criterion.incompatibilities)
- self.state.criteria[k] = Criterion(
- candidates=candidates,
- information=list(criterion.information),
- incompatibilities=incompatibilities,
- )
- return True
- def _save_state(self) -> None:
- """Save states for potential rollback if optimistic backjumping fails."""
- if self._save_states is None:
- self._save_states = [
- State(
- mapping=s.mapping.copy(),
- criteria=s.criteria.copy(),
- backtrack_causes=s.backtrack_causes[:],
- )
- for s in self._states
- ]
- def _rollback_states(self) -> None:
- """Rollback states and disable optimistic backjumping."""
- self._optimistic_backjumping_ratio = 0.0
- if self._save_states:
- self._states = self._save_states
- self._save_states = None
- def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool:
- """Perform backjumping.
- When we enter here, the stack is like this::
- [ state Z ]
- [ state Y ]
- [ state X ]
- .... earlier states are irrelevant.
- 1. No pins worked for Z, so it does not have a pin.
- 2. We want to reset state Y to unpinned, and pin another candidate.
- 3. State X holds what state Y was before the pin, but does not
- have the incompatibility information gathered in state Y.
- Each iteration of the loop will:
- 1. Identify Z. The incompatibility is not always caused by the latest
- state. For example, given three requirements A, B and C, with
- dependencies A1, B1 and C1, where A1 and B1 are incompatible: the
- last state might be related to C, so we want to discard the
- previous state.
- 2. Discard Z.
- 3. Discard Y but remember its incompatibility information gathered
- previously, and the failure we're dealing with right now.
- 4. Push a new state Y' based on X, and apply the incompatibility
- information from Y to Y'.
- 5a. If this causes Y' to conflict, we need to backtrack again. Make Y'
- the new Z and go back to step 2.
- 5b. If the incompatibilities apply cleanly, end backtracking.
- """
- incompatible_reqs: Iterable[CT | RT] = itertools.chain(
- (c.parent for c in causes if c.parent is not None),
- (c.requirement for c in causes),
- )
- incompatible_deps = {self._p.identify(r) for r in incompatible_reqs}
- while len(self._states) >= 3:
- # Remove the state that triggered backtracking.
- del self._states[-1]
- # Optimistically backtrack to a state that caused the incompatibility
- broken_state = self.state
- while True:
- # Retrieve the last candidate pin and known incompatibilities.
- try:
- broken_state = self._states.pop()
- name, candidate = broken_state.mapping.popitem()
- except (IndexError, KeyError):
- raise ResolutionImpossible(causes) from None
- if (
- not self._optimistic_backjumping_ratio
- and name not in incompatible_deps
- ):
- # For safe backjumping only backjump if the current dependency
- # is not the same as the incompatible dependency
- break
- # On the first time a non-safe backjump is done the state
- # is saved so we can restore it later if the resolution fails
- if (
- self._optimistic_backjumping_ratio
- and self._save_states is None
- and name not in incompatible_deps
- ):
- self._save_state()
- # If the current dependencies and the incompatible dependencies
- # are overlapping then we have likely found a cause of the
- # incompatibility
- current_dependencies = {
- self._p.identify(d) for d in self._p.get_dependencies(candidate)
- }
- if not current_dependencies.isdisjoint(incompatible_deps):
- break
- # Fallback: We should not backtrack to the point where
- # broken_state.mapping is empty, so stop backtracking for
- # a chance for the resolution to recover
- if not broken_state.mapping:
- break
- # Guard: We need at least two state to remain to both
- # backtrack and push a new state
- if len(self._states) <= 1:
- raise ResolutionImpossible(causes)
- incompatibilities_from_broken = [
- (k, list(v.incompatibilities)) for k, v in broken_state.criteria.items()
- ]
- # Also mark the newly known incompatibility.
- incompatibilities_from_broken.append((name, [candidate]))
- self._push_new_state()
- success = self._patch_criteria(incompatibilities_from_broken)
- # It works! Let's work on this new state.
- if success:
- return True
- # State does not work after applying known incompatibilities.
- # Try the still previous state.
- # No way to backtrack anymore.
- return False
- def _extract_causes(
- self, criteron: list[Criterion[RT, CT]]
- ) -> list[RequirementInformation[RT, CT]]:
- """Extract causes from list of criterion and deduplicate"""
- return list({id(i): i for c in criteron for i in c.information}.values())
- def resolve(self, requirements: Iterable[RT], max_rounds: int) -> State[RT, CT, KT]:
- if self._states:
- raise RuntimeError("already resolved")
- self._r.starting()
- # Initialize the root state.
- self._states = [
- State(
- mapping=collections.OrderedDict(),
- criteria={},
- backtrack_causes=[],
- )
- ]
- for r in requirements:
- try:
- self._add_to_criteria(self.state.criteria, r, parent=None)
- except RequirementsConflicted as e:
- raise ResolutionImpossible(e.criterion.information) from e
- # The root state is saved as a sentinel so the first ever pin can have
- # something to backtrack to if it fails. The root state is basically
- # pinning the virtual "root" package in the graph.
- self._push_new_state()
- # Variables for optimistic backjumping
- optimistic_rounds_cutoff: int | None = None
- optimistic_backjumping_start_round: int | None = None
- for round_index in range(max_rounds):
- self._r.starting_round(index=round_index)
- # Handle if optimistic backjumping has been running for too long
- if self._optimistic_backjumping_ratio and self._save_states is not None:
- if optimistic_backjumping_start_round is None:
- optimistic_backjumping_start_round = round_index
- optimistic_rounds_cutoff = int(
- (max_rounds - round_index) * self._optimistic_backjumping_ratio
- )
- if optimistic_rounds_cutoff <= 0:
- self._rollback_states()
- continue
- elif optimistic_rounds_cutoff is not None:
- if (
- round_index - optimistic_backjumping_start_round
- >= optimistic_rounds_cutoff
- ):
- self._rollback_states()
- continue
- unsatisfied_names = [
- key
- for key, criterion in self.state.criteria.items()
- if not self._is_current_pin_satisfying(key, criterion)
- ]
- # All criteria are accounted for. Nothing more to pin, we are done!
- if not unsatisfied_names:
- self._r.ending(state=self.state)
- return self.state
- # keep track of satisfied names to calculate diff after pinning
- satisfied_names = set(self.state.criteria.keys()) - set(unsatisfied_names)
- if len(unsatisfied_names) > 1:
- narrowed_unstatisfied_names = list(
- self._p.narrow_requirement_selection(
- identifiers=unsatisfied_names,
- resolutions=self.state.mapping,
- candidates=IteratorMapping(
- self.state.criteria,
- operator.attrgetter("candidates"),
- ),
- information=IteratorMapping(
- self.state.criteria,
- operator.attrgetter("information"),
- ),
- backtrack_causes=self.state.backtrack_causes,
- )
- )
- else:
- narrowed_unstatisfied_names = unsatisfied_names
- # If there are no unsatisfied names use unsatisfied names
- if not narrowed_unstatisfied_names:
- raise RuntimeError("narrow_requirement_selection returned 0 names")
- # If there is only 1 unsatisfied name skip calling self._get_preference
- if len(narrowed_unstatisfied_names) > 1:
- # Choose the most preferred unpinned criterion to try.
- name = min(narrowed_unstatisfied_names, key=self._get_preference)
- else:
- name = narrowed_unstatisfied_names[0]
- failure_criterion = self._attempt_to_pin_criterion(name)
- if failure_criterion:
- causes = self._extract_causes(failure_criterion)
- # Backjump if pinning fails. The backjump process puts us in
- # an unpinned state, so we can work on it in the next round.
- self._r.resolving_conflicts(causes=causes)
- try:
- success = self._backjump(causes)
- except ResolutionImpossible:
- if self._optimistic_backjumping_ratio and self._save_states:
- failed_optimistic_backjumping = True
- else:
- raise
- else:
- failed_optimistic_backjumping = bool(
- not success
- and self._optimistic_backjumping_ratio
- and self._save_states
- )
- if failed_optimistic_backjumping and self._save_states:
- self._rollback_states()
- else:
- self.state.backtrack_causes[:] = causes
- # Dead ends everywhere. Give up.
- if not success:
- raise ResolutionImpossible(self.state.backtrack_causes)
- else:
- # discard as information sources any invalidated names
- # (unsatisfied names that were previously satisfied)
- newly_unsatisfied_names = {
- key
- for key, criterion in self.state.criteria.items()
- if key in satisfied_names
- and not self._is_current_pin_satisfying(key, criterion)
- }
- self._remove_information_from_criteria(
- self.state.criteria, newly_unsatisfied_names
- )
- # Pinning was successful. Push a new state to do another pin.
- self._push_new_state()
- self._r.ending_round(index=round_index, state=self.state)
- raise ResolutionTooDeep(max_rounds)
- class Resolver(AbstractResolver[RT, CT, KT]):
- """The thing that performs the actual resolution work."""
- base_exception = ResolverException
- def resolve( # type: ignore[override]
- self,
- requirements: Iterable[RT],
- max_rounds: int = 100,
- ) -> Result[RT, CT, KT]:
- """Take a collection of constraints, spit out the resolution result.
- The return value is a representation to the final resolution result. It
- is a tuple subclass with three public members:
- * `mapping`: A dict of resolved candidates. Each key is an identifier
- of a requirement (as returned by the provider's `identify` method),
- and the value is the resolved candidate.
- * `graph`: A `DirectedGraph` instance representing the dependency tree.
- The vertices are keys of `mapping`, and each edge represents *why*
- a particular package is included. A special vertex `None` is
- included to represent parents of user-supplied requirements.
- * `criteria`: A dict of "criteria" that hold detailed information on
- how edges in the graph are derived. Each key is an identifier of a
- requirement, and the value is a `Criterion` instance.
- The following exceptions may be raised if a resolution cannot be found:
- * `ResolutionImpossible`: A resolution cannot be found for the given
- combination of requirements. The `causes` attribute of the
- exception is a list of (requirement, parent), giving the
- requirements that could not be satisfied.
- * `ResolutionTooDeep`: The dependency tree is too deeply nested and
- the resolver gave up. This is usually caused by a circular
- dependency, but you can try to resolve this by increasing the
- `max_rounds` argument.
- """
- resolution = Resolution(self.provider, self.reporter)
- state = resolution.resolve(requirements, max_rounds=max_rounds)
- return _build_result(state)
- def _has_route_to_root(
- criteria: Mapping[KT, Criterion[RT, CT]],
- key: KT | None,
- all_keys: dict[int, KT | None],
- connected: set[KT | None],
- ) -> bool:
- if key in connected:
- return True
- if key not in criteria:
- return False
- assert key is not None
- for p in criteria[key].iter_parent():
- try:
- pkey = all_keys[id(p)]
- except KeyError:
- continue
- if pkey in connected:
- connected.add(key)
- return True
- if _has_route_to_root(criteria, pkey, all_keys, connected):
- connected.add(key)
- return True
- return False
|