localinterfaces.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. """Utilities for identifying local IP addresses."""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import annotations
  5. import os
  6. import re
  7. import socket
  8. import subprocess
  9. from collections.abc import Callable, Iterable, Mapping, Sequence
  10. from subprocess import PIPE, Popen
  11. from typing import Any
  12. from warnings import warn
  13. LOCAL_IPS: list[str] = []
  14. PUBLIC_IPS: list[str] = []
  15. LOCALHOST: str = ""
  16. def _uniq_stable(elems: Iterable) -> list:
  17. """uniq_stable(elems) -> list
  18. Return from an iterable, a list of all the unique elements in the input,
  19. maintaining the order in which they first appear.
  20. """
  21. seen = set()
  22. value = []
  23. for x in elems:
  24. if x not in seen:
  25. value.append(x)
  26. seen.add(x)
  27. return value
  28. def _get_output(cmd: str | Sequence[str]) -> str:
  29. """Get output of a command, raising IOError if it fails"""
  30. startupinfo = None
  31. if os.name == "nt":
  32. startupinfo = subprocess.STARTUPINFO() # type:ignore[attr-defined]
  33. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type:ignore[attr-defined]
  34. p = Popen(cmd, stdout=PIPE, stderr=PIPE, startupinfo=startupinfo) # noqa
  35. stdout, stderr = p.communicate()
  36. if p.returncode:
  37. msg = "Failed to run {}: {}".format(cmd, stderr.decode("utf8", "replace"))
  38. raise OSError(msg)
  39. return stdout.decode("utf8", "replace")
  40. def _only_once(f: Callable) -> Callable:
  41. """decorator to only run a function once"""
  42. f.called = False # type:ignore[attr-defined]
  43. def wrapped(**kwargs: Any) -> Any:
  44. if f.called: # type:ignore[attr-defined]
  45. return
  46. ret = f(**kwargs)
  47. f.called = True # type:ignore[attr-defined]
  48. return ret
  49. return wrapped
  50. def _requires_ips(f: Callable) -> Callable:
  51. """decorator to ensure load_ips has been run before f"""
  52. def ips_loaded(*args: Any, **kwargs: Any) -> Any:
  53. _load_ips()
  54. return f(*args, **kwargs)
  55. return ips_loaded
  56. # subprocess-parsing ip finders
  57. class NoIPAddresses(Exception): # noqa
  58. pass
  59. def _populate_from_list(addrs: Sequence[str]) -> None:
  60. """populate local and public IPs from flat list of all IPs"""
  61. _populate_from_dict({"all": addrs})
  62. def _populate_from_dict(addrs: Mapping[str, Sequence[str]]) -> None:
  63. """populate local and public IPs from dict of {'en0': 'ip'}"""
  64. if not addrs:
  65. raise NoIPAddresses()
  66. global LOCALHOST
  67. public_ips = []
  68. local_ips = []
  69. for iface, ip_list in addrs.items():
  70. for ip in ip_list:
  71. local_ips.append(ip)
  72. if not LOCALHOST and (iface.startswith("lo") or ip.startswith("127.")):
  73. LOCALHOST = ip
  74. if not iface.startswith("lo") and not ip.startswith(("127.", "169.254.")):
  75. # don't include link-local address in public_ips
  76. public_ips.append(ip)
  77. if not LOCALHOST or LOCALHOST == "127.0.0.1":
  78. LOCALHOST = "127.0.0.1"
  79. local_ips.insert(0, LOCALHOST)
  80. local_ips.extend(["0.0.0.0", ""]) # noqa: S104
  81. LOCAL_IPS[:] = _uniq_stable(local_ips)
  82. PUBLIC_IPS[:] = _uniq_stable(public_ips)
  83. _ifconfig_ipv4_pat = re.compile(r"inet\b.*?(\d+\.\d+\.\d+\.\d+)", re.IGNORECASE)
  84. def _load_ips_ifconfig() -> None:
  85. """load ip addresses from `ifconfig` output (posix)"""
  86. try:
  87. out = _get_output("ifconfig")
  88. except OSError:
  89. # no ifconfig, it's usually in /sbin and /sbin is not on everyone's PATH
  90. out = _get_output("/sbin/ifconfig")
  91. lines = out.splitlines()
  92. addrs = []
  93. for line in lines:
  94. m = _ifconfig_ipv4_pat.match(line.strip())
  95. if m:
  96. addrs.append(m.group(1))
  97. _populate_from_list(addrs)
  98. def _load_ips_ip() -> None:
  99. """load ip addresses from `ip addr` output (Linux)"""
  100. out = _get_output(["ip", "-f", "inet", "addr"])
  101. lines = out.splitlines()
  102. addrs = []
  103. for line in lines:
  104. blocks = line.lower().split()
  105. if (len(blocks) >= 2) and (blocks[0] == "inet"):
  106. addrs.append(blocks[1].split("/")[0])
  107. _populate_from_list(addrs)
  108. _ipconfig_ipv4_pat = re.compile(r"ipv4.*?(\d+\.\d+\.\d+\.\d+)$", re.IGNORECASE)
  109. def _load_ips_ipconfig() -> None:
  110. """load ip addresses from `ipconfig` output (Windows)"""
  111. out = _get_output("ipconfig")
  112. lines = out.splitlines()
  113. addrs = []
  114. for line in lines:
  115. m = _ipconfig_ipv4_pat.match(line.strip())
  116. if m:
  117. addrs.append(m.group(1))
  118. _populate_from_list(addrs)
  119. def _load_ips_psutil() -> None:
  120. """load ip addresses with psutil"""
  121. import psutil
  122. addr_dict: dict[str, list[str]] = {}
  123. # dict of iface_name: address_list, eg
  124. # {"lo": [snicaddr(family=<AddressFamily.AF_INET>, address="127.0.0.1",
  125. # ...), snicaddr(family=<AddressFamily.AF_INET6>, ...)]}
  126. for iface, ifaddresses in psutil.net_if_addrs().items():
  127. addr_dict[iface] = [
  128. address_data.address
  129. for address_data in ifaddresses
  130. if address_data.family == socket.AF_INET
  131. ]
  132. _populate_from_dict(addr_dict)
  133. def _load_ips_netifaces() -> None:
  134. """load ip addresses with netifaces"""
  135. import netifaces
  136. addr_dict: dict[str, list[str]] = {}
  137. # list of iface names, 'lo0', 'eth0', etc.
  138. for iface in netifaces.interfaces():
  139. # list of ipv4 addrinfo dicts
  140. addr_dict[iface] = []
  141. ipv4s = netifaces.ifaddresses(iface).get(netifaces.AF_INET, [])
  142. for entry in ipv4s:
  143. addr = entry.get("addr")
  144. if addr:
  145. addr_dict[iface].append(addr)
  146. _populate_from_dict(addr_dict)
  147. def _load_ips_gethostbyname() -> None:
  148. """load ip addresses with socket.gethostbyname_ex
  149. This can be slow.
  150. """
  151. global LOCALHOST
  152. try:
  153. LOCAL_IPS[:] = socket.gethostbyname_ex("localhost")[2]
  154. except OSError:
  155. # assume common default
  156. LOCAL_IPS[:] = ["127.0.0.1"]
  157. try:
  158. hostname = socket.gethostname()
  159. PUBLIC_IPS[:] = socket.gethostbyname_ex(hostname)[2]
  160. # try hostname.local, in case hostname has been short-circuited to loopback
  161. if not hostname.endswith(".local") and all(ip.startswith("127") for ip in PUBLIC_IPS):
  162. PUBLIC_IPS[:] = socket.gethostbyname_ex(socket.gethostname() + ".local")[2]
  163. except OSError:
  164. pass
  165. finally:
  166. PUBLIC_IPS[:] = _uniq_stable(PUBLIC_IPS)
  167. LOCAL_IPS.extend(PUBLIC_IPS)
  168. # include all-interface aliases: 0.0.0.0 and ''
  169. LOCAL_IPS.extend(["0.0.0.0", ""]) # noqa
  170. LOCAL_IPS[:] = _uniq_stable(LOCAL_IPS)
  171. LOCALHOST = LOCAL_IPS[0]
  172. def _load_ips_dumb() -> None:
  173. """Fallback in case of unexpected failure"""
  174. global LOCALHOST
  175. LOCALHOST = "127.0.0.1"
  176. LOCAL_IPS[:] = [LOCALHOST, "0.0.0.0", ""] # noqa
  177. PUBLIC_IPS[:] = []
  178. @_only_once
  179. def _load_ips(suppress_exceptions: bool = True) -> None:
  180. """load the IPs that point to this machine
  181. This function will only ever be called once.
  182. If will use psutil to do it quickly if available.
  183. If not, it will use netifaces to do it quickly if available.
  184. Then it will fallback on parsing the output of ifconfig / ip addr / ipconfig, as appropriate.
  185. Finally, it will fallback on socket.gethostbyname_ex, which can be slow.
  186. """
  187. try:
  188. # first priority, use psutil
  189. try:
  190. return _load_ips_psutil()
  191. except ImportError:
  192. pass
  193. # second priority, use netifaces
  194. try:
  195. return _load_ips_netifaces()
  196. except ImportError:
  197. pass
  198. # second priority, parse subprocess output (how reliable is this?)
  199. if os.name == "nt":
  200. try:
  201. return _load_ips_ipconfig()
  202. except (OSError, NoIPAddresses):
  203. pass
  204. else:
  205. try:
  206. return _load_ips_ip()
  207. except (OSError, NoIPAddresses):
  208. pass
  209. try:
  210. return _load_ips_ifconfig()
  211. except (OSError, NoIPAddresses):
  212. pass
  213. # lowest priority, use gethostbyname
  214. return _load_ips_gethostbyname()
  215. except Exception as e:
  216. if not suppress_exceptions:
  217. raise
  218. # unexpected error shouldn't crash, load dumb default values instead.
  219. warn("Unexpected error discovering local network interfaces: %s" % e, stacklevel=2)
  220. _load_ips_dumb()
  221. @_requires_ips
  222. def local_ips() -> list[str]:
  223. """return the IP addresses that point to this machine"""
  224. return LOCAL_IPS
  225. @_requires_ips
  226. def public_ips() -> list[str]:
  227. """return the IP addresses for this machine that are visible to other machines"""
  228. return PUBLIC_IPS
  229. @_requires_ips
  230. def localhost() -> str:
  231. """return ip for localhost (almost always 127.0.0.1)"""
  232. return LOCALHOST
  233. @_requires_ips
  234. def is_local_ip(ip: str) -> bool:
  235. """does `ip` point to this machine?"""
  236. return ip in LOCAL_IPS
  237. @_requires_ips
  238. def is_public_ip(ip: str) -> bool:
  239. """is `ip` a publicly visible address?"""
  240. return ip in PUBLIC_IPS