| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- """Utilities for identifying local IP addresses."""
- # Copyright (c) Jupyter Development Team.
- # Distributed under the terms of the Modified BSD License.
- from __future__ import annotations
- import os
- import re
- import socket
- import subprocess
- from collections.abc import Callable, Iterable, Mapping, Sequence
- from subprocess import PIPE, Popen
- from typing import Any
- from warnings import warn
- LOCAL_IPS: list[str] = []
- PUBLIC_IPS: list[str] = []
- LOCALHOST: str = ""
- def _uniq_stable(elems: Iterable) -> list:
- """uniq_stable(elems) -> list
- Return from an iterable, a list of all the unique elements in the input,
- maintaining the order in which they first appear.
- """
- seen = set()
- value = []
- for x in elems:
- if x not in seen:
- value.append(x)
- seen.add(x)
- return value
- def _get_output(cmd: str | Sequence[str]) -> str:
- """Get output of a command, raising IOError if it fails"""
- startupinfo = None
- if os.name == "nt":
- startupinfo = subprocess.STARTUPINFO() # type:ignore[attr-defined]
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type:ignore[attr-defined]
- p = Popen(cmd, stdout=PIPE, stderr=PIPE, startupinfo=startupinfo) # noqa
- stdout, stderr = p.communicate()
- if p.returncode:
- msg = "Failed to run {}: {}".format(cmd, stderr.decode("utf8", "replace"))
- raise OSError(msg)
- return stdout.decode("utf8", "replace")
- def _only_once(f: Callable) -> Callable:
- """decorator to only run a function once"""
- f.called = False # type:ignore[attr-defined]
- def wrapped(**kwargs: Any) -> Any:
- if f.called: # type:ignore[attr-defined]
- return
- ret = f(**kwargs)
- f.called = True # type:ignore[attr-defined]
- return ret
- return wrapped
- def _requires_ips(f: Callable) -> Callable:
- """decorator to ensure load_ips has been run before f"""
- def ips_loaded(*args: Any, **kwargs: Any) -> Any:
- _load_ips()
- return f(*args, **kwargs)
- return ips_loaded
- # subprocess-parsing ip finders
- class NoIPAddresses(Exception): # noqa
- pass
- def _populate_from_list(addrs: Sequence[str]) -> None:
- """populate local and public IPs from flat list of all IPs"""
- _populate_from_dict({"all": addrs})
- def _populate_from_dict(addrs: Mapping[str, Sequence[str]]) -> None:
- """populate local and public IPs from dict of {'en0': 'ip'}"""
- if not addrs:
- raise NoIPAddresses()
- global LOCALHOST
- public_ips = []
- local_ips = []
- for iface, ip_list in addrs.items():
- for ip in ip_list:
- local_ips.append(ip)
- if not LOCALHOST and (iface.startswith("lo") or ip.startswith("127.")):
- LOCALHOST = ip
- if not iface.startswith("lo") and not ip.startswith(("127.", "169.254.")):
- # don't include link-local address in public_ips
- public_ips.append(ip)
- if not LOCALHOST or LOCALHOST == "127.0.0.1":
- LOCALHOST = "127.0.0.1"
- local_ips.insert(0, LOCALHOST)
- local_ips.extend(["0.0.0.0", ""]) # noqa: S104
- LOCAL_IPS[:] = _uniq_stable(local_ips)
- PUBLIC_IPS[:] = _uniq_stable(public_ips)
- _ifconfig_ipv4_pat = re.compile(r"inet\b.*?(\d+\.\d+\.\d+\.\d+)", re.IGNORECASE)
- def _load_ips_ifconfig() -> None:
- """load ip addresses from `ifconfig` output (posix)"""
- try:
- out = _get_output("ifconfig")
- except OSError:
- # no ifconfig, it's usually in /sbin and /sbin is not on everyone's PATH
- out = _get_output("/sbin/ifconfig")
- lines = out.splitlines()
- addrs = []
- for line in lines:
- m = _ifconfig_ipv4_pat.match(line.strip())
- if m:
- addrs.append(m.group(1))
- _populate_from_list(addrs)
- def _load_ips_ip() -> None:
- """load ip addresses from `ip addr` output (Linux)"""
- out = _get_output(["ip", "-f", "inet", "addr"])
- lines = out.splitlines()
- addrs = []
- for line in lines:
- blocks = line.lower().split()
- if (len(blocks) >= 2) and (blocks[0] == "inet"):
- addrs.append(blocks[1].split("/")[0])
- _populate_from_list(addrs)
- _ipconfig_ipv4_pat = re.compile(r"ipv4.*?(\d+\.\d+\.\d+\.\d+)$", re.IGNORECASE)
- def _load_ips_ipconfig() -> None:
- """load ip addresses from `ipconfig` output (Windows)"""
- out = _get_output("ipconfig")
- lines = out.splitlines()
- addrs = []
- for line in lines:
- m = _ipconfig_ipv4_pat.match(line.strip())
- if m:
- addrs.append(m.group(1))
- _populate_from_list(addrs)
- def _load_ips_psutil() -> None:
- """load ip addresses with psutil"""
- import psutil
- addr_dict: dict[str, list[str]] = {}
- # dict of iface_name: address_list, eg
- # {"lo": [snicaddr(family=<AddressFamily.AF_INET>, address="127.0.0.1",
- # ...), snicaddr(family=<AddressFamily.AF_INET6>, ...)]}
- for iface, ifaddresses in psutil.net_if_addrs().items():
- addr_dict[iface] = [
- address_data.address
- for address_data in ifaddresses
- if address_data.family == socket.AF_INET
- ]
- _populate_from_dict(addr_dict)
- def _load_ips_netifaces() -> None:
- """load ip addresses with netifaces"""
- import netifaces
- addr_dict: dict[str, list[str]] = {}
- # list of iface names, 'lo0', 'eth0', etc.
- for iface in netifaces.interfaces():
- # list of ipv4 addrinfo dicts
- addr_dict[iface] = []
- ipv4s = netifaces.ifaddresses(iface).get(netifaces.AF_INET, [])
- for entry in ipv4s:
- addr = entry.get("addr")
- if addr:
- addr_dict[iface].append(addr)
- _populate_from_dict(addr_dict)
- def _load_ips_gethostbyname() -> None:
- """load ip addresses with socket.gethostbyname_ex
- This can be slow.
- """
- global LOCALHOST
- try:
- LOCAL_IPS[:] = socket.gethostbyname_ex("localhost")[2]
- except OSError:
- # assume common default
- LOCAL_IPS[:] = ["127.0.0.1"]
- try:
- hostname = socket.gethostname()
- PUBLIC_IPS[:] = socket.gethostbyname_ex(hostname)[2]
- # try hostname.local, in case hostname has been short-circuited to loopback
- if not hostname.endswith(".local") and all(ip.startswith("127") for ip in PUBLIC_IPS):
- PUBLIC_IPS[:] = socket.gethostbyname_ex(socket.gethostname() + ".local")[2]
- except OSError:
- pass
- finally:
- PUBLIC_IPS[:] = _uniq_stable(PUBLIC_IPS)
- LOCAL_IPS.extend(PUBLIC_IPS)
- # include all-interface aliases: 0.0.0.0 and ''
- LOCAL_IPS.extend(["0.0.0.0", ""]) # noqa
- LOCAL_IPS[:] = _uniq_stable(LOCAL_IPS)
- LOCALHOST = LOCAL_IPS[0]
- def _load_ips_dumb() -> None:
- """Fallback in case of unexpected failure"""
- global LOCALHOST
- LOCALHOST = "127.0.0.1"
- LOCAL_IPS[:] = [LOCALHOST, "0.0.0.0", ""] # noqa
- PUBLIC_IPS[:] = []
- @_only_once
- def _load_ips(suppress_exceptions: bool = True) -> None:
- """load the IPs that point to this machine
- This function will only ever be called once.
- If will use psutil to do it quickly if available.
- If not, it will use netifaces to do it quickly if available.
- Then it will fallback on parsing the output of ifconfig / ip addr / ipconfig, as appropriate.
- Finally, it will fallback on socket.gethostbyname_ex, which can be slow.
- """
- try:
- # first priority, use psutil
- try:
- return _load_ips_psutil()
- except ImportError:
- pass
- # second priority, use netifaces
- try:
- return _load_ips_netifaces()
- except ImportError:
- pass
- # second priority, parse subprocess output (how reliable is this?)
- if os.name == "nt":
- try:
- return _load_ips_ipconfig()
- except (OSError, NoIPAddresses):
- pass
- else:
- try:
- return _load_ips_ip()
- except (OSError, NoIPAddresses):
- pass
- try:
- return _load_ips_ifconfig()
- except (OSError, NoIPAddresses):
- pass
- # lowest priority, use gethostbyname
- return _load_ips_gethostbyname()
- except Exception as e:
- if not suppress_exceptions:
- raise
- # unexpected error shouldn't crash, load dumb default values instead.
- warn("Unexpected error discovering local network interfaces: %s" % e, stacklevel=2)
- _load_ips_dumb()
- @_requires_ips
- def local_ips() -> list[str]:
- """return the IP addresses that point to this machine"""
- return LOCAL_IPS
- @_requires_ips
- def public_ips() -> list[str]:
- """return the IP addresses for this machine that are visible to other machines"""
- return PUBLIC_IPS
- @_requires_ips
- def localhost() -> str:
- """return ip for localhost (almost always 127.0.0.1)"""
- return LOCALHOST
- @_requires_ips
- def is_local_ip(ip: str) -> bool:
- """does `ip` point to this machine?"""
- return ip in LOCAL_IPS
- @_requires_ips
- def is_public_ip(ip: str) -> bool:
- """is `ip` a publicly visible address?"""
- return ip in PUBLIC_IPS
|