auto_suggest.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. import re
  2. import asyncio
  3. import tokenize
  4. from io import StringIO
  5. from typing import List, Optional, Union, Tuple, ClassVar, Any
  6. from collections.abc import Callable, Generator
  7. import warnings
  8. import prompt_toolkit
  9. from prompt_toolkit.buffer import Buffer
  10. from prompt_toolkit.key_binding import KeyPressEvent
  11. from prompt_toolkit.key_binding.bindings import named_commands as nc
  12. from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion
  13. from prompt_toolkit.document import Document
  14. from prompt_toolkit.history import History
  15. from prompt_toolkit.shortcuts import PromptSession
  16. from prompt_toolkit.layout.processors import (
  17. Processor,
  18. Transformation,
  19. TransformationInput,
  20. )
  21. from IPython.core.getipython import get_ipython
  22. from IPython.utils.tokenutil import generate_tokens
  23. from .filters import pass_through
  24. def _get_query(document: Document):
  25. return document.lines[document.cursor_position_row]
  26. class AppendAutoSuggestionInAnyLine(Processor):
  27. """
  28. Append the auto suggestion to lines other than the last (appending to the
  29. last line is natively supported by the prompt toolkit).
  30. This has a private `_debug` attribute that can be set to True to display
  31. debug information as virtual suggestion on the end of any line. You can do
  32. so with:
  33. >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
  34. >>> AppendAutoSuggestionInAnyLine._debug = True
  35. """
  36. _debug: ClassVar[bool] = False
  37. def __init__(self, style: str = "class:auto-suggestion") -> None:
  38. self.style = style
  39. def apply_transformation(self, ti: TransformationInput) -> Transformation:
  40. """
  41. Apply transformation to the line that is currently being edited.
  42. This is a variation of the original implementation in prompt toolkit
  43. that allows to not only append suggestions to any line, but also to show
  44. multi-line suggestions.
  45. As transformation are applied on a line-by-line basis; we need to trick
  46. a bit, and elide any line that is after the line we are currently
  47. editing, until we run out of completions. We cannot shift the existing
  48. lines
  49. There are multiple cases to handle:
  50. The completions ends before the end of the buffer:
  51. We can resume showing the normal line, and say that some code may
  52. be hidden.
  53. The completions ends at the end of the buffer
  54. We can just say that some code may be hidden.
  55. And separately:
  56. The completions ends beyond the end of the buffer
  57. We need to both say that some code may be hidden, and that some
  58. lines are not shown.
  59. """
  60. last_line_number = ti.document.line_count - 1
  61. is_last_line = ti.lineno == last_line_number
  62. noop = lambda text: Transformation(
  63. fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
  64. )
  65. if ti.document.line_count == 1:
  66. return noop("noop:oneline")
  67. if ti.document.cursor_position_row == last_line_number and is_last_line:
  68. # prompt toolkit already appends something; just leave it be
  69. return noop("noop:last line and cursor")
  70. # first everything before the current line is unchanged.
  71. if ti.lineno < ti.document.cursor_position_row:
  72. return noop("noop:before cursor")
  73. buffer = ti.buffer_control.buffer
  74. if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
  75. return noop("noop:not eol")
  76. delta = ti.lineno - ti.document.cursor_position_row
  77. suggestions = buffer.suggestion.text.splitlines()
  78. if len(suggestions) == 0:
  79. return noop("noop: no suggestions")
  80. if prompt_toolkit.VERSION < (3, 0, 49):
  81. if len(suggestions) > 1 and prompt_toolkit.VERSION < (3, 0, 49):
  82. if ti.lineno == ti.document.cursor_position_row:
  83. return Transformation(
  84. fragments=ti.fragments
  85. + [
  86. (
  87. "red",
  88. "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
  89. )
  90. ]
  91. )
  92. else:
  93. return Transformation(fragments=ti.fragments)
  94. elif len(suggestions) == 1:
  95. if ti.lineno == ti.document.cursor_position_row:
  96. return Transformation(
  97. fragments=ti.fragments + [(self.style, suggestions[0])]
  98. )
  99. return Transformation(fragments=ti.fragments)
  100. if delta == 0:
  101. suggestion = suggestions[0]
  102. return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
  103. if is_last_line:
  104. if delta < len(suggestions):
  105. suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
  106. return Transformation([(self.style, suggestion)])
  107. n_elided = len(suggestions)
  108. for i in range(len(suggestions)):
  109. ll = ti.get_line(last_line_number - i)
  110. el = "".join(l[1] for l in ll).strip()
  111. if el:
  112. break
  113. else:
  114. n_elided -= 1
  115. if n_elided:
  116. return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
  117. else:
  118. return Transformation(
  119. ti.get_line(last_line_number - len(suggestions) + 1)
  120. + ([(self.style, "shift-last-line")] if self._debug else [])
  121. )
  122. elif delta < len(suggestions):
  123. suggestion = suggestions[delta]
  124. return Transformation([(self.style, suggestion)])
  125. else:
  126. shift = ti.lineno - len(suggestions) + 1
  127. return Transformation(ti.get_line(shift))
  128. class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
  129. """
  130. A subclass of AutoSuggestFromHistory that allow navigation to next/previous
  131. suggestion from history. To do so it remembers the current position, but it
  132. state need to carefully be cleared on the right events.
  133. """
  134. skip_lines: int
  135. _connected_apps: list[PromptSession]
  136. # handle to the currently running llm task that appends suggestions to the
  137. # current buffer; we keep a handle to it in order to cancel it when there is a cursor movement, or
  138. # another request.
  139. _llm_task: asyncio.Task | None = None
  140. # This is the constructor of the LLM provider from jupyter-ai
  141. # to which we forward the request to generate inline completions.
  142. _init_llm_provider: Callable | None
  143. _llm_provider_instance: Any | None
  144. _llm_prefixer: Callable = lambda self, x: "wrong"
  145. def __init__(self):
  146. super().__init__()
  147. self.skip_lines = 0
  148. self._connected_apps = []
  149. self._llm_provider_instance = None
  150. self._init_llm_provider = None
  151. self._request_number = 0
  152. def reset_history_position(self, _: Buffer) -> None:
  153. self.skip_lines = 0
  154. def disconnect(self) -> None:
  155. self._cancel_running_llm_task()
  156. for pt_app in self._connected_apps:
  157. text_insert_event = pt_app.default_buffer.on_text_insert
  158. text_insert_event.remove_handler(self.reset_history_position)
  159. def connect(self, pt_app: PromptSession) -> None:
  160. self._connected_apps.append(pt_app)
  161. # note: `on_text_changed` could be used for a bit different behaviour
  162. # on character deletion (i.e. resetting history position on backspace)
  163. pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
  164. pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
  165. def get_suggestion(
  166. self, buffer: Buffer, document: Document
  167. ) -> Optional[Suggestion]:
  168. text = _get_query(document)
  169. if text.strip():
  170. for suggestion, _ in self._find_next_match(
  171. text, self.skip_lines, buffer.history
  172. ):
  173. return Suggestion(suggestion)
  174. return None
  175. def _dismiss(self, buffer, *args, **kwargs) -> None:
  176. self._cancel_running_llm_task()
  177. buffer.suggestion = None
  178. def _find_match(
  179. self, text: str, skip_lines: float, history: History, previous: bool
  180. ) -> Generator[Tuple[str, float], None, None]:
  181. """
  182. text : str
  183. Text content to find a match for, the user cursor is most of the
  184. time at the end of this text.
  185. skip_lines : float
  186. number of items to skip in the search, this is used to indicate how
  187. far in the list the user has navigated by pressing up or down.
  188. The float type is used as the base value is +inf
  189. history : History
  190. prompt_toolkit History instance to fetch previous entries from.
  191. previous : bool
  192. Direction of the search, whether we are looking previous match
  193. (True), or next match (False).
  194. Yields
  195. ------
  196. Tuple with:
  197. str:
  198. current suggestion.
  199. float:
  200. will actually yield only ints, which is passed back via skip_lines,
  201. which may be a +inf (float)
  202. """
  203. line_number = -1
  204. for string in reversed(list(history.get_strings())):
  205. for line in reversed(string.splitlines()):
  206. line_number += 1
  207. if not previous and line_number < skip_lines:
  208. continue
  209. # do not return empty suggestions as these
  210. # close the auto-suggestion overlay (and are useless)
  211. if line.startswith(text) and len(line) > len(text):
  212. yield line[len(text) :], line_number
  213. if previous and line_number >= skip_lines:
  214. return
  215. def _find_next_match(
  216. self, text: str, skip_lines: float, history: History
  217. ) -> Generator[Tuple[str, float], None, None]:
  218. return self._find_match(text, skip_lines, history, previous=False)
  219. def _find_previous_match(self, text: str, skip_lines: float, history: History):
  220. return reversed(
  221. list(self._find_match(text, skip_lines, history, previous=True))
  222. )
  223. def up(self, query: str, other_than: str, history: History) -> None:
  224. self._cancel_running_llm_task()
  225. for suggestion, line_number in self._find_next_match(
  226. query, self.skip_lines, history
  227. ):
  228. # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
  229. # we want to switch from 'very.b' to 'very.a' because a) if the
  230. # suggestion equals current text, prompt-toolkit aborts suggesting
  231. # b) user likely would not be interested in 'very' anyways (they
  232. # already typed it).
  233. if query + suggestion != other_than:
  234. self.skip_lines = line_number
  235. break
  236. else:
  237. # no matches found, cycle back to beginning
  238. self.skip_lines = 0
  239. def down(self, query: str, other_than: str, history: History) -> None:
  240. self._cancel_running_llm_task()
  241. for suggestion, line_number in self._find_previous_match(
  242. query, self.skip_lines, history
  243. ):
  244. if query + suggestion != other_than:
  245. self.skip_lines = line_number
  246. break
  247. else:
  248. # no matches found, cycle to end
  249. for suggestion, line_number in self._find_previous_match(
  250. query, float("Inf"), history
  251. ):
  252. if query + suggestion != other_than:
  253. self.skip_lines = line_number
  254. break
  255. def _cancel_running_llm_task(self) -> None:
  256. """
  257. Try to cancel the currently running llm_task if exists, and set it to None.
  258. """
  259. if self._llm_task is not None:
  260. if self._llm_task.done():
  261. self._llm_task = None
  262. return
  263. cancelled = self._llm_task.cancel()
  264. if cancelled:
  265. self._llm_task = None
  266. if not cancelled:
  267. warnings.warn(
  268. "LLM task not cancelled, does your provider support cancellation?"
  269. )
  270. @property
  271. def _llm_provider(self):
  272. """Lazy-initialized instance of the LLM provider.
  273. Do not use in the constructor, as `_init_llm_provider` can trigger slow side-effects.
  274. """
  275. if self._llm_provider_instance is None and self._init_llm_provider:
  276. self._llm_provider_instance = self._init_llm_provider()
  277. return self._llm_provider_instance
  278. async def _trigger_llm(self, buffer) -> None:
  279. """
  280. This will ask the current llm provider a suggestion for the current buffer.
  281. If there is a currently running llm task, it will cancel it.
  282. """
  283. # we likely want to store the current cursor position, and cancel if the cursor has moved.
  284. try:
  285. import jupyter_ai_magics
  286. except ModuleNotFoundError:
  287. jupyter_ai_magics = None
  288. if not self._llm_provider:
  289. warnings.warn("No LLM provider found, cannot trigger LLM completions")
  290. return
  291. if jupyter_ai_magics is None:
  292. warnings.warn("LLM Completion requires `jupyter_ai_magics` to be installed")
  293. self._cancel_running_llm_task()
  294. async def error_catcher(buffer):
  295. """
  296. This catches and log any errors, as otherwise this is just
  297. lost in the void of the future running task.
  298. """
  299. try:
  300. await self._trigger_llm_core(buffer)
  301. except Exception as e:
  302. get_ipython().log.error("error %s", e)
  303. raise
  304. # here we need a cancellable task so we can't just await the error caught
  305. self._llm_task = asyncio.create_task(error_catcher(buffer))
  306. await self._llm_task
  307. async def _trigger_llm_core(self, buffer: Buffer):
  308. """
  309. This is the core of the current llm request.
  310. Here we build a compatible `InlineCompletionRequest` and ask the llm
  311. provider to stream it's response back to us iteratively setting it as
  312. the suggestion on the current buffer.
  313. Unlike with JupyterAi, as we do not have multiple cells, the cell id
  314. is always set to `None`.
  315. We set the prefix to the current cell content, but could also insert the
  316. rest of the history or even just the non-fail history.
  317. In the same way, we do not have cell id.
  318. LLM provider may return multiple suggestion stream, but for the time
  319. being we only support one.
  320. Here we make the assumption that the provider will have
  321. stream_inline_completions, I'm not sure it is the case for all
  322. providers.
  323. """
  324. try:
  325. import jupyter_ai.completions.models as jai_models
  326. except ModuleNotFoundError:
  327. jai_models = None
  328. if not jai_models:
  329. raise ValueError("jupyter-ai is not installed")
  330. if not self._llm_provider:
  331. raise ValueError("No LLM provider found, cannot trigger LLM completions")
  332. hm = buffer.history.shell.history_manager
  333. prefix = self._llm_prefixer(hm)
  334. get_ipython().log.debug("prefix: %s", prefix)
  335. self._request_number += 1
  336. request_number = self._request_number
  337. request = jai_models.InlineCompletionRequest(
  338. number=request_number,
  339. prefix=prefix + buffer.document.text_before_cursor,
  340. suffix=buffer.document.text_after_cursor,
  341. mime="text/x-python",
  342. stream=True,
  343. path=None,
  344. language="python",
  345. cell_id=None,
  346. )
  347. async for reply_and_chunks in self._llm_provider.stream_inline_completions(
  348. request
  349. ):
  350. if self._request_number != request_number:
  351. # If a new suggestion was requested, skip processing this one.
  352. return
  353. if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
  354. if len(reply_and_chunks.list.items) > 1:
  355. raise ValueError(
  356. "Terminal IPython cannot deal with multiple LLM suggestions at once"
  357. )
  358. buffer.suggestion = Suggestion(
  359. reply_and_chunks.list.items[0].insertText
  360. )
  361. buffer.on_suggestion_set.fire()
  362. elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
  363. buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
  364. buffer.on_suggestion_set.fire()
  365. return
  366. async def llm_autosuggestion(event: KeyPressEvent):
  367. """
  368. Ask the AutoSuggester from history to delegate to ask an LLM for completion
  369. This will first make sure that the current buffer have _MIN_LINES (7)
  370. available lines to insert the LLM completion
  371. Provisional as of 8.32, may change without warnings
  372. """
  373. _MIN_LINES = 5
  374. provider = get_ipython().auto_suggest
  375. if not isinstance(provider, NavigableAutoSuggestFromHistory):
  376. return
  377. doc = event.current_buffer.document
  378. lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
  379. for _ in range(lines_to_insert):
  380. event.current_buffer.insert_text("\n", move_cursor=False, fire_event=False)
  381. await provider._trigger_llm(event.current_buffer)
  382. def accept_or_jump_to_end(event: KeyPressEvent):
  383. """Apply autosuggestion or jump to end of line."""
  384. buffer = event.current_buffer
  385. d = buffer.document
  386. after_cursor = d.text[d.cursor_position :]
  387. lines = after_cursor.split("\n")
  388. end_of_current_line = lines[0].strip()
  389. suggestion = buffer.suggestion
  390. if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
  391. buffer.insert_text(suggestion.text)
  392. else:
  393. nc.end_of_line(event)
  394. def accept(event: KeyPressEvent):
  395. """Accept autosuggestion"""
  396. buffer = event.current_buffer
  397. suggestion = buffer.suggestion
  398. if suggestion:
  399. buffer.insert_text(suggestion.text)
  400. else:
  401. nc.forward_char(event)
  402. def discard(event: KeyPressEvent):
  403. """Discard autosuggestion"""
  404. buffer = event.current_buffer
  405. buffer.suggestion = None
  406. def accept_word(event: KeyPressEvent):
  407. """Fill partial autosuggestion by word"""
  408. buffer = event.current_buffer
  409. suggestion = buffer.suggestion
  410. if suggestion:
  411. t = re.split(r"(\S+\s+)", suggestion.text)
  412. buffer.insert_text(next((x for x in t if x), ""))
  413. else:
  414. nc.forward_word(event)
  415. def accept_character(event: KeyPressEvent):
  416. """Fill partial autosuggestion by character"""
  417. b = event.current_buffer
  418. suggestion = b.suggestion
  419. if suggestion and suggestion.text:
  420. b.insert_text(suggestion.text[0])
  421. def accept_and_keep_cursor(event: KeyPressEvent):
  422. """Accept autosuggestion and keep cursor in place"""
  423. buffer = event.current_buffer
  424. old_position = buffer.cursor_position
  425. suggestion = buffer.suggestion
  426. if suggestion:
  427. buffer.insert_text(suggestion.text)
  428. buffer.cursor_position = old_position
  429. def accept_and_move_cursor_left(event: KeyPressEvent):
  430. """Accept autosuggestion and move cursor left in place"""
  431. accept_and_keep_cursor(event)
  432. nc.backward_char(event)
  433. def _update_hint(buffer: Buffer):
  434. if buffer.auto_suggest:
  435. suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
  436. buffer.suggestion = suggestion
  437. def backspace_and_resume_hint(event: KeyPressEvent):
  438. """Resume autosuggestions after deleting last character"""
  439. nc.backward_delete_char(event)
  440. _update_hint(event.current_buffer)
  441. def resume_hinting(event: KeyPressEvent):
  442. """Resume autosuggestions"""
  443. pass_through.reply(event)
  444. # Order matters: if update happened first and event reply second, the
  445. # suggestion would be auto-accepted if both actions are bound to same key.
  446. _update_hint(event.current_buffer)
  447. def up_and_update_hint(event: KeyPressEvent):
  448. """Go up and update hint"""
  449. current_buffer = event.current_buffer
  450. current_buffer.auto_up(count=event.arg)
  451. _update_hint(current_buffer)
  452. def down_and_update_hint(event: KeyPressEvent):
  453. """Go down and update hint"""
  454. current_buffer = event.current_buffer
  455. current_buffer.auto_down(count=event.arg)
  456. _update_hint(current_buffer)
  457. def accept_token(event: KeyPressEvent):
  458. """Fill partial autosuggestion by token"""
  459. b = event.current_buffer
  460. suggestion = b.suggestion
  461. if suggestion:
  462. prefix = _get_query(b.document)
  463. text = prefix + suggestion.text
  464. tokens: List[Optional[str]] = [None, None, None]
  465. substrings = [""]
  466. i = 0
  467. for token in generate_tokens(StringIO(text).readline):
  468. if token.type == tokenize.NEWLINE:
  469. index = len(text)
  470. else:
  471. index = text.index(token[1], len(substrings[-1]))
  472. substrings.append(text[:index])
  473. tokenized_so_far = substrings[-1]
  474. if tokenized_so_far.startswith(prefix):
  475. if i == 0 and len(tokenized_so_far) > len(prefix):
  476. tokens[0] = tokenized_so_far[len(prefix) :]
  477. substrings.append(tokenized_so_far)
  478. i += 1
  479. tokens[i] = token[1]
  480. if i == 2:
  481. break
  482. i += 1
  483. if tokens[0]:
  484. to_insert: str
  485. insert_text = substrings[-2]
  486. if tokens[1] and len(tokens[1]) == 1:
  487. insert_text = substrings[-1]
  488. to_insert = insert_text[len(prefix) :]
  489. b.insert_text(to_insert)
  490. return
  491. nc.forward_word(event)
  492. Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
  493. def _swap_autosuggestion(
  494. buffer: Buffer,
  495. provider: NavigableAutoSuggestFromHistory,
  496. direction_method: Callable,
  497. ):
  498. """
  499. We skip most recent history entry (in either direction) if it equals the
  500. current autosuggestion because if user cycles when auto-suggestion is shown
  501. they most likely want something else than what was suggested (otherwise
  502. they would have accepted the suggestion).
  503. """
  504. suggestion = buffer.suggestion
  505. if not suggestion:
  506. return
  507. query = _get_query(buffer.document)
  508. current = query + suggestion.text
  509. direction_method(query=query, other_than=current, history=buffer.history)
  510. new_suggestion = provider.get_suggestion(buffer, buffer.document)
  511. buffer.suggestion = new_suggestion
  512. def swap_autosuggestion_up(event: KeyPressEvent):
  513. """Get next autosuggestion from history."""
  514. shell = get_ipython()
  515. provider = shell.auto_suggest
  516. if not isinstance(provider, NavigableAutoSuggestFromHistory):
  517. return
  518. return _swap_autosuggestion(
  519. buffer=event.current_buffer, provider=provider, direction_method=provider.up
  520. )
  521. def swap_autosuggestion_down(event: KeyPressEvent):
  522. """Get previous autosuggestion from history."""
  523. shell = get_ipython()
  524. provider = shell.auto_suggest
  525. if not isinstance(provider, NavigableAutoSuggestFromHistory):
  526. return
  527. return _swap_autosuggestion(
  528. buffer=event.current_buffer,
  529. provider=provider,
  530. direction_method=provider.down,
  531. )