_process_win32.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """Windows-specific implementation of process utilities.
  2. This file is only meant to be imported by process.py, not by end-users.
  3. """
  4. import ctypes
  5. import os
  6. import subprocess
  7. import sys
  8. import time
  9. from ctypes import POINTER, c_int
  10. from ctypes.wintypes import HLOCAL, LPCWSTR
  11. from subprocess import STDOUT
  12. from threading import Thread
  13. from types import TracebackType
  14. from typing import List, Optional
  15. from . import py3compat
  16. from ._process_common import arg_split as py_arg_split
  17. from ._process_common import process_handler, read_no_interrupt
  18. from .encoding import DEFAULT_ENCODING
  19. class AvoidUNCPath:
  20. """A context manager to protect command execution from UNC paths.
  21. In the Win32 API, commands can't be invoked with the cwd being a UNC path.
  22. This context manager temporarily changes directory to the 'C:' drive on
  23. entering, and restores the original working directory on exit.
  24. The context manager returns the starting working directory *if* it made a
  25. change and None otherwise, so that users can apply the necessary adjustment
  26. to their system calls in the event of a change.
  27. Examples
  28. --------
  29. ::
  30. cmd = 'dir'
  31. with AvoidUNCPath() as path:
  32. if path is not None:
  33. cmd = '"pushd %s &&"%s' % (path, cmd)
  34. os.system(cmd)
  35. """
  36. def __enter__(self) -> Optional[str]:
  37. self.path = os.getcwd()
  38. self.is_unc_path = self.path.startswith(r"\\")
  39. if self.is_unc_path:
  40. # change to c drive (as cmd.exe cannot handle UNC addresses)
  41. os.chdir("C:")
  42. return self.path
  43. else:
  44. # We return None to signal that there was no change in the working
  45. # directory
  46. return None
  47. def __exit__(
  48. self,
  49. exc_type: Optional[type[BaseException]],
  50. exc_value: Optional[BaseException],
  51. traceback: Optional[TracebackType],
  52. ) -> None:
  53. if self.is_unc_path:
  54. os.chdir(self.path)
  55. def _system_body(p: subprocess.Popen[bytes]) -> int:
  56. """Callback for _system."""
  57. enc = DEFAULT_ENCODING
  58. # Dec 2024: in both of these functions, I'm not sure why we .splitlines()
  59. # the bytes and then decode each line individually instead of just decoding
  60. # the whole thing at once.
  61. def stdout_read() -> None:
  62. try:
  63. assert p.stdout is not None
  64. for byte_line in (read_no_interrupt(p.stdout) or b"").splitlines():
  65. line = byte_line.decode(enc, "replace")
  66. print(line, file=sys.stdout)
  67. except Exception as e:
  68. print(f"Error reading stdout: {e}", file=sys.stderr)
  69. def stderr_read() -> None:
  70. try:
  71. assert p.stderr is not None
  72. for byte_line in (read_no_interrupt(p.stderr) or b"").splitlines():
  73. line = byte_line.decode(enc, "replace")
  74. print(line, file=sys.stderr)
  75. except Exception as e:
  76. print(f"Error reading stderr: {e}", file=sys.stderr)
  77. stdout_thread = Thread(target=stdout_read)
  78. stderr_thread = Thread(target=stderr_read)
  79. stdout_thread.start()
  80. stderr_thread.start()
  81. # Wait to finish for returncode. Unfortunately, Python has a bug where
  82. # wait() isn't interruptible (https://bugs.python.org/issue28168) so poll in
  83. # a loop instead of just doing `return p.wait()`
  84. while True:
  85. result = p.poll()
  86. if result is None:
  87. time.sleep(0.01)
  88. else:
  89. break
  90. # Join the threads to ensure they complete before returning
  91. stdout_thread.join()
  92. stderr_thread.join()
  93. return result
  94. def system(cmd: str) -> Optional[int]:
  95. """Win32 version of os.system() that works with network shares.
  96. Note that this implementation returns None, as meant for use in IPython.
  97. Parameters
  98. ----------
  99. cmd : str or list
  100. A command to be executed in the system shell.
  101. Returns
  102. -------
  103. int : child process' exit code.
  104. """
  105. # The controller provides interactivity with both
  106. # stdin and stdout
  107. # import _process_win32_controller
  108. # _process_win32_controller.system(cmd)
  109. with AvoidUNCPath() as path:
  110. if path is not None:
  111. cmd = '"pushd %s &&"%s' % (path, cmd)
  112. res = process_handler(cmd, _system_body)
  113. return res
  114. def getoutput(cmd: str) -> str:
  115. """Return standard output of executing cmd in a shell.
  116. Accepts the same arguments as os.system().
  117. Parameters
  118. ----------
  119. cmd : str or list
  120. A command to be executed in the system shell.
  121. Returns
  122. -------
  123. stdout : str
  124. """
  125. with AvoidUNCPath() as path:
  126. if path is not None:
  127. cmd = '"pushd %s &&"%s' % (path, cmd)
  128. out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)
  129. if out is None:
  130. out = b""
  131. return py3compat.decode(out)
  132. try:
  133. windll = ctypes.windll # type: ignore [attr-defined]
  134. CommandLineToArgvW = windll.shell32.CommandLineToArgvW
  135. CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
  136. CommandLineToArgvW.restype = POINTER(LPCWSTR)
  137. LocalFree = windll.kernel32.LocalFree
  138. LocalFree.res_type = HLOCAL
  139. LocalFree.arg_types = [HLOCAL]
  140. def arg_split(
  141. commandline: str, posix: bool = False, strict: bool = True
  142. ) -> List[str]:
  143. """Split a command line's arguments in a shell-like manner.
  144. This is a special version for windows that use a ctypes call to CommandLineToArgvW
  145. to do the argv splitting. The posix parameter is ignored.
  146. If strict=False, process_common.arg_split(...strict=False) is used instead.
  147. """
  148. # CommandLineToArgvW returns path to executable if called with empty string.
  149. if commandline.strip() == "":
  150. return []
  151. if not strict:
  152. # not really a cl-arg, fallback on _process_common
  153. return py_arg_split(commandline, posix=posix, strict=strict)
  154. argvn = c_int()
  155. result_pointer = CommandLineToArgvW(commandline.lstrip(), ctypes.byref(argvn))
  156. try:
  157. result_array_type = LPCWSTR * argvn.value
  158. result = [
  159. arg
  160. for arg in result_array_type.from_address(
  161. ctypes.addressof(result_pointer.contents)
  162. )
  163. if arg is not None
  164. ]
  165. finally:
  166. # for side effects
  167. _ = LocalFree(result_pointer)
  168. return result
  169. except AttributeError:
  170. arg_split = py_arg_split
  171. def check_pid(pid: int) -> bool:
  172. # OpenProcess returns 0 if no such process (of ours) exists
  173. # positive int otherwise
  174. return bool(windll.kernel32.OpenProcess(1, 0, pid))