win32.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
  1. from __future__ import annotations
  2. import os
  3. import sys
  4. from abc import abstractmethod
  5. from asyncio import get_running_loop
  6. from contextlib import contextmanager
  7. from ..utils import SPHINX_AUTODOC_RUNNING
  8. assert sys.platform == "win32"
  9. # Do not import win32-specific stuff when generating documentation.
  10. # Otherwise RTD would be unable to generate docs for this module.
  11. if not SPHINX_AUTODOC_RUNNING:
  12. import msvcrt
  13. from ctypes import windll
  14. from ctypes import Array, byref, pointer
  15. from ctypes.wintypes import DWORD, HANDLE
  16. from typing import Callable, ContextManager, Iterable, Iterator, TextIO
  17. from prompt_toolkit.eventloop import run_in_executor_with_context
  18. from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles
  19. from prompt_toolkit.key_binding.key_processor import KeyPress
  20. from prompt_toolkit.keys import Keys
  21. from prompt_toolkit.mouse_events import MouseButton, MouseEventType
  22. from prompt_toolkit.win32_types import (
  23. INPUT_RECORD,
  24. KEY_EVENT_RECORD,
  25. MOUSE_EVENT_RECORD,
  26. STD_INPUT_HANDLE,
  27. EventTypes,
  28. )
  29. from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
  30. from .base import Input
  31. from .vt100_parser import Vt100Parser
  32. __all__ = [
  33. "Win32Input",
  34. "ConsoleInputReader",
  35. "raw_mode",
  36. "cooked_mode",
  37. "attach_win32_input",
  38. "detach_win32_input",
  39. ]
  40. # Win32 Constants for MOUSE_EVENT_RECORD.
  41. # See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str
  42. FROM_LEFT_1ST_BUTTON_PRESSED = 0x1
  43. RIGHTMOST_BUTTON_PRESSED = 0x2
  44. MOUSE_MOVED = 0x0001
  45. MOUSE_WHEELED = 0x0004
  46. # See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
  47. ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
  48. class _Win32InputBase(Input):
  49. """
  50. Base class for `Win32Input` and `Win32PipeInput`.
  51. """
  52. def __init__(self) -> None:
  53. self.win32_handles = _Win32Handles()
  54. @property
  55. @abstractmethod
  56. def handle(self) -> HANDLE:
  57. pass
  58. class Win32Input(_Win32InputBase):
  59. """
  60. `Input` class that reads from the Windows console.
  61. """
  62. def __init__(self, stdin: TextIO | None = None) -> None:
  63. super().__init__()
  64. self._use_virtual_terminal_input = _is_win_vt100_input_enabled()
  65. self.console_input_reader: Vt100ConsoleInputReader | ConsoleInputReader
  66. if self._use_virtual_terminal_input:
  67. self.console_input_reader = Vt100ConsoleInputReader()
  68. else:
  69. self.console_input_reader = ConsoleInputReader()
  70. def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
  71. """
  72. Return a context manager that makes this input active in the current
  73. event loop.
  74. """
  75. return attach_win32_input(self, input_ready_callback)
  76. def detach(self) -> ContextManager[None]:
  77. """
  78. Return a context manager that makes sure that this input is not active
  79. in the current event loop.
  80. """
  81. return detach_win32_input(self)
  82. def read_keys(self) -> list[KeyPress]:
  83. return list(self.console_input_reader.read())
  84. def flush_keys(self) -> list[KeyPress]:
  85. return self.console_input_reader.flush_keys()
  86. @property
  87. def closed(self) -> bool:
  88. return False
  89. def raw_mode(self) -> ContextManager[None]:
  90. return raw_mode(
  91. use_win10_virtual_terminal_input=self._use_virtual_terminal_input
  92. )
  93. def cooked_mode(self) -> ContextManager[None]:
  94. return cooked_mode()
  95. def fileno(self) -> int:
  96. # The windows console doesn't depend on the file handle, so
  97. # this is not used for the event loop (which uses the
  98. # handle instead). But it's used in `Application.run_system_command`
  99. # which opens a subprocess with a given stdin/stdout.
  100. return sys.stdin.fileno()
  101. def typeahead_hash(self) -> str:
  102. return "win32-input"
  103. def close(self) -> None:
  104. self.console_input_reader.close()
  105. @property
  106. def handle(self) -> HANDLE:
  107. return self.console_input_reader.handle
  108. class ConsoleInputReader:
  109. """
  110. :param recognize_paste: When True, try to discover paste actions and turn
  111. the event into a BracketedPaste.
  112. """
  113. # Keys with character data.
  114. mappings = {
  115. b"\x1b": Keys.Escape,
  116. b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@)
  117. b"\x01": Keys.ControlA, # Control-A (home)
  118. b"\x02": Keys.ControlB, # Control-B (emacs cursor left)
  119. b"\x03": Keys.ControlC, # Control-C (interrupt)
  120. b"\x04": Keys.ControlD, # Control-D (exit)
  121. b"\x05": Keys.ControlE, # Control-E (end)
  122. b"\x06": Keys.ControlF, # Control-F (cursor forward)
  123. b"\x07": Keys.ControlG, # Control-G
  124. b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
  125. b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
  126. b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
  127. b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
  128. b"\x0c": Keys.ControlL, # Control-L (clear; form feed)
  129. b"\x0d": Keys.ControlM, # Control-M (enter)
  130. b"\x0e": Keys.ControlN, # Control-N (14) (history forward)
  131. b"\x0f": Keys.ControlO, # Control-O (15)
  132. b"\x10": Keys.ControlP, # Control-P (16) (history back)
  133. b"\x11": Keys.ControlQ, # Control-Q
  134. b"\x12": Keys.ControlR, # Control-R (18) (reverse search)
  135. b"\x13": Keys.ControlS, # Control-S (19) (forward search)
  136. b"\x14": Keys.ControlT, # Control-T
  137. b"\x15": Keys.ControlU, # Control-U
  138. b"\x16": Keys.ControlV, # Control-V
  139. b"\x17": Keys.ControlW, # Control-W
  140. b"\x18": Keys.ControlX, # Control-X
  141. b"\x19": Keys.ControlY, # Control-Y (25)
  142. b"\x1a": Keys.ControlZ, # Control-Z
  143. b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-|
  144. b"\x1d": Keys.ControlSquareClose, # Control-]
  145. b"\x1e": Keys.ControlCircumflex, # Control-^
  146. b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
  147. b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.)
  148. }
  149. # Keys that don't carry character data.
  150. keycodes = {
  151. # Home/End
  152. 33: Keys.PageUp,
  153. 34: Keys.PageDown,
  154. 35: Keys.End,
  155. 36: Keys.Home,
  156. # Arrows
  157. 37: Keys.Left,
  158. 38: Keys.Up,
  159. 39: Keys.Right,
  160. 40: Keys.Down,
  161. 45: Keys.Insert,
  162. 46: Keys.Delete,
  163. # F-keys.
  164. 112: Keys.F1,
  165. 113: Keys.F2,
  166. 114: Keys.F3,
  167. 115: Keys.F4,
  168. 116: Keys.F5,
  169. 117: Keys.F6,
  170. 118: Keys.F7,
  171. 119: Keys.F8,
  172. 120: Keys.F9,
  173. 121: Keys.F10,
  174. 122: Keys.F11,
  175. 123: Keys.F12,
  176. }
  177. LEFT_ALT_PRESSED = 0x0002
  178. RIGHT_ALT_PRESSED = 0x0001
  179. SHIFT_PRESSED = 0x0010
  180. LEFT_CTRL_PRESSED = 0x0008
  181. RIGHT_CTRL_PRESSED = 0x0004
  182. def __init__(self, recognize_paste: bool = True) -> None:
  183. self._fdcon = None
  184. self.recognize_paste = recognize_paste
  185. # When stdin is a tty, use that handle, otherwise, create a handle from
  186. # CONIN$.
  187. self.handle: HANDLE
  188. if sys.stdin.isatty():
  189. self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  190. else:
  191. self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
  192. self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
  193. def close(self) -> None:
  194. "Close fdcon."
  195. if self._fdcon is not None:
  196. os.close(self._fdcon)
  197. def read(self) -> Iterable[KeyPress]:
  198. """
  199. Return a list of `KeyPress` instances. It won't return anything when
  200. there was nothing to read. (This function doesn't block.)
  201. http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
  202. """
  203. max_count = 2048 # Max events to read at the same time.
  204. read = DWORD(0)
  205. arrtype = INPUT_RECORD * max_count
  206. input_records = arrtype()
  207. # Check whether there is some input to read. `ReadConsoleInputW` would
  208. # block otherwise.
  209. # (Actually, the event loop is responsible to make sure that this
  210. # function is only called when there is something to read, but for some
  211. # reason this happened in the asyncio_win32 loop, and it's better to be
  212. # safe anyway.)
  213. if not wait_for_handles([self.handle], timeout=0):
  214. return
  215. # Get next batch of input event.
  216. windll.kernel32.ReadConsoleInputW(
  217. self.handle, pointer(input_records), max_count, pointer(read)
  218. )
  219. # First, get all the keys from the input buffer, in order to determine
  220. # whether we should consider this a paste event or not.
  221. all_keys = list(self._get_keys(read, input_records))
  222. # Fill in 'data' for key presses.
  223. all_keys = [self._insert_key_data(key) for key in all_keys]
  224. # Correct non-bmp characters that are passed as separate surrogate codes
  225. all_keys = list(self._merge_paired_surrogates(all_keys))
  226. if self.recognize_paste and self._is_paste(all_keys):
  227. gen = iter(all_keys)
  228. k: KeyPress | None
  229. for k in gen:
  230. # Pasting: if the current key consists of text or \n, turn it
  231. # into a BracketedPaste.
  232. data = []
  233. while k and (
  234. not isinstance(k.key, Keys)
  235. or k.key in {Keys.ControlJ, Keys.ControlM}
  236. ):
  237. data.append(k.data)
  238. try:
  239. k = next(gen)
  240. except StopIteration:
  241. k = None
  242. if data:
  243. yield KeyPress(Keys.BracketedPaste, "".join(data))
  244. if k is not None:
  245. yield k
  246. else:
  247. yield from all_keys
  248. def flush_keys(self) -> list[KeyPress]:
  249. # Method only needed for structural compatibility with `Vt100ConsoleInputReader`.
  250. return []
  251. def _insert_key_data(self, key_press: KeyPress) -> KeyPress:
  252. """
  253. Insert KeyPress data, for vt100 compatibility.
  254. """
  255. if key_press.data:
  256. return key_press
  257. if isinstance(key_press.key, Keys):
  258. data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "")
  259. else:
  260. data = ""
  261. return KeyPress(key_press.key, data)
  262. def _get_keys(
  263. self, read: DWORD, input_records: Array[INPUT_RECORD]
  264. ) -> Iterator[KeyPress]:
  265. """
  266. Generator that yields `KeyPress` objects from the input records.
  267. """
  268. for i in range(read.value):
  269. ir = input_records[i]
  270. # Get the right EventType from the EVENT_RECORD.
  271. # (For some reason the Windows console application 'cmder'
  272. # [http://gooseberrycreative.com/cmder/] can return '0' for
  273. # ir.EventType. -- Just ignore that.)
  274. if ir.EventType in EventTypes:
  275. ev = getattr(ir.Event, EventTypes[ir.EventType])
  276. # Process if this is a key event. (We also have mouse, menu and
  277. # focus events.)
  278. if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
  279. yield from self._event_to_key_presses(ev)
  280. elif isinstance(ev, MOUSE_EVENT_RECORD):
  281. yield from self._handle_mouse(ev)
  282. @staticmethod
  283. def _merge_paired_surrogates(key_presses: list[KeyPress]) -> Iterator[KeyPress]:
  284. """
  285. Combines consecutive KeyPresses with high and low surrogates into
  286. single characters
  287. """
  288. buffered_high_surrogate = None
  289. for key in key_presses:
  290. is_text = not isinstance(key.key, Keys)
  291. is_high_surrogate = is_text and "\ud800" <= key.key <= "\udbff"
  292. is_low_surrogate = is_text and "\udc00" <= key.key <= "\udfff"
  293. if buffered_high_surrogate:
  294. if is_low_surrogate:
  295. # convert high surrogate + low surrogate to single character
  296. fullchar = (
  297. (buffered_high_surrogate.key + key.key)
  298. .encode("utf-16-le", "surrogatepass")
  299. .decode("utf-16-le")
  300. )
  301. key = KeyPress(fullchar, fullchar)
  302. else:
  303. yield buffered_high_surrogate
  304. buffered_high_surrogate = None
  305. if is_high_surrogate:
  306. buffered_high_surrogate = key
  307. else:
  308. yield key
  309. if buffered_high_surrogate:
  310. yield buffered_high_surrogate
  311. @staticmethod
  312. def _is_paste(keys: list[KeyPress]) -> bool:
  313. """
  314. Return `True` when we should consider this list of keys as a paste
  315. event. Pasted text on windows will be turned into a
  316. `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably
  317. the best possible way to detect pasting of text and handle that
  318. correctly.)
  319. """
  320. # Consider paste when it contains at least one newline and at least one
  321. # other character.
  322. text_count = 0
  323. newline_count = 0
  324. for k in keys:
  325. if not isinstance(k.key, Keys):
  326. text_count += 1
  327. if k.key == Keys.ControlM:
  328. newline_count += 1
  329. return newline_count >= 1 and text_count >= 1
  330. def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> list[KeyPress]:
  331. """
  332. For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances.
  333. """
  334. assert isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown
  335. result: KeyPress | None = None
  336. control_key_state = ev.ControlKeyState
  337. u_char = ev.uChar.UnicodeChar
  338. # Use surrogatepass because u_char may be an unmatched surrogate
  339. ascii_char = u_char.encode("utf-8", "surrogatepass")
  340. # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the
  341. # unicode code point truncated to 1 byte. See also:
  342. # https://github.com/ipython/ipython/issues/10004
  343. # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389
  344. if u_char == "\x00":
  345. if ev.VirtualKeyCode in self.keycodes:
  346. result = KeyPress(self.keycodes[ev.VirtualKeyCode], "")
  347. else:
  348. if ascii_char in self.mappings:
  349. if self.mappings[ascii_char] == Keys.ControlJ:
  350. u_char = (
  351. "\n" # Windows sends \n, turn into \r for unix compatibility.
  352. )
  353. result = KeyPress(self.mappings[ascii_char], u_char)
  354. else:
  355. result = KeyPress(u_char, u_char)
  356. # First we handle Shift-Control-Arrow/Home/End (need to do this first)
  357. if (
  358. (
  359. control_key_state & self.LEFT_CTRL_PRESSED
  360. or control_key_state & self.RIGHT_CTRL_PRESSED
  361. )
  362. and control_key_state & self.SHIFT_PRESSED
  363. and result
  364. ):
  365. mapping: dict[str, str] = {
  366. Keys.Left: Keys.ControlShiftLeft,
  367. Keys.Right: Keys.ControlShiftRight,
  368. Keys.Up: Keys.ControlShiftUp,
  369. Keys.Down: Keys.ControlShiftDown,
  370. Keys.Home: Keys.ControlShiftHome,
  371. Keys.End: Keys.ControlShiftEnd,
  372. Keys.Insert: Keys.ControlShiftInsert,
  373. Keys.PageUp: Keys.ControlShiftPageUp,
  374. Keys.PageDown: Keys.ControlShiftPageDown,
  375. }
  376. result.key = mapping.get(result.key, result.key)
  377. # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys.
  378. if (
  379. control_key_state & self.LEFT_CTRL_PRESSED
  380. or control_key_state & self.RIGHT_CTRL_PRESSED
  381. ) and result:
  382. mapping = {
  383. Keys.Left: Keys.ControlLeft,
  384. Keys.Right: Keys.ControlRight,
  385. Keys.Up: Keys.ControlUp,
  386. Keys.Down: Keys.ControlDown,
  387. Keys.Home: Keys.ControlHome,
  388. Keys.End: Keys.ControlEnd,
  389. Keys.Insert: Keys.ControlInsert,
  390. Keys.Delete: Keys.ControlDelete,
  391. Keys.PageUp: Keys.ControlPageUp,
  392. Keys.PageDown: Keys.ControlPageDown,
  393. }
  394. result.key = mapping.get(result.key, result.key)
  395. # Turn 'Tab' into 'BackTab' when shift was pressed.
  396. # Also handle other shift-key combination
  397. if control_key_state & self.SHIFT_PRESSED and result:
  398. mapping = {
  399. Keys.Tab: Keys.BackTab,
  400. Keys.Left: Keys.ShiftLeft,
  401. Keys.Right: Keys.ShiftRight,
  402. Keys.Up: Keys.ShiftUp,
  403. Keys.Down: Keys.ShiftDown,
  404. Keys.Home: Keys.ShiftHome,
  405. Keys.End: Keys.ShiftEnd,
  406. Keys.Insert: Keys.ShiftInsert,
  407. Keys.Delete: Keys.ShiftDelete,
  408. Keys.PageUp: Keys.ShiftPageUp,
  409. Keys.PageDown: Keys.ShiftPageDown,
  410. }
  411. result.key = mapping.get(result.key, result.key)
  412. # Turn 'Space' into 'ControlSpace' when control was pressed.
  413. if (
  414. (
  415. control_key_state & self.LEFT_CTRL_PRESSED
  416. or control_key_state & self.RIGHT_CTRL_PRESSED
  417. )
  418. and result
  419. and result.data == " "
  420. ):
  421. result = KeyPress(Keys.ControlSpace, " ")
  422. # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot
  423. # detect this combination. But it's really practical on Windows.)
  424. if (
  425. (
  426. control_key_state & self.LEFT_CTRL_PRESSED
  427. or control_key_state & self.RIGHT_CTRL_PRESSED
  428. )
  429. and result
  430. and result.key == Keys.ControlJ
  431. ):
  432. return [KeyPress(Keys.Escape, ""), result]
  433. # Return result. If alt was pressed, prefix the result with an
  434. # 'Escape' key, just like unix VT100 terminals do.
  435. # NOTE: Only replace the left alt with escape. The right alt key often
  436. # acts as altgr and is used in many non US keyboard layouts for
  437. # typing some special characters, like a backslash. We don't want
  438. # all backslashes to be prefixed with escape. (Esc-\ has a
  439. # meaning in E-macs, for instance.)
  440. if result:
  441. meta_pressed = control_key_state & self.LEFT_ALT_PRESSED
  442. if meta_pressed:
  443. return [KeyPress(Keys.Escape, ""), result]
  444. else:
  445. return [result]
  446. else:
  447. return []
  448. def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> list[KeyPress]:
  449. """
  450. Handle mouse events. Return a list of KeyPress instances.
  451. """
  452. event_flags = ev.EventFlags
  453. button_state = ev.ButtonState
  454. event_type: MouseEventType | None = None
  455. button: MouseButton = MouseButton.NONE
  456. # Scroll events.
  457. if event_flags & MOUSE_WHEELED:
  458. if button_state > 0:
  459. event_type = MouseEventType.SCROLL_UP
  460. else:
  461. event_type = MouseEventType.SCROLL_DOWN
  462. else:
  463. # Handle button state for non-scroll events.
  464. if button_state == FROM_LEFT_1ST_BUTTON_PRESSED:
  465. button = MouseButton.LEFT
  466. elif button_state == RIGHTMOST_BUTTON_PRESSED:
  467. button = MouseButton.RIGHT
  468. # Move events.
  469. if event_flags & MOUSE_MOVED:
  470. event_type = MouseEventType.MOUSE_MOVE
  471. # No key pressed anymore: mouse up.
  472. if event_type is None:
  473. if button_state > 0:
  474. # Some button pressed.
  475. event_type = MouseEventType.MOUSE_DOWN
  476. else:
  477. # No button pressed.
  478. event_type = MouseEventType.MOUSE_UP
  479. data = ";".join(
  480. [
  481. button.value,
  482. event_type.value,
  483. str(ev.MousePosition.X),
  484. str(ev.MousePosition.Y),
  485. ]
  486. )
  487. return [KeyPress(Keys.WindowsMouseEvent, data)]
  488. class Vt100ConsoleInputReader:
  489. """
  490. Similar to `ConsoleInputReader`, but for usage when
  491. `ENABLE_VIRTUAL_TERMINAL_INPUT` is enabled. This assumes that Windows sends
  492. us the right vt100 escape sequences and we parse those with our vt100
  493. parser.
  494. (Using this instead of `ConsoleInputReader` results in the "data" attribute
  495. from the `KeyPress` instances to be more correct in edge cases, because
  496. this responds to for instance the terminal being in application cursor keys
  497. mode.)
  498. """
  499. def __init__(self) -> None:
  500. self._fdcon = None
  501. self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
  502. self._vt100_parser = Vt100Parser(
  503. lambda key_press: self._buffer.append(key_press)
  504. )
  505. # When stdin is a tty, use that handle, otherwise, create a handle from
  506. # CONIN$.
  507. self.handle: HANDLE
  508. if sys.stdin.isatty():
  509. self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  510. else:
  511. self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
  512. self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
  513. def close(self) -> None:
  514. "Close fdcon."
  515. if self._fdcon is not None:
  516. os.close(self._fdcon)
  517. def read(self) -> Iterable[KeyPress]:
  518. """
  519. Return a list of `KeyPress` instances. It won't return anything when
  520. there was nothing to read. (This function doesn't block.)
  521. http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
  522. """
  523. max_count = 2048 # Max events to read at the same time.
  524. read = DWORD(0)
  525. arrtype = INPUT_RECORD * max_count
  526. input_records = arrtype()
  527. # Check whether there is some input to read. `ReadConsoleInputW` would
  528. # block otherwise.
  529. # (Actually, the event loop is responsible to make sure that this
  530. # function is only called when there is something to read, but for some
  531. # reason this happened in the asyncio_win32 loop, and it's better to be
  532. # safe anyway.)
  533. if not wait_for_handles([self.handle], timeout=0):
  534. return []
  535. # Get next batch of input event.
  536. windll.kernel32.ReadConsoleInputW(
  537. self.handle, pointer(input_records), max_count, pointer(read)
  538. )
  539. # First, get all the keys from the input buffer, in order to determine
  540. # whether we should consider this a paste event or not.
  541. for key_data in self._get_keys(read, input_records):
  542. self._vt100_parser.feed(key_data)
  543. # Return result.
  544. result = self._buffer
  545. self._buffer = []
  546. return result
  547. def flush_keys(self) -> list[KeyPress]:
  548. """
  549. Flush pending keys and return them.
  550. (Used for flushing the 'escape' key.)
  551. """
  552. # Flush all pending keys. (This is most important to flush the vt100
  553. # 'Escape' key early when nothing else follows.)
  554. self._vt100_parser.flush()
  555. # Return result.
  556. result = self._buffer
  557. self._buffer = []
  558. return result
  559. def _get_keys(
  560. self, read: DWORD, input_records: Array[INPUT_RECORD]
  561. ) -> Iterator[str]:
  562. """
  563. Generator that yields `KeyPress` objects from the input records.
  564. """
  565. for i in range(read.value):
  566. ir = input_records[i]
  567. # Get the right EventType from the EVENT_RECORD.
  568. # (For some reason the Windows console application 'cmder'
  569. # [http://gooseberrycreative.com/cmder/] can return '0' for
  570. # ir.EventType. -- Just ignore that.)
  571. if ir.EventType in EventTypes:
  572. ev = getattr(ir.Event, EventTypes[ir.EventType])
  573. # Process if this is a key event. (We also have mouse, menu and
  574. # focus events.)
  575. if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
  576. u_char = ev.uChar.UnicodeChar
  577. if u_char != "\x00":
  578. yield u_char
  579. class _Win32Handles:
  580. """
  581. Utility to keep track of which handles are connectod to which callbacks.
  582. `add_win32_handle` starts a tiny event loop in another thread which waits
  583. for the Win32 handle to become ready. When this happens, the callback will
  584. be called in the current asyncio event loop using `call_soon_threadsafe`.
  585. `remove_win32_handle` will stop this tiny event loop.
  586. NOTE: We use this technique, so that we don't have to use the
  587. `ProactorEventLoop` on Windows and we can wait for things like stdin
  588. in a `SelectorEventLoop`. This is important, because our inputhook
  589. mechanism (used by IPython), only works with the `SelectorEventLoop`.
  590. """
  591. def __init__(self) -> None:
  592. self._handle_callbacks: dict[int, Callable[[], None]] = {}
  593. # Windows Events that are triggered when we have to stop watching this
  594. # handle.
  595. self._remove_events: dict[int, HANDLE] = {}
  596. def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None:
  597. """
  598. Add a Win32 handle to the event loop.
  599. """
  600. handle_value = handle.value
  601. if handle_value is None:
  602. raise ValueError("Invalid handle.")
  603. # Make sure to remove a previous registered handler first.
  604. self.remove_win32_handle(handle)
  605. loop = get_running_loop()
  606. self._handle_callbacks[handle_value] = callback
  607. # Create remove event.
  608. remove_event = create_win32_event()
  609. self._remove_events[handle_value] = remove_event
  610. # Add reader.
  611. def ready() -> None:
  612. # Tell the callback that input's ready.
  613. try:
  614. callback()
  615. finally:
  616. run_in_executor_with_context(wait, loop=loop)
  617. # Wait for the input to become ready.
  618. # (Use an executor for this, the Windows asyncio event loop doesn't
  619. # allow us to wait for handles like stdin.)
  620. def wait() -> None:
  621. # Wait until either the handle becomes ready, or the remove event
  622. # has been set.
  623. result = wait_for_handles([remove_event, handle])
  624. if result is remove_event:
  625. windll.kernel32.CloseHandle(remove_event)
  626. return
  627. else:
  628. loop.call_soon_threadsafe(ready)
  629. run_in_executor_with_context(wait, loop=loop)
  630. def remove_win32_handle(self, handle: HANDLE) -> Callable[[], None] | None:
  631. """
  632. Remove a Win32 handle from the event loop.
  633. Return either the registered handler or `None`.
  634. """
  635. if handle.value is None:
  636. return None # Ignore.
  637. # Trigger remove events, so that the reader knows to stop.
  638. try:
  639. event = self._remove_events.pop(handle.value)
  640. except KeyError:
  641. pass
  642. else:
  643. windll.kernel32.SetEvent(event)
  644. try:
  645. return self._handle_callbacks.pop(handle.value)
  646. except KeyError:
  647. return None
  648. @contextmanager
  649. def attach_win32_input(
  650. input: _Win32InputBase, callback: Callable[[], None]
  651. ) -> Iterator[None]:
  652. """
  653. Context manager that makes this input active in the current event loop.
  654. :param input: :class:`~prompt_toolkit.input.Input` object.
  655. :param input_ready_callback: Called when the input is ready to read.
  656. """
  657. win32_handles = input.win32_handles
  658. handle = input.handle
  659. if handle.value is None:
  660. raise ValueError("Invalid handle.")
  661. # Add reader.
  662. previous_callback = win32_handles.remove_win32_handle(handle)
  663. win32_handles.add_win32_handle(handle, callback)
  664. try:
  665. yield
  666. finally:
  667. win32_handles.remove_win32_handle(handle)
  668. if previous_callback:
  669. win32_handles.add_win32_handle(handle, previous_callback)
  670. @contextmanager
  671. def detach_win32_input(input: _Win32InputBase) -> Iterator[None]:
  672. win32_handles = input.win32_handles
  673. handle = input.handle
  674. if handle.value is None:
  675. raise ValueError("Invalid handle.")
  676. previous_callback = win32_handles.remove_win32_handle(handle)
  677. try:
  678. yield
  679. finally:
  680. if previous_callback:
  681. win32_handles.add_win32_handle(handle, previous_callback)
  682. class raw_mode:
  683. """
  684. ::
  685. with raw_mode(stdin):
  686. ''' the windows terminal is now in 'raw' mode. '''
  687. The ``fileno`` attribute is ignored. This is to be compatible with the
  688. `raw_input` method of `.vt100_input`.
  689. """
  690. def __init__(
  691. self, fileno: int | None = None, use_win10_virtual_terminal_input: bool = False
  692. ) -> None:
  693. self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  694. self.use_win10_virtual_terminal_input = use_win10_virtual_terminal_input
  695. def __enter__(self) -> None:
  696. # Remember original mode.
  697. original_mode = DWORD()
  698. windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode))
  699. self.original_mode = original_mode
  700. self._patch()
  701. def _patch(self) -> None:
  702. # Set raw
  703. ENABLE_ECHO_INPUT = 0x0004
  704. ENABLE_LINE_INPUT = 0x0002
  705. ENABLE_PROCESSED_INPUT = 0x0001
  706. new_mode = self.original_mode.value & ~(
  707. ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
  708. )
  709. if self.use_win10_virtual_terminal_input:
  710. new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT
  711. windll.kernel32.SetConsoleMode(self.handle, new_mode)
  712. def __exit__(self, *a: object) -> None:
  713. # Restore original mode
  714. windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
  715. class cooked_mode(raw_mode):
  716. """
  717. ::
  718. with cooked_mode(stdin):
  719. ''' The pseudo-terminal stdin is now used in cooked mode. '''
  720. """
  721. def _patch(self) -> None:
  722. # Set cooked.
  723. ENABLE_ECHO_INPUT = 0x0004
  724. ENABLE_LINE_INPUT = 0x0002
  725. ENABLE_PROCESSED_INPUT = 0x0001
  726. windll.kernel32.SetConsoleMode(
  727. self.handle,
  728. self.original_mode.value
  729. | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
  730. )
  731. def _is_win_vt100_input_enabled() -> bool:
  732. """
  733. Returns True when we're running Windows and VT100 escape sequences are
  734. supported.
  735. """
  736. hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  737. # Get original console mode.
  738. original_mode = DWORD(0)
  739. windll.kernel32.GetConsoleMode(hconsole, byref(original_mode))
  740. try:
  741. # Try to enable VT100 sequences.
  742. result: int = windll.kernel32.SetConsoleMode(
  743. hconsole, DWORD(ENABLE_VIRTUAL_TERMINAL_INPUT)
  744. )
  745. return result == 1
  746. finally:
  747. windll.kernel32.SetConsoleMode(hconsole, original_mode)