async_checker.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
  2. # For details: https://github.com/pylint-dev/pylint/blob/main/LICENSE
  3. # Copyright (c) https://github.com/pylint-dev/pylint/blob/main/CONTRIBUTORS.txt
  4. """Checker for anything related to the async protocol (PEP 492)."""
  5. from __future__ import annotations
  6. import sys
  7. from typing import TYPE_CHECKING
  8. import astroid
  9. import astroid.bases
  10. import astroid.exceptions
  11. from astroid import nodes
  12. from pylint import checkers
  13. from pylint.checkers import utils as checker_utils
  14. from pylint.checkers.utils import decorated_with
  15. if TYPE_CHECKING:
  16. from pylint.lint import PyLinter
  17. class AsyncChecker(checkers.BaseChecker):
  18. name = "async"
  19. msgs = {
  20. "E1700": (
  21. "Yield inside async function",
  22. "yield-inside-async-function",
  23. "Used when an `yield` or `yield from` statement is "
  24. "found inside an async function.",
  25. {"minversion": (3, 5)},
  26. ),
  27. "E1701": (
  28. "Async context manager '%s' doesn't implement __aenter__ and __aexit__.",
  29. "not-async-context-manager",
  30. "Used when an async context manager is used with an object "
  31. "that does not implement the async context management protocol.",
  32. {"minversion": (3, 5)},
  33. ),
  34. }
  35. def open(self) -> None:
  36. self._mixin_class_rgx = self.linter.config.mixin_class_rgx
  37. self._async_generators = ["contextlib.asynccontextmanager"]
  38. @checker_utils.only_required_for_messages("yield-inside-async-function")
  39. def visit_asyncfunctiondef(self, node: nodes.AsyncFunctionDef) -> None:
  40. for child in node.nodes_of_class(nodes.Yield):
  41. if child.scope() is node and (
  42. sys.version_info[:2] == (3, 5) or isinstance(child, nodes.YieldFrom)
  43. ):
  44. self.add_message("yield-inside-async-function", node=child)
  45. @checker_utils.only_required_for_messages("not-async-context-manager")
  46. def visit_asyncwith(self, node: nodes.AsyncWith) -> None:
  47. for ctx_mgr, _ in node.items:
  48. match inferred := checker_utils.safe_infer(ctx_mgr):
  49. case _ if not inferred:
  50. continue
  51. case nodes.AsyncFunctionDef():
  52. # Check if we are dealing with a function decorated
  53. # with contextlib.asynccontextmanager.
  54. if decorated_with(inferred, self._async_generators):
  55. continue
  56. case astroid.bases.AsyncGenerator():
  57. # Check if we are dealing with a function decorated
  58. # with contextlib.asynccontextmanager.
  59. if decorated_with(inferred.parent, self._async_generators):
  60. continue
  61. case _:
  62. try:
  63. inferred.getattr("__aenter__")
  64. inferred.getattr("__aexit__")
  65. except astroid.exceptions.NotFoundError:
  66. if isinstance(inferred, astroid.Instance):
  67. # If we do not know the bases of this class,
  68. # just skip it.
  69. if not checker_utils.has_known_bases(inferred):
  70. continue
  71. # Ignore mixin classes if they match the rgx option.
  72. if (
  73. "not-async-context-manager"
  74. in self.linter.config.ignored_checks_for_mixins
  75. and self._mixin_class_rgx.match(inferred.name)
  76. ):
  77. continue
  78. else:
  79. continue
  80. self.add_message(
  81. "not-async-context-manager", node=node, args=(inferred.name,)
  82. )
  83. def register(linter: PyLinter) -> None:
  84. linter.register_checker(AsyncChecker(linter))