heartbeat.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """The client and server for a basic ping-pong style heartbeat."""
  2. # -----------------------------------------------------------------------------
  3. # Copyright (C) 2008-2011 The IPython Development Team
  4. #
  5. # Distributed under the terms of the BSD License. The full license is in
  6. # the file LICENSE, distributed as part of this software.
  7. # -----------------------------------------------------------------------------
  8. # -----------------------------------------------------------------------------
  9. # Imports
  10. # -----------------------------------------------------------------------------
  11. import errno
  12. import socket
  13. from pathlib import Path
  14. from threading import Thread
  15. import zmq
  16. from jupyter_client.localinterfaces import localhost
  17. # -----------------------------------------------------------------------------
  18. # Code
  19. # -----------------------------------------------------------------------------
  20. class Heartbeat(Thread):
  21. """A simple ping-pong style heartbeat that runs in a thread."""
  22. def __init__(self, context, addr=None):
  23. """Initialize the heartbeat thread."""
  24. if addr is None:
  25. addr = ("tcp", localhost(), 0)
  26. Thread.__init__(self, name="Heartbeat")
  27. self.context = context
  28. self.transport, self.ip, self.port = addr
  29. self.original_port = self.port
  30. if self.original_port == 0:
  31. self.pick_port()
  32. self.addr = (self.ip, self.port)
  33. self.daemon = True
  34. self.pydev_do_not_trace = True
  35. self.is_pydev_daemon_thread = True
  36. self.name = "Heartbeat"
  37. def pick_port(self):
  38. """Pick a port for the heartbeat."""
  39. if self.transport == "tcp":
  40. s = socket.socket()
  41. # '*' means all interfaces to 0MQ, which is '' to socket.socket
  42. s.bind(("" if self.ip == "*" else self.ip, 0))
  43. self.port = s.getsockname()[1]
  44. s.close()
  45. elif self.transport == "ipc":
  46. self.port = 1
  47. while Path(f"{self.ip}-{self.port}").exists():
  48. self.port = self.port + 1
  49. else:
  50. raise ValueError("Unrecognized zmq transport: %s" % self.transport)
  51. return self.port
  52. def _try_bind_socket(self):
  53. c = ":" if self.transport == "tcp" else "-"
  54. return self.socket.bind(f"{self.transport}://{self.ip}" + c + str(self.port))
  55. def _bind_socket(self):
  56. try:
  57. win_in_use = errno.WSAEADDRINUSE # type:ignore[attr-defined]
  58. except AttributeError:
  59. win_in_use = None
  60. # Try up to 100 times to bind a port when in conflict to avoid
  61. # infinite attempts in bad setups
  62. max_attempts = 1 if self.original_port else 100
  63. for attempt in range(max_attempts):
  64. try:
  65. self._try_bind_socket()
  66. except zmq.ZMQError as ze:
  67. if attempt == max_attempts - 1:
  68. raise
  69. # Raise if we have any error not related to socket binding
  70. if ze.errno != errno.EADDRINUSE and ze.errno != win_in_use:
  71. raise
  72. # Raise if we have any error not related to socket binding
  73. if self.original_port == 0:
  74. self.pick_port()
  75. else:
  76. raise
  77. else:
  78. return
  79. def run(self):
  80. """Run the heartbeat thread."""
  81. self.name = "Heartbeat"
  82. self.socket = self.context.socket(zmq.ROUTER)
  83. self.socket.linger = 1000
  84. try:
  85. self._bind_socket()
  86. except Exception:
  87. self.socket.close()
  88. raise
  89. while True:
  90. try:
  91. zmq.device(zmq.QUEUE, self.socket, self.socket) # type:ignore[attr-defined]
  92. except zmq.ZMQError as e:
  93. if e.errno == errno.EINTR:
  94. # signal interrupt, resume heartbeat
  95. continue
  96. if e.errno == zmq.ETERM:
  97. # context terminated, close socket and exit
  98. try:
  99. self.socket.close()
  100. except zmq.ZMQError:
  101. # suppress further errors during cleanup
  102. # this shouldn't happen, though
  103. pass
  104. break
  105. if e.errno == zmq.ENOTSOCK:
  106. # socket closed elsewhere, exit
  107. break
  108. raise
  109. else:
  110. break