v1_compat.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. """Provides partial support for compatibility with Pydantic v1."""
  2. from __future__ import annotations
  3. import json
  4. from functools import cache
  5. from inspect import signature
  6. from operator import attrgetter
  7. from typing import TYPE_CHECKING, Any, Callable, ClassVar, Literal, overload
  8. import pydantic
  9. from .utils import IS_PYDANTIC_V2, to_json
  10. if TYPE_CHECKING:
  11. from typing import Protocol
  12. class V1Model(Protocol):
  13. # ------------------------------------------------------------------------------
  14. # NOTE: These aren't part of the original v1 BaseModel spec, but were added as
  15. # internal helpers and are (re-)declared here to satisfy mypy checks.
  16. @classmethod
  17. def _dump_json_vals(cls, values: dict, by_alias: bool) -> dict: ...
  18. # ------------------------------------------------------------------------------
  19. # These methods are part of the original v1 BaseModel spec.
  20. __config__: ClassVar[type]
  21. __fields__: ClassVar[dict[str, Any]]
  22. __fields_set__: set[str]
  23. @classmethod
  24. def update_forward_refs(cls, *args: Any, **kwargs: Any) -> None: ...
  25. @classmethod
  26. def construct(cls, *args: Any, **kwargs: Any) -> V1Model: ...
  27. @classmethod
  28. def parse_obj(cls, *args: Any, **kwargs: Any) -> V1Model: ...
  29. @classmethod
  30. def parse_raw(cls, *args: Any, **kwargs: Any) -> V1Model: ...
  31. def dict(self, **kwargs: Any) -> dict[str, Any]: ...
  32. def json(self, **kwargs: Any) -> str: ...
  33. def copy(self, **kwargs: Any) -> V1Model: ...
  34. # Maps {v2 -> v1} model config keys that were renamed in v2.
  35. # See: https://docs.pydantic.dev/latest/migration/#changes-to-config
  36. _V1_CONFIG_KEYS = {
  37. "populate_by_name": "allow_population_by_field_name",
  38. "str_to_lower": "anystr_lower",
  39. "str_strip_whitespace": "anystr_strip_whitespace",
  40. "str_to_upper": "anystr_upper",
  41. "ignored_types": "keep_untouched",
  42. "str_max_length": "max_anystr_length",
  43. "str_min_length": "min_anystr_length",
  44. "from_attributes": "orm_mode",
  45. "json_schema_extra": "schema_extra",
  46. "validate_default": "validate_all",
  47. }
  48. def convert_v2_config(v2_config: dict[str, Any]) -> dict[str, Any]:
  49. """Internal helper: Return a copy of the v2 ConfigDict with renamed v1 keys."""
  50. return {
  51. # Convert v2 config keys to v1 keys
  52. **{_V1_CONFIG_KEYS.get(k, k): v for k, v in v2_config.items()},
  53. # This is a v1-only config key. In v2 it no longer exists and is
  54. # effectively always True.
  55. "underscore_attrs_are_private": True,
  56. }
  57. # HACKS: In older python versions and/or pydantic v1, we have fewer
  58. # tools to help us resolve annotations reliably before the type is fully built.
  59. # String comparison is brittle, but it'll have to do.
  60. def _is_list_like_ann(ann_str: str) -> bool:
  61. # Handle "Optional[List[T]]", "List[T]", "list[T]"
  62. return ann_str.strip().lower().startswith(("list[", "optional[list["))
  63. def _is_str_like_ann(ann_str: str) -> bool:
  64. # Handle "Optional[str]", "str"
  65. return ann_str.strip().lower() in {"str", "optional[str]"}
  66. @cache # Reduce repeat introspection via `signature()`
  67. def allowed_arg_names(func: Callable) -> set[str]:
  68. """Internal helper: Return the names of args accepted by the given function."""
  69. return set(signature(func).parameters)
  70. # Pydantic BaseModels use a custom metaclass, but its namespace changed between
  71. # versions. In v1 import it via `from pydantic.main import ModelMetaclass`; in
  72. # v2 it lives in an internal module, so avoid importing it directly.
  73. PydanticModelMetaclass: type = type(pydantic.BaseModel)
  74. class V1MixinMetaclass(PydanticModelMetaclass):
  75. def __new__(
  76. cls,
  77. name: str,
  78. bases: tuple[type, ...],
  79. namespace: dict[str, Any],
  80. **kwargs: Any,
  81. ):
  82. # Type checks run in a Pydantic v2 environment, so tell mypy to analyze
  83. # certain types in here as if they were from v1.
  84. # Note that this code should never even run unless Pydantic v1 is detected.
  85. if TYPE_CHECKING:
  86. from pydantic.v1.fields import FieldInfo
  87. else:
  88. from pydantic.fields import FieldInfo
  89. # ------------------------------------------------------------------------------
  90. # Convert any inline model config, e.g.:
  91. # class MyModel(BaseModel): # BEFORE (v2)
  92. # model_config = ConfigDict(populate_by_name=True)
  93. #
  94. # class MyModel(BaseModel): # AFTER (v1)
  95. # class Config:
  96. # allow_population_by_field_name = True
  97. if config_dict := namespace.pop("model_config", None):
  98. namespace["Config"] = type("Config", (), convert_v2_config(config_dict))
  99. # ------------------------------------------------------------------------------
  100. # Rename v2 Field() args to their v1 equivalents, if possible
  101. if annotations := namespace.get("__annotations__"):
  102. for field_name, obj in namespace.items():
  103. if (
  104. # Process annotated `Field(...)` assignments
  105. isinstance(field := obj, FieldInfo)
  106. and (ann := annotations.get(field_name))
  107. ):
  108. # For list-like fields, we WOULD want to rename:
  109. # - `max_length (v2) -> max_items (v1)`
  110. # - `min_length (v2) -> min_items (v1)`
  111. # In v1: lists -> `{min,max}_items`; strings -> `{min,max}_length`.
  112. # In v2: lists OR strings -> `{min,max}_length`.
  113. #
  114. # HOWEVER, this does not play well with generated code that defers
  115. # type annotations via `from __future__ import annotations`.
  116. # See: https://github.com/pydantic/pydantic/issues/3745
  117. #
  118. # Pydantic v1 users will unfortunately have to skip validation
  119. # of length constraints on any list-like fields.
  120. if _is_list_like_ann(ann):
  121. field.max_items, field.max_length = None, None
  122. field.min_items, field.min_length = None, None
  123. # For str-like fields, rename:
  124. # - `pattern (v2) -> regex (v1)`
  125. elif _is_str_like_ann(ann):
  126. field.regex = field.extra.pop("pattern", None)
  127. return super().__new__(cls, name, bases, namespace, **kwargs)
  128. @property
  129. def model_fields(self) -> dict[str, Any]:
  130. return self.__fields__ # type: ignore[deprecated]
  131. # Mixin to maintain compatibility with Pydantic v1. These are best-effort
  132. # shims and cannot guarantee complete compatibility. Whenever possible, prefer
  133. # upgrading to Pydantic v2 for full support.
  134. class V1Mixin(metaclass=V1MixinMetaclass):
  135. # Internal compat helpers
  136. @classmethod
  137. def _dump_json_vals(cls, values: dict[str, Any], by_alias: bool) -> dict[str, Any]:
  138. """Reserialize values from `Json`-typed fields after dumping the model to dict."""
  139. # Get the expected keys (after `.model_dump()`) for `Json`-typed fields.
  140. # Note: In v1, `Json` fields have `ModelField.parse_json == True`
  141. json_fields = (f for f in cls.__fields__.values() if f.parse_json) # type: ignore[deprecated]
  142. get_key = attrgetter("alias" if by_alias else "name")
  143. json_field_keys = set(map(get_key, json_fields))
  144. return {
  145. # Only serialize `Json` fields with non-null values.
  146. k: to_json(v) if ((v is not None) and (k in json_field_keys)) else v
  147. for k, v in values.items()
  148. }
  149. # ------------------------------------------------------------------------------
  150. @classmethod
  151. def __try_update_forward_refs__(cls: type[V1Model], **localns: Any) -> None:
  152. if hasattr(sup := super(), "__try_update_forward_refs__"):
  153. sup.__try_update_forward_refs__(**localns)
  154. @classmethod
  155. def model_rebuild(cls, *args: Any, **kwargs: Any) -> None:
  156. return cls.update_forward_refs(*args, **kwargs)
  157. @classmethod
  158. def model_construct(cls, *args: Any, **kwargs: Any) -> V1Model:
  159. return cls.construct(*args, **kwargs)
  160. @classmethod
  161. def model_validate(cls, *args: Any, **kwargs: Any) -> V1Model:
  162. return cls.parse_obj(*args, **kwargs)
  163. @classmethod
  164. def model_validate_json(cls, *args: Any, **kwargs: Any) -> V1Model:
  165. return cls.parse_raw(*args, **kwargs)
  166. def model_dump(self: V1Model, **kwargs: Any) -> dict[str, Any]:
  167. # Pass only kwargs that are allowed in the V1 method.
  168. allowed_keys = allowed_arg_names(self.dict) & kwargs.keys()
  169. dict_ = self.dict(**{k: kwargs[k] for k in allowed_keys})
  170. # Hack: serialize `Json` fields correctly when `round_trip=True` in
  171. # pydantic v1.
  172. if kwargs.get("round_trip", False):
  173. by_alias: bool = kwargs.get("by_alias", False)
  174. return self._dump_json_vals(dict_, by_alias=by_alias)
  175. return dict_
  176. def model_dump_json(self: V1Model, **kwargs: Any) -> str:
  177. # Pass only kwargs that are allowed in the V1 method.
  178. allowed_keys = allowed_arg_names(self.json) & kwargs.keys()
  179. json_ = self.json(**{k: kwargs[k] for k in allowed_keys})
  180. # Hack: serialize `Json` fields correctly when `round_trip=True` in
  181. # pydantic v1.
  182. if kwargs.get("round_trip", False):
  183. by_alias: bool = kwargs.get("by_alias", False)
  184. dict_ = json.loads(json_)
  185. return json.dumps(self._dump_json_vals(dict_, by_alias=by_alias))
  186. return json_
  187. def model_copy(self: V1Model, **kwargs: Any) -> V1Model:
  188. # Pass only kwargs that are allowed in the V1 method.
  189. allowed_keys = allowed_arg_names(self.copy) & kwargs.keys()
  190. return self.copy(**{k: kwargs[k] for k in allowed_keys})
  191. @property
  192. def model_fields_set(self: V1Model) -> set[str]:
  193. return self.__fields_set__
  194. # Placeholder. Pydantic v2 is already compatible with itself, so no extra
  195. # mixins are required.
  196. class V2Mixin:
  197. pass
  198. # Pick the mixin type based on the detected Pydantic version.
  199. PydanticCompatMixin: type = V2Mixin if IS_PYDANTIC_V2 else V1Mixin
  200. # ----------------------------------------------------------------------------
  201. # Decorators and other pydantic helpers
  202. # ----------------------------------------------------------------------------
  203. if IS_PYDANTIC_V2:
  204. from pydantic import alias_generators
  205. # https://docs.pydantic.dev/latest/api/config/#pydantic.alias_generators.to_camel
  206. to_camel = alias_generators.to_camel # e.g. "foo_bar" -> "fooBar"
  207. # https://docs.pydantic.dev/latest/api/functional_validators/#pydantic.functional_validators.field_validator
  208. field_validator = pydantic.field_validator
  209. # https://docs.pydantic.dev/latest/api/functional_validators/#pydantic.functional_validators.model_validator
  210. model_validator = pydantic.model_validator
  211. # https://docs.pydantic.dev/latest/api/fields/#pydantic.fields.computed_field
  212. computed_field = pydantic.computed_field
  213. # https://docs.pydantic.dev/latest/api/aliases/#pydantic.aliases.AliasChoices
  214. AliasChoices = pydantic.AliasChoices
  215. else:
  216. from pydantic.utils import to_lower_camel
  217. V2ValidatorMode = Literal["before", "after", "wrap", "plain"]
  218. # NOTE:
  219. # - `to_lower_camel` in v1 equals `to_camel` in v2 (lowerCamelCase).
  220. # - `to_camel` in v1 equals `to_pascal` in v2 (UpperCamelCase).
  221. to_camel = to_lower_camel
  222. # Ensures we can use v2's `@field_validator` by invoking v1's `@validator`
  223. # if v1 is detected.
  224. def field_validator(
  225. *fields: str,
  226. mode: V2ValidatorMode = "after",
  227. check_fields: bool | None = None,
  228. **_: Any,
  229. ) -> Callable:
  230. return pydantic.validator( # type: ignore[deprecated]
  231. *fields,
  232. pre=(mode == "before"),
  233. always=True,
  234. check_fields=bool(check_fields),
  235. allow_reuse=True,
  236. )
  237. # Ensures we can use v2's `@model_validator` by invoking v1's `@root_validator`
  238. # if v1 is detected.
  239. def model_validator(*, mode: V2ValidatorMode, **_: Any) -> Callable:
  240. if mode == "after":
  241. def _decorator(v2_method: Callable) -> Any:
  242. # Patch the behavior for `@model_validator(mode="after")` in
  243. # v1. This is complicated because:
  244. # - In v2 it decorates an instance method, so the function takes
  245. # `self` as the first argument.
  246. # - In v1 `@root_validator(pre=False)` decorates a classmethod,
  247. # so the function takes `cls` as the first argument.
  248. def v1_method(
  249. cls: type[V1Model], values: dict[str, Any]
  250. ) -> dict[str, Any]:
  251. # Values should already be validated in an "after"
  252. # validator, so use `construct()` to instantiate without
  253. # revalidating.
  254. v_self = v2_method(cls.construct(**values))
  255. # Pydantic v1 expects the validator to return a
  256. # `{field_name -> value}` mapping.
  257. return {f: getattr(v_self, f) for f in v_self.__fields__}
  258. return pydantic.root_validator(pre=False, allow_reuse=True)( # type: ignore[call-overload]
  259. classmethod(v1_method)
  260. )
  261. return _decorator
  262. else:
  263. return pydantic.root_validator(pre=(mode == "before"), allow_reuse=True) # type: ignore[call-overload]
  264. @overload # type: ignore[no-redef]
  265. def computed_field(func: Callable | property, /) -> property: ...
  266. @overload
  267. def computed_field(
  268. func: None, /, **_: Any
  269. ) -> Callable[[Callable | property], property]: ...
  270. def computed_field(
  271. func: Callable | property | None = None, /, **_: Any
  272. ) -> property | Callable[[Callable | property], property]:
  273. """Compatibility wrapper for Pydantic v2's `computed_field` in v1."""
  274. def always_property(f: Callable | property) -> property:
  275. # Convert the method to a property only if needed
  276. return f if isinstance(f, property) else property(f)
  277. # Handle both decorator styles
  278. return always_property if (func is None) else always_property(func)
  279. class AliasChoices: # type: ignore [no-redef]
  280. """Placeholder for Pydantic v2's AliasChoices to retain partial v1 support."""
  281. aliases: list[str]
  282. def __init__(self, *aliases: str):
  283. self.aliases = list(aliases)