terminalmanager.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. """A MultiTerminalManager for use in the notebook webserver
  2. - raises HTTPErrors
  3. - creates REST API models
  4. """
  5. # Copyright (c) Jupyter Development Team.
  6. # Distributed under the terms of the Modified BSD License.
  7. from __future__ import annotations
  8. import typing as t
  9. from datetime import timedelta
  10. from jupyter_server._tz import isoformat, utcnow
  11. from jupyter_server.prometheus import metrics
  12. from terminado.management import NamedTermManager, PtyWithClients
  13. from tornado import web
  14. from tornado.ioloop import IOLoop, PeriodicCallback
  15. from traitlets import Integer
  16. from traitlets.config import LoggingConfigurable
  17. RUNNING_TOTAL = metrics.TERMINAL_CURRENTLY_RUNNING_TOTAL
  18. MODEL = t.Dict[str, t.Any]
  19. class TerminalManager(LoggingConfigurable, NamedTermManager): # type:ignore[misc]
  20. """A MultiTerminalManager for use in the notebook webserver"""
  21. _culler_callback = None
  22. _initialized_culler = False
  23. cull_inactive_timeout = Integer(
  24. 0,
  25. config=True,
  26. help="""Timeout (in seconds) in which a terminal has been inactive and ready to be culled.
  27. Values of 0 or lower disable culling.""",
  28. )
  29. cull_interval_default = 300 # 5 minutes
  30. cull_interval = Integer(
  31. cull_interval_default,
  32. config=True,
  33. help="""The interval (in seconds) on which to check for terminals exceeding the inactive timeout value.""",
  34. )
  35. # -------------------------------------------------------------------------
  36. # Methods for managing terminals
  37. # -------------------------------------------------------------------------
  38. def create(self, **kwargs: t.Any) -> MODEL:
  39. """Create a new terminal."""
  40. name, term = self.new_named_terminal(**kwargs)
  41. # Monkey-patch last-activity, similar to kernels. Should we need
  42. # more functionality per terminal, we can look into possible sub-
  43. # classing or containment then.
  44. term.last_activity = utcnow() # type:ignore[attr-defined]
  45. model = self.get_terminal_model(name)
  46. # Increase the metric by one because a new terminal was created
  47. RUNNING_TOTAL.inc()
  48. # Ensure culler is initialized
  49. self._initialize_culler()
  50. return model
  51. def get(self, name: str) -> MODEL:
  52. """Get terminal 'name'."""
  53. return self.get_terminal_model(name)
  54. def list(self) -> list[MODEL]:
  55. """Get a list of all running terminals."""
  56. models = [self.get_terminal_model(name) for name in self.terminals]
  57. # Update the metric below to the length of the list 'terms'
  58. RUNNING_TOTAL.set(len(models))
  59. return models
  60. async def terminate(self, name: str, force: bool = False) -> None:
  61. """Terminate terminal 'name'."""
  62. self._check_terminal(name)
  63. await super().terminate(name, force=force)
  64. # Decrease the metric below by one
  65. # because a terminal has been shutdown
  66. RUNNING_TOTAL.dec()
  67. async def terminate_all(self) -> None:
  68. """Terminate all terminals."""
  69. terms = list(self.terminals)
  70. for term in terms:
  71. await self.terminate(term, force=True)
  72. def get_terminal_model(self, name: str) -> MODEL:
  73. """Return a JSON-safe dict representing a terminal.
  74. For use in representing terminals in the JSON APIs.
  75. """
  76. self._check_terminal(name)
  77. term = self.terminals[name]
  78. return {
  79. "name": name,
  80. "last_activity": isoformat(term.last_activity), # type:ignore[attr-defined]
  81. }
  82. def _check_terminal(self, name: str) -> None:
  83. """Check a that terminal 'name' exists and raise 404 if not."""
  84. if name not in self.terminals:
  85. raise web.HTTPError(404, "Terminal not found: %s" % name)
  86. def _initialize_culler(self) -> None:
  87. """Start culler if 'cull_inactive_timeout' is greater than zero.
  88. Regardless of that value, set flag that we've been here.
  89. """
  90. if not self._initialized_culler and self.cull_inactive_timeout > 0: # noqa: SIM102
  91. if self._culler_callback is None:
  92. _ = IOLoop.current()
  93. if self.cull_interval <= 0: # handle case where user set invalid value
  94. self.log.warning(
  95. "Invalid value for 'cull_interval' detected (%s) - using default value (%s).",
  96. self.cull_interval,
  97. self.cull_interval_default,
  98. )
  99. self.cull_interval = self.cull_interval_default
  100. self._culler_callback = PeriodicCallback(
  101. self._cull_terminals, 1000 * self.cull_interval
  102. )
  103. self.log.info(
  104. "Culling terminals with inactivity > %s seconds at %s second intervals ...",
  105. self.cull_inactive_timeout,
  106. self.cull_interval,
  107. )
  108. self._culler_callback.start()
  109. self._initialized_culler = True
  110. async def _cull_terminals(self) -> None:
  111. self.log.debug(
  112. "Polling every %s seconds for terminals inactive for > %s seconds...",
  113. self.cull_interval,
  114. self.cull_inactive_timeout,
  115. )
  116. # Create a separate list of terminals to avoid conflicting updates while iterating
  117. for name in list(self.terminals):
  118. try:
  119. await self._cull_inactive_terminal(name)
  120. except Exception as e:
  121. self.log.exception(
  122. "The following exception was encountered while checking the "
  123. "activity of terminal %s: %s",
  124. name,
  125. e,
  126. )
  127. async def _cull_inactive_terminal(self, name: str) -> None:
  128. try:
  129. term = self.terminals[name]
  130. except KeyError:
  131. return # KeyErrors are somewhat expected since the terminal can be terminated as the culling check is made.
  132. self.log.debug("name=%s, last_activity=%s", name, term.last_activity) # type:ignore[attr-defined]
  133. if hasattr(term, "last_activity"):
  134. dt_now = utcnow()
  135. dt_inactive = dt_now - term.last_activity
  136. # Compute idle properties
  137. is_time = dt_inactive > timedelta(seconds=self.cull_inactive_timeout)
  138. # Cull the kernel if all three criteria are met
  139. if is_time:
  140. inactivity = int(dt_inactive.total_seconds())
  141. self.log.warning(
  142. "Culling terminal '%s' due to %s seconds of inactivity.", name, inactivity
  143. )
  144. await self.terminate(name, force=True)
  145. def pre_pty_read_hook(self, ptywclients: PtyWithClients) -> None:
  146. """The pre-pty read hook."""
  147. ptywclients.last_activity = utcnow() # type:ignore[attr-defined]