manager.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. """Base class to manage a running kernel"""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import asyncio
  5. import functools
  6. import os
  7. import re
  8. import signal
  9. import sys
  10. import typing as t
  11. import uuid
  12. import warnings
  13. from asyncio.futures import Future
  14. from concurrent.futures import Future as CFuture
  15. from contextlib import contextmanager
  16. from enum import Enum
  17. import zmq
  18. from jupyter_core.utils import run_sync
  19. from traitlets import (
  20. Any,
  21. Bool,
  22. Dict,
  23. DottedObjectName,
  24. Float,
  25. Instance,
  26. Type,
  27. Unicode,
  28. default,
  29. observe,
  30. observe_compat,
  31. )
  32. from traitlets.utils.importstring import import_item
  33. from . import kernelspec
  34. from .asynchronous import AsyncKernelClient
  35. from .blocking import BlockingKernelClient
  36. from .client import KernelClient
  37. from .connect import ConnectionFileMixin
  38. from .managerabc import KernelManagerABC
  39. from .provisioning import KernelProvisionerBase
  40. from .provisioning import KernelProvisionerFactory as KPF # noqa
  41. class _ShutdownStatus(Enum):
  42. """
  43. This is so far used only for testing in order to track the internal state of
  44. the shutdown logic, and verifying which path is taken for which
  45. missbehavior.
  46. """
  47. Unset = None
  48. ShutdownRequest = "ShutdownRequest"
  49. SigtermRequest = "SigtermRequest"
  50. SigkillRequest = "SigkillRequest"
  51. F = t.TypeVar("F", bound=t.Callable[..., t.Any])
  52. def _get_future() -> t.Union[Future, CFuture]:
  53. """Get an appropriate Future object"""
  54. try:
  55. asyncio.get_running_loop()
  56. return Future()
  57. except RuntimeError:
  58. # No event loop running, use concurrent future
  59. return CFuture()
  60. def in_pending_state(method: F) -> F:
  61. """Sets the kernel to a pending state by
  62. creating a fresh Future for the KernelManager's `ready`
  63. attribute. Once the method is finished, set the Future's results.
  64. """
  65. @t.no_type_check
  66. @functools.wraps(method)
  67. async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
  68. """Create a future for the decorated method."""
  69. if self._attempted_start or not self._ready:
  70. self._ready = _get_future()
  71. try:
  72. # call wrapped method, await, and set the result or exception.
  73. out = await method(self, *args, **kwargs)
  74. # Add a small sleep to ensure tests can capture the state before done
  75. await asyncio.sleep(0.01)
  76. if self.owns_kernel:
  77. self._ready.set_result(None)
  78. return out
  79. except Exception as e:
  80. self._ready.set_exception(e)
  81. self.log.exception(self._ready.exception())
  82. raise e
  83. return t.cast(F, wrapper)
  84. class KernelManager(ConnectionFileMixin):
  85. """Manages a single kernel in a subprocess on this host.
  86. This version starts kernels with Popen.
  87. """
  88. _ready: t.Union[Future, CFuture] | None
  89. def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
  90. """Initialize a kernel manager."""
  91. if args:
  92. warnings.warn(
  93. "Passing positional only arguments to "
  94. "`KernelManager.__init__` is deprecated since jupyter_client"
  95. " 8.6, and will become an error on future versions. Positional "
  96. " arguments have been ignored since jupyter_client 7.0",
  97. DeprecationWarning,
  98. stacklevel=2,
  99. )
  100. self._owns_kernel = kwargs.pop("owns_kernel", True)
  101. super().__init__(**kwargs)
  102. self._shutdown_status = _ShutdownStatus.Unset
  103. self._attempted_start = False
  104. self._ready = None
  105. _created_context: Bool = Bool(False)
  106. # The PyZMQ Context to use for communication with the kernel.
  107. context: Instance = Instance(zmq.Context)
  108. @default("context")
  109. def _context_default(self) -> zmq.Context:
  110. self._created_context = True
  111. return zmq.Context()
  112. # the class to create with our `client` method
  113. client_class: DottedObjectName = DottedObjectName(
  114. "jupyter_client.blocking.BlockingKernelClient", config=True
  115. )
  116. client_factory: Type = Type(klass=KernelClient, config=True)
  117. @default("client_factory")
  118. def _client_factory_default(self) -> Type:
  119. return import_item(self.client_class)
  120. @observe("client_class")
  121. def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None:
  122. self.client_factory = import_item(str(change["new"]))
  123. kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True)
  124. # The kernel provisioner with which this KernelManager is communicating.
  125. # This will generally be a LocalProvisioner instance unless the kernelspec
  126. # indicates otherwise.
  127. provisioner: KernelProvisionerBase | None = None
  128. kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager)
  129. @default("kernel_spec_manager")
  130. def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager:
  131. return kernelspec.KernelSpecManager(data_dir=self.data_dir)
  132. @observe("kernel_spec_manager")
  133. @observe_compat
  134. def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None:
  135. self._kernel_spec = None
  136. shutdown_wait_time: Float = Float(
  137. 5.0,
  138. config=True,
  139. help="Time to wait for a kernel to terminate before killing it, "
  140. "in seconds. When a shutdown request is initiated, the kernel "
  141. "will be immediately sent an interrupt (SIGINT), followed"
  142. "by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
  143. "it will be sent a terminate (SIGTERM) request, and finally at "
  144. "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
  145. "and kill may be equivalent on windows. Note that this value can be"
  146. "overridden by the in-use kernel provisioner since shutdown times may"
  147. "vary by provisioned environment.",
  148. )
  149. kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME)
  150. @observe("kernel_name")
  151. def _kernel_name_changed(self, change: t.Dict[str, str]) -> None:
  152. self._kernel_spec = None
  153. if change["new"] == "python":
  154. self.kernel_name = kernelspec.NATIVE_KERNEL_NAME
  155. _kernel_spec: kernelspec.KernelSpec | None = None
  156. @property
  157. def kernel_spec(self) -> kernelspec.KernelSpec | None:
  158. if self._kernel_spec is None and self.kernel_name != "":
  159. self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name)
  160. return self._kernel_spec
  161. cache_ports: Bool = Bool(
  162. False,
  163. config=True,
  164. help="True if the MultiKernelManager should cache ports for this KernelManager instance",
  165. )
  166. @default("cache_ports")
  167. def _default_cache_ports(self) -> bool:
  168. return self.transport == "tcp"
  169. @property
  170. def ready(self) -> t.Union[CFuture, Future]:
  171. """A future that resolves when the kernel process has started for the first time"""
  172. if not self._ready:
  173. self._ready = _get_future()
  174. return self._ready
  175. @property
  176. def ipykernel(self) -> bool:
  177. return self.kernel_name in {"python", "python2", "python3"}
  178. # Protected traits
  179. _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True)
  180. _control_socket: Any = Any()
  181. _restarter: Any = Any()
  182. autorestart: Bool = Bool(
  183. True, config=True, help="""Should we autorestart the kernel if it dies."""
  184. )
  185. shutting_down: bool = False
  186. def __del__(self) -> None:
  187. self._close_control_socket()
  188. self.cleanup_connection_file()
  189. # --------------------------------------------------------------------------
  190. # Kernel restarter
  191. # --------------------------------------------------------------------------
  192. def start_restarter(self) -> None:
  193. """Start the kernel restarter."""
  194. pass
  195. def stop_restarter(self) -> None:
  196. """Stop the kernel restarter."""
  197. pass
  198. def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
  199. """Register a callback to be called when a kernel is restarted"""
  200. if self._restarter is None:
  201. return
  202. self._restarter.add_callback(callback, event)
  203. def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
  204. """Unregister a callback to be called when a kernel is restarted"""
  205. if self._restarter is None:
  206. return
  207. self._restarter.remove_callback(callback, event)
  208. # --------------------------------------------------------------------------
  209. # create a Client connected to our Kernel
  210. # --------------------------------------------------------------------------
  211. def client(self, **kwargs: t.Any) -> BlockingKernelClient:
  212. """Create a client configured to connect to our kernel"""
  213. kw: dict = {}
  214. kw.update(self.get_connection_info(session=True))
  215. kw.update(
  216. {
  217. "connection_file": self.connection_file,
  218. "parent": self,
  219. }
  220. )
  221. # add kwargs last, for manual overrides
  222. kw.update(kwargs)
  223. return self.client_factory(**kw)
  224. # --------------------------------------------------------------------------
  225. # Kernel management
  226. # --------------------------------------------------------------------------
  227. def resolve_path(self, path: str) -> str | None:
  228. """Resolve path to given file."""
  229. assert self.provisioner is not None
  230. return self.provisioner.resolve_path(path)
  231. def update_env(self, *, env: t.Dict[str, str]) -> None:
  232. """
  233. Allow to update the environment of a kernel manager.
  234. This will take effect only after kernel restart when the new env is
  235. passed to the new kernel.
  236. This is useful as some of the information of the current kernel reflect
  237. the state of the session that started it, and those session information
  238. (like the attach file path, or name), are mutable.
  239. .. version-added: 8.5
  240. """
  241. # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict
  242. if (
  243. isinstance(self._launch_args, dict)
  244. and "env" in self._launch_args
  245. and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable]
  246. ):
  247. self._launch_args["env"].update(env) # type: ignore [unreachable]
  248. def format_kernel_cmd(self, extra_arguments: t.List[str] | None = None) -> t.List[str]:
  249. """Replace templated args (e.g. {connection_file})"""
  250. extra_arguments = extra_arguments or []
  251. assert self.kernel_spec is not None
  252. cmd = self.kernel_spec.argv + extra_arguments
  253. if cmd and cmd[0] in {
  254. "python",
  255. "python%i" % sys.version_info[0],
  256. "python%i.%i" % sys.version_info[:2],
  257. }:
  258. # executable is 'python' or 'python3', use sys.executable.
  259. # These will typically be the same,
  260. # but if the current process is in an env
  261. # and has been launched by abspath without
  262. # activating the env, python on PATH may not be sys.executable,
  263. # but it should be.
  264. cmd[0] = sys.executable
  265. # Make sure to use the realpath for the connection_file
  266. # On windows, when running with the store python, the connection_file path
  267. # is not usable by non python kernels because the path is being rerouted when
  268. # inside of a store app.
  269. # See this bug here: https://bugs.python.org/issue41196
  270. ns: t.Dict[str, t.Any] = {
  271. "connection_file": os.path.realpath(self.connection_file),
  272. "prefix": sys.prefix,
  273. }
  274. if self.kernel_spec: # type:ignore[truthy-bool]
  275. ns["resource_dir"] = self.kernel_spec.resource_dir
  276. assert isinstance(self._launch_args, dict)
  277. ns.update(self._launch_args)
  278. pat = re.compile(r"\{([A-Za-z0-9_]+)\}")
  279. def from_ns(match: t.Any) -> t.Any:
  280. """Get the key out of ns if it's there, otherwise no change."""
  281. return ns.get(match.group(1), match.group())
  282. return [pat.sub(from_ns, arg) for arg in cmd]
  283. async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None:
  284. """actually launch the kernel
  285. override in a subclass to launch kernel subprocesses differently
  286. Note that provisioners can now be used to customize kernel environments
  287. and
  288. """
  289. assert self.provisioner is not None
  290. connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw)
  291. assert self.provisioner.has_process
  292. # Provisioner provides the connection information. Load into kernel manager
  293. # and write the connection file, if not already done.
  294. self._reconcile_connection_info(connection_info)
  295. _launch_kernel = run_sync(_async_launch_kernel)
  296. # Control socket used for polite kernel shutdown
  297. def _connect_control_socket(self) -> None:
  298. if self._control_socket is None:
  299. self._control_socket = self._create_connected_socket("control")
  300. self._control_socket.linger = 100
  301. def _close_control_socket(self) -> None:
  302. if self._control_socket is None:
  303. return
  304. self._control_socket.close()
  305. self._control_socket = None
  306. async def _async_pre_start_kernel(
  307. self, **kw: t.Any
  308. ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]:
  309. """Prepares a kernel for startup in a separate process.
  310. If random ports (port=0) are being used, this method must be called
  311. before the channels are created.
  312. Parameters
  313. ----------
  314. `**kw` : optional
  315. keyword arguments that are passed down to build the kernel_cmd
  316. and launching the kernel (e.g. Popen kwargs).
  317. """
  318. self.shutting_down = False
  319. self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4()))
  320. # save kwargs for use in restart
  321. # assigning Traitlets Dicts to Dict make mypy unhappy but is ok
  322. self._launch_args = kw.copy()
  323. if self.provisioner is None: # will not be None on restarts
  324. self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance(
  325. self.kernel_id,
  326. self.kernel_spec,
  327. parent=self,
  328. )
  329. kw = await self.provisioner.pre_launch(**kw)
  330. kernel_cmd = kw.pop("cmd")
  331. return kernel_cmd, kw
  332. pre_start_kernel = run_sync(_async_pre_start_kernel)
  333. async def _async_post_start_kernel(self, **kw: t.Any) -> None:
  334. """Performs any post startup tasks relative to the kernel.
  335. Parameters
  336. ----------
  337. `**kw` : optional
  338. keyword arguments that were used in the kernel process's launch.
  339. """
  340. self.start_restarter()
  341. self._connect_control_socket()
  342. assert self.provisioner is not None
  343. await self.provisioner.post_launch(**kw)
  344. post_start_kernel = run_sync(_async_post_start_kernel)
  345. @in_pending_state
  346. async def _async_start_kernel(self, **kw: t.Any) -> None:
  347. """Starts a kernel on this host in a separate process.
  348. If random ports (port=0) are being used, this method must be called
  349. before the channels are created.
  350. Parameters
  351. ----------
  352. `**kw` : optional
  353. keyword arguments that are passed down to build the kernel_cmd
  354. and launching the kernel (e.g. Popen kwargs).
  355. """
  356. self._attempted_start = True
  357. kernel_cmd, kw = await self._async_pre_start_kernel(**kw)
  358. # launch the kernel subprocess
  359. self.log.debug("Starting kernel: %s", kernel_cmd)
  360. await self._async_launch_kernel(kernel_cmd, **kw)
  361. await self._async_post_start_kernel(**kw)
  362. start_kernel = run_sync(_async_start_kernel)
  363. async def _async_request_shutdown(self, restart: bool = False) -> None:
  364. """Send a shutdown request via control channel"""
  365. content = {"restart": restart}
  366. msg = self.session.msg("shutdown_request", content=content)
  367. # ensure control socket is connected
  368. self._connect_control_socket()
  369. self.session.send(self._control_socket, msg)
  370. assert self.provisioner is not None
  371. await self.provisioner.shutdown_requested(restart=restart)
  372. self._shutdown_status = _ShutdownStatus.ShutdownRequest
  373. request_shutdown = run_sync(_async_request_shutdown)
  374. async def _async_finish_shutdown(
  375. self,
  376. waittime: float | None = None,
  377. pollinterval: float = 0.1,
  378. restart: bool = False,
  379. ) -> None:
  380. """Wait for kernel shutdown, then kill process if it doesn't shutdown.
  381. This does not send shutdown requests - use :meth:`request_shutdown`
  382. first.
  383. """
  384. if waittime is None:
  385. waittime = max(self.shutdown_wait_time, 0)
  386. if self.provisioner: # Allow provisioner to override
  387. waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime)
  388. try:
  389. await asyncio.wait_for(
  390. self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
  391. )
  392. except asyncio.TimeoutError:
  393. self.log.debug("Kernel is taking too long to finish, terminating")
  394. self._shutdown_status = _ShutdownStatus.SigtermRequest
  395. await self._async_send_kernel_sigterm()
  396. try:
  397. await asyncio.wait_for(
  398. self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
  399. )
  400. except asyncio.TimeoutError:
  401. self.log.debug("Kernel is taking too long to finish, killing")
  402. self._shutdown_status = _ShutdownStatus.SigkillRequest
  403. await self._async_kill_kernel(restart=restart)
  404. else:
  405. # Process is no longer alive, wait and clear
  406. if self.has_kernel:
  407. assert self.provisioner is not None
  408. await self.provisioner.wait()
  409. finish_shutdown = run_sync(_async_finish_shutdown)
  410. async def _async_cleanup_resources(self, restart: bool = False) -> None:
  411. """Clean up resources when the kernel is shut down"""
  412. if not restart:
  413. self.cleanup_connection_file()
  414. self.cleanup_ipc_files()
  415. self._close_control_socket()
  416. self.session.parent = None
  417. if self._created_context and not restart:
  418. self.context.destroy(linger=100)
  419. if self.provisioner:
  420. await self.provisioner.cleanup(restart=restart)
  421. cleanup_resources = run_sync(_async_cleanup_resources)
  422. @in_pending_state
  423. async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
  424. """Attempts to stop the kernel process cleanly.
  425. This attempts to shutdown the kernels cleanly by:
  426. 1. Sending it a shutdown message over the control channel.
  427. 2. If that fails, the kernel is shutdown forcibly by sending it
  428. a signal.
  429. Parameters
  430. ----------
  431. now : bool
  432. Should the kernel be forcible killed *now*. This skips the
  433. first, nice shutdown attempt.
  434. restart: bool
  435. Will this kernel be restarted after it is shutdown. When this
  436. is True, connection files will not be cleaned up.
  437. """
  438. if not self.owns_kernel:
  439. return
  440. self.shutting_down = True # Used by restarter to prevent race condition
  441. # Stop monitoring for restarting while we shutdown.
  442. self.stop_restarter()
  443. if self.has_kernel:
  444. await self._async_interrupt_kernel()
  445. if now:
  446. await self._async_kill_kernel()
  447. else:
  448. await self._async_request_shutdown(restart=restart)
  449. # Don't send any additional kernel kill messages immediately, to give
  450. # the kernel a chance to properly execute shutdown actions. Wait for at
  451. # most 1s, checking every 0.1s.
  452. await self._async_finish_shutdown(restart=restart)
  453. await self._async_cleanup_resources(restart=restart)
  454. shutdown_kernel = run_sync(_async_shutdown_kernel)
  455. async def _async_restart_kernel(
  456. self, now: bool = False, newports: bool = False, **kw: t.Any
  457. ) -> None:
  458. """Restarts a kernel with the arguments that were used to launch it.
  459. Parameters
  460. ----------
  461. now : bool, optional
  462. If True, the kernel is forcefully restarted *immediately*, without
  463. having a chance to do any cleanup action. Otherwise the kernel is
  464. given 1s to clean up before a forceful restart is issued.
  465. In all cases the kernel is restarted, the only difference is whether
  466. it is given a chance to perform a clean shutdown or not.
  467. newports : bool, optional
  468. If the old kernel was launched with random ports, this flag decides
  469. whether the same ports and connection file will be used again.
  470. If False, the same ports and connection file are used. This is
  471. the default. If True, new random port numbers are chosen and a
  472. new connection file is written. It is still possible that the newly
  473. chosen random port numbers happen to be the same as the old ones.
  474. `**kw` : optional
  475. Any options specified here will overwrite those used to launch the
  476. kernel.
  477. """
  478. if self._launch_args is None:
  479. msg = "Cannot restart the kernel. No previous call to 'start_kernel'."
  480. raise RuntimeError(msg)
  481. # Stop currently running kernel.
  482. await self._async_shutdown_kernel(now=now, restart=True)
  483. if newports:
  484. self.cleanup_random_ports()
  485. # Start new kernel.
  486. self._launch_args.update(kw)
  487. await self._async_start_kernel(**self._launch_args)
  488. restart_kernel = run_sync(_async_restart_kernel)
  489. @property
  490. def owns_kernel(self) -> bool:
  491. return self._owns_kernel
  492. @property
  493. def has_kernel(self) -> bool:
  494. """Has a kernel process been started that we are actively managing."""
  495. return self.provisioner is not None and self.provisioner.has_process
  496. async def _async_send_kernel_sigterm(self, restart: bool = False) -> None:
  497. """similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
  498. if self.has_kernel:
  499. assert self.provisioner is not None
  500. await self.provisioner.terminate(restart=restart)
  501. _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)
  502. async def _async_kill_kernel(self, restart: bool = False) -> None:
  503. """Kill the running kernel.
  504. This is a private method, callers should use shutdown_kernel(now=True).
  505. """
  506. if self.has_kernel:
  507. assert self.provisioner is not None
  508. await self.provisioner.kill(restart=restart)
  509. # Wait until the kernel terminates.
  510. try:
  511. await asyncio.wait_for(self._async_wait(), timeout=5.0)
  512. except asyncio.TimeoutError:
  513. # Wait timed out, just log warning but continue - not much more we can do.
  514. self.log.warning("Wait for final termination of kernel timed out - continuing...")
  515. pass
  516. else:
  517. # Process is no longer alive, wait and clear
  518. if self.has_kernel:
  519. await self.provisioner.wait()
  520. _kill_kernel = run_sync(_async_kill_kernel)
  521. async def _async_interrupt_kernel(self) -> None:
  522. """Interrupts the kernel by sending it a signal.
  523. Unlike ``signal_kernel``, this operation is well supported on all
  524. platforms.
  525. """
  526. if not self.has_kernel and self._ready is not None:
  527. if isinstance(self._ready, CFuture):
  528. ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready))
  529. else:
  530. ready = self._ready
  531. # Wait for a shutdown if one is in progress.
  532. if self.shutting_down:
  533. await ready
  534. # Wait for a startup.
  535. await ready
  536. if self.has_kernel:
  537. assert self.kernel_spec is not None
  538. interrupt_mode = self.kernel_spec.interrupt_mode
  539. if interrupt_mode == "signal":
  540. await self._async_signal_kernel(signal.SIGINT)
  541. elif interrupt_mode == "message":
  542. msg = self.session.msg("interrupt_request", content={})
  543. self._connect_control_socket()
  544. self.session.send(self._control_socket, msg)
  545. else:
  546. msg = "Cannot interrupt kernel. No kernel is running!"
  547. raise RuntimeError(msg)
  548. interrupt_kernel = run_sync(_async_interrupt_kernel)
  549. async def _async_signal_kernel(self, signum: int) -> None:
  550. """Sends a signal to the process group of the kernel (this
  551. usually includes the kernel and any subprocesses spawned by
  552. the kernel).
  553. Note that since only SIGTERM is supported on Windows, this function is
  554. only useful on Unix systems.
  555. """
  556. if self.has_kernel:
  557. assert self.provisioner is not None
  558. await self.provisioner.send_signal(signum)
  559. else:
  560. msg = "Cannot signal kernel. No kernel is running!"
  561. raise RuntimeError(msg)
  562. signal_kernel = run_sync(_async_signal_kernel)
  563. async def _async_is_alive(self) -> bool:
  564. """Is the kernel process still running?"""
  565. if not self.owns_kernel:
  566. return True
  567. if self.has_kernel:
  568. assert self.provisioner is not None
  569. ret = await self.provisioner.poll()
  570. if ret is None:
  571. return True
  572. return False
  573. is_alive = run_sync(_async_is_alive)
  574. async def _async_wait(self, pollinterval: float = 0.1) -> None:
  575. # Use busy loop at 100ms intervals, polling until the process is
  576. # not alive. If we find the process is no longer alive, complete
  577. # its cleanup via the blocking wait(). Callers are responsible for
  578. # issuing calls to wait() using a timeout (see _kill_kernel()).
  579. while await self._async_is_alive():
  580. await asyncio.sleep(pollinterval)
  581. class AsyncKernelManager(KernelManager):
  582. """An async kernel manager."""
  583. # the class to create with our `client` method
  584. client_class: DottedObjectName = DottedObjectName(
  585. "jupyter_client.asynchronous.AsyncKernelClient", config=True
  586. )
  587. client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient", config=True)
  588. # The PyZMQ Context to use for communication with the kernel.
  589. context: Instance = Instance(zmq.asyncio.Context)
  590. @default("context")
  591. def _context_default(self) -> zmq.asyncio.Context:
  592. self._created_context = True
  593. return zmq.asyncio.Context()
  594. def client( # type:ignore[override]
  595. self, **kwargs: t.Any
  596. ) -> AsyncKernelClient:
  597. """Get a client for the manager."""
  598. return super().client(**kwargs) # type:ignore[return-value]
  599. _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment]
  600. start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment]
  601. pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment]
  602. post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment]
  603. request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment]
  604. finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment]
  605. cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment]
  606. shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment]
  607. restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment]
  608. _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment]
  609. _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment]
  610. interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment]
  611. signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment]
  612. is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment]
  613. KernelManagerABC.register(KernelManager)
  614. def start_new_kernel(
  615. startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
  616. ) -> t.Tuple[KernelManager, BlockingKernelClient]:
  617. """Start a new kernel, and return its Manager and Client"""
  618. km = KernelManager(kernel_name=kernel_name)
  619. km.start_kernel(**kwargs)
  620. kc = km.client()
  621. kc.start_channels()
  622. try:
  623. kc.wait_for_ready(timeout=startup_timeout)
  624. except RuntimeError:
  625. kc.stop_channels()
  626. km.shutdown_kernel()
  627. raise
  628. return km, kc
  629. async def start_new_async_kernel(
  630. startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
  631. ) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]:
  632. """Start a new kernel, and return its Manager and Client"""
  633. km = AsyncKernelManager(kernel_name=kernel_name)
  634. await km.start_kernel(**kwargs)
  635. kc = km.client()
  636. kc.start_channels()
  637. try:
  638. await kc.wait_for_ready(timeout=startup_timeout)
  639. except RuntimeError:
  640. kc.stop_channels()
  641. await km.shutdown_kernel()
  642. raise
  643. return (km, kc)
  644. @contextmanager
  645. def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]:
  646. """Context manager to create a kernel in a subprocess.
  647. The kernel is shut down when the context exits.
  648. Returns
  649. -------
  650. kernel_client: connected KernelClient instance
  651. """
  652. km, kc = start_new_kernel(**kwargs)
  653. try:
  654. yield kc
  655. finally:
  656. kc.stop_channels()
  657. km.shutdown_kernel(now=True)