utils.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import asyncio
  2. import itertools
  3. import logging
  4. import subprocess
  5. import textwrap
  6. import types
  7. from typing import List
  8. class SubprocessCalledProcessError(subprocess.CalledProcessError):
  9. """The subprocess.CalledProcessError with stripped stdout."""
  10. LAST_N_LINES = 50
  11. def __init__(self, *args, cmd_index=None, **kwargs):
  12. self.cmd_index = cmd_index
  13. super().__init__(*args, **kwargs)
  14. @staticmethod
  15. def _get_last_n_line(str_data: str, last_n_lines: int) -> str:
  16. if last_n_lines < 0:
  17. return str_data
  18. lines = str_data.strip().split("\n")
  19. return "\n".join(lines[-last_n_lines:])
  20. def __str__(self):
  21. str_list = (
  22. []
  23. if self.cmd_index is None
  24. else [f"Run cmd[{self.cmd_index}] failed with the following details."]
  25. )
  26. str_list.append(super().__str__())
  27. out = {
  28. "stdout": self.stdout,
  29. "stderr": self.stderr,
  30. }
  31. for name, s in out.items():
  32. if s:
  33. subtitle = f"Last {self.LAST_N_LINES} lines of {name}:"
  34. last_n_line_str = self._get_last_n_line(s, self.LAST_N_LINES).strip()
  35. str_list.append(
  36. f"{subtitle}\n{textwrap.indent(last_n_line_str, ' ' * 4)}"
  37. )
  38. return "\n".join(str_list)
  39. async def check_output_cmd(
  40. cmd: List[str],
  41. *,
  42. logger: logging.Logger,
  43. cmd_index_gen: types.GeneratorType = itertools.count(1),
  44. **kwargs,
  45. ) -> str:
  46. """Run command with arguments and return its output.
  47. If the return code was non-zero it raises a CalledProcessError. The
  48. CalledProcessError object will have the return code in the returncode
  49. attribute and any output in the output attribute.
  50. Args:
  51. cmd: The cmdline should be a sequence of program arguments or else
  52. a single string or path-like object. The program to execute is
  53. the first item in cmd.
  54. logger: The logger instance.
  55. cmd_index_gen: The cmd index generator, default is itertools.count(1).
  56. kwargs: All arguments are passed to the create_subprocess_exec.
  57. Returns:
  58. The stdout of cmd.
  59. Raises:
  60. CalledProcessError: If the return code of cmd is not 0.
  61. """
  62. cmd_index = next(cmd_index_gen)
  63. logger.info("Run cmd[%s] %s", cmd_index, repr(cmd))
  64. proc = None
  65. try:
  66. proc = await asyncio.create_subprocess_exec(
  67. *cmd,
  68. stdout=asyncio.subprocess.PIPE,
  69. stderr=asyncio.subprocess.STDOUT,
  70. **kwargs,
  71. )
  72. # Use communicate instead of polling stdout:
  73. # * Avoid deadlocks due to streams pausing reading or writing and blocking the
  74. # child process. Please refer to:
  75. # https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process.stderr
  76. # * Avoid mixing multiple outputs of concurrent cmds.
  77. stdout, _ = await proc.communicate()
  78. except asyncio.exceptions.CancelledError as e:
  79. # since Python 3.9, when cancelled, the inner process needs to throw as it is
  80. # for asyncio to timeout properly https://bugs.python.org/issue40607
  81. raise e
  82. except BaseException as e:
  83. raise RuntimeError(f"Run cmd[{cmd_index}] got exception.") from e
  84. else:
  85. stdout = stdout.decode("utf-8")
  86. if stdout:
  87. logger.info("Output of cmd[%s]: %s", cmd_index, stdout)
  88. else:
  89. logger.info("No output for cmd[%s]", cmd_index)
  90. if proc.returncode != 0:
  91. raise SubprocessCalledProcessError(
  92. proc.returncode, cmd, output=stdout, cmd_index=cmd_index
  93. )
  94. return stdout
  95. finally:
  96. if proc is not None:
  97. # Kill process.
  98. try:
  99. proc.kill()
  100. except ProcessLookupError:
  101. pass
  102. # Wait process exit.
  103. await proc.wait()