direct_url.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. """PEP 610"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import urllib.parse
  6. from collections.abc import Iterable
  7. from dataclasses import dataclass
  8. from typing import Any, ClassVar, TypeVar, Union
  9. __all__ = [
  10. "DirectUrl",
  11. "DirectUrlValidationError",
  12. "DirInfo",
  13. "ArchiveInfo",
  14. "VcsInfo",
  15. ]
  16. T = TypeVar("T")
  17. DIRECT_URL_METADATA_NAME = "direct_url.json"
  18. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  19. class DirectUrlValidationError(Exception):
  20. pass
  21. def _get(
  22. d: dict[str, Any], expected_type: type[T], key: str, default: T | None = None
  23. ) -> T | None:
  24. """Get value from dictionary and verify expected type."""
  25. if key not in d:
  26. return default
  27. value = d[key]
  28. if not isinstance(value, expected_type):
  29. raise DirectUrlValidationError(
  30. f"{value!r} has unexpected type for {key} (expected {expected_type})"
  31. )
  32. return value
  33. def _get_required(
  34. d: dict[str, Any], expected_type: type[T], key: str, default: T | None = None
  35. ) -> T:
  36. value = _get(d, expected_type, key, default)
  37. if value is None:
  38. raise DirectUrlValidationError(f"{key} must have a value")
  39. return value
  40. def _exactly_one_of(infos: Iterable[InfoType | None]) -> InfoType:
  41. infos = [info for info in infos if info is not None]
  42. if not infos:
  43. raise DirectUrlValidationError(
  44. "missing one of archive_info, dir_info, vcs_info"
  45. )
  46. if len(infos) > 1:
  47. raise DirectUrlValidationError(
  48. "more than one of archive_info, dir_info, vcs_info"
  49. )
  50. assert infos[0] is not None
  51. return infos[0]
  52. def _filter_none(**kwargs: Any) -> dict[str, Any]:
  53. """Make dict excluding None values."""
  54. return {k: v for k, v in kwargs.items() if v is not None}
  55. @dataclass
  56. class VcsInfo:
  57. name: ClassVar = "vcs_info"
  58. vcs: str
  59. commit_id: str
  60. requested_revision: str | None = None
  61. @classmethod
  62. def _from_dict(cls, d: dict[str, Any] | None) -> VcsInfo | None:
  63. if d is None:
  64. return None
  65. return cls(
  66. vcs=_get_required(d, str, "vcs"),
  67. commit_id=_get_required(d, str, "commit_id"),
  68. requested_revision=_get(d, str, "requested_revision"),
  69. )
  70. def _to_dict(self) -> dict[str, Any]:
  71. return _filter_none(
  72. vcs=self.vcs,
  73. requested_revision=self.requested_revision,
  74. commit_id=self.commit_id,
  75. )
  76. class ArchiveInfo:
  77. name = "archive_info"
  78. def __init__(
  79. self,
  80. hash: str | None = None,
  81. hashes: dict[str, str] | None = None,
  82. ) -> None:
  83. # set hashes before hash, since the hash setter will further populate hashes
  84. self.hashes = hashes
  85. self.hash = hash
  86. @property
  87. def hash(self) -> str | None:
  88. return self._hash
  89. @hash.setter
  90. def hash(self, value: str | None) -> None:
  91. if value is not None:
  92. # Auto-populate the hashes key to upgrade to the new format automatically.
  93. # We don't back-populate the legacy hash key from hashes.
  94. try:
  95. hash_name, hash_value = value.split("=", 1)
  96. except ValueError:
  97. raise DirectUrlValidationError(
  98. f"invalid archive_info.hash format: {value!r}"
  99. )
  100. if self.hashes is None:
  101. self.hashes = {hash_name: hash_value}
  102. elif hash_name not in self.hashes:
  103. self.hashes = self.hashes.copy()
  104. self.hashes[hash_name] = hash_value
  105. self._hash = value
  106. @classmethod
  107. def _from_dict(cls, d: dict[str, Any] | None) -> ArchiveInfo | None:
  108. if d is None:
  109. return None
  110. return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
  111. def _to_dict(self) -> dict[str, Any]:
  112. return _filter_none(hash=self.hash, hashes=self.hashes)
  113. @dataclass
  114. class DirInfo:
  115. name: ClassVar = "dir_info"
  116. editable: bool = False
  117. @classmethod
  118. def _from_dict(cls, d: dict[str, Any] | None) -> DirInfo | None:
  119. if d is None:
  120. return None
  121. return cls(editable=_get_required(d, bool, "editable", default=False))
  122. def _to_dict(self) -> dict[str, Any]:
  123. return _filter_none(editable=self.editable or None)
  124. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  125. @dataclass
  126. class DirectUrl:
  127. url: str
  128. info: InfoType
  129. subdirectory: str | None = None
  130. def _remove_auth_from_netloc(self, netloc: str) -> str:
  131. if "@" not in netloc:
  132. return netloc
  133. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  134. if (
  135. isinstance(self.info, VcsInfo)
  136. and self.info.vcs == "git"
  137. and user_pass == "git"
  138. ):
  139. return netloc
  140. if ENV_VAR_RE.match(user_pass):
  141. return netloc
  142. return netloc_no_user_pass
  143. @property
  144. def redacted_url(self) -> str:
  145. """url with user:password part removed unless it is formed with
  146. environment variables as specified in PEP 610, or it is ``git``
  147. in the case of a git URL.
  148. """
  149. purl = urllib.parse.urlsplit(self.url)
  150. netloc = self._remove_auth_from_netloc(purl.netloc)
  151. surl = urllib.parse.urlunsplit(
  152. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  153. )
  154. return surl
  155. def validate(self) -> None:
  156. self.from_dict(self.to_dict())
  157. @classmethod
  158. def from_dict(cls, d: dict[str, Any]) -> DirectUrl:
  159. return DirectUrl(
  160. url=_get_required(d, str, "url"),
  161. subdirectory=_get(d, str, "subdirectory"),
  162. info=_exactly_one_of(
  163. [
  164. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  165. DirInfo._from_dict(_get(d, dict, "dir_info")),
  166. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  167. ]
  168. ),
  169. )
  170. def to_dict(self) -> dict[str, Any]:
  171. res = _filter_none(
  172. url=self.redacted_url,
  173. subdirectory=self.subdirectory,
  174. )
  175. res[self.info.name] = self.info._to_dict()
  176. return res
  177. @classmethod
  178. def from_json(cls, s: str) -> DirectUrl:
  179. return cls.from_dict(json.loads(s))
  180. def to_json(self) -> str:
  181. return json.dumps(self.to_dict(), sort_keys=True)
  182. def is_local_editable(self) -> bool:
  183. return isinstance(self.info, DirInfo) and self.info.editable