brain_subprocess.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
  2. # For details: https://github.com/pylint-dev/astroid/blob/main/LICENSE
  3. # Copyright (c) https://github.com/pylint-dev/astroid/blob/main/CONTRIBUTORS.txt
  4. import textwrap
  5. from astroid import nodes
  6. from astroid.brain.helpers import register_module_extender
  7. from astroid.builder import parse
  8. from astroid.const import PY311_PLUS
  9. from astroid.manager import AstroidManager
  10. def _subprocess_transform() -> nodes.Module:
  11. communicate = (bytes("string", "ascii"), bytes("string", "ascii"))
  12. communicate_signature = "def communicate(self, input=None, timeout=None)"
  13. args = """\
  14. self, args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None,
  15. preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None,
  16. universal_newlines=None, startupinfo=None, creationflags=0, restore_signals=True,
  17. start_new_session=False, pass_fds=(), *, encoding=None, errors=None, text=None,
  18. user=None, group=None, extra_groups=None, umask=-1, pipesize=-1"""
  19. if PY311_PLUS:
  20. args += ", process_group=None"
  21. init = f"""
  22. def __init__({args}):
  23. pass"""
  24. wait_signature = "def wait(self, timeout=None)"
  25. ctx_manager = """
  26. def __enter__(self): return self
  27. def __exit__(self, *args): pass
  28. """
  29. py3_args = "args = []"
  30. check_output_signature = """
  31. check_output(
  32. args, *,
  33. stdin=None,
  34. stderr=None,
  35. shell=False,
  36. cwd=None,
  37. encoding=None,
  38. errors=None,
  39. universal_newlines=False,
  40. timeout=None,
  41. env=None,
  42. text=None,
  43. restore_signals=True,
  44. preexec_fn=None,
  45. pass_fds=(),
  46. input=None,
  47. bufsize=0,
  48. executable=None,
  49. close_fds=False,
  50. startupinfo=None,
  51. creationflags=0,
  52. start_new_session=False
  53. ):
  54. """.strip()
  55. code = textwrap.dedent(
  56. f"""
  57. def {check_output_signature}
  58. if universal_newlines:
  59. return ""
  60. return b""
  61. class Popen(object):
  62. returncode = pid = 0
  63. stdin = stdout = stderr = file()
  64. {py3_args}
  65. {communicate_signature}:
  66. return {communicate!r}
  67. {wait_signature}:
  68. return self.returncode
  69. def poll(self):
  70. return self.returncode
  71. def send_signal(self, signal):
  72. pass
  73. def terminate(self):
  74. pass
  75. def kill(self):
  76. pass
  77. {ctx_manager}
  78. @classmethod
  79. def __class_getitem__(cls, item):
  80. pass
  81. """
  82. )
  83. init_lines = textwrap.dedent(init).splitlines()
  84. indented_init = "\n".join(" " * 4 + line for line in init_lines)
  85. code += indented_init
  86. return parse(code)
  87. def register(manager: AstroidManager) -> None:
  88. register_module_extender(manager, "subprocess", _subprocess_transform)