| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835 |
- """Wrappers for forwarding stdout/stderr over zmq"""
- # Copyright (c) IPython Development Team.
- # Distributed under the terms of the Modified BSD License.
- import asyncio
- import atexit
- import contextvars
- import io
- import os
- import sys
- import threading
- import traceback
- import warnings
- from binascii import b2a_hex
- from collections import defaultdict, deque
- from collections.abc import Callable
- from io import StringIO, TextIOBase
- from threading import local
- from typing import Any
- import zmq
- from jupyter_client.session import extract_header
- from tornado.ioloop import IOLoop
- from zmq.eventloop.zmqstream import ZMQStream
- # -----------------------------------------------------------------------------
- # Globals
- # -----------------------------------------------------------------------------
- MASTER = 0
- CHILD = 1
- PIPE_BUFFER_SIZE = 1000
- # -----------------------------------------------------------------------------
- # IO classes
- # -----------------------------------------------------------------------------
- class IOPubThread:
- """An object for sending IOPub messages in a background thread
- Prevents a blocking main thread from delaying output from threads.
- IOPubThread(pub_socket).background_socket is a Socket-API-providing object
- whose IO is always run in a thread.
- """
- def __init__(self, socket, pipe=False, session=False):
- """Create IOPub thread
- Parameters
- ----------
- socket : zmq.PUB Socket
- the socket on which messages will be sent.
- pipe : bool
- Whether this process should listen for IOPub messages
- piped from subprocesses.
- """
- self.socket = socket
- self.session = session
- self._stopped = False
- self.background_socket = BackgroundSocket(self)
- self._master_pid = os.getpid()
- self._pipe_flag = pipe
- self.io_loop = IOLoop(make_current=False)
- if pipe:
- self._setup_pipe_in()
- self._local = threading.local()
- self._events: deque[Callable[..., Any]] = deque()
- self._event_pipes: dict[threading.Thread, Any] = {}
- self._event_pipe_gc_lock: threading.Lock = threading.Lock()
- self._event_pipe_gc_seconds: float = 10
- self._event_pipe_gc_task: asyncio.Task[Any] | None = None
- self._setup_event_pipe()
- self._setup_xpub_listener()
- self.thread = threading.Thread(target=self._thread_main, name="IOPub")
- self.thread.daemon = True
- self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
- self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
- self.thread.name = "IOPub"
- def _setup_xpub_listener(self):
- """Setup listener for XPUB subscription events"""
- # Checks the socket is not a DummySocket
- if not hasattr(self.socket, "getsockopt"):
- return
- socket_type = self.socket.getsockopt(zmq.TYPE)
- if socket_type == zmq.XPUB:
- self._xpub_stream = ZMQStream(self.socket, self.io_loop)
- self._xpub_stream.on_recv(self._handle_subscription)
- def _handle_subscription(self, frames):
- """Handle subscription/unsubscription events from XPUB socket
- XPUB sockets receive:
- - subscribe: single frame with b'\\x01' + topic
- - unsubscribe: single frame with b'\\x00' + topic
- """
- for frame in frames:
- event_type = frame[0]
- if event_type == 1:
- subscription = frame[1:] if len(frame) > 1 else b""
- try:
- subscription_str = subscription.decode("utf-8")
- except UnicodeDecodeError:
- continue
- self._send_welcome_message(subscription_str)
- def _send_welcome_message(self, subscription):
- """Send iopub_welcome message for new subscription
- Parameters
- ----------
- subscription : str
- The subscription topic (UTF-8 decoded)
- """
- # TODO: This early return is for backward-compatibility with ipyparallel.
- # This should be removed when ipykernel has been released with support of
- # xpub and ipyparallel has been updated to pass the session parameter
- # to IOPubThread upon construction.
- # (NB: the call to fix is here:
- # https://github.com/ipython/ipyparallel/blob/main/ipyparallel/engine/app.py#L679
- if self.session is None:
- return
- content = {"subscription": subscription}
- header = self.session.msg_header("iopub_welcome")
- msg = {
- "header": header,
- "parent_header": {},
- "metadata": {},
- "content": content,
- "buffers": [],
- }
- msg_list = self.session.serialize(msg)
- if subscription:
- identity = subscription.encode("utf-8")
- full_msg = [identity, *msg_list]
- else:
- full_msg = msg_list
- # Send directly on socket (we're already in IO thread context)
- self.socket.send_multipart(full_msg)
- def _thread_main(self):
- """The inner loop that's actually run in a thread"""
- def _start_event_gc():
- self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())
- self.io_loop.run_sync(_start_event_gc)
- if not self._stopped:
- # avoid race if stop called before start thread gets here
- # probably only comes up in tests
- self.io_loop.start()
- if self._event_pipe_gc_task is not None:
- # cancel gc task to avoid pending task warnings
- async def _cancel():
- self._event_pipe_gc_task.cancel() # type:ignore[union-attr]
- if not self._stopped:
- self.io_loop.run_sync(_cancel)
- else:
- self._event_pipe_gc_task.cancel()
- self.io_loop.close(all_fds=True)
- def _setup_event_pipe(self):
- """Create the PULL socket listening for events that should fire in this thread."""
- ctx = self.socket.context
- pipe_in = ctx.socket(zmq.PULL)
- pipe_in.linger = 0
- _uuid = b2a_hex(os.urandom(16)).decode("ascii")
- iface = self._event_interface = "inproc://%s" % _uuid
- pipe_in.bind(iface)
- self._event_puller = ZMQStream(pipe_in, self.io_loop)
- self._event_puller.on_recv(self._handle_event)
- async def _run_event_pipe_gc(self):
- """Task to run event pipe gc continuously"""
- while True:
- await asyncio.sleep(self._event_pipe_gc_seconds)
- try:
- await self._event_pipe_gc()
- except Exception as e:
- print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__)
- async def _event_pipe_gc(self):
- """run a single garbage collection on event pipes"""
- if not self._event_pipes:
- # don't acquire the lock if there's nothing to do
- return
- with self._event_pipe_gc_lock:
- for thread, socket in list(self._event_pipes.items()):
- if not thread.is_alive():
- socket.close()
- del self._event_pipes[thread]
- @property
- def _event_pipe(self):
- """thread-local event pipe for signaling events that should be processed in the thread"""
- try:
- event_pipe = self._local.event_pipe
- except AttributeError:
- # new thread, new event pipe
- ctx = self.socket.context
- event_pipe = ctx.socket(zmq.PUSH)
- event_pipe.linger = 0
- event_pipe.connect(self._event_interface)
- self._local.event_pipe = event_pipe
- # associate event pipes to their threads
- # so they can be closed explicitly
- # implicit close on __del__ throws a ResourceWarning
- with self._event_pipe_gc_lock:
- self._event_pipes[threading.current_thread()] = event_pipe
- return event_pipe
- def _handle_event(self, msg):
- """Handle an event on the event pipe
- Content of the message is ignored.
- Whenever *an* event arrives on the event stream,
- *all* waiting events are processed in order.
- """
- # freeze event count so new writes don't extend the queue
- # while we are processing
- n_events = len(self._events)
- for _ in range(n_events):
- event_f = self._events.popleft()
- event_f()
- def _setup_pipe_in(self):
- """setup listening pipe for IOPub from forked subprocesses"""
- ctx = self.socket.context
- # use UUID to authenticate pipe messages
- self._pipe_uuid = os.urandom(16)
- pipe_in = ctx.socket(zmq.PULL)
- pipe_in.linger = 0
- try:
- self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
- except zmq.ZMQError as e:
- warnings.warn(
- "Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e
- + "\nsubprocess output will be unavailable.",
- stacklevel=2,
- )
- self._pipe_flag = False
- pipe_in.close()
- return
- self._pipe_in = ZMQStream(pipe_in, self.io_loop)
- self._pipe_in.on_recv(self._handle_pipe_msg)
- def _handle_pipe_msg(self, msg):
- """handle a pipe message from a subprocess"""
- if not self._pipe_flag or not self._is_master_process():
- return
- if msg[0] != self._pipe_uuid:
- print("Bad pipe message: %s", msg, file=sys.__stderr__)
- return
- self.send_multipart(msg[1:])
- def _setup_pipe_out(self):
- # must be new context after fork
- ctx = zmq.Context()
- pipe_out = ctx.socket(zmq.PUSH)
- pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
- pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
- return ctx, pipe_out
- def _is_master_process(self):
- return os.getpid() == self._master_pid
- def _check_mp_mode(self):
- """check for forks, and switch to zmq pipeline if necessary"""
- if not self._pipe_flag or self._is_master_process():
- return MASTER
- return CHILD
- def start(self):
- """Start the IOPub thread"""
- self.thread.name = "IOPub"
- self.thread.start()
- # make sure we don't prevent process exit
- # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
- atexit.register(self.stop)
- def stop(self):
- """Stop the IOPub thread"""
- self._stopped = True
- if not self.thread.is_alive():
- return
- self.io_loop.add_callback(self.io_loop.stop)
- self.thread.join(timeout=30)
- if self.thread.is_alive():
- # avoid infinite hang if stop fails
- msg = "IOPub thread did not terminate in 30 seconds"
- raise TimeoutError(msg)
- # close *all* event pipes, created in any thread
- # event pipes can only be used from other threads while self.thread.is_alive()
- # so after thread.join, this should be safe
- for _thread, event_pipe in self._event_pipes.items():
- event_pipe.close()
- def close(self):
- """Close the IOPub thread."""
- if self.closed:
- return
- self.socket.close()
- self.socket = None
- @property
- def closed(self):
- return self.socket is None
- def schedule(self, f):
- """Schedule a function to be called in our IO thread.
- If the thread is not running, call immediately.
- """
- if self.thread.is_alive():
- self._events.append(f)
- # wake event thread (message content is ignored)
- self._event_pipe.send(b"")
- else:
- f()
- def send_multipart(self, *args, **kwargs):
- """send_multipart schedules actual zmq send in my thread.
- If my thread isn't running (e.g. forked process), send immediately.
- """
- self.schedule(lambda: self._really_send(*args, **kwargs))
- def _really_send(self, msg, *args, **kwargs):
- """The callback that actually sends messages"""
- if self.closed:
- return
- mp_mode = self._check_mp_mode()
- if mp_mode != CHILD:
- # we are master, do a regular send
- self.socket.send_multipart(msg, *args, **kwargs)
- else:
- # we are a child, pipe to master
- # new context/socket for every pipe-out
- # since forks don't teardown politely, use ctx.term to ensure send has completed
- ctx, pipe_out = self._setup_pipe_out()
- pipe_out.send_multipart([self._pipe_uuid, *msg], *args, **kwargs)
- pipe_out.close()
- ctx.term()
- class BackgroundSocket:
- """Wrapper around IOPub thread that provides zmq send[_multipart]"""
- io_thread = None
- def __init__(self, io_thread):
- """Initialize the socket."""
- self.io_thread = io_thread
- def __getattr__(self, attr):
- """Wrap socket attr access for backward-compatibility"""
- if attr.startswith("__") and attr.endswith("__"):
- # don't wrap magic methods
- super().__getattr__(attr) # type:ignore[misc]
- assert self.io_thread is not None
- if hasattr(self.io_thread.socket, attr):
- warnings.warn(
- f"Accessing zmq Socket attribute {attr} on BackgroundSocket"
- f" is deprecated since ipykernel 4.3.0"
- f" use .io_thread.socket.{attr}",
- DeprecationWarning,
- stacklevel=2,
- )
- return getattr(self.io_thread.socket, attr)
- return super().__getattr__(attr) # type:ignore[misc]
- def __setattr__(self, attr, value):
- """Set an attribute on the socket."""
- if attr == "io_thread" or (attr.startswith("__") and attr.endswith("__")):
- super().__setattr__(attr, value)
- else:
- warnings.warn(
- f"Setting zmq Socket attribute {attr} on BackgroundSocket"
- f" is deprecated since ipykernel 4.3.0"
- f" use .io_thread.socket.{attr}",
- DeprecationWarning,
- stacklevel=2,
- )
- assert self.io_thread is not None
- setattr(self.io_thread.socket, attr, value)
- def send(self, msg, *args, **kwargs):
- """Send a message to the socket."""
- return self.send_multipart([msg], *args, **kwargs)
- def send_multipart(self, *args, **kwargs):
- """Schedule send in IO thread"""
- assert self.io_thread is not None
- return self.io_thread.send_multipart(*args, **kwargs)
- class OutStream(TextIOBase):
- """A file like object that publishes the stream to a 0MQ PUB socket.
- Output is handed off to an IO Thread
- """
- # timeout for flush to avoid infinite hang
- # in case of misbehavior
- flush_timeout = 10
- # The time interval between automatic flushes, in seconds.
- flush_interval = 0.2
- topic = None
- encoding = "UTF-8"
- _exc: Any | None = None
- def fileno(self):
- """
- Things like subprocess will peak and write to the fileno() of stderr/stdout.
- """
- if getattr(self, "_original_stdstream_copy", None) is not None:
- return self._original_stdstream_copy
- msg = "fileno"
- raise io.UnsupportedOperation(msg)
- def _watch_pipe_fd(self):
- """
- We've redirected standards streams 0 and 1 into a pipe.
- We need to watch in a thread and redirect them to the right places.
- 1) the ZMQ channels to show in notebook interfaces,
- 2) the original stdout/err, to capture errors in terminals.
- We cannot schedule this on the ioloop thread, as this might be blocking.
- """
- if self._fid is None:
- return
- try:
- bts = os.read(self._fid, PIPE_BUFFER_SIZE)
- while bts and self._should_watch:
- self.write(bts.decode(errors="replace"))
- os.write(self._original_stdstream_copy, bts)
- bts = os.read(self._fid, PIPE_BUFFER_SIZE)
- except Exception:
- self._exc = sys.exc_info()
- def __init__(
- self,
- session,
- pub_thread,
- name,
- pipe=None,
- echo=None,
- *,
- watchfd=True,
- isatty=False,
- ):
- """
- Parameters
- ----------
- session : object
- the session object
- pub_thread : threading.Thread
- the publication thread
- name : str {'stderr', 'stdout'}
- the name of the standard stream to replace
- pipe : object
- the pipe object
- echo : bool
- whether to echo output
- watchfd : bool (default, True)
- Watch the file descriptor corresponding to the replaced stream.
- This is useful if you know some underlying code will write directly
- the file descriptor by its number. It will spawn a watching thread,
- that will swap the give file descriptor for a pipe, read from the
- pipe, and insert this into the current Stream.
- isatty : bool (default, False)
- Indication of whether this stream has terminal capabilities (e.g. can handle colors)
- """
- if pipe is not None:
- warnings.warn(
- "pipe argument to OutStream is deprecated and ignored since ipykernel 4.2.3.",
- DeprecationWarning,
- stacklevel=2,
- )
- # This is necessary for compatibility with Python built-in streams
- self.session = session
- self._fid = None
- if not isinstance(pub_thread, IOPubThread):
- # Backward-compat: given socket, not thread. Wrap in a thread.
- warnings.warn(
- "Since IPykernel 4.3, OutStream should be created with "
- "IOPubThread, not %r" % pub_thread,
- DeprecationWarning,
- stacklevel=2,
- )
- pub_thread = IOPubThread(pub_thread, session=self.session)
- pub_thread.start()
- self.pub_thread = pub_thread
- self.name = name
- self.topic = b"stream." + name.encode()
- self._parent_header: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
- "parent_header"
- )
- self._parent_header.set({})
- self._parent_header_global = {}
- self._master_pid = os.getpid()
- self._flush_pending = False
- self._subprocess_flush_pending = False
- self._io_loop = pub_thread.io_loop
- self._buffer_lock = threading.RLock()
- self._buffers = defaultdict(StringIO)
- self.echo = None
- self._isatty = bool(isatty)
- self._should_watch = False
- self._local = local()
- if (
- (
- watchfd
- and (
- (sys.platform.startswith("linux") or sys.platform.startswith("darwin"))
- # Pytest set its own capture. Don't redirect from within pytest.
- and ("PYTEST_CURRENT_TEST" not in os.environ)
- )
- )
- # allow forcing watchfd (mainly for tests)
- or watchfd == "force"
- ):
- self._should_watch = True
- self._setup_stream_redirects(name)
- if echo:
- if hasattr(echo, "read") and hasattr(echo, "write"):
- # make sure we aren't trying to echo on the FD we're watching!
- # that would cause an infinite loop, always echoing on itself
- if self._should_watch:
- try:
- echo_fd = echo.fileno()
- except Exception:
- echo_fd = None
- if echo_fd is not None and echo_fd == self._original_stdstream_fd:
- # echo on the _copy_ we made during
- # this is the actual terminal FD now
- echo = io.TextIOWrapper(
- io.FileIO(
- self._original_stdstream_copy,
- "w",
- )
- )
- self.echo = echo
- else:
- msg = "echo argument must be a file-like object"
- raise ValueError(msg)
- @property
- def parent_header(self):
- try:
- # asyncio or thread-specific
- return self._parent_header.get()
- except LookupError:
- # global (fallback)
- return self._parent_header_global
- @parent_header.setter
- def parent_header(self, value):
- self._parent_header_global = value
- return self._parent_header.set(value)
- def isatty(self):
- """Return a bool indicating whether this is an 'interactive' stream.
- Returns:
- Boolean
- """
- return self._isatty
- def _setup_stream_redirects(self, name):
- pr, pw = os.pipe()
- fno = self._original_stdstream_fd = getattr(sys, name).fileno()
- self._original_stdstream_copy = os.dup(fno)
- os.dup2(pw, fno)
- self._fid = pr
- self._exc = None
- self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd)
- self.watch_fd_thread.daemon = True
- self.watch_fd_thread.start()
- def _is_master_process(self):
- return os.getpid() == self._master_pid
- def set_parent(self, parent):
- """Set the parent header."""
- self.parent_header = extract_header(parent)
- def close(self):
- """Close the stream."""
- if self._should_watch:
- self._should_watch = False
- # thread won't wake unless there's something to read
- # writing something after _should_watch will not be echoed
- os.write(self._original_stdstream_fd, b"\0")
- self.watch_fd_thread.join()
- # restore original FDs
- os.dup2(self._original_stdstream_copy, self._original_stdstream_fd)
- os.close(self._original_stdstream_copy)
- if self._exc:
- etype, value, tb = self._exc
- traceback.print_exception(etype, value, tb)
- self.pub_thread = None
- @property
- def closed(self):
- return self.pub_thread is None
- def _schedule_flush(self):
- """schedule a flush in the IO thread
- call this on write, to indicate that flush should be called soon.
- """
- if self._flush_pending:
- return
- self._flush_pending = True
- # add_timeout has to be handed to the io thread via event pipe
- def _schedule_in_thread():
- self._io_loop.call_later(self.flush_interval, self._flush)
- self.pub_thread.schedule(_schedule_in_thread)
- def flush(self):
- """trigger actual zmq send
- send will happen in the background thread
- """
- if (
- self.pub_thread
- and self.pub_thread.thread is not None
- and self.pub_thread.thread.is_alive()
- and self.pub_thread.thread.ident != threading.current_thread().ident
- ):
- # request flush on the background thread
- self.pub_thread.schedule(self._flush)
- # wait for flush to actually get through, if we can.
- evt = threading.Event()
- self.pub_thread.schedule(evt.set)
- # and give a timeout to avoid
- if not evt.wait(self.flush_timeout):
- # write directly to __stderr__ instead of warning because
- # if this is happening sys.stderr may be the problem.
- print("IOStream.flush timed out", file=sys.__stderr__)
- else:
- self._flush()
- def _flush(self):
- """This is where the actual send happens.
- _flush should generally be called in the IO thread,
- unless the thread has been destroyed (e.g. forked subprocess).
- """
- self._flush_pending = False
- self._subprocess_flush_pending = False
- if self.echo is not None:
- try:
- self.echo.flush()
- except OSError as e:
- if self.echo is not sys.__stderr__:
- print(f"Flush failed: {e}", file=sys.__stderr__)
- for parent, data in self._flush_buffers():
- if data:
- # FIXME: this disables Session's fork-safe check,
- # since pub_thread is itself fork-safe.
- # There should be a better way to do this.
- self.session.pid = os.getpid()
- content = {"name": self.name, "text": data}
- msg = self.session.msg("stream", content, parent=parent)
- # Each transform either returns a new
- # message or None. If None is returned,
- # the message has been 'used' and we return.
- for hook in self._hooks:
- msg = hook(msg)
- if msg is None:
- return
- self.session.send(
- self.pub_thread,
- msg,
- ident=self.topic,
- )
- def write(self, string: str) -> int | None: # type:ignore[override]
- """Write to current stream after encoding if necessary
- Returns
- -------
- len : int
- number of items from input parameter written to stream.
- """
- parent = self.parent_header
- if not isinstance(string, str):
- msg = f"write() argument must be str, not {type(string)}" # type:ignore[unreachable]
- raise TypeError(msg)
- if self.echo is not None:
- try:
- self.echo.write(string)
- except OSError as e:
- if self.echo is not sys.__stderr__:
- print(f"Write failed: {e}", file=sys.__stderr__)
- if self.pub_thread is None:
- msg = "I/O operation on closed file"
- raise ValueError(msg)
- is_child = not self._is_master_process()
- # only touch the buffer in the IO thread to avoid races
- with self._buffer_lock:
- self._buffers[frozenset(parent.items())].write(string)
- if is_child:
- # mp.Pool cannot be trusted to flush promptly (or ever),
- # and this helps.
- if self._subprocess_flush_pending:
- return None
- self._subprocess_flush_pending = True
- # We can not rely on self._io_loop.call_later from a subprocess
- self.pub_thread.schedule(self._flush)
- else:
- self._schedule_flush()
- return len(string)
- def writelines(self, sequence):
- """Write lines to the stream."""
- if self.pub_thread is None:
- msg = "I/O operation on closed file"
- raise ValueError(msg)
- for string in sequence:
- self.write(string)
- def writable(self):
- """Test whether the stream is writable."""
- return True
- def _flush_buffers(self):
- """clear the current buffer and return the current buffer data."""
- buffers = self._rotate_buffers()
- for frozen_parent, buffer in buffers.items():
- data = buffer.getvalue()
- buffer.close()
- yield dict(frozen_parent), data
- def _rotate_buffers(self):
- """Returns the current buffer and replaces it with an empty buffer."""
- with self._buffer_lock:
- old_buffers = self._buffers
- self._buffers = defaultdict(StringIO)
- return old_buffers
- @property
- def _hooks(self):
- if not hasattr(self._local, "hooks"):
- # create new list for a new thread
- self._local.hooks = []
- return self._local.hooks
- def register_hook(self, hook):
- """
- Registers a hook with the thread-local storage.
- Parameters
- ----------
- hook : Any callable object
- Returns
- -------
- Either a publishable message, or `None`.
- The hook callable must return a message from
- the __call__ method if they still require the
- `session.send` method to be called after transformation.
- Returning `None` will halt that execution path, and
- session.send will not be called.
- """
- self._hooks.append(hook)
- def unregister_hook(self, hook):
- """
- Un-registers a hook with the thread-local storage.
- Parameters
- ----------
- hook : Any callable object which has previously been
- registered as a hook.
- Returns
- -------
- bool - `True` if the hook was removed, `False` if it wasn't
- found.
- """
- try:
- self._hooks.remove(hook)
- return True
- except ValueError:
- return False
|