| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- """Windows-specific implementation of process utilities.
- This file is only meant to be imported by process.py, not by end-users.
- """
- import ctypes
- import os
- import subprocess
- import sys
- import time
- from ctypes import POINTER, c_int
- from ctypes.wintypes import HLOCAL, LPCWSTR
- from subprocess import STDOUT
- from threading import Thread
- from types import TracebackType
- from typing import List, Optional
- from . import py3compat
- from ._process_common import arg_split as py_arg_split
- from ._process_common import process_handler, read_no_interrupt
- from .encoding import DEFAULT_ENCODING
- class AvoidUNCPath:
- """A context manager to protect command execution from UNC paths.
- In the Win32 API, commands can't be invoked with the cwd being a UNC path.
- This context manager temporarily changes directory to the 'C:' drive on
- entering, and restores the original working directory on exit.
- The context manager returns the starting working directory *if* it made a
- change and None otherwise, so that users can apply the necessary adjustment
- to their system calls in the event of a change.
- Examples
- --------
- ::
- cmd = 'dir'
- with AvoidUNCPath() as path:
- if path is not None:
- cmd = '"pushd %s &&"%s' % (path, cmd)
- os.system(cmd)
- """
- def __enter__(self) -> Optional[str]:
- self.path = os.getcwd()
- self.is_unc_path = self.path.startswith(r"\\")
- if self.is_unc_path:
- # change to c drive (as cmd.exe cannot handle UNC addresses)
- os.chdir("C:")
- return self.path
- else:
- # We return None to signal that there was no change in the working
- # directory
- return None
- def __exit__(
- self,
- exc_type: Optional[type[BaseException]],
- exc_value: Optional[BaseException],
- traceback: Optional[TracebackType],
- ) -> None:
- if self.is_unc_path:
- os.chdir(self.path)
- def _system_body(p: subprocess.Popen[bytes]) -> int:
- """Callback for _system."""
- enc = DEFAULT_ENCODING
- # Dec 2024: in both of these functions, I'm not sure why we .splitlines()
- # the bytes and then decode each line individually instead of just decoding
- # the whole thing at once.
- def stdout_read() -> None:
- try:
- assert p.stdout is not None
- for byte_line in (read_no_interrupt(p.stdout) or b"").splitlines():
- line = byte_line.decode(enc, "replace")
- print(line, file=sys.stdout)
- except Exception as e:
- print(f"Error reading stdout: {e}", file=sys.stderr)
- def stderr_read() -> None:
- try:
- assert p.stderr is not None
- for byte_line in (read_no_interrupt(p.stderr) or b"").splitlines():
- line = byte_line.decode(enc, "replace")
- print(line, file=sys.stderr)
- except Exception as e:
- print(f"Error reading stderr: {e}", file=sys.stderr)
- stdout_thread = Thread(target=stdout_read)
- stderr_thread = Thread(target=stderr_read)
- stdout_thread.start()
- stderr_thread.start()
- # Wait to finish for returncode. Unfortunately, Python has a bug where
- # wait() isn't interruptible (https://bugs.python.org/issue28168) so poll in
- # a loop instead of just doing `return p.wait()`
- while True:
- result = p.poll()
- if result is None:
- time.sleep(0.01)
- else:
- break
- # Join the threads to ensure they complete before returning
- stdout_thread.join()
- stderr_thread.join()
- return result
- def system(cmd: str) -> Optional[int]:
- """Win32 version of os.system() that works with network shares.
- Note that this implementation returns None, as meant for use in IPython.
- Parameters
- ----------
- cmd : str or list
- A command to be executed in the system shell.
- Returns
- -------
- int : child process' exit code.
- """
- # The controller provides interactivity with both
- # stdin and stdout
- # import _process_win32_controller
- # _process_win32_controller.system(cmd)
- with AvoidUNCPath() as path:
- if path is not None:
- cmd = '"pushd %s &&"%s' % (path, cmd)
- res = process_handler(cmd, _system_body)
- return res
- def getoutput(cmd: str) -> str:
- """Return standard output of executing cmd in a shell.
- Accepts the same arguments as os.system().
- Parameters
- ----------
- cmd : str or list
- A command to be executed in the system shell.
- Returns
- -------
- stdout : str
- """
- with AvoidUNCPath() as path:
- if path is not None:
- cmd = '"pushd %s &&"%s' % (path, cmd)
- out = process_handler(cmd, lambda p: p.communicate()[0], STDOUT)
- if out is None:
- out = b""
- return py3compat.decode(out)
- try:
- windll = ctypes.windll # type: ignore [attr-defined]
- CommandLineToArgvW = windll.shell32.CommandLineToArgvW
- CommandLineToArgvW.arg_types = [LPCWSTR, POINTER(c_int)]
- CommandLineToArgvW.restype = POINTER(LPCWSTR)
- LocalFree = windll.kernel32.LocalFree
- LocalFree.res_type = HLOCAL
- LocalFree.arg_types = [HLOCAL]
- def arg_split(
- commandline: str, posix: bool = False, strict: bool = True
- ) -> List[str]:
- """Split a command line's arguments in a shell-like manner.
- This is a special version for windows that use a ctypes call to CommandLineToArgvW
- to do the argv splitting. The posix parameter is ignored.
- If strict=False, process_common.arg_split(...strict=False) is used instead.
- """
- # CommandLineToArgvW returns path to executable if called with empty string.
- if commandline.strip() == "":
- return []
- if not strict:
- # not really a cl-arg, fallback on _process_common
- return py_arg_split(commandline, posix=posix, strict=strict)
- argvn = c_int()
- result_pointer = CommandLineToArgvW(commandline.lstrip(), ctypes.byref(argvn))
- try:
- result_array_type = LPCWSTR * argvn.value
- result = [
- arg
- for arg in result_array_type.from_address(
- ctypes.addressof(result_pointer.contents)
- )
- if arg is not None
- ]
- finally:
- # for side effects
- _ = LocalFree(result_pointer)
- return result
- except AttributeError:
- arg_split = py_arg_split
- def check_pid(pid: int) -> bool:
- # OpenProcess returns 0 if no such process (of ours) exists
- # positive int otherwise
- return bool(windll.kernel32.OpenProcess(1, 0, pid))
|