iostream.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. """Wrappers for forwarding stdout/stderr over zmq"""
  2. # Copyright (c) IPython Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import asyncio
  5. import atexit
  6. import contextvars
  7. import io
  8. import os
  9. import sys
  10. import threading
  11. import traceback
  12. import warnings
  13. from binascii import b2a_hex
  14. from collections import defaultdict, deque
  15. from collections.abc import Callable
  16. from io import StringIO, TextIOBase
  17. from threading import local
  18. from typing import Any
  19. import zmq
  20. from jupyter_client.session import extract_header
  21. from tornado.ioloop import IOLoop
  22. from zmq.eventloop.zmqstream import ZMQStream
  23. # -----------------------------------------------------------------------------
  24. # Globals
  25. # -----------------------------------------------------------------------------
  26. MASTER = 0
  27. CHILD = 1
  28. PIPE_BUFFER_SIZE = 1000
  29. # -----------------------------------------------------------------------------
  30. # IO classes
  31. # -----------------------------------------------------------------------------
  32. class IOPubThread:
  33. """An object for sending IOPub messages in a background thread
  34. Prevents a blocking main thread from delaying output from threads.
  35. IOPubThread(pub_socket).background_socket is a Socket-API-providing object
  36. whose IO is always run in a thread.
  37. """
  38. def __init__(self, socket, pipe=False, session=False):
  39. """Create IOPub thread
  40. Parameters
  41. ----------
  42. socket : zmq.PUB Socket
  43. the socket on which messages will be sent.
  44. pipe : bool
  45. Whether this process should listen for IOPub messages
  46. piped from subprocesses.
  47. """
  48. self.socket = socket
  49. self.session = session
  50. self._stopped = False
  51. self.background_socket = BackgroundSocket(self)
  52. self._master_pid = os.getpid()
  53. self._pipe_flag = pipe
  54. self.io_loop = IOLoop(make_current=False)
  55. if pipe:
  56. self._setup_pipe_in()
  57. self._local = threading.local()
  58. self._events: deque[Callable[..., Any]] = deque()
  59. self._event_pipes: dict[threading.Thread, Any] = {}
  60. self._event_pipe_gc_lock: threading.Lock = threading.Lock()
  61. self._event_pipe_gc_seconds: float = 10
  62. self._event_pipe_gc_task: asyncio.Task[Any] | None = None
  63. self._setup_event_pipe()
  64. self._setup_xpub_listener()
  65. self.thread = threading.Thread(target=self._thread_main, name="IOPub")
  66. self.thread.daemon = True
  67. self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
  68. self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
  69. self.thread.name = "IOPub"
  70. def _setup_xpub_listener(self):
  71. """Setup listener for XPUB subscription events"""
  72. # Checks the socket is not a DummySocket
  73. if not hasattr(self.socket, "getsockopt"):
  74. return
  75. socket_type = self.socket.getsockopt(zmq.TYPE)
  76. if socket_type == zmq.XPUB:
  77. self._xpub_stream = ZMQStream(self.socket, self.io_loop)
  78. self._xpub_stream.on_recv(self._handle_subscription)
  79. def _handle_subscription(self, frames):
  80. """Handle subscription/unsubscription events from XPUB socket
  81. XPUB sockets receive:
  82. - subscribe: single frame with b'\\x01' + topic
  83. - unsubscribe: single frame with b'\\x00' + topic
  84. """
  85. for frame in frames:
  86. event_type = frame[0]
  87. if event_type == 1:
  88. subscription = frame[1:] if len(frame) > 1 else b""
  89. try:
  90. subscription_str = subscription.decode("utf-8")
  91. except UnicodeDecodeError:
  92. continue
  93. self._send_welcome_message(subscription_str)
  94. def _send_welcome_message(self, subscription):
  95. """Send iopub_welcome message for new subscription
  96. Parameters
  97. ----------
  98. subscription : str
  99. The subscription topic (UTF-8 decoded)
  100. """
  101. # TODO: This early return is for backward-compatibility with ipyparallel.
  102. # This should be removed when ipykernel has been released with support of
  103. # xpub and ipyparallel has been updated to pass the session parameter
  104. # to IOPubThread upon construction.
  105. # (NB: the call to fix is here:
  106. # https://github.com/ipython/ipyparallel/blob/main/ipyparallel/engine/app.py#L679
  107. if self.session is None:
  108. return
  109. content = {"subscription": subscription}
  110. header = self.session.msg_header("iopub_welcome")
  111. msg = {
  112. "header": header,
  113. "parent_header": {},
  114. "metadata": {},
  115. "content": content,
  116. "buffers": [],
  117. }
  118. msg_list = self.session.serialize(msg)
  119. if subscription:
  120. identity = subscription.encode("utf-8")
  121. full_msg = [identity, *msg_list]
  122. else:
  123. full_msg = msg_list
  124. # Send directly on socket (we're already in IO thread context)
  125. self.socket.send_multipart(full_msg)
  126. def _thread_main(self):
  127. """The inner loop that's actually run in a thread"""
  128. def _start_event_gc():
  129. self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())
  130. self.io_loop.run_sync(_start_event_gc)
  131. if not self._stopped:
  132. # avoid race if stop called before start thread gets here
  133. # probably only comes up in tests
  134. self.io_loop.start()
  135. if self._event_pipe_gc_task is not None:
  136. # cancel gc task to avoid pending task warnings
  137. async def _cancel():
  138. self._event_pipe_gc_task.cancel() # type:ignore[union-attr]
  139. if not self._stopped:
  140. self.io_loop.run_sync(_cancel)
  141. else:
  142. self._event_pipe_gc_task.cancel()
  143. self.io_loop.close(all_fds=True)
  144. def _setup_event_pipe(self):
  145. """Create the PULL socket listening for events that should fire in this thread."""
  146. ctx = self.socket.context
  147. pipe_in = ctx.socket(zmq.PULL)
  148. pipe_in.linger = 0
  149. _uuid = b2a_hex(os.urandom(16)).decode("ascii")
  150. iface = self._event_interface = "inproc://%s" % _uuid
  151. pipe_in.bind(iface)
  152. self._event_puller = ZMQStream(pipe_in, self.io_loop)
  153. self._event_puller.on_recv(self._handle_event)
  154. async def _run_event_pipe_gc(self):
  155. """Task to run event pipe gc continuously"""
  156. while True:
  157. await asyncio.sleep(self._event_pipe_gc_seconds)
  158. try:
  159. await self._event_pipe_gc()
  160. except Exception as e:
  161. print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__)
  162. async def _event_pipe_gc(self):
  163. """run a single garbage collection on event pipes"""
  164. if not self._event_pipes:
  165. # don't acquire the lock if there's nothing to do
  166. return
  167. with self._event_pipe_gc_lock:
  168. for thread, socket in list(self._event_pipes.items()):
  169. if not thread.is_alive():
  170. socket.close()
  171. del self._event_pipes[thread]
  172. @property
  173. def _event_pipe(self):
  174. """thread-local event pipe for signaling events that should be processed in the thread"""
  175. try:
  176. event_pipe = self._local.event_pipe
  177. except AttributeError:
  178. # new thread, new event pipe
  179. ctx = self.socket.context
  180. event_pipe = ctx.socket(zmq.PUSH)
  181. event_pipe.linger = 0
  182. event_pipe.connect(self._event_interface)
  183. self._local.event_pipe = event_pipe
  184. # associate event pipes to their threads
  185. # so they can be closed explicitly
  186. # implicit close on __del__ throws a ResourceWarning
  187. with self._event_pipe_gc_lock:
  188. self._event_pipes[threading.current_thread()] = event_pipe
  189. return event_pipe
  190. def _handle_event(self, msg):
  191. """Handle an event on the event pipe
  192. Content of the message is ignored.
  193. Whenever *an* event arrives on the event stream,
  194. *all* waiting events are processed in order.
  195. """
  196. # freeze event count so new writes don't extend the queue
  197. # while we are processing
  198. n_events = len(self._events)
  199. for _ in range(n_events):
  200. event_f = self._events.popleft()
  201. event_f()
  202. def _setup_pipe_in(self):
  203. """setup listening pipe for IOPub from forked subprocesses"""
  204. ctx = self.socket.context
  205. # use UUID to authenticate pipe messages
  206. self._pipe_uuid = os.urandom(16)
  207. pipe_in = ctx.socket(zmq.PULL)
  208. pipe_in.linger = 0
  209. try:
  210. self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
  211. except zmq.ZMQError as e:
  212. warnings.warn(
  213. "Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e
  214. + "\nsubprocess output will be unavailable.",
  215. stacklevel=2,
  216. )
  217. self._pipe_flag = False
  218. pipe_in.close()
  219. return
  220. self._pipe_in = ZMQStream(pipe_in, self.io_loop)
  221. self._pipe_in.on_recv(self._handle_pipe_msg)
  222. def _handle_pipe_msg(self, msg):
  223. """handle a pipe message from a subprocess"""
  224. if not self._pipe_flag or not self._is_master_process():
  225. return
  226. if msg[0] != self._pipe_uuid:
  227. print("Bad pipe message: %s", msg, file=sys.__stderr__)
  228. return
  229. self.send_multipart(msg[1:])
  230. def _setup_pipe_out(self):
  231. # must be new context after fork
  232. ctx = zmq.Context()
  233. pipe_out = ctx.socket(zmq.PUSH)
  234. pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
  235. pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
  236. return ctx, pipe_out
  237. def _is_master_process(self):
  238. return os.getpid() == self._master_pid
  239. def _check_mp_mode(self):
  240. """check for forks, and switch to zmq pipeline if necessary"""
  241. if not self._pipe_flag or self._is_master_process():
  242. return MASTER
  243. return CHILD
  244. def start(self):
  245. """Start the IOPub thread"""
  246. self.thread.name = "IOPub"
  247. self.thread.start()
  248. # make sure we don't prevent process exit
  249. # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
  250. atexit.register(self.stop)
  251. def stop(self):
  252. """Stop the IOPub thread"""
  253. self._stopped = True
  254. if not self.thread.is_alive():
  255. return
  256. self.io_loop.add_callback(self.io_loop.stop)
  257. self.thread.join(timeout=30)
  258. if self.thread.is_alive():
  259. # avoid infinite hang if stop fails
  260. msg = "IOPub thread did not terminate in 30 seconds"
  261. raise TimeoutError(msg)
  262. # close *all* event pipes, created in any thread
  263. # event pipes can only be used from other threads while self.thread.is_alive()
  264. # so after thread.join, this should be safe
  265. for _thread, event_pipe in self._event_pipes.items():
  266. event_pipe.close()
  267. def close(self):
  268. """Close the IOPub thread."""
  269. if self.closed:
  270. return
  271. self.socket.close()
  272. self.socket = None
  273. @property
  274. def closed(self):
  275. return self.socket is None
  276. def schedule(self, f):
  277. """Schedule a function to be called in our IO thread.
  278. If the thread is not running, call immediately.
  279. """
  280. if self.thread.is_alive():
  281. self._events.append(f)
  282. # wake event thread (message content is ignored)
  283. self._event_pipe.send(b"")
  284. else:
  285. f()
  286. def send_multipart(self, *args, **kwargs):
  287. """send_multipart schedules actual zmq send in my thread.
  288. If my thread isn't running (e.g. forked process), send immediately.
  289. """
  290. self.schedule(lambda: self._really_send(*args, **kwargs))
  291. def _really_send(self, msg, *args, **kwargs):
  292. """The callback that actually sends messages"""
  293. if self.closed:
  294. return
  295. mp_mode = self._check_mp_mode()
  296. if mp_mode != CHILD:
  297. # we are master, do a regular send
  298. self.socket.send_multipart(msg, *args, **kwargs)
  299. else:
  300. # we are a child, pipe to master
  301. # new context/socket for every pipe-out
  302. # since forks don't teardown politely, use ctx.term to ensure send has completed
  303. ctx, pipe_out = self._setup_pipe_out()
  304. pipe_out.send_multipart([self._pipe_uuid, *msg], *args, **kwargs)
  305. pipe_out.close()
  306. ctx.term()
  307. class BackgroundSocket:
  308. """Wrapper around IOPub thread that provides zmq send[_multipart]"""
  309. io_thread = None
  310. def __init__(self, io_thread):
  311. """Initialize the socket."""
  312. self.io_thread = io_thread
  313. def __getattr__(self, attr):
  314. """Wrap socket attr access for backward-compatibility"""
  315. if attr.startswith("__") and attr.endswith("__"):
  316. # don't wrap magic methods
  317. super().__getattr__(attr) # type:ignore[misc]
  318. assert self.io_thread is not None
  319. if hasattr(self.io_thread.socket, attr):
  320. warnings.warn(
  321. f"Accessing zmq Socket attribute {attr} on BackgroundSocket"
  322. f" is deprecated since ipykernel 4.3.0"
  323. f" use .io_thread.socket.{attr}",
  324. DeprecationWarning,
  325. stacklevel=2,
  326. )
  327. return getattr(self.io_thread.socket, attr)
  328. return super().__getattr__(attr) # type:ignore[misc]
  329. def __setattr__(self, attr, value):
  330. """Set an attribute on the socket."""
  331. if attr == "io_thread" or (attr.startswith("__") and attr.endswith("__")):
  332. super().__setattr__(attr, value)
  333. else:
  334. warnings.warn(
  335. f"Setting zmq Socket attribute {attr} on BackgroundSocket"
  336. f" is deprecated since ipykernel 4.3.0"
  337. f" use .io_thread.socket.{attr}",
  338. DeprecationWarning,
  339. stacklevel=2,
  340. )
  341. assert self.io_thread is not None
  342. setattr(self.io_thread.socket, attr, value)
  343. def send(self, msg, *args, **kwargs):
  344. """Send a message to the socket."""
  345. return self.send_multipart([msg], *args, **kwargs)
  346. def send_multipart(self, *args, **kwargs):
  347. """Schedule send in IO thread"""
  348. assert self.io_thread is not None
  349. return self.io_thread.send_multipart(*args, **kwargs)
  350. class OutStream(TextIOBase):
  351. """A file like object that publishes the stream to a 0MQ PUB socket.
  352. Output is handed off to an IO Thread
  353. """
  354. # timeout for flush to avoid infinite hang
  355. # in case of misbehavior
  356. flush_timeout = 10
  357. # The time interval between automatic flushes, in seconds.
  358. flush_interval = 0.2
  359. topic = None
  360. encoding = "UTF-8"
  361. _exc: Any | None = None
  362. def fileno(self):
  363. """
  364. Things like subprocess will peak and write to the fileno() of stderr/stdout.
  365. """
  366. if getattr(self, "_original_stdstream_copy", None) is not None:
  367. return self._original_stdstream_copy
  368. msg = "fileno"
  369. raise io.UnsupportedOperation(msg)
  370. def _watch_pipe_fd(self):
  371. """
  372. We've redirected standards streams 0 and 1 into a pipe.
  373. We need to watch in a thread and redirect them to the right places.
  374. 1) the ZMQ channels to show in notebook interfaces,
  375. 2) the original stdout/err, to capture errors in terminals.
  376. We cannot schedule this on the ioloop thread, as this might be blocking.
  377. """
  378. if self._fid is None:
  379. return
  380. try:
  381. bts = os.read(self._fid, PIPE_BUFFER_SIZE)
  382. while bts and self._should_watch:
  383. self.write(bts.decode(errors="replace"))
  384. os.write(self._original_stdstream_copy, bts)
  385. bts = os.read(self._fid, PIPE_BUFFER_SIZE)
  386. except Exception:
  387. self._exc = sys.exc_info()
  388. def __init__(
  389. self,
  390. session,
  391. pub_thread,
  392. name,
  393. pipe=None,
  394. echo=None,
  395. *,
  396. watchfd=True,
  397. isatty=False,
  398. ):
  399. """
  400. Parameters
  401. ----------
  402. session : object
  403. the session object
  404. pub_thread : threading.Thread
  405. the publication thread
  406. name : str {'stderr', 'stdout'}
  407. the name of the standard stream to replace
  408. pipe : object
  409. the pipe object
  410. echo : bool
  411. whether to echo output
  412. watchfd : bool (default, True)
  413. Watch the file descriptor corresponding to the replaced stream.
  414. This is useful if you know some underlying code will write directly
  415. the file descriptor by its number. It will spawn a watching thread,
  416. that will swap the give file descriptor for a pipe, read from the
  417. pipe, and insert this into the current Stream.
  418. isatty : bool (default, False)
  419. Indication of whether this stream has terminal capabilities (e.g. can handle colors)
  420. """
  421. if pipe is not None:
  422. warnings.warn(
  423. "pipe argument to OutStream is deprecated and ignored since ipykernel 4.2.3.",
  424. DeprecationWarning,
  425. stacklevel=2,
  426. )
  427. # This is necessary for compatibility with Python built-in streams
  428. self.session = session
  429. self._fid = None
  430. if not isinstance(pub_thread, IOPubThread):
  431. # Backward-compat: given socket, not thread. Wrap in a thread.
  432. warnings.warn(
  433. "Since IPykernel 4.3, OutStream should be created with "
  434. "IOPubThread, not %r" % pub_thread,
  435. DeprecationWarning,
  436. stacklevel=2,
  437. )
  438. pub_thread = IOPubThread(pub_thread, session=self.session)
  439. pub_thread.start()
  440. self.pub_thread = pub_thread
  441. self.name = name
  442. self.topic = b"stream." + name.encode()
  443. self._parent_header: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
  444. "parent_header"
  445. )
  446. self._parent_header.set({})
  447. self._parent_header_global = {}
  448. self._master_pid = os.getpid()
  449. self._flush_pending = False
  450. self._subprocess_flush_pending = False
  451. self._io_loop = pub_thread.io_loop
  452. self._buffer_lock = threading.RLock()
  453. self._buffers = defaultdict(StringIO)
  454. self.echo = None
  455. self._isatty = bool(isatty)
  456. self._should_watch = False
  457. self._local = local()
  458. if (
  459. (
  460. watchfd
  461. and (
  462. (sys.platform.startswith("linux") or sys.platform.startswith("darwin"))
  463. # Pytest set its own capture. Don't redirect from within pytest.
  464. and ("PYTEST_CURRENT_TEST" not in os.environ)
  465. )
  466. )
  467. # allow forcing watchfd (mainly for tests)
  468. or watchfd == "force"
  469. ):
  470. self._should_watch = True
  471. self._setup_stream_redirects(name)
  472. if echo:
  473. if hasattr(echo, "read") and hasattr(echo, "write"):
  474. # make sure we aren't trying to echo on the FD we're watching!
  475. # that would cause an infinite loop, always echoing on itself
  476. if self._should_watch:
  477. try:
  478. echo_fd = echo.fileno()
  479. except Exception:
  480. echo_fd = None
  481. if echo_fd is not None and echo_fd == self._original_stdstream_fd:
  482. # echo on the _copy_ we made during
  483. # this is the actual terminal FD now
  484. echo = io.TextIOWrapper(
  485. io.FileIO(
  486. self._original_stdstream_copy,
  487. "w",
  488. )
  489. )
  490. self.echo = echo
  491. else:
  492. msg = "echo argument must be a file-like object"
  493. raise ValueError(msg)
  494. @property
  495. def parent_header(self):
  496. try:
  497. # asyncio or thread-specific
  498. return self._parent_header.get()
  499. except LookupError:
  500. # global (fallback)
  501. return self._parent_header_global
  502. @parent_header.setter
  503. def parent_header(self, value):
  504. self._parent_header_global = value
  505. return self._parent_header.set(value)
  506. def isatty(self):
  507. """Return a bool indicating whether this is an 'interactive' stream.
  508. Returns:
  509. Boolean
  510. """
  511. return self._isatty
  512. def _setup_stream_redirects(self, name):
  513. pr, pw = os.pipe()
  514. fno = self._original_stdstream_fd = getattr(sys, name).fileno()
  515. self._original_stdstream_copy = os.dup(fno)
  516. os.dup2(pw, fno)
  517. self._fid = pr
  518. self._exc = None
  519. self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd)
  520. self.watch_fd_thread.daemon = True
  521. self.watch_fd_thread.start()
  522. def _is_master_process(self):
  523. return os.getpid() == self._master_pid
  524. def set_parent(self, parent):
  525. """Set the parent header."""
  526. self.parent_header = extract_header(parent)
  527. def close(self):
  528. """Close the stream."""
  529. if self._should_watch:
  530. self._should_watch = False
  531. # thread won't wake unless there's something to read
  532. # writing something after _should_watch will not be echoed
  533. os.write(self._original_stdstream_fd, b"\0")
  534. self.watch_fd_thread.join()
  535. # restore original FDs
  536. os.dup2(self._original_stdstream_copy, self._original_stdstream_fd)
  537. os.close(self._original_stdstream_copy)
  538. if self._exc:
  539. etype, value, tb = self._exc
  540. traceback.print_exception(etype, value, tb)
  541. self.pub_thread = None
  542. @property
  543. def closed(self):
  544. return self.pub_thread is None
  545. def _schedule_flush(self):
  546. """schedule a flush in the IO thread
  547. call this on write, to indicate that flush should be called soon.
  548. """
  549. if self._flush_pending:
  550. return
  551. self._flush_pending = True
  552. # add_timeout has to be handed to the io thread via event pipe
  553. def _schedule_in_thread():
  554. self._io_loop.call_later(self.flush_interval, self._flush)
  555. self.pub_thread.schedule(_schedule_in_thread)
  556. def flush(self):
  557. """trigger actual zmq send
  558. send will happen in the background thread
  559. """
  560. if (
  561. self.pub_thread
  562. and self.pub_thread.thread is not None
  563. and self.pub_thread.thread.is_alive()
  564. and self.pub_thread.thread.ident != threading.current_thread().ident
  565. ):
  566. # request flush on the background thread
  567. self.pub_thread.schedule(self._flush)
  568. # wait for flush to actually get through, if we can.
  569. evt = threading.Event()
  570. self.pub_thread.schedule(evt.set)
  571. # and give a timeout to avoid
  572. if not evt.wait(self.flush_timeout):
  573. # write directly to __stderr__ instead of warning because
  574. # if this is happening sys.stderr may be the problem.
  575. print("IOStream.flush timed out", file=sys.__stderr__)
  576. else:
  577. self._flush()
  578. def _flush(self):
  579. """This is where the actual send happens.
  580. _flush should generally be called in the IO thread,
  581. unless the thread has been destroyed (e.g. forked subprocess).
  582. """
  583. self._flush_pending = False
  584. self._subprocess_flush_pending = False
  585. if self.echo is not None:
  586. try:
  587. self.echo.flush()
  588. except OSError as e:
  589. if self.echo is not sys.__stderr__:
  590. print(f"Flush failed: {e}", file=sys.__stderr__)
  591. for parent, data in self._flush_buffers():
  592. if data:
  593. # FIXME: this disables Session's fork-safe check,
  594. # since pub_thread is itself fork-safe.
  595. # There should be a better way to do this.
  596. self.session.pid = os.getpid()
  597. content = {"name": self.name, "text": data}
  598. msg = self.session.msg("stream", content, parent=parent)
  599. # Each transform either returns a new
  600. # message or None. If None is returned,
  601. # the message has been 'used' and we return.
  602. for hook in self._hooks:
  603. msg = hook(msg)
  604. if msg is None:
  605. return
  606. self.session.send(
  607. self.pub_thread,
  608. msg,
  609. ident=self.topic,
  610. )
  611. def write(self, string: str) -> int | None: # type:ignore[override]
  612. """Write to current stream after encoding if necessary
  613. Returns
  614. -------
  615. len : int
  616. number of items from input parameter written to stream.
  617. """
  618. parent = self.parent_header
  619. if not isinstance(string, str):
  620. msg = f"write() argument must be str, not {type(string)}" # type:ignore[unreachable]
  621. raise TypeError(msg)
  622. if self.echo is not None:
  623. try:
  624. self.echo.write(string)
  625. except OSError as e:
  626. if self.echo is not sys.__stderr__:
  627. print(f"Write failed: {e}", file=sys.__stderr__)
  628. if self.pub_thread is None:
  629. msg = "I/O operation on closed file"
  630. raise ValueError(msg)
  631. is_child = not self._is_master_process()
  632. # only touch the buffer in the IO thread to avoid races
  633. with self._buffer_lock:
  634. self._buffers[frozenset(parent.items())].write(string)
  635. if is_child:
  636. # mp.Pool cannot be trusted to flush promptly (or ever),
  637. # and this helps.
  638. if self._subprocess_flush_pending:
  639. return None
  640. self._subprocess_flush_pending = True
  641. # We can not rely on self._io_loop.call_later from a subprocess
  642. self.pub_thread.schedule(self._flush)
  643. else:
  644. self._schedule_flush()
  645. return len(string)
  646. def writelines(self, sequence):
  647. """Write lines to the stream."""
  648. if self.pub_thread is None:
  649. msg = "I/O operation on closed file"
  650. raise ValueError(msg)
  651. for string in sequence:
  652. self.write(string)
  653. def writable(self):
  654. """Test whether the stream is writable."""
  655. return True
  656. def _flush_buffers(self):
  657. """clear the current buffer and return the current buffer data."""
  658. buffers = self._rotate_buffers()
  659. for frozen_parent, buffer in buffers.items():
  660. data = buffer.getvalue()
  661. buffer.close()
  662. yield dict(frozen_parent), data
  663. def _rotate_buffers(self):
  664. """Returns the current buffer and replaces it with an empty buffer."""
  665. with self._buffer_lock:
  666. old_buffers = self._buffers
  667. self._buffers = defaultdict(StringIO)
  668. return old_buffers
  669. @property
  670. def _hooks(self):
  671. if not hasattr(self._local, "hooks"):
  672. # create new list for a new thread
  673. self._local.hooks = []
  674. return self._local.hooks
  675. def register_hook(self, hook):
  676. """
  677. Registers a hook with the thread-local storage.
  678. Parameters
  679. ----------
  680. hook : Any callable object
  681. Returns
  682. -------
  683. Either a publishable message, or `None`.
  684. The hook callable must return a message from
  685. the __call__ method if they still require the
  686. `session.send` method to be called after transformation.
  687. Returning `None` will halt that execution path, and
  688. session.send will not be called.
  689. """
  690. self._hooks.append(hook)
  691. def unregister_hook(self, hook):
  692. """
  693. Un-registers a hook with the thread-local storage.
  694. Parameters
  695. ----------
  696. hook : Any callable object which has previously been
  697. registered as a hook.
  698. Returns
  699. -------
  700. bool - `True` if the hook was removed, `False` if it wasn't
  701. found.
  702. """
  703. try:
  704. self._hooks.remove(hook)
  705. return True
  706. except ValueError:
  707. return False