test_ptyprocess.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # -*- coding: utf-8 -*-
  2. """winpty wrapper tests."""
  3. # Standard library imports
  4. import asyncio
  5. import os
  6. import signal
  7. import time
  8. import sys
  9. import re
  10. # Third party imports
  11. import pytest
  12. from flaky import flaky
  13. # Local imports
  14. from winpty import WinptyError
  15. from winpty.enums import Backend
  16. from winpty.ptyprocess import PtyProcess, which
  17. @pytest.fixture(scope='module', params=['WinPTY', 'ConPTY'])
  18. def pty_fixture(request):
  19. backend = request.param
  20. if os.environ.get('CI_RUNNING', None) == '1':
  21. if backend == 'ConPTY':
  22. os.environ['CI'] = '1'
  23. os.environ['CONPTY_CI'] = '1'
  24. if backend == 'WinPTY':
  25. os.environ.pop('CI', None)
  26. os.environ.pop('CONPTY_CI', None)
  27. backend = getattr(Backend, backend)
  28. def _pty_factory(cmd=None, env=None):
  29. cmd = cmd or 'cmd'
  30. try:
  31. pty = PtyProcess.spawn(cmd, env=env, backend=backend)
  32. except WinptyError:
  33. pytest.skip()
  34. return None
  35. return pty
  36. # time.sleep(10)
  37. _pty_factory.backend = request.param
  38. return _pty_factory
  39. @flaky(max_runs=40, min_passes=1)
  40. def test_read(pty_fixture):
  41. pty = pty_fixture()
  42. loc = os.getcwd()
  43. data = ''
  44. tries = 0
  45. while loc not in data and tries < 10:
  46. try:
  47. data += pty.read()
  48. except EOFError:
  49. pass
  50. tries += 1
  51. assert loc in data
  52. pty.terminate()
  53. time.sleep(2)
  54. @flaky(max_runs=40, min_passes=1)
  55. def test_write(pty_fixture):
  56. pty = pty_fixture()
  57. text = 'Eggs, ham and spam ünicode'
  58. pty.write(text)
  59. data = ''
  60. tries = 0
  61. while text not in data and tries < 10:
  62. try:
  63. data += pty.read()
  64. except EOFError:
  65. pass
  66. tries += 1
  67. assert text in data
  68. pty.terminate()
  69. @pytest.mark.xfail(reason="It fails sometimes due to long strings")
  70. @flaky(max_runs=40, min_passes=1)
  71. def test_isalive(pty_fixture):
  72. pty = pty_fixture()
  73. time.sleep(4)
  74. pty.write('echo \"foo\"\r\nexit\r\n')
  75. data = ''
  76. while True:
  77. try:
  78. print('Stuck')
  79. data += pty.read()
  80. except EOFError:
  81. break
  82. regex = re.compile(".*foo.*")
  83. assert regex.findall(data)
  84. assert not pty.isalive()
  85. pty.terminate()
  86. @pytest.mark.xfail(reason="It fails sometimes due to long strings")
  87. @flaky(max_runs=40, min_passes=1)
  88. def test_readline(pty_fixture):
  89. env = os.environ.copy()
  90. env['foo'] = 'bar'
  91. pty = pty_fixture(env=env)
  92. # Ensure that the echo print has its own CRLF
  93. pty.write('cls\r\n')
  94. pty.write('echo %foo%\r\n')
  95. data = ''
  96. tries = 0
  97. while 'bar' not in data and tries < 10:
  98. data = pty.readline()
  99. tries += 1
  100. assert 'bar' in data
  101. pty.terminate()
  102. def test_close(pty_fixture):
  103. pty = pty_fixture()
  104. pty.close()
  105. assert not pty.isalive()
  106. def test_flush(pty_fixture):
  107. pty = pty_fixture()
  108. pty.flush()
  109. pty.terminate()
  110. def test_intr(pty_fixture):
  111. pty = pty_fixture(cmd=[sys.executable, 'import time; time.sleep(10)'])
  112. pty.sendintr()
  113. assert pty.wait() != 0
  114. def test_send_control(pty_fixture):
  115. pty = pty_fixture(cmd=[sys.executable, 'import time; time.sleep(10)'])
  116. pty.sendcontrol('d')
  117. assert pty.wait() != 0
  118. @pytest.mark.skipif(which('cat') is None, reason="Requires cat on the PATH")
  119. def test_send_eof(pty_fixture):
  120. cat = pty_fixture('cat')
  121. cat.sendeof()
  122. assert cat.wait() == 0
  123. def test_isatty(pty_fixture):
  124. pty = pty_fixture()
  125. assert pty.isatty()
  126. pty.terminate()
  127. assert not pty.isatty()
  128. def test_wait(pty_fixture):
  129. pty = pty_fixture(cmd=[sys.executable, '--version'])
  130. assert pty.wait() == 0
  131. def test_exit_status(pty_fixture):
  132. pty = pty_fixture(cmd=[sys.executable])
  133. pty.write('import sys;sys.exit(1)\r\n')
  134. pty.wait()
  135. assert pty.exitstatus == 1
  136. # @pytest.mark.timeout(30)
  137. def test_kill_sigterm(pty_fixture):
  138. pty = pty_fixture()
  139. pty.write('echo \"foo\"\r\nsleep 1000\r\n')
  140. pty.read()
  141. pty.kill(signal.SIGTERM)
  142. while True:
  143. try:
  144. pty.read()
  145. except EOFError:
  146. break
  147. assert not pty.isalive()
  148. assert pty.exitstatus == signal.SIGTERM
  149. # @pytest.mark.timeout(30)
  150. def test_terminate(pty_fixture):
  151. pty = pty_fixture()
  152. pty.write('echo \"foo\"\r\nsleep 1000\r\n')
  153. pty.read()
  154. pty.terminate()
  155. while True:
  156. try:
  157. pty.read()
  158. except EOFError:
  159. break
  160. assert not pty.isalive()
  161. assert pty.closed
  162. # @pytest.mark.timeout(30)
  163. def test_terminate_force(pty_fixture):
  164. pty = pty_fixture()
  165. pty.write('echo \"foo\"\r\nsleep 1000\r\n')
  166. pty.read()
  167. pty.terminate(force=True)
  168. while True:
  169. try:
  170. pty.read()
  171. except EOFError:
  172. break
  173. assert not pty.isalive()
  174. assert pty.closed
  175. def test_terminate_loop(pty_fixture):
  176. pty = pty_fixture()
  177. loop = asyncio.SelectorEventLoop()
  178. asyncio.set_event_loop(loop)
  179. def reader():
  180. try:
  181. data = pty.read()
  182. except EOFError:
  183. loop.remove_reader(pty.fd)
  184. loop.stop()
  185. loop.add_reader(pty.fd, reader)
  186. loop.call_soon(pty.write, 'echo \"foo\"\r\nsleep 1000\r\n')
  187. loop.call_soon(pty.terminate, True)
  188. try:
  189. loop.run_forever()
  190. finally:
  191. loop.close()
  192. assert not pty.isalive()
  193. assert pty.closed
  194. def test_getwinsize(pty_fixture):
  195. pty = pty_fixture()
  196. assert pty.getwinsize() == (24, 80)
  197. pty.terminate()
  198. def test_setwinsize(pty_fixture):
  199. pty = pty_fixture()
  200. pty.setwinsize(50, 110)
  201. assert pty.getwinsize() == (50, 110)
  202. pty.terminate()
  203. pty = PtyProcess.spawn('cmd', dimensions=(60, 120))
  204. assert pty.getwinsize() == (60, 120)
  205. pty.terminate()